Data synchronization sounds straightforward until it isn't.

A system that pulls new records daily works beautifully — until a source system retroactively modifies a historical record. A pipeline that inserts fresh rows performs well — until the storage bill arrives and the duplicate count has quietly ballooned to eight million rows. The architecture that impressed the team in the design review breaks under the weight of a two-week data backfill that was never tested.

This article addresses the engineering challenges that sit behind every "just sync the data" requirement: how to pull only what changed, how to recognize when the source has backdated a change, and how to store everything without growing your database like a tumor. The code examples use Python and SQL, and the patterns apply whether your data source is a REST API, a WebSocket stream, or a file export.

The Three Synchronization Problems

Before diving into solutions, it helps to separate the problem space. Data synchronization across a local cache and a remote source involves three distinct failure modes.

Missing new records. Your source generates new data. Your local system does not yet have it. This is the canonical incremental update problem.

Duplicate existing records. Your source already has a record that your local system has stored. A naive insert-once strategy creates a primary key violation or, worse, a silent row duplication if you use soft deletes or append-only schemas without proper uniqueness constraints.

Stale updated records. Your source modified a historical record retroactively. Your local system still holds the old version. This is the most dangerous failure mode because it is silent — the data looks correct, and no error fires.

Each problem requires a different mechanism. Attempting to solve all three with a single approach produces brittle code. The sections below address each in turn.

Incremental Updates with Cursor-Based Pagination

The simplest incremental update pattern uses a cursor: a timestamp, sequence number, or opaque token that marks your last-read position. On each sync cycle, you request records where the cursor has advanced beyond your stored position.

For a market data API, the cursor typically maps to a timestamp field. The following Python example demonstrates a production-grade polling loop that fetches new kline (candlestick) data from a remote source while respecting rate limits and handling transient failures.

import os
import time
import requests
import logging
from datetime import datetime, timezone
from typing import Optional

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)


class IncrementalKlineFetcher:
    """
    Fetches kline data incrementally using timestamp-based cursoring.
    Handles reconnection, rate limits, and backpressure.
    """

    def __init__(self, api_key: str, base_url: str = "https://api.tickdb.ai/v1"):
        self.api_key = api_key
        self.base_url = base_url
        self._last_cursor: Optional[datetime] = None

    def _fetch_page(self, cursor: Optional[datetime] = None, limit: int = 1000):
        """
        Fetch one page of kline data.
        Returns (records, next_cursor) where next_cursor is the timestamp
        of the last record in this page, or None if no more pages.
        """
        headers = {"X-API-Key": self.api_key}
        params = {
            "symbol": "BTC.BTC",  # example symbol
            "interval": "1h",
            "limit": limit,
        }

        if cursor:
            params["start_time"] = int(cursor.timestamp() * 1000)

        response = requests.get(
            f"{self.base_url}/market/kline",
            headers=headers,
            params=params,
            timeout=(3.05, 10),
        )

        if response.status_code == 429:
            retry_after = int(response.headers.get("Retry-After", 5))
            logger.warning(f"Rate limited. Sleeping {retry_after}s.")
            time.sleep(retry_after)
            return self._fetch_page(cursor, limit)
        elif response.status_code != 200:
            raise RuntimeError(f"Kline fetch failed: {response.status_code} {response.text}")

        data = response.json()
        records = data.get("data", [])
        next_cursor = None
        if records:
            next_cursor = datetime.fromtimestamp(records[-1]["open_time"] / 1000, tz=timezone.utc)

        return records, next_cursor

    def sync(self, batch_size: int = 1000, max_iterations: Optional[int] = None):
        """
        Run a full incremental sync cycle. Yields records in batches.
        """
        iterations = 0
        current_cursor = self._last_cursor

        while True:
            if max_iterations and iterations >= max_iterations:
                logger.info("Max iterations reached. Breaking.")
                break

            try:
                records, next_cursor = self._fetch_page(cursor=current_cursor, limit=batch_size)
            except Exception as e:
                delay = min(60, (2 ** iterations) + random.uniform(0, 1))
                logger.error(f"Fetch error: {e}. Retrying in {delay:.1f}s.")
                time.sleep(delay)
                iterations += 1
                continue

            if not records:
                logger.info("No new records. Sync complete.")
                break

            yield records

            if next_cursor is None:
                break

            current_cursor = next_cursor
            iterations += 1

        self._last_cursor = datetime.now(timezone.utc)

