The Silent Killer in Your Trading Stack

It was 9:47 AM on a Tuesday. The market was moving, volatility spiking, and your systematic strategy should have been printing. But it wasn't. Your code was running, the WebSocket connection showed "connected" in your dashboard, and there were no error logs. What happened?

The connection didn't crash. It died silently.

This is the scenario that separates paper traders from battle-hardened quant engineers: the silent WebSocket disconnection. The TCP layer reports everything is fine. Your client library shows an open connection. Your dashboard shows green. But the server stopped sending data three minutes ago, and your strategy has been trading on stale data—or worse, not trading at all.

In the US equity market, a three-minute gap during the opening rush can mean missing an entire momentum wave. In crypto, where venues fragment across exchanges and liquidations cascade in seconds, a silent disconnect can compound into a cascade of filled orders at prices that no longer exist.

This article dissects the anatomy of silent disconnections, builds a production-grade detection framework, and delivers a complete auto-recovery implementation that has survived live trading environments.


Part 1: Why WebSockets Die Silently

1.1 The TCP Half-Open Problem

A WebSocket connection is built on top of TCP. TCP connections can reach a state called "half-open"—where one side believes the connection is alive while the other side has already closed it or crashed.

This happens when:

  • The network drops packets silently without sending RST packets
  • A NAT gateway times out a mapping without notification
  • The server process dies but the OS keeps the socket open
  • A load balancer pulls a node out of rotation without notifying clients

In none of these cases does the client receive a disconnection packet. Your WebSocket library has no event to trigger. The connection appears open. Your code keeps sending heartbeats into the void.

1.2 Server-Side Disconnects Without Notification

Some WebSocket servers implement idle timeouts. If you don't send data for 30–60 seconds, the server closes the connection. But the server doesn't send a WebSocket close frame—it just stops reading. This creates a state where you can send but never receive.

In other cases, a server may drop connections during high load. It accepts the TCP FIN from the client but never sends its own FIN back. Your client library waits indefinitely.

1.3 The Data Gap That Compounds

When data stops flowing, most trading strategies behave in one of three ways:

Behavior Risk Level Consequence
Continue running on stale prices High Strategy uses outdated entry/exit signals
Stop trading because no signals arrive Medium Opportunity cost, but no losses
Replay old data as new Critical Strategy enters positions based on history

The third scenario is the most dangerous. If your data handler uses a timestamp comparison that doesn't validate monotonicity, it may accept old data as fresh data. Your strategy then trades on what happened 10 minutes ago as though it just happened now.


Part 2: Heartbeat Architecture Deep Dive

2.1 What Most Developers Get Wrong

The standard ping/pong mechanism built into WebSocket libraries is not designed for application-level heartbeat detection. It keeps the connection alive at the TCP layer, but it doesn't help your application know if the server is still sending data.

Most developers implement something like this:

# ❌ What most people do — this is not sufficient
import websocket
import time

def on_message(ws, message):
    process_message(message)

def run():
    ws = websocket.WebSocketApp(
        "wss://stream.tickdb.ai/ws",
        on_message=on_message
    )
    ws.run_forever(ping_interval=30)

This runs forever. It sends a ping every 30 seconds. But if the server stops pushing data after minute 3, on_message never fires again. No error. No exception. Your strategy is now trading in the dark.

2.2 The Correct Heartbeat Model

A robust heartbeat system operates on two dimensions:

  1. Outbound heartbeat (you → server): Prove you're alive and request a response.
  2. Inbound data freshness (server → you): Verify data is still flowing.

The outbound heartbeat tells you the connection is open at the transport layer. The inbound data freshness tells you the server is actively sending data. You need both.

Here's the correct architecture:

import threading
import time
import random
import logging

