"Price is the effect. The order book is the cause."

Imagine this: your system is ingesting real-time NVIDIA options flow through a WebSocket stream. At 14:47:23 UTC, a network partition severs the connection. You reconnect at 14:52:11 — five minutes of data, gone. The bid-ask spread has since mean-reverted. The volume profile has shifted. And your downstream strategy, running on a 30-second rolling window of order book pressure ratios, has been making decisions on a phantom baseline.

This is not a hypothetical. It is the most common failure mode in real-time market data pipelines, and it happens to every team at least once — usually on a Friday afternoon, during a high-volatility event.

The good news: it is solvable. TickDB provides both the WebSocket stream for live data and the REST API for historical retrieval, and the two are designed to work together in exactly this scenario. This article walks through the complete reconnection and data-recovery architecture — from detecting the disconnect to reconstructing a locally consistent data state.


1. Understanding the Disconnect Problem in Real-Time Data Systems

A WebSocket connection is stateful and persistent. It is also fragile. TCP keepalives will tell you whether the underlying socket is alive, but they do not tell you whether the application on the other end is still emitting data. Market data providers may silently drop a stale connection. A load balancer may recycle a connection without notification. A brief network glitch may not trigger an error — it may simply stop delivering messages.

The consequences depend on what you are consuming:

Data type Impact of 5-min gap Recoverable?
Price quotes (OHLCV ticks) Strategy executes on stale prices Yes — via REST /kline
Order book depth Pressure ratio diverges from reality Partial — reconstruct from depth snapshots
Trade flow Volume-weighted signals distorted Yes — via REST /trades (where supported)
Options Greeks Delta/gamma hedges drift Yes — via REST /kline

The key insight is that a WebSocket stream and a REST API serve complementary roles. The WebSocket is your live ingestion path — low latency, high frequency. The REST API is your reconciliation path — reliable, timestamped, batch-retrievable. A production-grade system uses both, in sequence, every time a connection is re-established.


2. The Three-Phase Reconnection Architecture

A complete reconnection and recovery cycle has three phases:

Phase 1: Connection Recovery

Before you recover data, you need a stable connection. This is not simply calling connect() again. A naive reconnect loop — retry immediately, retry again — creates a thundering herd that can take down a provider's endpoint during an outage.

A production-grade reconnect strategy uses exponential backoff with jitter:

import random
import time
import asyncio

class ReconnectingWebSocketClient:
    def __init__(self, api_key: str, on_message, on_disconnect):
        self.api_key = api_key
        self.on_message = on_message
        self.on_disconnect = on_disconnect
        self.ws = None
        self._base_delay = 1.0        # seconds
        self._max_delay = 60.0        # seconds
        self._retry_count = 0

    async def connect(self):
        """Establish WebSocket connection with reconnection logic."""
        try:
            uri = f"wss://api.tickdb.ai/v1/ws?api_key={self.api_key}"
            self.ws = await websockets.connect(
                uri,
                ping_interval=20,
                ping_timeout=10,
                close_timeout=5
            )
            self._retry_count = 0
            asyncio.create_task(self._receive_loop())
            asyncio.create_task(self._heartbeat_loop())
        except Exception as exc:
            await self._schedule_reconnect()
            raise

    async def _schedule_reconnect(self):
        """Exponential backoff with jitter — prevents thundering herd."""
        delay = min(self._base_delay * (2 ** self._retry_count), self._max_delay)
        jitter = random.uniform(0, delay * 0.1)  # ±10% jitter
        wait_time = delay + jitter
        self._retry_count += 1

        print(f"[Reconnect] Attempt {self._retry_count} — waiting {wait_time:.2f}s")
        await asyncio.sleep(wait_time)
        await self.connect()

    async def _heartbeat_loop(self):
        """Ping every 20 seconds to keep connection alive."""
        while True:
            await asyncio.sleep(20)
            if self.ws and self.ws.open:
                try:
                    await self.ws.send('{"cmd": "ping"}')
                except Exception:
                    self.on_disconnect()
                    return

Engineering note: The ping_interval=20 parameter is critical. TickDB's server-side idle timeout is 60 seconds; a ping every 20 seconds keeps three keepalive cycles in flight, which is sufficient to detect a dead connection well before the server closes it.

Phase 2: Gap Detection — What Did We Miss?

Once the connection is re-established, the first task is to determine the exact boundaries of the data gap. You need two timestamps:

  • last_timestamp: The timestamp of the last message you successfully processed before the disconnect.
  • server_time: The current server timestamp, which you receive as the first message after reconnecting (or via a REST call).

The gap is then:

gap_start = last_timestamp + 1  # microsecond after the last received tick
gap_end   = server_time         # current server time

Critical: Always use server timestamps, not local clock timestamps, for gap boundaries. Client clocks drift, especially in containerized environments. A 500ms clock skew can cause you to miss or duplicate the boundary tick.