The key design choices here: a cursor expressed as a UTC timestamp, batched fetching with proper pagination, and a sync() method that yields records rather than loading everything into memory. The _last_cursor attribute persists the last-synchronized timestamp so the next run picks up where this one left off.

Detecting and Handling Duplicates

Cursor-based fetching solves the "missing records" problem. Duplicate handling requires a different strategy that depends on how your schema is designed.

The UPSERT Pattern

An upsert — update if exists, insert if new — is the standard mechanism for idempotent data ingestion. PostgreSQL and MySQL support this via ON CONFLICT and ON DUPLICATE KEY UPDATE respectively. The following example assumes a klines table with a composite unique constraint on (symbol, interval, open_time).

-- PostgreSQL example
INSERT INTO klines (symbol, interval, open_time, open, high, low, close, volume, updated_at)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, NOW())
ON CONFLICT (symbol, interval, open_time)
DO UPDATE SET
    open      = EXCLUDED.open,
    high      = EXCLUDED.high,
    low       = EXCLUDED.low,
    close     = EXCLUDED.close,
    volume    = EXCLUDED.volume,
    updated_at = NOW();

The updated_at field serves double duty. It timestamps the last modification on your local side, which matters for the backfill problem discussed in the next section. It also allows you to detect whether a record has changed since your last sync — if the source timestamp matches the local timestamp, no update is needed, and you can skip the write entirely.

Deduplication via Hash Checksums

UPSERT handles duplicates at the record level, but sometimes you need deduplication across a larger data window. Consider a scenario where your source has backfilled three months of historical data in a single API response. A cursor-based approach would process each batch correctly, but you may want a faster check: has this record already been processed, regardless of cursor position?

A content-based hash provides this check. Compute a deterministic hash from the record's content fields, store the hash alongside the record, and reject any incoming record whose hash already exists locally.

import hashlib
import json
from typing import Any, Dict

def compute_record_hash(record: Dict[str, Any], fields: list[str]) -> str:
    """
    Compute a deterministic SHA-256 hash from selected fields.
    Fields should be ordered consistently to ensure reproducibility.
    """
    canonical = {
        k: record[k]
        for k in sorted(fields)  # Sort keys for deterministic ordering
        if k in record
    }
    payload = json.dumps(canonical, sort_keys=True, separators=(",", ":"))
    return hashlib.sha256(payload.encode("utf-8")).hexdigest()


# Example usage in a batch insert pipeline
def dedup_and_insert(records: list[Dict], db_connection, table: str, fields_for_hash: list[str]):
    """
    Check each record's hash against the local hash table before inserting.
    """
    hash_col = f"{table}_hash"
    existing_hashes = {
        row[hash_col]
        for row in db_connection.execute(
            f"SELECT {hash_col} FROM {table}_hashes WHERE {hash_col} IN ({','.join(['?'] * len(records)])})",
            [compute_record_hash(r, fields_for_hash) for r in records],
        ).fetchall()
    }

    to_insert = []
    to_hash = []

    for record in records:
        h = compute_record_hash(record, fields_for_hash)
        if h not in existing_hashes:
            to_insert.append(record)
            to_hash.append(h)

    if to_insert:
        db_connection.executemany(
            f"INSERT INTO {table} (...) VALUES (...)",
            [format_record(r) for r in to_insert],
        )
        db_connection.executemany(
            f"INSERT OR IGNORE INTO {table}_hashes ({hash_col}) VALUES (?)",
            [(h,) for h in to_hash],
        )
        db_connection.commit()

    return len(to_insert), len(records) - len(to_insert)