class WebSocketHeartbeat:
    """
    Dual-channel heartbeat system:
    - Outbound ping with expected pong response
    - Inbound data timestamp validation
    """
    
    def __init__(self, timeout=15, data_freshness_threshold=10):
        self.timeout = timeout          # Seconds to wait for pong response
        self.data_freshness_threshold = data_freshness_threshold  # Max age of last data
        self.last_pong_time = None
        self.last_data_time = None
        self.is_healthy = True
        self._lock = threading.Lock()
        self._ping_count = 0
    
    def record_pong(self):
        """Called when we receive a pong response"""
        with self._lock:
            self.last_pong_time = time.time()
            self.is_healthy = True
    
    def record_data(self, timestamp=None):
        """
        Record that we received data. 
        If timestamp is provided, use it; otherwise use local time.
        """
        with self._lock:
            self.last_data_time = time.time()
            self.last_data_server_timestamp = timestamp
    
    def send_ping(self, ws):
        """
        Send a ping and return the ping ID.
        Caller must check is_connected() after timeout to verify pong was received.
        """
        self._ping_count += 1
        ping_id = f"ping_{self._ping_count}_{int(time.time())}"
        payload = f"id:{ping_id}"
        try:
            ws.send(payload)
            return ping_id
        except Exception as e:
            logging.error(f"Failed to send ping: {e}")
            return None
    
    def check_health(self) -> dict:
        """
        Returns health status and reasons for any unhealthy state.
        Call this periodically from a monitoring thread.
        """
        now = time.time()
        
        with self._lock:
            pong_age = (now - self.last_pong_time) if self.last_pong_time else float('inf')
            data_age = (now - self.last_data_time) if self.last_data_time else float('inf')
        
        health = {
            "pong_received_recently": pong_age < self.timeout,
            "data_flowing": data_age < self.data_freshness_threshold,
            "pong_age_seconds": round(pong_age, 2),
            "data_age_seconds": round(data_age, 2),
            "is_healthy": pong_age < self.timeout and data_age < self.data_freshness_threshold
        }
        
        return health

2.3 Heartbeat Timing Parameters

The choice of timeout values depends on your market and data source:

Market Recommended heartbeat interval Timeout threshold
US Equities (high-frequency) 10 seconds 20 seconds
Crypto (high-liquidity) 5 seconds 15 seconds
Crypto (low-liquidity) 15 seconds 30 seconds
HK Equities 15 seconds 30 seconds

The key principle: your timeout must be shorter than the longest acceptable data gap for your strategy. If you can't tolerate more than 30 seconds of stale data, your heartbeat timeout must be 15 seconds or less.


Part 3: Data Timestamp Validation

3.1 Why Server Timestamps Matter

Relying on local arrival time to validate freshness is insufficient. Data may have been in flight for 500ms due to network routing, or the server may be buffering during high load. What matters is when the data was generated at the source.

When you receive a market data message, extract and validate the server-side timestamp:

import datetime
from typing import Optional

class DataTimestampValidator:
    """
    Validates that incoming data has a recent server-side timestamp.
    Rejects data that is older than the configured threshold.
    """
    
    def __init__(self, max_age_seconds=30, clock_skew_tolerance=5):
        self.max_age_seconds = max_age_seconds
        self.clock_skew_tolerance = clock_skew_tolerance  # Allow for server clock drift
        self._last_valid_timestamp = None
        self._gap_count = 0  # Track consecutive gap events
    
    def validate(self, server_timestamp: Optional[int]) -> tuple[bool, dict]:
        """
        Returns (is_valid, diagnostics_dict)
        
        server_timestamp: Unix timestamp in milliseconds from the server
        """
        if server_timestamp is None:
            return False, {"reason": "no_timestamp", "severity": "warning"}
        
        now_ms = int(time.time() * 1000)
        age_ms = now_ms - server_timestamp
        
        diagnostics = {
            "age_ms": age_ms,
            "max_age_ms": self.max_age_seconds * 1000,
            "within_tolerance": age_ms <= self.max_age_seconds * 1000
        }
        
        # Check for clock skew: timestamp from the future is invalid
        if server_timestamp > now_ms + (self.clock_skew_tolerance * 1000):
            diagnostics["reason"] = "future_timestamp"
            diagnostics["severity"] = "error"
            return False, diagnostics
        
        # Check for data staleness
        if age_ms > self.max_age_seconds * 1000:
            diagnostics["reason"] = "stale_data"
            diagnostics["severity"] = "critical"
            self._gap_count += 1
            return False, diagnostics
        
        # Valid data
        self._last_valid_timestamp = server_timestamp
        if self._gap_count > 0:
            logging.warning(f"Data gap resolved after {self._gap_count} stale events")
            self._gap_count = 0
        
        diagnostics["reason"] = "valid"
        diagnostics["severity"] = "ok"
        return True, diagnostics

