"Price is the effect. The order book is the cause."
Imagine this: your system is ingesting real-time NVIDIA options flow through a WebSocket stream. At 14:47:23 UTC, a network partition severs the connection. You reconnect at 14:52:11 — five minutes of data, gone. The bid-ask spread has since mean-reverted. The volume profile has shifted. And your downstream strategy, running on a 30-second rolling window of order book pressure ratios, has been making decisions on a phantom baseline.
This is not a hypothetical. It is the most common failure mode in real-time market data pipelines, and it happens to every team at least once — usually on a Friday afternoon, during a high-volatility event.
The good news: it is solvable. TickDB provides both the WebSocket stream for live data and the REST API for historical retrieval, and the two are designed to work together in exactly this scenario. This article walks through the complete reconnection and data-recovery architecture — from detecting the disconnect to reconstructing a locally consistent data state.
1. Understanding the Disconnect Problem in Real-Time Data Systems
A WebSocket connection is stateful and persistent. It is also fragile. TCP keepalives will tell you whether the underlying socket is alive, but they do not tell you whether the application on the other end is still emitting data. Market data providers may silently drop a stale connection. A load balancer may recycle a connection without notification. A brief network glitch may not trigger an error — it may simply stop delivering messages.
The consequences depend on what you are consuming:
| Data type | Impact of 5-min gap | Recoverable? |
|---|---|---|
| Price quotes (OHLCV ticks) | Strategy executes on stale prices | Yes — via REST /kline |
| Order book depth | Pressure ratio diverges from reality | Partial — reconstruct from depth snapshots |
| Trade flow | Volume-weighted signals distorted | Yes — via REST /trades (where supported) |
| Options Greeks | Delta/gamma hedges drift | Yes — via REST /kline |
The key insight is that a WebSocket stream and a REST API serve complementary roles. The WebSocket is your live ingestion path — low latency, high frequency. The REST API is your reconciliation path — reliable, timestamped, batch-retrievable. A production-grade system uses both, in sequence, every time a connection is re-established.
2. The Three-Phase Reconnection Architecture
A complete reconnection and recovery cycle has three phases:
Phase 1: Connection Recovery
Before you recover data, you need a stable connection. This is not simply calling connect() again. A naive reconnect loop — retry immediately, retry again — creates a thundering herd that can take down a provider's endpoint during an outage.
A production-grade reconnect strategy uses exponential backoff with jitter:
import random
import time
import asyncio
class ReconnectingWebSocketClient:
def __init__(self, api_key: str, on_message, on_disconnect):
self.api_key = api_key
self.on_message = on_message
self.on_disconnect = on_disconnect
self.ws = None
self._base_delay = 1.0 # seconds
self._max_delay = 60.0 # seconds
self._retry_count = 0
async def connect(self):
"""Establish WebSocket connection with reconnection logic."""
try:
uri = f"wss://api.tickdb.ai/v1/ws?api_key={self.api_key}"
self.ws = await websockets.connect(
uri,
ping_interval=20,
ping_timeout=10,
close_timeout=5
)
self._retry_count = 0
asyncio.create_task(self._receive_loop())
asyncio.create_task(self._heartbeat_loop())
except Exception as exc:
await self._schedule_reconnect()
raise
async def _schedule_reconnect(self):
"""Exponential backoff with jitter — prevents thundering herd."""
delay = min(self._base_delay * (2 ** self._retry_count), self._max_delay)
jitter = random.uniform(0, delay * 0.1) # ±10% jitter
wait_time = delay + jitter
self._retry_count += 1
print(f"[Reconnect] Attempt {self._retry_count} — waiting {wait_time:.2f}s")
await asyncio.sleep(wait_time)
await self.connect()
async def _heartbeat_loop(self):
"""Ping every 20 seconds to keep connection alive."""
while True:
await asyncio.sleep(20)
if self.ws and self.ws.open:
try:
await self.ws.send('{"cmd": "ping"}')
except Exception:
self.on_disconnect()
return
Engineering note: The ping_interval=20 parameter is critical. TickDB's server-side idle timeout is 60 seconds; a ping every 20 seconds keeps three keepalive cycles in flight, which is sufficient to detect a dead connection well before the server closes it.
Phase 2: Gap Detection — What Did We Miss?
Once the connection is re-established, the first task is to determine the exact boundaries of the data gap. You need two timestamps:
last_timestamp: The timestamp of the last message you successfully processed before the disconnect.server_time: The current server timestamp, which you receive as the first message after reconnecting (or via a REST call).
The gap is then:
gap_start = last_timestamp + 1 # microsecond after the last received tick
gap_end = server_time # current server time
Critical: Always use server timestamps, not local clock timestamps, for gap boundaries. Client clocks drift, especially in containerized environments. A 500ms clock skew can cause you to miss or duplicate the boundary tick.
class GapDetector:
def __init__(self, client: ReconnectingWebSocketClient):
self.client = client
self.last_processed_ts: int = 0 # Unix microseconds
def record_message(self, message: dict):
"""Call this on every successfully processed message."""
if "ts" in message:
self.last_processed_ts = max(self.last_processed_ts, message["ts"])
def get_gap_bounds(self) -> tuple[int, int]:
"""
Returns (gap_start, gap_end) in Unix microseconds.
gap_start is exclusive; gap_end is inclusive.
"""
gap_start = self.last_processed_ts + 1
gap_end = self._fetch_server_time()
return gap_start, gap_end
def _fetch_server_time(self) -> int:
"""Fetch server time via REST API — not local clock."""
import os, requests
headers = {"X-API-Key": os.environ.get("TICKDB_API_KEY")}
response = requests.get(
"https://api.tickdb.ai/v1/server/time",
headers=headers,
timeout=(3.05, 10)
)
response.raise_for_status()
return response.json()["data"]["server_time"]
Phase 3: Data Reconciliation — Filling the Gap
With the gap boundaries established, you retrieve the missing data from the REST API and merge it into your local state. This is the most nuanced phase, because you must handle several edge cases:
- Boundary overlap: The first REST response may contain the last tick you already processed. Deduplicate.
- Partial coverage: Some REST endpoints have minimum granularity (e.g., 1-minute klines vs. second-level ticks). Reconcile at the lowest common denominator.
- Timestamp alignment: Market data from different endpoints may use different timestamp conventions. Normalize everything to Unix microseconds before merging.
import os
import requests
from typing import List, Dict, Any
from datetime import datetime
class DataReconciler:
def __init__(self, api_key: str, gap_detector: GapDetector):
self.api_key = api_key
self.gap_detector = gap_detector
self.local_buffer: List[Dict[str, Any]] = []
def reconcile(self, symbol: str, channel: str) -> List[Dict[str, Any]]:
"""
Main entry point: detect gap, fetch missing data, merge into local buffer.
Returns the reconciled data list.
"""
gap_start, gap_end = self.gap_detector.get_gap_bounds()
if gap_end - gap_start < 1_000: # Less than 1 ms — no meaningful gap
print("[Reconcile] Gap too small, skipping fetch.")
return self.local_buffer
print(f"[Reconcile] Fetching gap: {gap_start} → {gap_end}")
if channel == "kline":
fetched = self._fetch_klines(symbol, gap_start, gap_end)
elif channel == "depth":
fetched = self._fetch_depth_snapshots(symbol, gap_start, gap_end)
elif channel == "trades":
fetched = self._fetch_trades(symbol, gap_start, gap_end)
else:
raise ValueError(f"Unsupported channel: {channel}")
merged = self._merge_into_buffer(fetched)
return merged
def _fetch_klines(self, symbol: str, gap_start: int, gap_end: int) -> List[Dict]:
"""Fetch 1-minute klines covering the gap via REST API."""
headers = {"X-API-Key": self.api_key}
params = {
"symbol": symbol,
"interval": "1m",
"start": gap_start,
"end": gap_end,
"limit": 1000
}
response = requests.get(
"https://api.tickdb.ai/v1/market/kline",
headers=headers,
params=params,
timeout=(3.05, 30)
)
response.raise_for_status()
data = response.json()
# Handle rate limiting
if data.get("code") == 3001:
retry_after = int(response.headers.get("Retry-After", 5))
print(f"[RateLimit] Waiting {retry_after}s before retry.")
time.sleep(retry_after)
return self._fetch_klines(symbol, gap_start, gap_end)
return data.get("data", [])
def _merge_into_buffer(self, fetched: List[Dict]) -> List[Dict]:
"""
Merge fetched records into the local buffer using timestamp-based deduplication.
Both fetched and local records must have a 'ts' field in Unix microseconds.
"""
fetched_ts = {r["ts"]: r for r in fetched}
for ts, record in fetched_ts.items():
if ts not in {r["ts"] for r in self.local_buffer}:
self.local_buffer.append(record)
# Sort by timestamp ascending — essential for time-series downstream
self.local_buffer.sort(key=lambda r: r["ts"])
return self.local_buffer
3. Handling Rate Limits During Bulk Fetch
A 5-minute gap in a high-frequency stream can represent thousands of messages. Attempting to fetch all of them in a single REST call will likely hit TickDB's rate limit (error code 3001). Your fetch logic must implement pagination with backoff.
def _fetch_klines_paginated(self, symbol: str, gap_start: int, gap_end: int) -> List[Dict]:
"""Paginate through a large time range, handling rate limits gracefully."""
all_records: List[Dict] = []
current_start = gap_start
headers = {"X-API-Key": self.api_key}
while current_start < gap_end:
params = {
"symbol": symbol,
"interval": "1m",
"start": current_start,
"end": gap_end,
"limit": 1000
}
response = requests.get(
"https://api.tickdb.ai/v1/market/kline",
headers=headers,
params=params,
timeout=(3.05, 30)
)
response.raise_for_status()
data = response.json()
if data.get("code") == 3001:
retry_after = int(response.headers.get("Retry-After", 5))
print(f"[RateLimit] Pausing {retry_after}s.")
time.sleep(retry_after)
continue
records = data.get("data", [])
if not records:
break
all_records.extend(records)
current_start = records[-1]["ts"] + 1 # Move to next microsecond
# Respect rate limits between pagination calls
time.sleep(0.1)
return all_records
Warning: Do not use asyncio.gather to parallelize paginated fetches for the same symbol. The REST API enforces per-API-key rate limits. Parallel requests will trigger 3001 errors and slow down the entire reconciliation process.
4. Timestamp Alignment Across Data Sources
One of the subtlest sources of bugs in multi-source data pipelines is timestamp misalignment. When you combine data from a WebSocket stream and a REST API, the two sources may use different conventions:
| Source | Timestamp convention | Example |
|---|---|---|
| TickDB WebSocket | Unix microseconds (ts) |
1704067200000000 |
TickDB REST /kline |
Unix microseconds (ts) |
1704067200000000 |
| Third-party WebSocket | Unix milliseconds (ts) |
1704067200000 |
| Third-party REST | Unix milliseconds (ts) |
1704067200000 |
Normalize everything to a single unit — Unix microseconds — at ingestion time. A single utility function at the entry point of your data pipeline prevents an entire class of off-by-one-thousand bugs:
def normalize_timestamp(ts: Any, unit: str = "ms") -> int:
"""
Normalize a timestamp to Unix microseconds.
Args:
ts: Timestamp value (int, float, or string).
unit: Source unit — 'ms' (milliseconds) or 'us' (microseconds).
Returns:
Unix timestamp in microseconds.
"""
ts = int(ts)
if unit == "ms":
return ts * 1000
elif unit == "us":
return ts
else:
raise ValueError(f"Unknown timestamp unit: {unit}")
5. Order Book Depth Reconstruction
Order book data presents a unique challenge during gap recovery. Unlike OHLCV ticks, which are point-in-time snapshots, the order book is a stateful structure. A gap of five minutes means the entire book may have shifted — new price levels have appeared, old levels have been consumed, and the accumulated size at each level has changed.
TickDB's depth channel delivers incremental snapshots — not individual order events. Each message represents the current state of the order book. This means reconstruction is simpler than it appears:
- On reconnect, fetch the most recent depth snapshot at or before
gap_start. - Re-apply all subsequent depth snapshots from
gap_starttogap_end. - The final state represents the order book at
gap_end.
def reconstruct_depth_book(symbol: str, gap_start: int, gap_end: int) -> Dict:
"""
Reconstruct the order book state at gap_end.
Returns the best bid/ask with sizes at all available levels.
"""
headers = {"X-API-Key": os.environ.get("TICKDB_API_KEY")}
params = {
"symbol": symbol,
"channel": "depth",
"start": gap_start,
"end": gap_end,
"limit": 500
}
response = requests.get(
"https://api.tickdb.ai/v1/market/depth",
headers=headers,
params=params,
timeout=(3.05, 30)
)
response.raise_for_status()
snapshots = response.json().get("data", [])
# Start from the earliest snapshot and apply sequentially
# Each snapshot replaces the local book state
book = {"bids": {}, "asks": {}}
for snapshot in snapshots:
for level in snapshot.get("bids", []):
price, size = float(level["p"]), float(level["v"])
if size == 0:
book["bids"].pop(price, None)
else:
book["bids"][price] = size
for level in snapshot.get("asks", []):
price, size = float(level["p"]), float(level["v"])
if size == 0:
book["asks"].pop(price, None)
else:
book["asks"][price] = size
return book
6. Integration: Putting It All Together
The complete reconnection lifecycle — connection recovery, gap detection, data reconciliation — ties together in a single orchestrator class:
class MarketDataPipeline:
"""
Production-grade market data pipeline with automatic reconnection
and gap-filling reconciliation.
Workflow:
1. Connect to WebSocket for live data.
2. On disconnect, detect the gap boundaries.
3. Fetch missing data from REST API.
4. Merge reconciled data into the local buffer.
5. Resume WebSocket ingestion.
"""
def __init__(self, api_key: str, symbol: str, channel: str):
self.api_key = api_key
self.symbol = symbol
self.channel = channel
self.gap_detector = GapDetector(client=None)
self.reconciler = DataReconciler(api_key, self.gap_detector)
self.ws_client = ReconnectingWebSocketClient(
api_key=api_key,
on_message=self._handle_message,
on_disconnect=self._handle_disconnect
)
self.gap_detector = GapDetector(self.ws_client)
def start(self):
asyncio.run(self.ws_client.connect())
def _handle_message(self, message: dict):
"""Process incoming WebSocket message."""
self.gap_detector.record_message(message)
self.reconciler.local_buffer.append(message)
# Emit to downstream strategy — this is your live data path
self._dispatch_to_strategy(message)
def _handle_disconnect(self):
"""Triggered when WebSocket connection is lost."""
print("[Pipeline] Disconnect detected — initiating reconciliation.")
try:
reconciled = self.reconciler.reconcile(self.symbol, self.channel)
print(f"[Pipeline] Reconciled {len(reconciled)} total records.")
# Reconnect — the ReconnectingWebSocketClient handles backoff internally
asyncio.run(self.ws_client.connect())
except Exception as exc:
print(f"[Pipeline] Reconciliation failed: {exc}")
raise
def _dispatch_to_strategy(self, message: dict):
"""Route reconciled data to the strategy layer."""
# Implementation depends on your downstream architecture
pass
⚠️ Production warning: This architecture handles single-connection reconnection. If you run multiple parallel streams (e.g., for 50 symbols simultaneously), each stream must maintain its own last_timestamp state independently. A shared global timestamp will cause silent data loss during concurrent disconnects.
7. Deployment Recommendations by Scale
| User type | Recommended approach | Key consideration |
|---|---|---|
| Individual quant | Single-threaded asyncio pipeline with one MarketDataPipeline per symbol |
Start here; add complexity only when you hit scale |
| Quant team (3–10 streams) | One process per asset class, shared GapDetector state in a Redis instance |
Redis TTL must exceed your maximum expected disconnect duration |
| Institutional (50+ streams) | Kafka or Pulsar as the ingestion buffer; REST reconciliation writes to Kafka; strategy layer consumes from Kafka | Kafka consumer group offset management replaces manual timestamp tracking |
For teams running more than 20 concurrent streams, consider offloading the reconciliation state (last timestamp per symbol) to a durable store (Redis, PostgreSQL) rather than keeping it in-memory. This provides two benefits: resilience to process restarts and a single source of truth for coordinated multi-process rebalancing.
8. Common Pitfalls and How to Avoid Them
| Pitfall | Why it happens | Fix |
|---|---|---|
| Processing the same tick twice | First tick after reconnect overlaps with the first REST-fetched tick | Deduplicate by timestamp before merging (see _merge_into_buffer) |
| Off-by-one timestamp | Client clock skew when calculating gap boundaries | Always fetch server_time from the REST API |
| Rate limit cascade | Parallel REST calls during bulk reconciliation | Paginate sequentially; respect Retry-After header |
| Stale local buffer | local_buffer grows unbounded during a long session |
Implement a circular buffer or flush-to-disk strategy after N records |
| Heartbeat timeout mismatch | Server-side idle timeout exceeds client-side ping interval | Ping every 20 seconds when server timeout is 60 seconds — keep at least 2:1 ratio |
| Depth reconstruction failure | Out-of-order depth snapshots during fast markets | Sort snapshots by timestamp before applying; discard if ts < last applied ts |
Closing
The gap between "we lost the connection" and "we recovered all the data" is not a mystery. It is a deterministic sequence of steps: reconnect with exponential backoff, fetch gap boundaries from the server's clock, pull the missing records from the REST API in paginated batches, and merge by timestamp with deduplication.
Every one of these steps has a correct implementation — and a plausible-looking incorrect one. The incorrect versions usually involve a client clock, a missing deduplication step, or an optimistic retry loop with no backoff. They work in testing. They fail in production, at 2 AM, during an earnings release.
Build it correctly the first time. The reconciliation logic is not the interesting part of your trading system — but it is the part that determines whether your interesting strategy ever gets to run on real data.
Next Steps
If you're building a single-stream pipeline and want to get started quickly, the free tier at tickdb.ai provides WebSocket access and REST historical data for development and backtesting. No credit card required.
If you're running multiple concurrent streams and need a durable reconciliation state store, reach out to enterprise@tickdb.ai for architectural guidance on multi-stream gap management and Redis-backed timestamp state.
If you're building an AI-assisted trading system, search for and install the tickdb-market-data SKILL in your AI coding assistant's marketplace — it provides pre-built reconnection and reconciliation templates that conform to the patterns described in this article.
This article does not constitute investment advice. Markets involve risk; past performance does not guarantee future results.