This pattern shines when your ingestion pipeline is replaying historical data or when a backfill run might re-process records that were already synced in an earlier session. The hash table adds a modest storage overhead — one 32-byte hash per record — but the deduplication speedup can cut backfill times by an order of magnitude.

Detecting Source Backfills: The Version Number Problem

This is the most subtle of the three problems. A source system that retroactively modifies historical data — a backfill — breaks the assumption that "old" data is stable. Your local store may contain stale records that no longer match the source.

Common triggers for source backfills include: exchange methodology corrections, dividend and split adjustments, regulatory data reprocessing, and error corrections in upstream aggregators. If your trading strategy relies on historical data integrity, a missed backfill corrupts your backtest and potentially your live signal.

The Version Number Strategy

The most reliable detection mechanism is a version field on the source record. When the source modifies a record, it increments the version. Your local system stores the version alongside the data and compares it on each sync.

def sync_with_version_check(
    local_cursor: datetime,
    remote_cursor: Optional[datetime],
    local_version: int,
    remote_version: int,
):
    """
    Determine the appropriate sync strategy based on version comparison.
    """
    if remote_version > local_version:
        # Source has been modified. Full refresh required for this record.
        return "UPDATE"
    elif remote_cursor > local_cursor and remote_version == local_version:
        # New record. Standard incremental insert.
        return "INSERT"
    elif remote_cursor <= local_cursor and remote_version == local_version:
        # Record unchanged since last sync.
        return "SKIP"
    else:
        # Version is lower — source rolled back. This is anomalous but possible.
        # Treat as an update to match the source.
        return "FORCE_UPDATE"

Not all APIs expose version numbers. When they do not, you fall back to content comparison: recompute the hash of the incoming record and compare it against the stored hash. If they differ, the content has changed even if the primary key is identical.

Handling Backfills Gracefully

A full backfill of historical data can overwhelm a sync pipeline. Two strategies mitigate this.

First, batch by time window rather than by cursor. Instead of fetching record-by-record, request data in fixed time blocks — 30 days at a time — and use the upsert pattern to handle any records that were already present. This dramatically reduces round-trip count.

Second, add a circuit breaker. If the incoming data volume exceeds a threshold — say, 10× the normal daily ingestion rate — pause the sync, alert the operator, and require manual confirmation before resuming. Backfills that proceed silently can corrupt a data store before anyone notices.

def detect_backfill(
    records: list[Dict],
    baseline_daily_rate: int = 10_000,
    backfill_threshold_multiplier: float = 10.0,
):
    """
    Detect whether an incoming batch looks like a backfill rather than
    normal incremental flow.
    """
    incoming_count = len(records)
    threshold = baseline_daily_rate * backfill_threshold_multiplier

    if incoming_count > threshold:
        logger.warning(
            f"Backfill detected: {incoming_count} records received "
            f"against a baseline of {baseline_daily_rate}/day. "
            f"Threshold: {threshold:.0f}. Review recommended."
        )
        return True
    return False

Storing Sync Metadata

The patterns above require a small metadata store alongside your main data tables. Track at least three fields per sync job: the last cursor value, the last sync timestamp, and the last seen version.

CREATE TABLE sync_metadata (
    source_name    TEXT PRIMARY KEY,
    symbol         TEXT,
    cursor_value   TIMESTAMP,
    last_version   BIGINT,
    last_sync_at   TIMESTAMP,
    record_count   BIGINT,
    status         TEXT  -- 'OK', 'BACKFILL_DETECTED', 'RATE_LIMITED'
);

Update this table at the end of every successful sync cycle. On startup, read it to restore the cursor state rather than relying on the in-memory _last_cursor attribute used in the earlier example. This makes your pipeline stateless and recoverable from restarts.

Putting It Together: A Full Sync Pipeline

Combining the elements above produces a sync pipeline that handles incremental updates, deduplication, and backfill detection in a single orchestration loop.