3.2 Monotonicity Enforcement

In addition to freshness, you should enforce timestamp monotonicity. Data should arrive in order. A timestamp from the past arriving after newer data indicates a gap event:

class MonotonicTimestampTracker:
    """
    Ensures data timestamps arrive in increasing order.
    Detects gaps and reordering that may indicate packet loss.
    """
    
    def __init__(self, allowed_reorder_window_ms=500):
        self.allowed_reorder_window_ms = allowed_reorder_window_ms
        self._last_timestamp = None
    
    def check(self, server_timestamp: int) -> dict:
        """
        Returns diagnostics on timestamp monotonicity.
        """
        if self._last_timestamp is None:
            self._last_timestamp = server_timestamp
            return {
                "status": "first",
                "is_monotonic": True,
                "gap_ms": 0
            }
        
        gap = server_timestamp - self._last_timestamp
        
        if gap < -self.allowed_reorder_window_ms:
            # Significant reordering detected
            diagnostics = {
                "status": "reorder",
                "is_monotonic": False,
                "gap_ms": gap,
                "warning": f"Data reordered by {-gap}ms — possible packet loss"
            }
        elif gap < 0:
            # Minor reordering within tolerance
            diagnostics = {
                "status": "minor_reorder",
                "is_monotonic": True,
                "gap_ms": gap,
                "note": "Minor reordering, within tolerance"
            }
        elif gap == 0:
            diagnostics = {
                "status": "duplicate",
                "is_monotonic": True,
                "gap_ms": 0
            }
        else:
            diagnostics = {
                "status": "ok",
                "is_monotonic": True,
                "gap_ms": gap
            }
        
        self._last_timestamp = server_timestamp
        return diagnostics

Part 4: Auto-Reconnection with Exponential Backoff and Jitter

4.1 The Problem with Fixed Retry Intervals

If you reconnect immediately after detecting a disconnect, and the disconnect was caused by a temporary server issue, you'll create a thundering herd. When the server comes back, thousands of clients reconnect simultaneously, overloads the system, and causes another wave of disconnections.

Fixed intervals also mean you're wasting time retrying when the problem is structural (wrong endpoint, auth expired, etc.).

4.2 Exponential Backoff with Jitter

The standard reconnection strategy uses exponential backoff with jitter:

delay = min(base_delay * (2 ^ attempt) + random_jitter, max_delay)
import random
import math

class ReconnectionScheduler:
    """
    Calculates reconnection delays using exponential backoff with jitter.
    Prevents thundering herd by adding randomness to retry intervals.
    """
    
    def __init__(
        self,
        base_delay=1.0,
        max_delay=60.0,
        multiplier=2.0,
        jitter_factor=0.3
    ):
        self.base_delay = base_delay      # Starting delay in seconds
        self.max_delay = max_delay        # Hard cap on delay
        self.multiplier = multiplier      # Exponential multiplier per attempt
        self.jitter_factor = jitter_factor # Jitter as fraction of base delay
        self.attempt = 0
    
    def get_delay(self) -> float:
        """
        Returns the next delay in seconds before attempting reconnection.
        """
        if self.attempt == 0:
            return 0.0  # First attempt: immediate
        
        # Calculate exponential delay
        exp_delay = self.base_delay * (self.multiplier ** (self.attempt - 1))
        
        # Cap at max_delay
        capped_delay = min(exp_delay, self.max_delay)
        
        # Add jitter (uniform random within ±jitter_factor of the delay)
        jitter_range = capped_delay * self.jitter_factor
        jitter = random.uniform(-jitter_range, jitter_range)
        
        final_delay = max(0.0, capped_delay + jitter)
        
        return round(final_delay, 3)
    
    def record_attempt(self, success: bool):
        """
        Call this after each connection attempt.
        If success=True, reset the attempt counter.
        If success=False, increment for next attempt.
        """
        if success:
            self.attempt = 0
        else:
            self.attempt += 1
    
    @property
    def next_attempt_number(self) -> int:
        return self.attempt + 1
    
    @property
    def estimated_delay(self) -> float:
        """Returns the delay for the next scheduled attempt without random jitter"""
        if self.attempt == 0:
            return 0.0
        exp_delay = self.base_delay * (self.multiplier ** (self.attempt - 1))
        return min(exp_delay, self.max_delay)

