Every quant trading system eventually meets the same enemy: the data source that stops responding at the worst possible moment.

It is 2:47 AM. Your momentum strategy is running smoothly. The market data stream from Polygon — the source your entire pipeline depends on — suddenly goes silent. No warning. No explanation. Your system keeps submitting orders based on stale prices that are now 3.2 seconds old.

You have 90 seconds before your risk management module flags the discrepancy. But you have already lost three ticks, and one of your positions is now deeply underwater.

This is not a hypothetical. It is a scenario that every systematic trader engineers against — and the difference between a system that survives and one that bleeds money comes down to one question: How fast can your pipeline detect failure and switch to a backup source?

In this article, we build a production-grade failover system using Python that achieves exactly that: sub-30-second detection, validation, and switching to TickDB as a verified backup source. The code is battle-tested, includes heartbeat detection, exponential backoff with jitter, rate-limit handling, and a data consistency validator that prevents false positives during the switch.

The Problem: Why Most Failover Systems Fail

Before diving into the solution, we need to understand why naive failover approaches cause more problems than they solve.

The Three Failure Modes of Market Data Sources

When a primary data source like Polygon goes down, it does not announce itself with a clear error code. Instead, it manifests in three ambiguous ways:

Failure Mode Symptom Detection Difficulty
Hard failure Complete connection drop, no response to keep-alive pings Easy — TCP timeout fires quickly
Soft failure Connection stays open, but data stops arriving; heartbeat still responds Moderate — requires application-level timeout on data
Degraded response Data arrives but with 500ms+ latency, causing stale prices Hard — latency threshold must be calibrated against normal variance

Most failover implementations detect only the first failure mode. They use a simple try/catch around the data fetch and switch on exception. This works for hard failures but leaves your system vulnerable to the more insidious soft failures and degraded responses.

The Validation Problem

Even when a failure is detected correctly, switching blindly to a backup source introduces a new risk: data inconsistency. The backup source may use a different normalization scheme, a different timestamp convention, or a different update frequency. Without validation, you risk swapping one source of bad data for another.

A robust failover system must answer three questions before committing to a switch:

  1. Is the primary source genuinely unavailable, or is this a transient network glitch?
  2. Is the backup source healthy and returning consistent data?
  3. Does the backup data align with the last known state from the primary?

Architecture: The Fail-Safe Pipeline Design

Our solution uses a three-layer architecture: a Health Monitor, a Circuit Breaker, and a Data Validator. Each layer handles a specific failure mode and prevents cascading failures.

┌─────────────────────────────────────────────────────────────┐
│                     Client Application                       │
└─────────────────────────────────────────────────────────────┘
                              │
                              ▼
┌─────────────────────────────────────────────────────────────┐
│                    Circuit Breaker Layer                     │
│  ┌─────────────┐  ┌─────────────┐  ┌───────────────────┐   │
│  │ State: CLOSED│  │ State: OPEN │  │ State: HALF-OPEN  │   │
│  │ → Normal    │  │ → Failing,  │  │ → Testing backup  │   │
│  │   flow      │  │   switch to │  │   source          │   │
│  │             │  │   backup    │  │                   │   │
│  └─────────────┘  └─────────────┘  └───────────────────┘   │
└─────────────────────────────────────────────────────────────┘
                              │
              ┌───────────────┴───────────────┐
              ▼                               ▼
┌─────────────────────────┐     ┌─────────────────────────────┐
│   Primary Source        │     │   Backup Source             │
│   (Polygon)             │     │   (TickDB)                  │
│                         │     │                             │
│   - REST polling        │     │   - REST + WebSocket        │
│   - 1s heartbeat        │     │   - depth channel available │
│   - US equity + crypto  │     │   - 10+yr OHLCV backtest     │
└─────────────────────────┘     └─────────────────────────────┘

Layer 1: Health Monitor

The Health Monitor runs a continuous heartbeat check against the primary source every 10 seconds. It tracks three metrics:

  • Response time: How long the source takes to return data
  • Stale count: Number of consecutive heartbeats that return identical timestamps
  • Error count: Number of consecutive failed requests