def run_sync_pipeline(
    fetcher: IncrementalKlineFetcher,
    db_conn,
    symbol: str,
    interval: str,
    fields_for_hash: list[str],
    baseline_daily_rate: int = 10_000,
):
    """
    Full incremental sync pipeline with deduplication and backfill detection.
    """
    metadata = db_conn.execute(
        "SELECT * FROM sync_metadata WHERE source_name = 'tickdb' AND symbol = ?",
        (symbol,),
    ).fetchone()

    last_cursor = metadata["cursor_value"] if metadata else None
    last_version = metadata["last_version"] if metadata else 0

    all_new_records = []
    all_updated_records = []

    for batch in fetcher.sync(batch_size=1000):
        is_backfill = detect_backfill(batch, baseline_daily_rate)
        if is_backfill:
            logger.warning("Backfill batch received. Proceeding with version-aware merge.")

        for record in batch:
            remote_version = record.get("version", 1)
            record_hash = compute_record_hash(record, fields_for_hash)

            if remote_version > last_version:
                all_updated_records.append(record)
            else:
                # Check content hash for records at the same version
                local_hash = get_local_hash(db_conn, record, symbol)
                if local_hash is None or local_hash != record_hash:
                    all_updated_records.append(record)
                else:
                    continue  # Already stored, skip

            all_new_records.append(record)

    if not all_new_records and not all_updated_records:
        logger.info("No changes detected.")
        return 0, 0

    inserted = insert_klines(db_conn, all_new_records)
    updated = update_klines(db_conn, all_updated_records)

    db_conn.execute(
        """
        INSERT INTO sync_metadata (source_name, symbol, cursor_value, last_version, last_sync_at, record_count, status)
        VALUES ('tickdb', ?, ?, ?, NOW(), ?, 'OK')
        ON CONFLICT (source_name, symbol) DO UPDATE SET
            cursor_value = EXCLUDED.cursor_value,
            last_version = EXCLUDED.last_version,
            last_sync_at = EXCLUDED.last_sync_at,
            record_count = EXCLUDED.record_count,
            status = EXCLUDED.status
        """,
        (symbol, fetcher._last_cursor, max(r.get("version", 1) for r in all_new_records + all_updated_records),
         len(all_new_records) + len(all_updated_records)),
    )
    db_conn.commit()

    logger.info(f"Sync complete. Inserted: {inserted}, Updated: {updated}.")
    return inserted, updated

Choosing the Right Pattern

Not every sync pipeline needs every mechanism described above. The table below maps patterns to the problem they solve and the tradeoffs they introduce.

Pattern Solves Tradeoff When to use
Cursor-based polling Missing new records Requires timestamp or sequence field on source Always, as the baseline mechanism
UPSERT (ON CONFLICT) Duplicate records Requires unique constraint on natural key Always, as the safety net
Hash-based dedup Cross-batch duplicates, replay safety 32-byte overhead per record; hash computation cost When replaying historical data or running frequent backfills
Version number check Source backfill detection Requires version field on source; adds column to schema When source API exposes version or last_modified
Content hash diff Backfill detection without version field Requires hash column; recomputes on every sync When source API lacks versioning
Backfill circuit breaker Silent data corruption from large backfills May delay ingestion during legitimate backfills When data integrity is critical for trading decisions

Start with cursor-based polling and UPSERT. Add hash deduplication when you begin running backfill jobs. Introduce version checking when your source API provides the necessary metadata and your use case demands historical data integrity. Each layer adds complexity, so layer only when the specific failure mode is a real risk rather than a theoretical one.

Next Steps

If your sync pipeline handles multiple symbols or data types, consider parameterizing the fetcher and metadata store to support heterogeneous sources under a single framework. Abstracting the fetch logic, the dedup strategy, and the write strategy behind a common interface lets you add new data sources without duplicating the orchestration code.

For teams running production data pipelines at scale, the patterns in this article provide a foundation. Each implementation will need to adapt to specific schema constraints, API rate limits, and downstream consumer requirements. The core discipline remains constant: treat data synchronization as a stateful engineering problem rather than a fire-and-forget ETL task.


This article does not constitute investment advice. Market data ingestion involves technical and operational complexity; ensure thorough testing of any sync pipeline before deployment in production environments.