Every data pipeline eventually faces the same quiet crisis: the local database grows fat with duplicates, the sync job takes longer each week, and somewhere between the third failed cron job and the manual DELETE FROM statements, someone asks the question that should have been asked at the start.
What happens when the data source changes its mind?
This is the problem space of incremental updates and deduplication. It is unglamorous work — no flashy dashboards, no real-time adrenaline — but it is the load-bearing infrastructure beneath every reliable data product. Get it wrong and your backtests decay. Get it right and your system breathes: only changed records move across the wire, storage stays lean, and your pipeline survives the data source's inevitable corrections, backfills, and restatements.
This article dissects three complementary strategies — version-based tracking, hash validation, and UPSERT operations — and shows how they compose into a production-grade sync layer. All code examples use TickDB as the upstream data source and SQLite as the local store, but the patterns transfer to PostgreSQL, ClickHouse, or any SQL-backed destination.
The Core Problem: Three Sync Failure Modes
Before diving into solutions, we need to crystallize what we are solving. There are three distinct failure modes in data synchronization.
Duplication: The same logical record appears multiple times in the local store. This happens when the sync job is idempotent-in-name-only — it inserts new rows without checking whether a record with that primary key already exists. Duplication inflates storage, distorts aggregations, and silently corrupts any analysis built on top.
Staleness: The local store reflects an older version of the truth. This occurs when the sync job pulls only new records (by appending on a monotonic sequence or timestamp) but the data source retroactively modifies an existing record. The new data never reaches the local store because it is not "new" by the pipeline's definition.
Drift: The local store diverges from the source in a way that is neither duplication nor staleness — the schema changes, the record key changes, or the source restructures its identifiers entirely. Drift is the hardest failure mode to detect and recover from.
Each failure mode demands a different defense. Duplication is prevented by uniqueness constraints and idempotent write operations. Staleness is prevented by version-aware tracking and update operations, not just inserts. Drift is detected by hash validation and reconciled by selective replacement.
The strategies in this article address all three.
Strategy 1: Version-Based Tracking
The Principle
Every record in the source has a logical version identifier — a timestamp, a sequence number, a monotonic counter, or a composite of multiple fields. The local store maintains a watermark: the highest version value seen for each ticker or entity. On each sync run, only records whose version exceeds the watermark are fetched and written.
Version-based tracking is the most widely applicable strategy. It works for any data source that exposes a monotonic ordering on records — OHLCV klines with an interval close time, order book snapshots with a sequence number, or trade records with an exchange-assigned ID.
The Watermark Table
The watermark lives in its own metadata table, separate from the data:
import sqlite3
import os
from datetime import datetime, timezone
DB_PATH = os.path.expanduser("~/tickdb_local.db")
def init_watermark_table(db_path: str = DB_PATH):
"""Create the watermark metadata table if it doesn't exist."""
with sqlite3.connect(db_path) as conn:
conn.execute("""
CREATE TABLE IF NOT EXISTS sync_watermarks (
source_table TEXT PRIMARY KEY,
last_version TEXT NOT NULL,
updated_at TEXT NOT NULL
)
""")
conn.commit()
def get_watermark(db_path: str, table_name: str) -> str | None:
"""Retrieve the last synced version for a given source table."""
with sqlite3.connect(db_path) as conn:
row = conn.execute(
"SELECT last_version FROM sync_watermarks WHERE source_table = ?",
(table_name,)
).fetchone()
return row[0] if row else None
def set_watermark(db_path: str, table_name: str, version: str):
"""Update the watermark after a successful sync."""
with sqlite3.connect(db_path) as conn:
conn.execute("""
INSERT INTO sync_watermarks (source_table, last_version, updated_at)
VALUES (?, ?, ?)
ON CONFLICT(source_table) DO UPDATE SET
last_version = excluded.last_version,
updated_at = excluded.updated_at
""", (table_name, version, datetime.now(timezone.utc).isoformat()))
conn.commit()
The ON CONFLICT(source_table) DO UPDATE clause is the SQLite equivalent of an UPSERT — it inserts a new watermark row if none exists, or updates the existing row if the source table key already exists. This is idempotent: running the sync job twice in succession will not corrupt the watermark state.
Watermark Granularity: Per-Ticker vs. Global
A critical design decision is whether the watermark is global (one value for the entire dataset) or per-ticker (one value per symbol). Global watermarks are simpler but less resilient: if you sync NVDA and AMD on the same schedule and AMD's data is restated, the global watermark advances past AMD's old version range and you lose the correction.
Per-ticker watermarks solve this at the cost of operational complexity. For a focused pipeline — say, a single strategy tracking five aerospace tickers — per-ticker watermarks are worth the overhead. For a broad-market pipeline with 500 tickers, a global watermark with hash validation as a fallback is more practical.
def init_watermark_table_per_ticker(db_path: str = DB_PATH):
"""Per-ticker watermarks for fine-grained sync control."""
with sqlite3.connect(db_path) as conn:
conn.execute("""
CREATE TABLE IF NOT EXISTS sync_watermarks (
source_table TEXT NOT NULL,
symbol TEXT NOT NULL,
last_version TEXT NOT NULL,
updated_at TEXT NOT NULL,
PRIMARY KEY (source_table, symbol)
)
""")
conn.commit()
Strategy 2: Hash Validation
The Principle
Version-based tracking works when the source exposes a monotonic version field. It fails when the source does not — or when it does, but the versioning is per-page rather than per-record. In these cases, hash validation provides a fallback detection mechanism.
The idea is straightforward: compute a content hash over the record's canonical fields. Store the hash alongside the data in the local store. On each sync run, recompute the hash from the fetched record and compare it to the stored hash. A mismatch means the record changed, triggering an update.
Content Hash Implementation
import hashlib
import json
def compute_record_hash(record: dict, fields: list[str]) -> str:
"""
Compute a deterministic SHA-256 hash over specified fields.
Args:
record: A dict representing the data record.
fields: The list of field names to include in the hash. Fields
are sorted alphabetically to ensure deterministic ordering.
Returns:
A hexadecimal hash string.
Engineering note: Sorting fields by name before hashing prevents
order-dependent hash divergence. If your data source returns
dicts with varying key ordering, this matters.
"""
canonical = {k: record[k] for k in sorted(fields) if k in record}
normalized = json.dumps(canonical, sort_keys=True, default=str)
return hashlib.sha256(normalized.encode("utf-8")).hexdigest()
# Example: hash the o, h, l, c, v fields of a kline
def compute_kline_hash(kline: dict) -> str:
"""Hash only the price and volume fields — ignore metadata like fetch time."""
return compute_record_hash(kline, fields=["o", "h", "l", "c", "v"])
Schema Hash for Structural Drift Detection
For detecting drift — changes to the source schema or identifier structure — we compute a schema hash: a hash over the set of field names present in a data batch. If the schema hash changes between sync runs, something structural shifted in the source, and the pipeline should alert for manual review before proceeding.
def compute_schema_hash(records: list[dict]) -> str:
"""Detect structural schema changes across a batch of records."""
field_names = sorted(set(k for r in records for k in r.keys()))
schema_sig = json.dumps(field_names)
return hashlib.sha256(schema_sig.encode("utf-8")).hexdigest()
Strategy 3: UPSERT — The Write Operation That Replaces Duplication with Synchronization
The Principle
UPSERT — UPDATE if exists, INSERT if not — is the atomic write primitive that makes incremental sync possible without read-before-write round trips. Every major SQL database supports it: PostgreSQL uses ON CONFLICT DO UPDATE, SQLite uses ON CONFLICT DO UPDATE (or the older INSERT OR REPLACE), MySQL uses ON DUPLICATE KEY UPDATE.
Without UPSERT, the sync flow is: SELECT to check existence → conditional INSERT or UPDATE. This is two round trips per record, it creates a race condition between the SELECT and the write, and it is slow at scale. With UPSERT, it is one atomic statement.
UPSERT for OHLCV Data
The canonical use case is storing candlestick (kline) data. Each kline is uniquely identified by its symbol, interval, and close time. If a new kline arrives (a new candle closes), we insert it. If the source backfills and modifies an existing candle's close time or volume, we update it. Same statement, both cases.
def upsert_kline(db_path: str, kline: dict):
"""
Atomically insert or update a kline record.
The record is identified by (symbol, interval, close_time).
If the record already exists, UPDATE overwrites; if not, INSERT creates.
Engineering note: The UPSERT here uses the full OHLCV tuple as the
conflict resolution target. This is safe for kline data because
the combination of symbol + interval + close_time is guaranteed
unique by the exchange. For other data types, ensure your uniqueness
constraint is enforced at the source before relying on UPSERT semantics.
"""
with sqlite3.connect(db_path) as conn:
conn.execute("""
INSERT INTO klines (symbol, interval, open_time, close_time,
o, h, l, c, v, hash, updated_at)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
ON CONFLICT(symbol, interval, close_time) DO UPDATE SET
o = excluded.o,
h = excluded.h,
l = excluded.l,
c = excluded.c,
v = excluded.v,
hash = excluded.hash,
updated_at = excluded.updated_at
""", (
kline["symbol"],
kline["interval"],
kline["open_time"],
kline["close_time"],
kline["o"],
kline["h"],
kline["l"],
kline["c"],
kline["v"],
compute_kline_hash(kline),
datetime.now(timezone.utc).isoformat()
))
conn.commit()
Batch UPSERT with Transaction Batching
Single-record UPSERT is correct but slow. For a pipeline syncing thousands of records per run, we batch writes into transactions. SQLite's default autocommit mode creates a new transaction per statement, which is slow; explicit BEGIN/COMMIT blocks reduce transaction overhead by 10–100x.
def upsert_klines_batch(db_path: str, klines: list[dict], batch_size: int = 500):
"""
Batch upsert kline records with transactional grouping.
Args:
klines: List of kline dicts to upsert.
batch_size: Number of records per transaction commit.
Engineering note: Batch size is a performance knob. Too large and
you risk holding locks too long or exhausting memory on a
constrained device. Too small and transaction overhead dominates.
500 is a reasonable starting point for SQLite; PostgreSQL
can tolerate larger batches.
"""
for i in range(0, len(klines), batch_size):
batch = klines[i:i + batch_size]
with sqlite3.connect(db_path) as conn:
try:
conn.execute("BEGIN")
for kline in batch:
conn.execute("""
INSERT INTO klines (symbol, interval, open_time, close_time,
o, h, l, c, v, hash, updated_at)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
ON CONFLICT(symbol, interval, close_time) DO UPDATE SET
o = excluded.o,
h = excluded.h,
l = excluded.l,
c = excluded.c,
v = excluded.v,
hash = excluded.hash,
updated_at = excluded.updated_at
""", (
kline["symbol"], kline["interval"],
kline["open_time"], kline["close_time"],
kline["o"], kline["h"], kline["l"], kline["c"], kline["v"],
compute_kline_hash(kline),
datetime.now(timezone.utc).isoformat()
))
conn.execute("COMMIT")
except Exception as e:
conn.execute("ROLLBACK")
raise RuntimeError(f"Batch {i // batch_size} failed: {e}") from e
Putting It Together: A Complete Sync Job
With the three strategies defined, here is how they compose into a complete, daily sync job.
TickDB Integration
The code below demonstrates a daily OHLCV sync from TickDB using the /v1/market/kline endpoint. It reads the last synced close_time as the watermark, fetches only newer klines, computes hashes, and upserts them in batches.
import os
import time
import random
import sqlite3
import requests
from datetime import datetime, timezone, timedelta
API_KEY = os.environ.get("TICKDB_API_KEY")
BASE_URL = "https://api.tickdb.ai/v1"
DB_PATH = os.path.expanduser("~/tickdb_local.db")
def init_tables(db_path: str = DB_PATH):
"""Initialize the kline storage and watermark tables."""
with sqlite3.connect(db_path) as conn:
conn.execute("""
CREATE TABLE IF NOT EXISTS klines (
symbol TEXT NOT NULL,
interval TEXT NOT NULL,
open_time TEXT NOT NULL,
close_time TEXT NOT NULL,
o REAL NOT NULL, h REAL NOT NULL,
l REAL NOT NULL, c REAL NOT NULL,
v REAL NOT NULL,
hash TEXT NOT NULL,
updated_at TEXT NOT NULL,
PRIMARY KEY (symbol, interval, close_time)
)
""")
conn.execute("""
CREATE TABLE IF NOT EXISTS sync_watermarks (
symbol TEXT PRIMARY KEY,
last_close_time TEXT NOT NULL,
updated_at TEXT NOT NULL
)
""")
conn.execute("CREATE INDEX IF NOT EXISTS idx_klines_close ON klines(close_time)")
conn.commit()
def fetch_klines_since(symbol: str, interval: str, since: datetime) -> list[dict]:
"""
Fetch klines from TickDB since a given timestamp.
Uses the 'since' parameter on the kline endpoint to request
only records with close_time >= the given threshold. This is
the server-side filter that makes incremental sync possible.
"""
since_ms = int(since.timestamp() * 1000)
limit = 1000
fetched = []
while True:
resp = requests.get(
f"{BASE_URL}/market/kline",
headers={"X-API-Key": API_KEY},
params={
"symbol": symbol,
"interval": interval,
"startTime": since_ms,
"limit": limit
},
timeout=(3.05, 10)
)
if resp.status_code == 429:
retry_after = int(resp.headers.get("Retry-After", 5))
time.sleep(retry_after)
continue
data = resp.json()
if data.get("code") == 3001:
retry_after = int(data.get("headers", {}).get("Retry-After", 5))
time.sleep(retry_after)
continue
if data.get("code") != 0:
raise RuntimeError(f"TickDB API error {data.get('code')}: {data.get('message')}")
batch = data["data"]
fetched.extend(batch)
if len(batch) < limit:
break
last_close = batch[-1]["close_time"]
since_ms = last_close + 1
return fetched
def sync_symbol(symbol: str, interval: str = "1d", db_path: str = DB_PATH):
"""
Sync a single symbol's kline data incrementally since the last watermark.
1. Read the watermark (last synced close_time) from local store.
2. Fetch only newer klines from TickDB using the 'since' filter.
3. Upsert each kline (updates if the close time matches, inserts if new).
4. Update the watermark to the max close_time of fetched records.
"""
watermark = get_watermark(db_path, symbol)
since = datetime.fromisoformat(watermark) if watermark else (
datetime.now(timezone.utc) - timedelta(days=365 * 10)
)
klines = fetch_klines_since(symbol, interval, since)
if not klines:
return 0
for i in range(0, len(klines), 500):
batch = klines[i:i + 500]
with sqlite3.connect(db_path) as conn:
conn.execute("BEGIN")
for kline in batch:
conn.execute("""
INSERT INTO klines (symbol, interval, open_time, close_time,
o, h, l, c, v, hash, updated_at)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
ON CONFLICT(symbol, interval, close_time) DO UPDATE SET
o = excluded.o, h = excluded.h,
l = excluded.l, c = excluded.c, v = excluded.v,
hash = excluded.hash, updated_at = excluded.updated_at
""", (
kline["symbol"], kline["interval"],
kline["open_time"], kline["close_time"],
kline["o"], kline["h"], kline["l"], kline["c"], kline["v"],
compute_kline_hash(kline),
datetime.now(timezone.utc).isoformat()
))
conn.execute("COMMIT")
max_close_time = max(k["close_time"] for k in klines)
set_watermark(db_path, symbol, max_close_time)
return len(klines)
def get_watermark(db_path: str, symbol: str) -> str | None:
with sqlite3.connect(db_path) as conn:
row = conn.execute(
"SELECT last_close_time FROM sync_watermarks WHERE symbol = ?",
(symbol,)
).fetchone()
return row[0] if row else None
def set_watermark(db_path: str, symbol: str, close_time: int):
with sqlite3.connect(db_path) as conn:
conn.execute("""
INSERT INTO sync_watermarks (symbol, last_close_time, updated_at)
VALUES (?, ?, ?)
ON CONFLICT(symbol) DO UPDATE SET
last_close_time = excluded.last_close_time,
updated_at = excluded.updated_at
""", (symbol, close_time, datetime.now(timezone.utc).isoformat()))
conn.commit()
# ─── Daily sync job entry point ────────────────────────────────────────────────
if __name__ == "__main__":
init_tables()
symbols = ["AAPL.US", "NVDA.US", "TSLA.US", "SPY.US"]
for symbol in symbols:
try:
count = sync_symbol(symbol)
print(f"[{datetime.now(timezone.utc).isoformat()}] Synced {count} "
f"klines for {symbol}")
except Exception as e:
print(f"[ERROR] Sync failed for {symbol}: {e}")
Handling the Hard Case: Data Source Backfills
Version-based tracking handles ongoing incremental updates cleanly. The hard case is when the data source retroactively modifies records — a backfill, a restatement, a corporate action adjustment. The source does not re-emit these records with a newer version timestamp; it modifies the record in place, and the version field — if one exists at all — may be unchanged.
The defense against backfill-driven staleness is a two-layer check: version-based filtering as the fast path, combined with hash validation as the correctness guard.
The Backfill Detection Flow
Sync Job Starts
│
▼
Fetch records where close_time > watermark
│
▼
For each fetched record:
├─ Record exists in local store? ──No──► UPSERT (INSERT path)
│ Continue
│
Yes
│
├─ New version > stored version? ──Yes──► UPSERT (UPDATE path)
│ Continue
│
No (version unchanged)
│
├─ Hash mismatch? ──Yes──► UPSERT (UPDATE path — source corrected data)
│ Flag for review log
│
No (hash matches)
│
└─ No action needed. Record is identical.
The hash mismatch path is the safety net. Even if the version field does not change (because the source updated the record without incrementing the version), the content hash will differ, triggering an update. This catches the corporate action adjustments, the exchange corrections, and the restatements that version fields alone would miss.
def sync_with_backfill_detection(symbol: str, db_path: str = DB_PATH):
"""
Sync with explicit backfill detection via hash comparison.
For each record that already exists locally, we compare hashes before
deciding whether to update. This is more expensive than version-only
sync (it requires a SELECT per record), so it should be reserved for
markets or data types where backfills are common.
Engineering note: For TickDB's kline data, backfills are infrequent
but not unknown — exchange corrections happen. If your use case
tolerates occasional staleness in exchange-sourced data (most do),
version-only sync is sufficient. If you need bulletproof accuracy
(e.g., regulatory reporting), enable hash comparison.
"""
since = datetime.now(timezone.utc) - timedelta(days=365 * 10)
watermark = get_watermark(db_path, symbol)
if watermark:
since = datetime.fromisoformat(watermark)
klines = fetch_klines_since(symbol, "1d", since)
updated = 0
inserted = 0
with sqlite3.connect(db_path) as conn:
for kline in klines:
close_time = kline["close_time"]
new_hash = compute_kline_hash(kline)
row = conn.execute(
"SELECT hash FROM klines WHERE symbol = ? AND close_time = ?",
(symbol, close_time)
).fetchone()
if row is None:
conn.execute("""
INSERT INTO klines (symbol, interval, open_time, close_time,
o, h, l, c, v, hash, updated_at)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""", (
symbol, "1d", kline["open_time"], close_time,
kline["o"], kline["h"], kline["l"], kline["c"], kline["v"],
new_hash, datetime.now(timezone.utc).isoformat()
))
inserted += 1
elif row[0] != new_hash:
conn.execute("""
UPDATE klines SET
o = ?, h = ?, l = ?, c = ?, v = ?,
hash = ?, updated_at = ?
WHERE symbol = ? AND close_time = ?
""", (
kline["o"], kline["h"], kline["l"], kline["c"], kline["v"],
new_hash, datetime.now(timezone.utc).isoformat(),
symbol, close_time
))
updated += 1
conn.commit()
if klines:
max_close_time = max(k["close_time"] for k in klines)
set_watermark(db_path, symbol, max_close_time)
print(f"{symbol}: {inserted} inserted, {updated} backfill corrections detected")
Production Considerations
Error Handling and Idempotency
The sync job must be idempotent: running it twice with the same input should produce the same state as running it once. The UPSERT pattern achieves this at the write level, but the job-level idempotency depends on the watermark update being atomic with the data write. If the job writes the data, crashes, and then restarts, it must not re-fetch data that it already stored — but it also must not skip data that it failed to store.
The watermark must be updated atomically with the data. If the watermark advances before the write completes, a restart will skip the records that failed to write. If the watermark advances after the write but the update fails, a restart will re-fetch data that was already written (wasted work, but not a correctness failure, thanks to UPSERT).
For production pipelines, wrap the entire sync-and-watermark cycle in a single transaction:
def sync_symbol_atomic(symbol: str, db_path: str = DB_PATH):
"""Atomic sync: data write and watermark update in a single transaction."""
since = datetime.now(timezone.utc) - timedelta(days=365 * 10)
watermark = get_watermark(db_path, symbol)
if watermark:
since = datetime.fromisoformat(watermark)
klines = fetch_klines_since(symbol, "1d", since)
if not klines:
return 0
with sqlite3.connect(db_path) as conn:
conn.execute("BEGIN")
try:
for kline in klines:
conn.execute("""
INSERT INTO klines (symbol, interval, open_time, close_time,
o, h, l, c, v, hash, updated_at)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
ON CONFLICT(symbol, interval, close_time) DO UPDATE SET
o = excluded.o, h = excluded.h,
l = excluded.l, c = excluded.c, v = excluded.v,
hash = excluded.hash, updated_at = excluded.updated_at
""", (
kline["symbol"], kline["interval"],
kline["open_time"], kline["close_time"],
kline["o"], kline["h"], kline["l"], kline["c"], kline["v"],
compute_kline_hash(kline),
datetime.now(timezone.utc).isoformat()
))
max_close_time = max(k["close_time"] for k in klines)
conn.execute("""
INSERT INTO sync_watermarks (symbol, last_close_time, updated_at)
VALUES (?, ?, ?)
ON CONFLICT(symbol) DO UPDATE SET
last_close_time = excluded.last_close_time,
updated_at = excluded.updated_at
""", (symbol, max_close_time, datetime.now(timezone.utc).isoformat()))
conn.execute("COMMIT")
return len(klines)
except Exception as e:
conn.execute("ROLLBACK")
raise RuntimeError(f"Atomic sync failed for {symbol}: {e}") from e
Monitoring: The Log That Saves You at 3 AM
Every sync job should emit structured logs that make the pipeline's health inspectable without querying the database directly:
def log_sync_event(symbol: str, event: str, detail: str):
"""Structured logging for sync pipeline observability."""
ts = datetime.now(timezone.utc).isoformat()
print(f"[{ts}] SYNC {symbol} {event}: {detail}")
Log events to capture: sync started, records fetched, records upserted (insert vs. update breakdown), backfill corrections detected, watermark advanced, errors encountered. Over time, these logs form a historical record that makes anomaly detection straightforward — a spike in backfill corrections suggests the source restated historical data; a steady trickle of updates between sync runs suggests the source publishes regular corrections.
Database Maintenance: Periodic Integrity Checks
Even with correct UPSERT logic, local databases accumulate cruft: orphaned rows from failed transactions, index bloat, hash inconsistencies from manual interventions. Schedule a monthly integrity check:
def run_integrity_check(db_path: str = DB_PATH):
"""Monthly integrity check: detect hash mismatches and orphan records."""
with sqlite3.connect(db_path) as conn:
mismatches = conn.execute("""
SELECT symbol, close_time, hash, updated_at
FROM klines
WHERE hash != compute_kline_hash_from_row(rowid)
""").fetchall()
orphan_count = conn.execute("""
SELECT COUNT(*) FROM klines k
WHERE NOT EXISTS (
SELECT 1 FROM sync_watermarks w
WHERE w.symbol = k.symbol
)
""").fetchone()[0]
print(f"Integrity check: {len(mismatches)} hash mismatches, "
f"{orphan_count} orphaned records")
return mismatches, orphan_count
Decision Framework: Which Strategy to Use When
| Data source behavior | Recommended strategy | Reasoning |
|---|---|---|
| Monotonic version field; infrequent backfills | Version-based only | Fast, simple, minimal overhead |
| Monotonic version field; frequent backfills | Version-based + hash validation | Hash catches corrections that version misses |
| No version field; batch-level changes | Hash-based comparison | Content hash detects any change |
| Schema changes occasionally | Schema hash + alert | Detects structural drift before it corrupts data |
| High-frequency updates (>1000 records/minute) | Per-ticker watermark + batch UPSERT | Reduces contention and write amplification |
Most market data pipelines — including TickDB-sourced OHLCV data — fall into the first or second category. Version-based tracking with hash validation as a fallback is sufficient for virtually every equity, crypto, and forex kline use case.
Closing
The three strategies — version-based watermarks, content hash validation, and atomic UPSERT writes — are not mutually exclusive. They compose into a layered defense: the watermark filters what to fetch (fast path), the hash detects whether the content changed (correctness guard), and the UPSERT writes atomically without read-before-write overhead (efficiency).
The critical insight is that each strategy addresses a different failure mode. Watermarks prevent duplication and reduce fetch volume. Hash validation prevents staleness from backfills. UPSERT ensures that even if the job restarts mid-flight, the outcome is deterministic and clean.
None of this is glamorous. But a data pipeline that runs every morning and is never the reason someone pages you at 3 AM — that is the highest engineering compliment your data can pay you.
Next Steps
If you want to implement this sync logic today: Sign up at tickdb.ai to get a free API key (no credit card required), set TICKDB_API_KEY, and adapt the code above for your symbols. The /v1/market/kline endpoint supports server-side startTime filtering, making incremental sync a single API call per symbol per day.
If you need a production-ready data pipeline: TickDB's Professional plan includes WebSocket streaming for real-time updates alongside the REST endpoint used here — giving you both the daily batch sync described in this article and live order book depth via the depth channel for intraday strategies.
If you use AI coding assistants: Search for and install the tickdb-market-data SKILL in your AI tool's marketplace to get TickDB API integration code and context injected directly into your workflow.
This article does not constitute investment advice. Market data and historical information are provided for informational purposes only. Past performance of any strategy described herein does not guarantee future results. TickDB is a market data provider; use of its data does not imply endorsement of any trading strategy.