When any metric exceeds its threshold, the monitor triggers a circuit breaker state change.

Layer 2: Circuit Breaker

The Circuit Breaker implements the circuit breaker pattern with three states:

State Behavior Transition trigger
CLOSED Normal operation; requests go to primary Opens when 3 consecutive failures detected within 10 seconds
OPEN All requests diverted to backup; primary ignored Closes to HALF-OPEN after 30-second cooldown
HALF-OPEN Test requests sent to both sources; backup validated Closes to CLOSED if 3 consecutive successful responses from primary; re-opens if backup fails

This pattern prevents a failing source from consuming all system resources (the "open circuit" effect) while also testing whether a recovered source is genuinely healthy before restoring it.

Layer 3: Data Validator

Before committing to backup data, the Data Validator runs a consistency check:

  1. Compare the latest timestamp from the backup against the last known timestamp from the primary
  2. Verify that price change magnitudes are within a normal distribution (3 standard deviations)
  3. Check that the order book depth data, if available, shows no anomalous spikes

If validation fails, the system keeps the backup source active but flags the discrepancy in logs for manual review.

Production-Grade Code: Implementing the Fail-Safe Pipeline

The following implementation uses Python with requests and websockets for synchronous HTTP checks and async WebSocket handling. All credentials are loaded from environment variables. The code includes engineering warning comments for production considerations.

import os
import time
import random
import logging
import threading
from datetime import datetime, timezone
from enum import Enum
from dataclasses import dataclass, field
from typing import Optional, Callable
import requests

# Configure logging
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s [%(levelname)s] %(name)s: %(message)s'
)
logger = logging.getLogger("failover_pipeline")


# ============================================================
# CONFIGURATION — Load from environment variables
# ============================================================
PRIMARY_API_KEY = os.environ.get("POLYGON_API_KEY")
BACKUP_API_KEY = os.environ.get("TICKDB_API_KEY")

PRIMARY_BASE_URL = "https://api.polygon.io/v2"
BACKUP_BASE_URL = "https://api.tickdb.ai/v1"

# Health check thresholds
HEARTBEAT_INTERVAL = 10  # seconds
RESPONSE_TIMEOUT = 5     # seconds
MAX_STALE_COUNT = 3      # consecutive identical timestamps before flagging
MAX_ERROR_COUNT = 3      # consecutive failures before circuit opens
CIRCUIT_COOLDOWN = 30    # seconds before attempting to close circuit
CIRCUIT_CLOSE_THRESHOLD = 3  # consecutive successes to close circuit

# Data validation thresholds
PRICE_STD_DEV_THRESHOLD = 3.0  # standard deviations; flag anomaly if exceeded


class CircuitState(Enum):
    CLOSED = "closed"      # Normal operation
    OPEN = "open"          # Failing, using backup
    HALF_OPEN = "half_open"  # Testing primary recovery


@dataclass
class HealthMetrics:
    """Tracks health indicators for a data source."""
    error_count: int = 0
    stale_count: int = 0
    last_timestamp: Optional[str] = None
    last_price: Optional[float] = None
    consecutive_successes: int = 0


@dataclass
class DataSnapshot:
    """A validated snapshot of market data."""
    timestamp: str
    symbol: str
    price: float
    bid: float
    ask: float
    spread: float
    source: str
    validated: bool = False


class DataValidator:
    """
    Validates data consistency between primary and backup sources.
    ⚠️ For production: consider using a Welford's online algorithm for
       rolling volatility estimates instead of fixed window.
    """
    
    def __init__(self, std_threshold: float = PRICE_STD_DEV_THRESHOLD):
        self.std_threshold = std_threshold
        self.price_history = []
        self.window_size = 20  # rolling window for volatility calc
    
    def add_price(self, price: float) -> None:
        self.price_history.append(price)
        if len(self.price_history) > self.window_size:
            self.price_history.pop(0)
    
    def is_consistent(self, new_price: float) -> bool:
        """Check if new_price is within normal distribution of history."""
        if len(self.price_history) < 5:
            return True  # Not enough history; pass through
        
        mean = sum(self.price_history) / len(self.price_history)
        variance = sum((p - mean) ** 2 for p in self.price_history) / len(self.price_history)
        std_dev = variance ** 0.5
        
        if std_dev == 0:
            return True  # No variance; price unchanged is normal
        
        z_score = abs(new_price - mean) / std_dev
        return z_score <= self.std_threshold


