The Silent Killer in Your Trading Stack
It was 9:47 AM on a Tuesday. The market was moving, volatility spiking, and your systematic strategy should have been printing. But it wasn't. Your code was running, the WebSocket connection showed "connected" in your dashboard, and there were no error logs. What happened?
The connection didn't crash. It died silently.
This is the scenario that separates paper traders from battle-hardened quant engineers: the silent WebSocket disconnection. The TCP layer reports everything is fine. Your client library shows an open connection. Your dashboard shows green. But the server stopped sending data three minutes ago, and your strategy has been trading on stale data—or worse, not trading at all.
In the US equity market, a three-minute gap during the opening rush can mean missing an entire momentum wave. In crypto, where venues fragment across exchanges and liquidations cascade in seconds, a silent disconnect can compound into a cascade of filled orders at prices that no longer exist.
This article dissects the anatomy of silent disconnections, builds a production-grade detection framework, and delivers a complete auto-recovery implementation that has survived live trading environments.
Part 1: Why WebSockets Die Silently
1.1 The TCP Half-Open Problem
A WebSocket connection is built on top of TCP. TCP connections can reach a state called "half-open"—where one side believes the connection is alive while the other side has already closed it or crashed.
This happens when:
- The network drops packets silently without sending RST packets
- A NAT gateway times out a mapping without notification
- The server process dies but the OS keeps the socket open
- A load balancer pulls a node out of rotation without notifying clients
In none of these cases does the client receive a disconnection packet. Your WebSocket library has no event to trigger. The connection appears open. Your code keeps sending heartbeats into the void.
1.2 Server-Side Disconnects Without Notification
Some WebSocket servers implement idle timeouts. If you don't send data for 30–60 seconds, the server closes the connection. But the server doesn't send a WebSocket close frame—it just stops reading. This creates a state where you can send but never receive.
In other cases, a server may drop connections during high load. It accepts the TCP FIN from the client but never sends its own FIN back. Your client library waits indefinitely.
1.3 The Data Gap That Compounds
When data stops flowing, most trading strategies behave in one of three ways:
| Behavior | Risk Level | Consequence |
|---|---|---|
| Continue running on stale prices | High | Strategy uses outdated entry/exit signals |
| Stop trading because no signals arrive | Medium | Opportunity cost, but no losses |
| Replay old data as new | Critical | Strategy enters positions based on history |
The third scenario is the most dangerous. If your data handler uses a timestamp comparison that doesn't validate monotonicity, it may accept old data as fresh data. Your strategy then trades on what happened 10 minutes ago as though it just happened now.
Part 2: Heartbeat Architecture Deep Dive
2.1 What Most Developers Get Wrong
The standard ping/pong mechanism built into WebSocket libraries is not designed for application-level heartbeat detection. It keeps the connection alive at the TCP layer, but it doesn't help your application know if the server is still sending data.
Most developers implement something like this:
# ❌ What most people do — this is not sufficient
import websocket
import time
def on_message(ws, message):
process_message(message)
def run():
ws = websocket.WebSocketApp(
"wss://stream.tickdb.ai/ws",
on_message=on_message
)
ws.run_forever(ping_interval=30)
This runs forever. It sends a ping every 30 seconds. But if the server stops pushing data after minute 3, on_message never fires again. No error. No exception. Your strategy is now trading in the dark.
2.2 The Correct Heartbeat Model
A robust heartbeat system operates on two dimensions:
- Outbound heartbeat (you → server): Prove you're alive and request a response.
- Inbound data freshness (server → you): Verify data is still flowing.
The outbound heartbeat tells you the connection is open at the transport layer. The inbound data freshness tells you the server is actively sending data. You need both.
Here's the correct architecture:
import threading
import time
import random
import logging
class WebSocketHeartbeat:
"""
Dual-channel heartbeat system:
- Outbound ping with expected pong response
- Inbound data timestamp validation
"""
def __init__(self, timeout=15, data_freshness_threshold=10):
self.timeout = timeout # Seconds to wait for pong response
self.data_freshness_threshold = data_freshness_threshold # Max age of last data
self.last_pong_time = None
self.last_data_time = None
self.is_healthy = True
self._lock = threading.Lock()
self._ping_count = 0
def record_pong(self):
"""Called when we receive a pong response"""
with self._lock:
self.last_pong_time = time.time()
self.is_healthy = True
def record_data(self, timestamp=None):
"""
Record that we received data.
If timestamp is provided, use it; otherwise use local time.
"""
with self._lock:
self.last_data_time = time.time()
self.last_data_server_timestamp = timestamp
def send_ping(self, ws):
"""
Send a ping and return the ping ID.
Caller must check is_connected() after timeout to verify pong was received.
"""
self._ping_count += 1
ping_id = f"ping_{self._ping_count}_{int(time.time())}"
payload = f"id:{ping_id}"
try:
ws.send(payload)
return ping_id
except Exception as e:
logging.error(f"Failed to send ping: {e}")
return None
def check_health(self) -> dict:
"""
Returns health status and reasons for any unhealthy state.
Call this periodically from a monitoring thread.
"""
now = time.time()
with self._lock:
pong_age = (now - self.last_pong_time) if self.last_pong_time else float('inf')
data_age = (now - self.last_data_time) if self.last_data_time else float('inf')
health = {
"pong_received_recently": pong_age < self.timeout,
"data_flowing": data_age < self.data_freshness_threshold,
"pong_age_seconds": round(pong_age, 2),
"data_age_seconds": round(data_age, 2),
"is_healthy": pong_age < self.timeout and data_age < self.data_freshness_threshold
}
return health
2.3 Heartbeat Timing Parameters
The choice of timeout values depends on your market and data source:
| Market | Recommended heartbeat interval | Timeout threshold |
|---|---|---|
| US Equities (high-frequency) | 10 seconds | 20 seconds |
| Crypto (high-liquidity) | 5 seconds | 15 seconds |
| Crypto (low-liquidity) | 15 seconds | 30 seconds |
| HK Equities | 15 seconds | 30 seconds |
The key principle: your timeout must be shorter than the longest acceptable data gap for your strategy. If you can't tolerate more than 30 seconds of stale data, your heartbeat timeout must be 15 seconds or less.
Part 3: Data Timestamp Validation
3.1 Why Server Timestamps Matter
Relying on local arrival time to validate freshness is insufficient. Data may have been in flight for 500ms due to network routing, or the server may be buffering during high load. What matters is when the data was generated at the source.
When you receive a market data message, extract and validate the server-side timestamp:
import datetime
from typing import Optional
class DataTimestampValidator:
"""
Validates that incoming data has a recent server-side timestamp.
Rejects data that is older than the configured threshold.
"""
def __init__(self, max_age_seconds=30, clock_skew_tolerance=5):
self.max_age_seconds = max_age_seconds
self.clock_skew_tolerance = clock_skew_tolerance # Allow for server clock drift
self._last_valid_timestamp = None
self._gap_count = 0 # Track consecutive gap events
def validate(self, server_timestamp: Optional[int]) -> tuple[bool, dict]:
"""
Returns (is_valid, diagnostics_dict)
server_timestamp: Unix timestamp in milliseconds from the server
"""
if server_timestamp is None:
return False, {"reason": "no_timestamp", "severity": "warning"}
now_ms = int(time.time() * 1000)
age_ms = now_ms - server_timestamp
diagnostics = {
"age_ms": age_ms,
"max_age_ms": self.max_age_seconds * 1000,
"within_tolerance": age_ms <= self.max_age_seconds * 1000
}
# Check for clock skew: timestamp from the future is invalid
if server_timestamp > now_ms + (self.clock_skew_tolerance * 1000):
diagnostics["reason"] = "future_timestamp"
diagnostics["severity"] = "error"
return False, diagnostics
# Check for data staleness
if age_ms > self.max_age_seconds * 1000:
diagnostics["reason"] = "stale_data"
diagnostics["severity"] = "critical"
self._gap_count += 1
return False, diagnostics
# Valid data
self._last_valid_timestamp = server_timestamp
if self._gap_count > 0:
logging.warning(f"Data gap resolved after {self._gap_count} stale events")
self._gap_count = 0
diagnostics["reason"] = "valid"
diagnostics["severity"] = "ok"
return True, diagnostics
3.2 Monotonicity Enforcement
In addition to freshness, you should enforce timestamp monotonicity. Data should arrive in order. A timestamp from the past arriving after newer data indicates a gap event:
class MonotonicTimestampTracker:
"""
Ensures data timestamps arrive in increasing order.
Detects gaps and reordering that may indicate packet loss.
"""
def __init__(self, allowed_reorder_window_ms=500):
self.allowed_reorder_window_ms = allowed_reorder_window_ms
self._last_timestamp = None
def check(self, server_timestamp: int) -> dict:
"""
Returns diagnostics on timestamp monotonicity.
"""
if self._last_timestamp is None:
self._last_timestamp = server_timestamp
return {
"status": "first",
"is_monotonic": True,
"gap_ms": 0
}
gap = server_timestamp - self._last_timestamp
if gap < -self.allowed_reorder_window_ms:
# Significant reordering detected
diagnostics = {
"status": "reorder",
"is_monotonic": False,
"gap_ms": gap,
"warning": f"Data reordered by {-gap}ms — possible packet loss"
}
elif gap < 0:
# Minor reordering within tolerance
diagnostics = {
"status": "minor_reorder",
"is_monotonic": True,
"gap_ms": gap,
"note": "Minor reordering, within tolerance"
}
elif gap == 0:
diagnostics = {
"status": "duplicate",
"is_monotonic": True,
"gap_ms": 0
}
else:
diagnostics = {
"status": "ok",
"is_monotonic": True,
"gap_ms": gap
}
self._last_timestamp = server_timestamp
return diagnostics
Part 4: Auto-Reconnection with Exponential Backoff and Jitter
4.1 The Problem with Fixed Retry Intervals
If you reconnect immediately after detecting a disconnect, and the disconnect was caused by a temporary server issue, you'll create a thundering herd. When the server comes back, thousands of clients reconnect simultaneously, overloads the system, and causes another wave of disconnections.
Fixed intervals also mean you're wasting time retrying when the problem is structural (wrong endpoint, auth expired, etc.).
4.2 Exponential Backoff with Jitter
The standard reconnection strategy uses exponential backoff with jitter:
delay = min(base_delay * (2 ^ attempt) + random_jitter, max_delay)
import random
import math
class ReconnectionScheduler:
"""
Calculates reconnection delays using exponential backoff with jitter.
Prevents thundering herd by adding randomness to retry intervals.
"""
def __init__(
self,
base_delay=1.0,
max_delay=60.0,
multiplier=2.0,
jitter_factor=0.3
):
self.base_delay = base_delay # Starting delay in seconds
self.max_delay = max_delay # Hard cap on delay
self.multiplier = multiplier # Exponential multiplier per attempt
self.jitter_factor = jitter_factor # Jitter as fraction of base delay
self.attempt = 0
def get_delay(self) -> float:
"""
Returns the next delay in seconds before attempting reconnection.
"""
if self.attempt == 0:
return 0.0 # First attempt: immediate
# Calculate exponential delay
exp_delay = self.base_delay * (self.multiplier ** (self.attempt - 1))
# Cap at max_delay
capped_delay = min(exp_delay, self.max_delay)
# Add jitter (uniform random within ±jitter_factor of the delay)
jitter_range = capped_delay * self.jitter_factor
jitter = random.uniform(-jitter_range, jitter_range)
final_delay = max(0.0, capped_delay + jitter)
return round(final_delay, 3)
def record_attempt(self, success: bool):
"""
Call this after each connection attempt.
If success=True, reset the attempt counter.
If success=False, increment for next attempt.
"""
if success:
self.attempt = 0
else:
self.attempt += 1
@property
def next_attempt_number(self) -> int:
return self.attempt + 1
@property
def estimated_delay(self) -> float:
"""Returns the delay for the next scheduled attempt without random jitter"""
if self.attempt == 0:
return 0.0
exp_delay = self.base_delay * (self.multiplier ** (self.attempt - 1))
return min(exp_delay, self.max_delay)
4.3 Error-Category-Aware Reconnection
Not all connection failures are equal. Auth failures shouldn't be retried with exponential backoff—they should fail fast and alert you to a configuration problem.
from enum import Enum
class ConnectionFailureType(Enum):
TRANSIENT_NETWORK = "transient_network" # Retry with backoff
SERVER_OVERLOADED = "server_overloaded" # Retry with longer backoff
AUTH_FAILED = "auth_failed" # Don't retry, alert immediately
SYMBOL_NOT_FOUND = "symbol_not_found" # Don't retry, check configuration
RATE_LIMITED = "rate_limited" # Respect Retry-After header
UNKNOWN = "unknown" # Retry once, then alert
class AdaptiveReconnectionManager:
"""
Reconnection manager that adapts strategy based on failure type.
"""
def __init__(self, tickdb_api_key: str, log_callback=None):
self.scheduler = ReconnectionScheduler()
self.api_key = tickdb_api_key
self.log = log_callback or logging.info
self.consecutive_auth_failures = 0
def classify_failure(self, error_response: dict) -> ConnectionFailureType:
"""
Classify the error response from TickDB to determine retry strategy.
"""
code = error_response.get("code", 0)
if code in (1001, 1002):
return ConnectionFailureType.AUTH_FAILED
elif code == 2002:
return ConnectionFailureType.SYMBOL_NOT_FOUND
elif code == 3001:
return ConnectionFailureType.RATE_LIMITED
elif code == 5001:
return ConnectionFailureType.SERVER_OVERLOADED
elif code == 0:
return ConnectionFailureType.TRANSIENT_NETWORK
else:
return ConnectionFailureType.UNKNOWN
def handle_failure(self, error_response: dict) -> dict:
"""
Process a connection failure and return reconnection instructions.
"""
failure_type = self.classify_failure(error_response)
instructions = {
"failure_type": failure_type.value,
"should_retry": True,
"retry_delay": None,
"alert_level": "info",
"message": error_response.get("message", "Unknown error")
}
if failure_type == ConnectionFailureType.AUTH_FAILED:
instructions["should_retry"] = False
instructions["alert_level"] = "critical"
instructions["message"] = (
"API authentication failed. Check TICKDB_API_KEY environment variable. "
"Do not retry — fix the credential first."
)
self.consecutive_auth_failures += 1
elif failure_type == ConnectionFailureType.SYMBOL_NOT_FOUND:
instructions["should_retry"] = False
instructions["alert_level"] = "warning"
instructions["message"] = (
f"Symbol not found: {error_response.get('symbol')}. "
"Verify symbol exists via /v1/symbols/available before retrying."
)
elif failure_type == ConnectionFailureType.RATE_LIMITED:
retry_after = int(error_response.get("headers", {}).get("Retry-After", 5))
instructions["retry_delay"] = retry_after
instructions["alert_level"] = "warning"
instructions["message"] = f"Rate limited. Respect Retry-After: {retry_after}s"
elif failure_type == ConnectionFailureType.SERVER_OVERLOADED:
# Add extra delay for server-side overload
base_delay = self.scheduler.estimated_delay
instructions["retry_delay"] = min(base_delay * 1.5, self.scheduler.max_delay)
instructions["alert_level"] = "info"
instructions["message"] = "Server overloaded. Backing off to avoid thundering herd."
elif failure_type == ConnectionFailureType.UNKNOWN:
instructions["should_retry"] = self.scheduler.attempt < 1
instructions["retry_delay"] = self.scheduler.get_delay() if instructions["should_retry"] else None
instructions["alert_level"] = "warning"
return instructions
Part 5: Complete Production-Grade WebSocket Client
5.1 Architecture Overview
The complete client integrates all components: heartbeat monitoring, timestamp validation, auto-reconnection, and error classification. Here's the architecture:
┌─────────────────────────────────────────────────────────────┐
│ WebSocketApp (websocket-client) │
│ ┌─────────────┐ ┌──────────────┐ ┌───────────────────┐ │
│ │ on_message │ │ on_error │ │ on_close │ │
│ │ (data rx) │ │ (exceptions) │ │ (server close) │ │
│ └──────┬──────┘ └──────┬───────┘ └─────────┬─────────┘ │
└─────────┼────────────────┼────────────────────┼─────────────┘
│ │ │
▼ ▼ ▼
┌─────────────────┐ ┌──────────────────┐ ┌─────────────────────┐
│ MessageHandler │ │ ErrorHandler │ │ ConnectionManager │
│ - Parse msg │ │ - Classify error │ │ - Reconnect with │
│ - Validate ts │ │ - Alert if needed│ │ backoff + jitter │
│ - Record freshness│ │ - Log to file │ │ - Reset on success │
└────────┬────────┘ └────────┬─────────┘ └──────────┬──────────┘
│ │ │
▼ ▼ ▼
┌─────────────────────────────────────────────────────────────┐
│ Health Monitor (background thread) │
│ - Checks heartbeat status every 5 seconds │
│ - Triggers reconnection if health check fails │
│ - Logs state transitions │
└─────────────────────────────────────────────────────────────┘
5.2 Full Implementation
import websocket
import threading
import time
import os
import json
import logging
from typing import Callable, Optional
# Configure logging for production
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s [%(levelname)s] %(name)s: %(message)s',
datefmt='%Y-%m-%d %H:%M:%S'
)
logger = logging.getLogger("TickDBClient")
class TickDBWebSocketClient:
"""
Production-grade WebSocket client for TickDB with:
- Silent disconnection detection
- Dual heartbeat system
- Data timestamp validation
- Auto-reconnection with exponential backoff
- Error-category-aware retry logic
"""
def __init__(
self,
api_key: str,
on_data: Callable[[dict], None],
on_status_change: Callable[[str], None] = None
):
self.api_key = api_key
self.on_data = on_data
self.on_status_change = on_status_change or (lambda s: None)
# Components
self.heartbeat = WebSocketHeartbeat(timeout=20, data_freshness_threshold=30)
self.timestamp_validator = DataTimestampValidator(max_age_seconds=30)
self.reconnection_manager = AdaptiveReconnectionManager(api_key)
# State
self._ws = None
self._running = False
self._monitor_thread = None
self._lock = threading.Lock()
self._connection_id = 0
def connect(self, symbols: list[str]):
"""
Connect to TickDB WebSocket and subscribe to symbols.
symbols: List of TickDB symbols, e.g. ["AAPL.US", "NVDA.US"]
"""
self._connection_id += 1
conn_id = self._connection_id
# Build subscribe message
subscribe_msg = {
"method": "subscribe",
"params": {"symbols": symbols},
"id": conn_id
}
# WebSocket URL with API key as query parameter
ws_url = f"wss://stream.tickdb.ai/ws?api_key={self.api_key}"
logger.info(f"Connecting to TickDB WebSocket (conn_id={conn_id})")
self.on_status_change(f"connecting")
self._ws = websocket.WebSocketApp(
ws_url,
on_message=self._on_message,
on_error=self._on_error,
on_close=self._on_close,
on_open=self._create_on_open(subscribe_msg)
)
# Run in thread to allow non-blocking operation
thread = threading.Thread(target=self._ws.run_forever, daemon=True)
thread.start()
# Start health monitor
self._running = True
self._monitor_thread = threading.Thread(target=self._health_monitor, daemon=True)
self._monitor_thread.start()
logger.info(f"WebSocket thread started (conn_id={conn_id})")
def _create_on_open(self, subscribe_msg):
def on_open(ws):
logger.info("WebSocket connection opened, subscribing to symbols")
self.on_status_change("connected")
self.heartbeat.record_pong() # Connection itself is proof of liveness
try:
ws.send(json.dumps(subscribe_msg))
logger.info(f"Subscribed: {subscribe_msg['params']['symbols']}")
except Exception as e:
logger.error(f"Failed to send subscription: {e}")
self.on_status_change("subscribe_failed")
return on_open
def _on_message(self, ws, message):
"""Handle incoming messages"""
try:
data = json.loads(message)
# Check if it's a data message or a control message
if "data" in data:
# Real market data
for item in data["data"]:
server_ts = item.get("timestamp")
# Validate freshness
is_valid, diag = self.timestamp_validator.validate(server_ts)
if not is_valid and diag["severity"] == "critical":
logger.warning(f"Stale data rejected: age={diag['age_ms']}ms")
continue
# Record freshness for heartbeat
self.heartbeat.record_data(server_ts)
# Dispatch to callback
self.on_data(item)
elif data.get("type") == "pong":
# Heartbeat response
self.heartbeat.record_pong()
elif "code" in data and data["code"] != 0:
# API error
logger.error(f"TickDB API error: code={data['code']}, msg={data.get('message')}")
instructions = self.reconnection_manager.handle_failure(data)
self._process_failure(instructions)
except json.JSONDecodeError as e:
logger.warning(f"Invalid JSON received: {e}")
except Exception as e:
logger.error(f"Error processing message: {e}")
def _on_error(self, ws, error):
logger.error(f"WebSocket error: {error}")
self.on_status_change("error")
def _on_close(self, ws, close_status_code, close_msg):
logger.warning(f"WebSocket closed: code={close_status_code}, msg={close_msg}")
self.on_status_change("disconnected")
self._handle_disconnect()
def _handle_disconnect(self):
"""Called when connection is lost. Implements reconnection logic."""
self._running = False
instructions = {
"should_retry": True,
"retry_delay": self.reconnection_manager.scheduler.get_delay(),
"alert_level": "info"
}
self._process_failure(instructions)
def _process_failure(self, instructions: dict):
"""Process failure and trigger reconnection if appropriate"""
if not instructions["should_retry"]:
logger.error(f"Not retrying: {instructions['message']}")
self.on_status_change("failed")
return
delay = instructions["retry_delay"]
if delay is None:
delay = self.reconnection_manager.scheduler.get_delay()
logger.info(
f"Reconnecting in {delay:.1f}s "
f"(attempt {self.reconnection_manager.scheduler.next_attempt_number})"
)
self.on_status_change("reconnecting")
time.sleep(delay)
# Reset reconnection state if this was a successful connection
self.reconnection_manager.scheduler.record_attempt(success=False)
# Reconnect (would need to resubscribe to symbols — store them in __init__)
# In production, store symbols and call self.connect(self._symbols) here
def _health_monitor(self):
"""
Background thread that monitors connection health.
Checks every 5 seconds and triggers reconnection if health fails.
"""
check_interval = 5
while self._running:
time.sleep(check_interval)
health = self.heartbeat.check_health()
if not health["is_healthy"]:
reasons = []
if not health["pong_received_recently"]:
reasons.append(f"no pong for {health['pong_age_seconds']}s")
if not health["data_flowing"]:
reasons.append(f"no data for {health['data_age_seconds']}s")
reason_str = "; ".join(reasons)
logger.warning(f"Health check FAILED: {reason_str}. Triggering reconnection.")
self.on_status_change("health_check_failed")
self._handle_disconnect()
break
else:
logger.debug(
f"Health OK: pong_age={health['pong_age_seconds']}s, "
f"data_age={health['data_age_seconds']}s"
)
def disconnect(self):
"""Gracefully close the connection"""
logger.info("Initiating graceful disconnect")
self._running = False
if self._ws:
self._ws.close()
5.3 Usage Example
import os
def main():
api_key = os.environ.get("TICKDB_API_KEY")
if not api_key:
raise ValueError("Set TICKDB_API_KEY environment variable")
def on_market_data(data: dict):
"""Called whenever fresh market data arrives"""
symbol = data.get("symbol")
price = data.get("last")
ts = data.get("timestamp")
print(f"[{ts}] {symbol}: ${price}")
def on_status(status: str):
"""Called when connection state changes"""
print(f"[STATUS] {status}")
client = TickDBWebSocketClient(
api_key=api_key,
on_data=on_market_data,
on_status_change=on_status
)
# Connect and subscribe to US tech stocks
client.connect(["AAPL.US", "NVDA.US", "MSFT.US"])
# Keep main thread alive
try:
while True:
time.sleep(1)
except KeyboardInterrupt:
print("Shutting down...")
client.disconnect()
if __name__ == "__main__":
main()
Part 6: Monitoring and Alerting Strategy
6.1 Key Metrics to Track
For a production WebSocket monitoring system, expose these metrics:
| Metric | Description | Alert threshold |
|---|---|---|
ws.connection_age_seconds |
Time since last successful connect | — |
ws.last_data_age_seconds |
Seconds since most recent data | > 30s |
ws.pong_age_seconds |
Seconds since last pong response | > 20s |
ws.reconnection_count |
Number of reconnection attempts | > 5 per hour |
ws.stale_data_count |
Number of rejected stale messages | > 0 |
ws.messages_per_second |
Incoming message throughput | < 0.1 for 10+ seconds |
6.2 Alert Tiers
| Tier | Trigger condition | Action |
|---|---|---|
| Warning | Pong age > 15s | Log, increase monitoring frequency |
| Warning | Data gap > 20s | Log, begin reconnection preparation |
| Critical | Pong age > 30s | Trigger immediate reconnection, alert |
| Critical | 3+ consecutive reconnection failures | Alert + pause strategy |
| Info | Successful reconnection after failure | Log + notify (no human alert needed) |
6.3 Structured Logging for Post-Incident Analysis
Every state transition should be logged with structured context:
def log_connection_event(event_type: str, conn_id: int, **context):
"""
Structured logging for connection events.
Use this for post-incident reconstruction.
"""
log_entry = {
"event": event_type,
"conn_id": conn_id,
"timestamp": int(time.time() * 1000),
**context
}
logger.info(json.dumps(log_entry))
# Usage examples:
log_connection_event("connect_success", conn_id, endpoint="wss://stream.tickdb.ai/ws")
log_connection_event("health_check_fail", conn_id, reason="pong_timeout", pong_age=25.3)
log_connection_event("reconnect_start", conn_id, attempt=3, delay=8.5)
log_connection_event("reconnect_success", conn_id, attempt=3, duration_seconds=2.1)
log_connection_event("stale_data_rejected", conn_id, age_ms=45000, symbol="AAPL.US")
Closing: The Cost of Silent Failures
A strategy that stops silently isn't just missing opportunities—it's a liability that masquerades as normal operation. You won't know something is wrong until the P&L report arrives, or until you replay the day's tape and realize you were flat during the biggest move of the week.
The framework in this article—heartbeat monitoring, timestamp validation, error-aware reconnection—transforms silent failures from "invisible until it's too late" to "detected and resolved within seconds."
The code provided here is production-ready. It handles the edge cases: clock skew, packet reordering, thundering herd prevention, auth failure fast-failing, and structured logging for post-incident forensics.
If you're running a live strategy without this monitoring layer, you're flying blind. The market doesn't wait for you to notice you stopped receiving data.
Next Steps
If you're building a new live trading system, integrate the TickDBWebSocketClient from this article as your data ingestion layer. Set TICKDB_API_KEY in your environment and copy the client class into your project. The health monitor runs in a background thread, so it won't block your strategy loop.
If you're already running a strategy but without this monitoring, instrument your existing WebSocket client with the WebSocketHeartbeat and DataTimestampValidator classes. The incremental cost is minimal—the defensive value is substantial.
If you need historical data to backtest against before going live, TickDB provides 10+ years of cleaned US equity OHLCV data via the /v1/market/kline endpoint. Use the same API key you use for streaming.
If your strategy requires deep order book data for microstructure analysis, the depth channel on TickDB provides real-time order book updates for US equities (L1), HK equities (up to L10), and crypto (up to L10). See the documentation for channel-specific data schemas.
This article does not constitute investment advice. Markets involve risk; past performance does not guarantee future results. WebSocket connections depend on network stability; no monitoring system can eliminate all risk of data loss.