class GapDetector:
    def __init__(self, client: ReconnectingWebSocketClient):
        self.client = client
        self.last_processed_ts: int = 0  # Unix microseconds

    def record_message(self, message: dict):
        """Call this on every successfully processed message."""
        if "ts" in message:
            self.last_processed_ts = max(self.last_processed_ts, message["ts"])

    def get_gap_bounds(self) -> tuple[int, int]:
        """
        Returns (gap_start, gap_end) in Unix microseconds.
        gap_start is exclusive; gap_end is inclusive.
        """
        gap_start = self.last_processed_ts + 1
        gap_end = self._fetch_server_time()
        return gap_start, gap_end

    def _fetch_server_time(self) -> int:
        """Fetch server time via REST API — not local clock."""
        import os, requests
        headers = {"X-API-Key": os.environ.get("TICKDB_API_KEY")}
        response = requests.get(
            "https://api.tickdb.ai/v1/server/time",
            headers=headers,
            timeout=(3.05, 10)
        )
        response.raise_for_status()
        return response.json()["data"]["server_time"]

Phase 3: Data Reconciliation — Filling the Gap

With the gap boundaries established, you retrieve the missing data from the REST API and merge it into your local state. This is the most nuanced phase, because you must handle several edge cases:

  1. Boundary overlap: The first REST response may contain the last tick you already processed. Deduplicate.
  2. Partial coverage: Some REST endpoints have minimum granularity (e.g., 1-minute klines vs. second-level ticks). Reconcile at the lowest common denominator.
  3. Timestamp alignment: Market data from different endpoints may use different timestamp conventions. Normalize everything to Unix microseconds before merging.
import os
import requests
from typing import List, Dict, Any
from datetime import datetime

class DataReconciler:
    def __init__(self, api_key: str, gap_detector: GapDetector):
        self.api_key = api_key
        self.gap_detector = gap_detector
        self.local_buffer: List[Dict[str, Any]] = []

    def reconcile(self, symbol: str, channel: str) -> List[Dict[str, Any]]:
        """
        Main entry point: detect gap, fetch missing data, merge into local buffer.
        Returns the reconciled data list.
        """
        gap_start, gap_end = self.gap_detector.get_gap_bounds()

        if gap_end - gap_start < 1_000:  # Less than 1 ms — no meaningful gap
            print("[Reconcile] Gap too small, skipping fetch.")
            return self.local_buffer

        print(f"[Reconcile] Fetching gap: {gap_start} → {gap_end}")

        if channel == "kline":
            fetched = self._fetch_klines(symbol, gap_start, gap_end)
        elif channel == "depth":
            fetched = self._fetch_depth_snapshots(symbol, gap_start, gap_end)
        elif channel == "trades":
            fetched = self._fetch_trades(symbol, gap_start, gap_end)
        else:
            raise ValueError(f"Unsupported channel: {channel}")

        merged = self._merge_into_buffer(fetched)
        return merged

    def _fetch_klines(self, symbol: str, gap_start: int, gap_end: int) -> List[Dict]:
        """Fetch 1-minute klines covering the gap via REST API."""
        headers = {"X-API-Key": self.api_key}
        params = {
            "symbol": symbol,
            "interval": "1m",
            "start": gap_start,
            "end": gap_end,
            "limit": 1000
        }
        response = requests.get(
            "https://api.tickdb.ai/v1/market/kline",
            headers=headers,
            params=params,
            timeout=(3.05, 30)
        )
        response.raise_for_status()
        data = response.json()

        # Handle rate limiting
        if data.get("code") == 3001:
            retry_after = int(response.headers.get("Retry-After", 5))
            print(f"[RateLimit] Waiting {retry_after}s before retry.")
            time.sleep(retry_after)
            return self._fetch_klines(symbol, gap_start, gap_end)

        return data.get("data", [])

    def _merge_into_buffer(self, fetched: List[Dict]) -> List[Dict]:
        """
        Merge fetched records into the local buffer using timestamp-based deduplication.
        Both fetched and local records must have a 'ts' field in Unix microseconds.
        """
        fetched_ts = {r["ts"]: r for r in fetched}

        for ts, record in fetched_ts.items():
            if ts not in {r["ts"] for r in self.local_buffer}:
                self.local_buffer.append(record)

        # Sort by timestamp ascending — essential for time-series downstream
        self.local_buffer.sort(key=lambda r: r["ts"])
        return self.local_buffer

3. Handling Rate Limits During Bulk Fetch

A 5-minute gap in a high-frequency stream can represent thousands of messages. Attempting to fetch all of them in a single REST call will likely hit TickDB's rate limit (error code 3001). Your fetch logic must implement pagination with backoff.