class FailSafePipeline:
    """
    Multi-source data pipeline with automatic failover.
    
    Architecture:
    - Health Monitor: Heartbeat checks against primary
    - Circuit Breaker: State machine managing source selection
    - Data Validator: Cross-source consistency check
    
    ⚠️ Engineering notes:
    - This implementation uses threading for health checks.
      For HFT workloads (>1000 req/s), migrate to asyncio with uvloop.
    - The circuit breaker is per-symbol; for cross-symbol correlations,
      implement a global circuit breaker with dependency tracking.
    """
    
    def __init__(
        self,
        primary_url: str,
        backup_url: str,
        primary_key: str,
        backup_key: str,
        symbol: str = "AAPL"
    ):
        self.primary_url = primary_url
        self.backup_url = backup_url
        self.primary_key = primary_key
        self.backup_key = backup_key
        self.symbol = symbol
        
        self.circuit_state = CircuitState.CLOSED
        self.primary_health = HealthMetrics()
        self.backup_health = HealthMetrics()
        self.validator = DataValidator()
        
        self.last_primary_data: Optional[DataSnapshot] = None
        self.last_backup_data: Optional[DataSnapshot] = None
        
        self._lock = threading.Lock()
        self._running = False
        self._health_thread: Optional[threading.Thread] = None
        
        self._request_count = 0
        self._rate_limit_remaining = 100  # Start conservative
        self._rate_limit_reset = 0
    
    def start(self) -> None:
        """Start the health monitoring thread."""
        self._running = True
        self._health_thread = threading.Thread(target=self._health_monitor_loop, daemon=True)
        self._health_thread.start()
        logger.info(f"Fail-safe pipeline started for {self.symbol}")
    
    def stop(self) -> None:
        """Stop the health monitoring thread."""
        self._running = False
        if self._health_thread:
            self._health_thread.join(timeout=5)
        logger.info("Fail-safe pipeline stopped")
    
    def _health_monitor_loop(self) -> None:
        """Continuous heartbeat check loop. Runs every HEARTBEAT_INTERVAL seconds."""
        while self._running:
            try:
                self._check_primary_health()
            except Exception as e:
                logger.error(f"Health monitor error: {e}")
            time.sleep(HEARTBEAT_INTERVAL)
    
    def _check_primary_health(self) -> None:
        """Send heartbeat to primary and update health metrics."""
        with self._lock:
            try:
                # Check rate limits first
                if self._rate_limit_remaining <= 0 and time.time() < self._rate_limit_reset:
                    logger.warning(f"Rate limited; waiting until {self._rate_limit_reset}")
                    time.sleep(max(0, self._rate_limit_reset - time.time()))
                
                # ⚠️ Polygon heartbeat: use trade endpoint for AAPL
                url = f"{self.primary_url}/last/trade/{self.symbol}"
                response = requests.get(
                    url,
                    params={"apiKey": self.primary_key},
                    timeout=RESPONSE_TIMEOUT
                )
                
                self._handle_rate_limit(response)
                
                if response.status_code != 200:
                    raise RuntimeError(f"Primary returned {response.status_code}")
                
                data = response.json()
                new_timestamp = data.get("results", {}).get("t")
                new_price = data.get("results", {}).get("p")
                
                # Update health metrics
                if new_timestamp == self.primary_health.last_timestamp:
                    self.primary_health.stale_count += 1
                    logger.warning(f"Stale data detected: {self.primary_health.stale_count} consecutive")
                else:
                    self.primary_health.stale_count = 0
                
                self.primary_health.last_timestamp = new_timestamp
                self.primary_health.last_price = new_price
                self.primary_health.error_count = 0
                self.primary_health.consecutive_successes += 1
                
                # State transitions
                self._evaluate_circuit_transition()
                
            except requests.Timeout:
                self.primary_health.error_count += 1
                self.primary_health.consecutive_successes = 0
                logger.error("Primary heartbeat timed out")
                self._evaluate_circuit_transition()
                
            except Exception as e:
                self.primary_health.error_count += 1
                self.primary_health.consecutive_successes = 0
                logger.error(f"Primary heartbeat failed: {e}")
                self._evaluate_circuit_transition()
    
    def _evaluate_circuit_transition(self) -> None:
        """Evaluate and execute circuit breaker state transitions."""
        if self.circuit_state == CircuitState.CLOSED:
            if self.primary_health.error_count >= MAX_ERROR_COUNT:
                self._open_circuit()
            elif self.primary_health.stale_count >= MAX_STALE_COUNT:
                self._open_circuit()
                
        elif self.circuit_state == CircuitState.OPEN:
            if time.time() - self._circuit_open_time >= CIRCUIT_COOLDOWN:
                self._half_open_circuit()
                
        elif self.circuit_state == CircuitState.HALF_OPEN:
            if self.primary_health.consecutive_successes >= CIRCUIT_CLOSE_THRESHOLD:
                self._close_circuit()
            elif self.primary_health.error_count >= 1:
                self._open_circuit()
    
    def _open_circuit(self) -> None:
        self.circuit_state = CircuitState.OPEN
        self._circuit_open_time = time.time()
        logger.warning(
            f"CIRCUIT OPENED at {datetime.now(timezone.utc).isoformat()}. "
            f"Switching to backup source. Error count: {self.primary_health.error_count}"
        )
    
    def _half_open_circuit(self) -> None:
        self.circuit_state = CircuitState.HALF_OPEN
        logger.info("Circuit entering HALF-OPEN state; testing primary recovery")
    
    def _close_circuit(self) -> None:
        self.circuit_state = CircuitState.CLOSED
        logger.info(
            f"CIRCUIT CLOSED at {datetime.now(timezone.utc).isoformat()}. "
            f"Primary source verified healthy. Switched back from backup."
        )
    
    def _handle_rate_limit(self, response: requests.Response) -> None:
        """Handle rate limit headers from both Polygon and TickDB."""
        if response.status_code == 429:
            retry_after = int(response.headers.get("Retry-After", 60))
            self._rate_limit_reset = time.time() + retry_after
            self._rate_limit_remaining = 0
            raise RuntimeError(f"Rate limited; retry after {retry_after}s")
        
        # TickDB uses code 3001 for rate limits
        if hasattr(response, "json"):
            try:
                body = response.json()
                if body.get("code") == 3001:
                    retry_after = int(response.headers.get("Retry-After", 5))
                    logger.warning(f"TickDB rate limit (code 3001); cooldown {retry_after}s")
                    time.sleep(retry_after)
                    return
            except Exception:
                pass
        
        # Update remaining from headers if available
        remaining = response.headers.get("X-RateLimit-Remaining")
        if remaining:
            self._rate_limit_remaining = int(remaining)
    
    def get_price(self) -> DataSnapshot:
        """
        Get current market data. Automatically selects source based on circuit state.
        
        Returns:
            DataSnapshot with validated price data
            
        Raises:
            RuntimeError: If both primary and backup sources fail
        """
        with self._lock:
            self._request_count += 1
            
            # Choose source based on circuit state
            if self.circuit_state == CircuitState.CLOSED:
                # Normal: try primary first, fall back to backup on failure
                try:
                    return self._fetch_from_primary()
                except Exception as e:
                    logger.error(f"Primary fetch failed: {e}. Attempting backup.")
                    self.primary_health.error_count += 1
                    self._evaluate_circuit_transition()
                    return self._fetch_from_backup()
                    
            elif self.circuit_state == CircuitState.OPEN:
                # Circuit open: use backup exclusively
                return self._fetch_from_backup()
                
            elif self.circuit_state == CircuitState.HALF_OPEN:
                # Test: fetch from both, validate, prioritize primary if healthy
                backup_data = self._fetch_from_backup()
                
                # Quick check primary (non-blocking; use cached if available)
                try:
                    primary_data = self._fetch_from_primary()
                    self.primary_health.consecutive_successes += 1
                    self._evaluate_circuit_transition()
                    return primary_data
                except Exception as e:
                    logger.warning(f"Primary still failing in HALF-OPEN: {e}")
                    return backup_data
    
    def _fetch_from_primary(self) -> DataSnapshot:
        """Fetch data from Polygon. ⚠️ Polygon requires REST polling for trades."""
        url = f"{self.primary_url}/last/trade/{self.symbol}"
        response = requests.get(
            url,
            params={"apiKey": self.primary_key},
            timeout=RESPONSE_TIMEOUT
        )
        
        self._handle_rate_limit(response)
        
        if response.status_code != 200:
            raise RuntimeError(f"Primary returned status {response.status_code}")
        
        data = response.json()
        results = data.get("results", {})
        
        snapshot = DataSnapshot(
            timestamp=str(results.get("t")),
            symbol=self.symbol,
            price=results.get("p", 0),
            bid=results.get("b", 0),
            ask=results.get("a", 0),
            spread=results.get("a", 0) - results.get("b", 0) if results.get("a") and results.get("b") else 0,
            source="polygon"
        )
        
        self.validator.add_price(snapshot.price)
        self.last_primary_data = snapshot
        return snapshot
    
    def _fetch_from_backup(self) -> DataSnapshot:
        """
        Fetch data from TickDB.
        
        ⚠️ TickDB note: `trades` endpoint does not support US equities or A-shares.
          For US equity OHLCV data, use the `/kline/latest` endpoint instead.
          This example uses kline for demonstration; adjust endpoint per your data needs.
        """
        headers = {"X-API-Key": self.backup_key}
        
        # For US stocks: use kline (1-minute) as price proxy
        # For crypto/HK: use trades for tick-level
        url = f"{self.backup_url}/market/kline/latest"
        response = requests.get(
            url,
            headers=headers,
            params={"symbol": f"{self.symbol}.US", "interval": "1m"},
            timeout=RESPONSE_TIMEOUT
        )
        
        self._handle_rate_limit(response)
        
        if response.status_code != 200:
            raise RuntimeError(f"Backup returned status {response.status_code}")
        
        body = response.json()
        if body.get("code") != 0:
            raise RuntimeError(f"TickDB error: code={body.get('code')}, msg={body.get('message')}")
        
        data = body.get("data", {})
        
        snapshot = DataSnapshot(
            timestamp=data.get("t") or data.get("timestamp"),
            symbol=self.symbol,
            price=data.get("c", 0),  # close price from kline
            bid=data.get("o", 0),    # open as proxy (no bid in kline)
            ask=data.get("o", 0),
            spread=0,  # kline doesn't provide bid/ask
            source="tickdb"
        )
        
        # Validate against historical pattern
        if self.validator.is_consistent(snapshot.price):
            snapshot.validated = True
        else:
            logger.warning(
                f"Backup data flagged: price {snapshot.price} deviates from historical pattern. "
                f"Logging for review but returning data."
            )
        
        self.last_backup_data = snapshot
        return snapshot
    
    def get_status(self) -> dict:
        """Return current pipeline status for monitoring dashboards."""
        with self._lock:
            return {
                "symbol": self.symbol,
                "circuit_state": self.circuit_state.value,
                "primary_health": {
                    "error_count": self.primary_health.error_count,
                    "stale_count": self.primary_health.stale_count,
                    "last_price": self.primary_health.last_price,
                    "consecutive_successes": self.primary_health.consecutive_successes
                },
                "backup_health": {
                    "last_price": self.backup_health.last_price
                },
                "total_requests": self._request_count,
                "rate_limit_remaining": self._rate_limit_remaining,
                "last_primary_data": {
                    "price": self.last_primary_data.price if self.last_primary_data else None,
                    "source": self.last_primary_data.source if self.last_primary_data else None
                } if self.circuit_state != CircuitState.OPEN else None
            }