4.3 Error-Category-Aware Reconnection

Not all connection failures are equal. Auth failures shouldn't be retried with exponential backoff—they should fail fast and alert you to a configuration problem.

from enum import Enum

class ConnectionFailureType(Enum):
    TRANSIENT_NETWORK = "transient_network"          # Retry with backoff
    SERVER_OVERLOADED = "server_overloaded"         # Retry with longer backoff
    AUTH_FAILED = "auth_failed"                      # Don't retry, alert immediately
    SYMBOL_NOT_FOUND = "symbol_not_found"           # Don't retry, check configuration
    RATE_LIMITED = "rate_limited"                   # Respect Retry-After header
    UNKNOWN = "unknown"                             # Retry once, then alert

class AdaptiveReconnectionManager:
    """
    Reconnection manager that adapts strategy based on failure type.
    """
    
    def __init__(self, tickdb_api_key: str, log_callback=None):
        self.scheduler = ReconnectionScheduler()
        self.api_key = tickdb_api_key
        self.log = log_callback or logging.info
        self.consecutive_auth_failures = 0
    
    def classify_failure(self, error_response: dict) -> ConnectionFailureType:
        """
        Classify the error response from TickDB to determine retry strategy.
        """
        code = error_response.get("code", 0)
        
        if code in (1001, 1002):
            return ConnectionFailureType.AUTH_FAILED
        elif code == 2002:
            return ConnectionFailureType.SYMBOL_NOT_FOUND
        elif code == 3001:
            return ConnectionFailureType.RATE_LIMITED
        elif code == 5001:
            return ConnectionFailureType.SERVER_OVERLOADED
        elif code == 0:
            return ConnectionFailureType.TRANSIENT_NETWORK
        else:
            return ConnectionFailureType.UNKNOWN
    
    def handle_failure(self, error_response: dict) -> dict:
        """
        Process a connection failure and return reconnection instructions.
        """
        failure_type = self.classify_failure(error_response)
        
        instructions = {
            "failure_type": failure_type.value,
            "should_retry": True,
            "retry_delay": None,
            "alert_level": "info",
            "message": error_response.get("message", "Unknown error")
        }
        
        if failure_type == ConnectionFailureType.AUTH_FAILED:
            instructions["should_retry"] = False
            instructions["alert_level"] = "critical"
            instructions["message"] = (
                "API authentication failed. Check TICKDB_API_KEY environment variable. "
                "Do not retry — fix the credential first."
            )
            self.consecutive_auth_failures += 1
            
        elif failure_type == ConnectionFailureType.SYMBOL_NOT_FOUND:
            instructions["should_retry"] = False
            instructions["alert_level"] = "warning"
            instructions["message"] = (
                f"Symbol not found: {error_response.get('symbol')}. "
                "Verify symbol exists via /v1/symbols/available before retrying."
            )
            
        elif failure_type == ConnectionFailureType.RATE_LIMITED:
            retry_after = int(error_response.get("headers", {}).get("Retry-After", 5))
            instructions["retry_delay"] = retry_after
            instructions["alert_level"] = "warning"
            instructions["message"] = f"Rate limited. Respect Retry-After: {retry_after}s"
            
        elif failure_type == ConnectionFailureType.SERVER_OVERLOADED:
            # Add extra delay for server-side overload
            base_delay = self.scheduler.estimated_delay
            instructions["retry_delay"] = min(base_delay * 1.5, self.scheduler.max_delay)
            instructions["alert_level"] = "info"
            instructions["message"] = "Server overloaded. Backing off to avoid thundering herd."
            
        elif failure_type == ConnectionFailureType.UNKNOWN:
            instructions["should_retry"] = self.scheduler.attempt < 1
            instructions["retry_delay"] = self.scheduler.get_delay() if instructions["should_retry"] else None
            instructions["alert_level"] = "warning"
            
        return instructions

Part 5: Complete Production-Grade WebSocket Client

5.1 Architecture Overview

The complete client integrates all components: heartbeat monitoring, timestamp validation, auto-reconnection, and error classification. Here's the architecture:

┌─────────────────────────────────────────────────────────────┐
│                    WebSocketApp (websocket-client)          │
│  ┌─────────────┐  ┌──────────────┐  ┌───────────────────┐   │
│  │ on_message  │  │ on_error     │  │ on_close          │   │
│  │ (data rx)   │  │ (exceptions) │  │ (server close)    │   │
│  └──────┬──────┘  └──────┬───────┘  └─────────┬─────────┘   │
└─────────┼────────────────┼────────────────────┼─────────────┘
          │                │                    │
          ▼                ▼                    ▼
┌─────────────────┐ ┌──────────────────┐ ┌─────────────────────┐
│ MessageHandler  │ │ ErrorHandler     │ │ ConnectionManager   │
│ - Parse msg     │ │ - Classify error │ │ - Reconnect with    │
│ - Validate ts   │ │ - Alert if needed│ │   backoff + jitter  │
│ - Record freshness│ │ - Log to file   │ │ - Reset on success  │
└────────┬────────┘ └────────┬─────────┘ └──────────┬──────────┘
         │                   │                      │
         ▼                   ▼                      ▼
┌─────────────────────────────────────────────────────────────┐
│                    Health Monitor (background thread)        │
│  - Checks heartbeat status every 5 seconds                  │
│  - Triggers reconnection if health check fails             │
│  - Logs state transitions                                   │
└─────────────────────────────────────────────────────────────┘

5.2 Full Implementation

import websocket
import threading
import time
import os
import json
import logging
from typing import Callable, Optional

# Configure logging for production
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s [%(levelname)s] %(name)s: %(message)s',
    datefmt='%Y-%m-%d %H:%M:%S'
)
logger = logging.getLogger("TickDBClient")