def _fetch_klines_paginated(self, symbol: str, gap_start: int, gap_end: int) -> List[Dict]:
    """Paginate through a large time range, handling rate limits gracefully."""
    all_records: List[Dict] = []
    current_start = gap_start
    headers = {"X-API-Key": self.api_key}

    while current_start < gap_end:
        params = {
            "symbol": symbol,
            "interval": "1m",
            "start": current_start,
            "end": gap_end,
            "limit": 1000
        }

        response = requests.get(
            "https://api.tickdb.ai/v1/market/kline",
            headers=headers,
            params=params,
            timeout=(3.05, 30)
        )
        response.raise_for_status()
        data = response.json()

        if data.get("code") == 3001:
            retry_after = int(response.headers.get("Retry-After", 5))
            print(f"[RateLimit] Pausing {retry_after}s.")
            time.sleep(retry_after)
            continue

        records = data.get("data", [])
        if not records:
            break

        all_records.extend(records)
        current_start = records[-1]["ts"] + 1  # Move to next microsecond

        # Respect rate limits between pagination calls
        time.sleep(0.1)

    return all_records

Warning: Do not use asyncio.gather to parallelize paginated fetches for the same symbol. The REST API enforces per-API-key rate limits. Parallel requests will trigger 3001 errors and slow down the entire reconciliation process.


4. Timestamp Alignment Across Data Sources

One of the subtlest sources of bugs in multi-source data pipelines is timestamp misalignment. When you combine data from a WebSocket stream and a REST API, the two sources may use different conventions:

Source Timestamp convention Example
TickDB WebSocket Unix microseconds (ts) 1704067200000000
TickDB REST /kline Unix microseconds (ts) 1704067200000000
Third-party WebSocket Unix milliseconds (ts) 1704067200000
Third-party REST Unix milliseconds (ts) 1704067200000

Normalize everything to a single unit — Unix microseconds — at ingestion time. A single utility function at the entry point of your data pipeline prevents an entire class of off-by-one-thousand bugs:

def normalize_timestamp(ts: Any, unit: str = "ms") -> int:
    """
    Normalize a timestamp to Unix microseconds.

    Args:
        ts: Timestamp value (int, float, or string).
        unit: Source unit — 'ms' (milliseconds) or 'us' (microseconds).

    Returns:
        Unix timestamp in microseconds.
    """
    ts = int(ts)

    if unit == "ms":
        return ts * 1000
    elif unit == "us":
        return ts
    else:
        raise ValueError(f"Unknown timestamp unit: {unit}")

5. Order Book Depth Reconstruction

Order book data presents a unique challenge during gap recovery. Unlike OHLCV ticks, which are point-in-time snapshots, the order book is a stateful structure. A gap of five minutes means the entire book may have shifted — new price levels have appeared, old levels have been consumed, and the accumulated size at each level has changed.

TickDB's depth channel delivers incremental snapshots — not individual order events. Each message represents the current state of the order book. This means reconstruction is simpler than it appears:

  1. On reconnect, fetch the most recent depth snapshot at or before gap_start.
  2. Re-apply all subsequent depth snapshots from gap_start to gap_end.
  3. The final state represents the order book at gap_end.
def reconstruct_depth_book(symbol: str, gap_start: int, gap_end: int) -> Dict:
    """
    Reconstruct the order book state at gap_end.
    Returns the best bid/ask with sizes at all available levels.
    """
    headers = {"X-API-Key": os.environ.get("TICKDB_API_KEY")}
    params = {
        "symbol": symbol,
        "channel": "depth",
        "start": gap_start,
        "end": gap_end,
        "limit": 500
    }

    response = requests.get(
        "https://api.tickdb.ai/v1/market/depth",
        headers=headers,
        params=params,
        timeout=(3.05, 30)
    )
    response.raise_for_status()
    snapshots = response.json().get("data", [])

    # Start from the earliest snapshot and apply sequentially
    # Each snapshot replaces the local book state
    book = {"bids": {}, "asks": {}}

    for snapshot in snapshots:
        for level in snapshot.get("bids", []):
            price, size = float(level["p"]), float(level["v"])
            if size == 0:
                book["bids"].pop(price, None)
            else:
                book["bids"][price] = size

        for level in snapshot.get("asks", []):
            price, size = float(level["p"]), float(level["v"])
            if size == 0:
                book["asks"].pop(price, None)
            else:
                book["asks"][price] = size

    return book

6. Integration: Putting It All Together

The complete reconnection lifecycle — connection recovery, gap detection, data reconciliation — ties together in a single orchestrator class:

class MarketDataPipeline:
    """
    Production-grade market data pipeline with automatic reconnection
    and gap-filling reconciliation.

    Workflow:
      1. Connect to WebSocket for live data.
      2. On disconnect, detect the gap boundaries.
      3. Fetch missing data from REST API.
      4. Merge reconciled data into the local buffer.
      5. Resume WebSocket ingestion.
    """

    def __init__(self, api_key: str, symbol: str, channel: str):
        self.api_key = api_key
        self.symbol = symbol
        self.channel = channel
        self.gap_detector = GapDetector(client=None)
        self.reconciler = DataReconciler(api_key, self.gap_detector)
        self.ws_client = ReconnectingWebSocketClient(
            api_key=api_key,
            on_message=self._handle_message,
            on_disconnect=self._handle_disconnect
        )
        self.gap_detector = GapDetector(self.ws_client)

    def start(self):
        asyncio.run(self.ws_client.connect())

    def _handle_message(self, message: dict):
        """Process incoming WebSocket message."""
        self.gap_detector.record_message(message)
        self.reconciler.local_buffer.append(message)

        # Emit to downstream strategy — this is your live data path
        self._dispatch_to_strategy(message)

    def _handle_disconnect(self):
        """Triggered when WebSocket connection is lost."""
        print("[Pipeline] Disconnect detected — initiating reconciliation.")

        try:
            reconciled = self.reconciler.reconcile(self.symbol, self.channel)
            print(f"[Pipeline] Reconciled {len(reconciled)} total records.")

            # Reconnect — the ReconnectingWebSocketClient handles backoff internally
            asyncio.run(self.ws_client.connect())

        except Exception as exc:
            print(f"[Pipeline] Reconciliation failed: {exc}")
            raise

    def _dispatch_to_strategy(self, message: dict):
        """Route reconciled data to the strategy layer."""
        # Implementation depends on your downstream architecture
        pass

⚠️ Production warning: This architecture handles single-connection reconnection. If you run multiple parallel streams (e.g., for 50 symbols simultaneously), each stream must maintain its own last_timestamp state independently. A shared global timestamp will cause silent data loss during concurrent disconnects.


7. Deployment Recommendations by Scale

User type Recommended approach Key consideration
Individual quant Single-threaded asyncio pipeline with one MarketDataPipeline per symbol Start here; add complexity only when you hit scale
Quant team (3–10 streams) One process per asset class, shared GapDetector state in a Redis instance Redis TTL must exceed your maximum expected disconnect duration
Institutional (50+ streams) Kafka or Pulsar as the ingestion buffer; REST reconciliation writes to Kafka; strategy layer consumes from Kafka Kafka consumer group offset management replaces manual timestamp tracking

For teams running more than 20 concurrent streams, consider offloading the reconciliation state (last timestamp per symbol) to a durable store (Redis, PostgreSQL) rather than keeping it in-memory. This provides two benefits: resilience to process restarts and a single source of truth for coordinated multi-process rebalancing.


8. Common Pitfalls and How to Avoid Them

Pitfall Why it happens Fix
Processing the same tick twice First tick after reconnect overlaps with the first REST-fetched tick Deduplicate by timestamp before merging (see _merge_into_buffer)
Off-by-one timestamp Client clock skew when calculating gap boundaries Always fetch server_time from the REST API
Rate limit cascade Parallel REST calls during bulk reconciliation Paginate sequentially; respect Retry-After header
Stale local buffer local_buffer grows unbounded during a long session Implement a circular buffer or flush-to-disk strategy after N records
Heartbeat timeout mismatch Server-side idle timeout exceeds client-side ping interval Ping every 20 seconds when server timeout is 60 seconds — keep at least 2:1 ratio
Depth reconstruction failure Out-of-order depth snapshots during fast markets Sort snapshots by timestamp before applying; discard if ts < last applied ts

Closing

The gap between "we lost the connection" and "we recovered all the data" is not a mystery. It is a deterministic sequence of steps: reconnect with exponential backoff, fetch gap boundaries from the server's clock, pull the missing records from the REST API in paginated batches, and merge by timestamp with deduplication.

Every one of these steps has a correct implementation — and a plausible-looking incorrect one. The incorrect versions usually involve a client clock, a missing deduplication step, or an optimistic retry loop with no backoff. They work in testing. They fail in production, at 2 AM, during an earnings release.

Build it correctly the first time. The reconciliation logic is not the interesting part of your trading system — but it is the part that determines whether your interesting strategy ever gets to run on real data.


Next Steps

If you're building a single-stream pipeline and want to get started quickly, the free tier at tickdb.ai provides WebSocket access and REST historical data for development and backtesting. No credit card required.

If you're running multiple concurrent streams and need a durable reconciliation state store, reach out to enterprise@tickdb.ai for architectural guidance on multi-stream gap management and Redis-backed timestamp state.

If you're building an AI-assisted trading system, search for and install the tickdb-market-data SKILL in your AI coding assistant's marketplace — it provides pre-built reconnection and reconciliation templates that conform to the patterns described in this article.


This article does not constitute investment advice. Markets involve risk; past performance does not guarantee future results.