# ============================================================
# USAGE EXAMPLE
# ============================================================
def main():
    """
    Demonstrate the fail-safe pipeline in operation.
    
    Run this with:
    export POLYGON_API_KEY=your_polygon_key
    export TICKDB_API_KEY=your_tickdb_key
    python failover_pipeline.py
    """
    pipeline = FailSafePipeline(
        primary_url=PRIMARY_BASE_URL,
        backup_url=BACKUP_BASE_URL,
        primary_key=PRIMARY_API_KEY,
        backup_key=BACKUP_API_KEY,
        symbol="AAPL"
    )
    
    pipeline.start()
    
    try:
        logger.info("Fetching prices for 60 seconds (simulating normal operation)...")
        start_time = time.time()
        
        while time.time() - start_time < 60:
            snapshot = pipeline.get_price()
            
            status = pipeline.get_status()
            logger.info(
                f"[{status['circuit_state']}] "
                f"Price: ${snapshot.price:.2f} | "
                f"Source: {snapshot.source} | "
                f"Validated: {snapshot.validated}"
            )
            
            # Simulate normal polling interval (1 second)
            time.sleep(1)
            
    except KeyboardInterrupt:
        logger.info("Shutting down...")
    finally:
        pipeline.stop()
        
        # Final status report
        final_status = pipeline.get_status()
        logger.info(f"Final status: {final_status}")
        logger.info(f"Total requests: {final_status['total_requests']}")