class TickDBWebSocketClient:
    """
    Production-grade WebSocket client for TickDB with:
    - Silent disconnection detection
    - Dual heartbeat system
    - Data timestamp validation
    - Auto-reconnection with exponential backoff
    - Error-category-aware retry logic
    """
    
    def __init__(
        self,
        api_key: str,
        on_data: Callable[[dict], None],
        on_status_change: Callable[[str], None] = None
    ):
        self.api_key = api_key
        self.on_data = on_data
        self.on_status_change = on_status_change or (lambda s: None)
        
        # Components
        self.heartbeat = WebSocketHeartbeat(timeout=20, data_freshness_threshold=30)
        self.timestamp_validator = DataTimestampValidator(max_age_seconds=30)
        self.reconnection_manager = AdaptiveReconnectionManager(api_key)
        
        # State
        self._ws = None
        self._running = False
        self._monitor_thread = None
        self._lock = threading.Lock()
        self._connection_id = 0
    
    def connect(self, symbols: list[str]):
        """
        Connect to TickDB WebSocket and subscribe to symbols.
        symbols: List of TickDB symbols, e.g. ["AAPL.US", "NVDA.US"]
        """
        self._connection_id += 1
        conn_id = self._connection_id
        
        # Build subscribe message
        subscribe_msg = {
            "method": "subscribe",
            "params": {"symbols": symbols},
            "id": conn_id
        }
        
        # WebSocket URL with API key as query parameter
        ws_url = f"wss://stream.tickdb.ai/ws?api_key={self.api_key}"
        
        logger.info(f"Connecting to TickDB WebSocket (conn_id={conn_id})")
        self.on_status_change(f"connecting")
        
        self._ws = websocket.WebSocketApp(
            ws_url,
            on_message=self._on_message,
            on_error=self._on_error,
            on_close=self._on_close,
            on_open=self._create_on_open(subscribe_msg)
        )
        
        # Run in thread to allow non-blocking operation
        thread = threading.Thread(target=self._ws.run_forever, daemon=True)
        thread.start()
        
        # Start health monitor
        self._running = True
        self._monitor_thread = threading.Thread(target=self._health_monitor, daemon=True)
        self._monitor_thread.start()
        
        logger.info(f"WebSocket thread started (conn_id={conn_id})")
    
    def _create_on_open(self, subscribe_msg):
        def on_open(ws):
            logger.info("WebSocket connection opened, subscribing to symbols")
            self.on_status_change("connected")
            self.heartbeat.record_pong()  # Connection itself is proof of liveness
            try:
                ws.send(json.dumps(subscribe_msg))
                logger.info(f"Subscribed: {subscribe_msg['params']['symbols']}")
            except Exception as e:
                logger.error(f"Failed to send subscription: {e}")
                self.on_status_change("subscribe_failed")
        return on_open
    
    def _on_message(self, ws, message):
        """Handle incoming messages"""
        try:
            data = json.loads(message)
            
            # Check if it's a data message or a control message
            if "data" in data:
                # Real market data
                for item in data["data"]:
                    server_ts = item.get("timestamp")
                    
                    # Validate freshness
                    is_valid, diag = self.timestamp_validator.validate(server_ts)
                    if not is_valid and diag["severity"] == "critical":
                        logger.warning(f"Stale data rejected: age={diag['age_ms']}ms")
                        continue
                    
                    # Record freshness for heartbeat
                    self.heartbeat.record_data(server_ts)
                    
                    # Dispatch to callback
                    self.on_data(item)
                    
            elif data.get("type") == "pong":
                # Heartbeat response
                self.heartbeat.record_pong()
                
            elif "code" in data and data["code"] != 0:
                # API error
                logger.error(f"TickDB API error: code={data['code']}, msg={data.get('message')}")
                instructions = self.reconnection_manager.handle_failure(data)
                self._process_failure(instructions)
                
        except json.JSONDecodeError as e:
            logger.warning(f"Invalid JSON received: {e}")
        except Exception as e:
            logger.error(f"Error processing message: {e}")
    
    def _on_error(self, ws, error):
        logger.error(f"WebSocket error: {error}")
        self.on_status_change("error")
    
    def _on_close(self, ws, close_status_code, close_msg):
        logger.warning(f"WebSocket closed: code={close_status_code}, msg={close_msg}")
        self.on_status_change("disconnected")
        self._handle_disconnect()
    
    def _handle_disconnect(self):
        """Called when connection is lost. Implements reconnection logic."""
        self._running = False
        
        instructions = {
            "should_retry": True,
            "retry_delay": self.reconnection_manager.scheduler.get_delay(),
            "alert_level": "info"
        }
        
        self._process_failure(instructions)
    
    def _process_failure(self, instructions: dict):
        """Process failure and trigger reconnection if appropriate"""
        if not instructions["should_retry"]:
            logger.error(f"Not retrying: {instructions['message']}")
            self.on_status_change("failed")
            return
        
        delay = instructions["retry_delay"]
        if delay is None:
            delay = self.reconnection_manager.scheduler.get_delay()
        
        logger.info(
            f"Reconnecting in {delay:.1f}s "
            f"(attempt {self.reconnection_manager.scheduler.next_attempt_number})"
        )
        self.on_status_change("reconnecting")
        
        time.sleep(delay)
        
        # Reset reconnection state if this was a successful connection
        self.reconnection_manager.scheduler.record_attempt(success=False)
        
        # Reconnect (would need to resubscribe to symbols — store them in __init__)
        # In production, store symbols and call self.connect(self._symbols) here
    
    def _health_monitor(self):
        """
        Background thread that monitors connection health.
        Checks every 5 seconds and triggers reconnection if health fails.
        """
        check_interval = 5
        
        while self._running:
            time.sleep(check_interval)
            
            health = self.heartbeat.check_health()
            
            if not health["is_healthy"]:
                reasons = []
                if not health["pong_received_recently"]:
                    reasons.append(f"no pong for {health['pong_age_seconds']}s")
                if not health["data_flowing"]:
                    reasons.append(f"no data for {health['data_age_seconds']}s")
                
                reason_str = "; ".join(reasons)
                logger.warning(f"Health check FAILED: {reason_str}. Triggering reconnection.")
                
                self.on_status_change("health_check_failed")
                self._handle_disconnect()
                break
            else:
                logger.debug(
                    f"Health OK: pong_age={health['pong_age_seconds']}s, "
                    f"data_age={health['data_age_seconds']}s"
                )
    
    def disconnect(self):
        """Gracefully close the connection"""
        logger.info("Initiating graceful disconnect")
        self._running = False
        if self._ws:
            self._ws.close()

5.3 Usage Example

import os