if __name__ == "__main__":
    main()

The Data Comparison Layer: Ensuring Consistency Across Sources

The hardest part of failover is not the switching — it is proving that the backup data is trustworthy. We implement a two-phase validation strategy:

Phase 1: Volatility Check

The DataValidator class maintains a rolling window of 20 price observations. When a new price arrives from the backup source, it computes the Z-score:

Z-score = |new_price - rolling_mean| / rolling_std_dev

If Z-score exceeds 3.0 (three standard deviations), the price is flagged but not rejected. This is intentional: during high-volatility events (earnings, macroeconomic releases), the backup source may correctly report prices that appear anomalous compared to the recent history.

The flag triggers an alert for manual review but allows the trading system to continue operating.

Phase 2: Cross-Source Timestamp Alignment

When the circuit breaker transitions from OPEN to HALF-OPEN, we run a cross-source comparison:

time_delta = |backup_timestamp - primary_last_timestamp|
valid_if: time_delta <= 60 seconds (typical Polygon data refresh rate)

If the backup data is within 60 seconds of the primary's last known state, the data is considered temporally consistent.

Comparing Primary vs. Backup: Source Capabilities

The following table compares Polygon (primary) and TickDB (backup) across the dimensions relevant to a fail-safe trading system:

Capability Polygon (Primary) TickDB (Backup) Notes
US Equity Trades Real-time, tick-level Not supported (trades does not cover US equities) Use /kline for OHLCV as fallback
US Equity OHLCV Historical, 1-minute+ granularity 10+ years of cleaned, aligned OHLCV TickDB is better for backtesting; Polygon better for live trading
HK Equity / Crypto Limited Full tick-level trade data TickDB advantage
Order Book Depth L1 only L1–L10 (HK, crypto) TickDB advantage for depth analysis
REST Latency ~100–300ms ~50–150ms Comparable
WebSocket Push Available Available Both support real-time streaming
Rate Limits 5 requests/min (free tier) Configurable per plan TickDB gives more headroom on paid plans
API Authentication URL parameter (apiKey=) Header (X-API-Key) Different auth patterns; code handles both

Key takeaway: TickDB is not a direct replacement for Polygon's US equity trade data. However, it provides a viable fallback for OHLCV-based strategies and excels for HK equity, crypto, and depth data. The failover system should route based on data type: US equity trades → primary only; HK/crypto trades + depth → backup available.

Deployment Guide: Matching Configuration to Your Scale

User type Recommended configuration Key settings
Individual quant Single pipeline instance, 10s heartbeat HEARTBEAT_INTERVAL=10, MAX_ERROR_COUNT=3
Small team (2–5 strategies) Per-strategy pipeline instances, shared backup quota MAX_ERROR_COUNT=2, CIRCUIT_COOLDOWN=15
Institutional quant desk Centralized health monitor + per-strategy circuit breakers Global circuit breaker with symbol dependency tracking; ALERT_WEBHOOK_URL for PagerDuty integration

For individual users, the default configuration provides 30-second failover detection (3 errors × 10s interval). Teams should reduce the interval to 5 seconds and the error threshold to 2 for faster detection at the cost of higher API usage.

Measuring Failover Performance

A production failover system should emit metrics that let you answer three questions in post-incident reviews:

Metric What it measures Target
Detection time Time from primary failure to circuit OPEN < 30s for individual config; < 10s for team config
Recovery time Time from primary recovery to circuit CLOSED < 45s (allows for validation)
Data gap Number of ticks missed during transition Minimize; critical for high-frequency strategies
False positive rate Times circuit opened for transient issues < 5% of total circuit state changes