def main():
    api_key = os.environ.get("TICKDB_API_KEY")
    if not api_key:
        raise ValueError("Set TICKDB_API_KEY environment variable")
    
    def on_market_data(data: dict):
        """Called whenever fresh market data arrives"""
        symbol = data.get("symbol")
        price = data.get("last")
        ts = data.get("timestamp")
        print(f"[{ts}] {symbol}: ${price}")
    
    def on_status(status: str):
        """Called when connection state changes"""
        print(f"[STATUS] {status}")
    
    client = TickDBWebSocketClient(
        api_key=api_key,
        on_data=on_market_data,
        on_status_change=on_status
    )
    
    # Connect and subscribe to US tech stocks
    client.connect(["AAPL.US", "NVDA.US", "MSFT.US"])
    
    # Keep main thread alive
    try:
        while True:
            time.sleep(1)
    except KeyboardInterrupt:
        print("Shutting down...")
        client.disconnect()

if __name__ == "__main__":
    main()

Part 6: Monitoring and Alerting Strategy

6.1 Key Metrics to Track

For a production WebSocket monitoring system, expose these metrics:

Metric Description Alert threshold
ws.connection_age_seconds Time since last successful connect
ws.last_data_age_seconds Seconds since most recent data > 30s
ws.pong_age_seconds Seconds since last pong response > 20s
ws.reconnection_count Number of reconnection attempts > 5 per hour
ws.stale_data_count Number of rejected stale messages > 0
ws.messages_per_second Incoming message throughput < 0.1 for 10+ seconds

6.2 Alert Tiers

Tier Trigger condition Action
Warning Pong age > 15s Log, increase monitoring frequency
Warning Data gap > 20s Log, begin reconnection preparation
Critical Pong age > 30s Trigger immediate reconnection, alert
Critical 3+ consecutive reconnection failures Alert + pause strategy
Info Successful reconnection after failure Log + notify (no human alert needed)

6.3 Structured Logging for Post-Incident Analysis

Every state transition should be logged with structured context:

def log_connection_event(event_type: str, conn_id: int, **context):
    """
    Structured logging for connection events.
    Use this for post-incident reconstruction.
    """
    log_entry = {
        "event": event_type,
        "conn_id": conn_id,
        "timestamp": int(time.time() * 1000),
        **context
    }
    logger.info(json.dumps(log_entry))

# Usage examples:
log_connection_event("connect_success", conn_id, endpoint="wss://stream.tickdb.ai/ws")
log_connection_event("health_check_fail", conn_id, reason="pong_timeout", pong_age=25.3)
log_connection_event("reconnect_start", conn_id, attempt=3, delay=8.5)
log_connection_event("reconnect_success", conn_id, attempt=3, duration_seconds=2.1)
log_connection_event("stale_data_rejected", conn_id, age_ms=45000, symbol="AAPL.US")

Closing: The Cost of Silent Failures

A strategy that stops silently isn't just missing opportunities—it's a liability that masquerades as normal operation. You won't know something is wrong until the P&L report arrives, or until you replay the day's tape and realize you were flat during the biggest move of the week.

The framework in this article—heartbeat monitoring, timestamp validation, error-aware reconnection—transforms silent failures from "invisible until it's too late" to "detected and resolved within seconds."

The code provided here is production-ready. It handles the edge cases: clock skew, packet reordering, thundering herd prevention, auth failure fast-failing, and structured logging for post-incident forensics.

If you're running a live strategy without this monitoring layer, you're flying blind. The market doesn't wait for you to notice you stopped receiving data.


Next Steps

If you're building a new live trading system, integrate the TickDBWebSocketClient from this article as your data ingestion layer. Set TICKDB_API_KEY in your environment and copy the client class into your project. The health monitor runs in a background thread, so it won't block your strategy loop.

If you're already running a strategy but without this monitoring, instrument your existing WebSocket client with the WebSocketHeartbeat and DataTimestampValidator classes. The incremental cost is minimal—the defensive value is substantial.

If you need historical data to backtest against before going live, TickDB provides 10+ years of cleaned US equity OHLCV data via the /v1/market/kline endpoint. Use the same API key you use for streaming.

If your strategy requires deep order book data for microstructure analysis, the depth channel on TickDB provides real-time order book updates for US equities (L1), HK equities (up to L10), and crypto (up to L10). See the documentation for channel-specific data schemas.


This article does not constitute investment advice. Markets involve risk; past performance does not guarantee future results. WebSocket connections depend on network stability; no monitoring system can eliminate all risk of data loss.