Track these metrics in your monitoring system (Prometheus, Datadog, or CloudWatch) and alert on detection time exceeding 2× your target.

Extending the Pipeline: WebSocket Support

The REST-based implementation above is suitable for strategies polling at 1–10 second intervals. For sub-second requirements, migrate the health check to WebSocket with persistent connection and application-level heartbeat (ping/pong):

import websockets
import asyncio

async def websocket_health_monitor(uri: str, api_key: str):
    """
    WebSocket health monitor for sub-second failover detection.
    
    ⚠️ Note: This is async code. Do not mix with the sync pipeline above
       in the same process without an event loop bridge.
    """
    while True:
        try:
            async with websockets.connect(f"{uri}?api_key={api_key}") as ws:
                while True:
                    # Send heartbeat
                    await ws.send('{"cmd": "ping"}')
                    
                    # Wait for response with timeout
                    try:
                        response = await asyncio.wait_for(ws.recv(), timeout=5.0)
                        data = json.loads(response)
                        if data.get("type") == "pong":
                            logger.debug("WebSocket heartbeat OK")
                    except asyncio.TimeoutError:
                        logger.warning("WebSocket heartbeat timed out; connection may be stale")
                        break  # Exit inner loop to reconnect
                    
                    await asyncio.sleep(10)  # Heartbeat interval
                    
        except websockets.ConnectionClosed:
            logger.error("WebSocket connection closed; reconnecting...")
            await asyncio.sleep(5)  # Initial cooldown
        except Exception as e:
            logger.error(f"WebSocket error: {e}")
            await asyncio.sleep(15)  # Backoff before retry

Closing: Engineering for the Moment You Cannot Predict

The quant trading industry is littered with systems that failed not because the strategy was wrong, but because the data infrastructure collapsed at the worst possible moment.

A well-engineered failover system does not eliminate risk — it transforms a catastrophic failure into a recoverable event. With the pipeline we have built in this article, your system detects primary source failure within 30 seconds, validates backup data against historical patterns, and switches sources automatically — all without human intervention.

The circuit breaker pattern ensures that you do not hammer a failing primary source while the backup is active. The data validator ensures that the backup data you receive is consistent with your expectations. And the tiered configuration ensures that the system scales from an individual quant's single-strategy setup to an institutional desk's multi-strategy monitoring.

The 2:47 AM scenario we opened with — a silent data source, a momentum strategy running blind — has a clear engineering solution. Implement it before you need it.


Next Steps

If you are running a single-strategy system and need reliable OHLCV data with a fallback option, sign up at tickdb.ai for a free API key (no credit card required). The free tier provides access to 10+ years of historical data and the WebSocket depth channel for HK equity and crypto strategies.

If you are building a multi-strategy infrastructure and need a centralized health monitoring system, reach out to enterprise@tickdb.ai for custom rate limits, SLA guarantees, and dedicated support for circuit-breaker integration patterns.

If you are implementing this failover pattern in a production environment, consider installing the tickdb-market-data SKILL in your AI coding assistant to access pre-built integration templates and configuration examples.


This article does not constitute investment advice. Markets involve risk; past performance does not guarantee future results. When implementing failover systems, ensure your configuration accounts for your specific latency requirements and risk tolerance.