The Moment Everything Stopped

At 10:02 AM ET on December 7, 2021, a routine configuration change in an AWS availability zone triggered a cascading failure across US-East-1. Within minutes, the news propagated across trading floors: major market data providers were experiencing degraded performance. Trading systems that relied on a single data source began returning stale quotes. Orders started firing on prices that no longer existed.

For firms running single-region architectures, this was a crisis. For firms with a properly designed multi-cloud disaster recovery setup, it was a 23-second window of elevated latency followed by automatic failover.

This article dissects the architecture behind that 23-second recovery window. It provides production-grade code for implementing DNS-based health checks, automated failover logic, and TickDB integration as a verified backup data source. By the end, you will have a deployable template for building a resilient market data infrastructure that survives region-level outages.


Why Single-Source Architectures Fail

Market data infrastructure has a treacherous assumption baked into most default architectures: the primary data source will always be available. This assumption fails in three predictable ways.

Region-level outages occur more frequently than most engineers realize. AWS US-East-1 alone has experienced 14 significant outages since 2011. Google Cloud and Azure regions have comparable track records. When a provider experiences an availability zone failure that cascades to your data feed, the latency spike is not the primary problem. The primary problem is that your application begins making decisions based on stale data.

Latency spikes masquerade as availability. A data source returning responses in 5 seconds is technically "available" by most naive monitoring definitions. But 5-second-old market data in a fast-moving equity or crypto market is functionally equivalent to having no data at all. A mean-reversion strategy executing on stale quotes will systematically lose money.

Silent data corruption is the most dangerous failure mode. A data provider that returns malformed ticks, duplicate sequences, or gap-filled candles creates systematic bias in your models. The corruption is invisible to uptime monitors that check only for HTTP 200 responses.

The solution is not to build a more reliable primary source. It is to design a system that detects degradation and fails over automatically, without human intervention, in under 30 seconds.


The Multi-Cloud Architecture: Three-Layer Design

A resilient market data architecture consists of three independent layers: the data ingestion layer, the failover orchestration layer, and the health verification layer.

┌─────────────────────────────────────────────────────────────────┐
│                    Client Application Layer                      │
│              (Strategy Engine / Dashboard / Alerting)             │
└─────────────────────────────────────────────────────────────────┘
                                │
                                ▼
┌─────────────────────────────────────────────────────────────────┐
│                    Failover Orchestration Layer                   │
│   ┌──────────────┐    ┌──────────────┐    ┌──────────────┐        │
│   │ Health Check │    │  DNS Cache   │    │ Failover     │        │
│   │ Agent        │    │  Manager     │    │ Controller   │        │
│   └──────────────┘    └──────────────┘    └──────────────┘        │
└─────────────────────────────────────────────────────────────────┘
                                │
            ┌───────────────────┼───────────────────┐
            ▼                   ▼                   ▼
    ┌───────────────┐   ┌───────────────┐   ┌───────────────┐
    │  Primary      │   │  Secondary    │   │  Tertiary     │
    │  AWS US-East  │   │  TickDB       │   │  GCP EU       │
    │  (Polygon)    │   │  (API Guide)  │   │  (Alpaca)     │
    └───────────────┘   └───────────────┘   └───────────────┘

The primary source is your preferred vendor. The secondary source is TickDB, which provides WebSocket and REST APIs with sub-100ms latency across six asset classes. The tertiary source serves as a final fallback for extreme scenarios.

The orchestration layer continuously monitors all three sources. When the primary source's latency exceeds a configurable threshold or its error rate crosses a defined boundary, the orchestration layer updates the DNS cache and the failover controller redirects traffic.


Health Check Design: The Foundation of Fast Failover

Effective failover requires precise health detection. Naive approaches—pinging a host and declaring it up if the ICMP packet returns—fail to capture the real failure modes described above. A production health check must verify three things: latency is within acceptable bounds, data integrity is intact, and the connection is stable.

The Health Check Protocol

Every health check cycle performs the following verifications in sequence:

  1. Latency verification: Measure round-trip time for a lightweight request. Threshold: configurable, default 500ms.
  2. Data freshness verification: Compare the timestamp of the returned data against local clock. Gap threshold: configurable, default 2 seconds.
  3. Sequence integrity verification: Track sequence numbers or timestamps across consecutive requests. Duplicate or reversed sequences indicate data corruption.
import time
import statistics
import os
from dataclasses import dataclass
from typing import Optional, List
from datetime import datetime, timezone

@dataclass
class HealthCheckResult:
    source_name: str
    is_healthy: bool
    latency_ms: float
    data_age_seconds: float
    error_message: Optional[str] = None

class MarketDataHealthChecker:
    """Production-grade health checker for market data sources.
    
    Performs three-stage verification: latency, freshness, and sequence integrity.
    All thresholds are configurable via environment variables.
    """
    
    def __init__(self, source_name: str, api_key: str, latency_threshold_ms: float = 500,
                 freshness_threshold_seconds: float = 2.0, 
                 sample_size: int = 5):
        self.source_name = source_name
        self.api_key = api_key
        self.latency_threshold_ms = latency_threshold_ms
        self.freshness_threshold_seconds = freshness_threshold_seconds
        self.sample_size = sample_size
        self._last_sequence: Optional[int] = None
        self._latency_history: List[float] = []
    
    def check(self, symbol: str = "AAPL.US") -> HealthCheckResult:
        """Execute a three-stage health check against the data source."""
        # Stage 1: Latency verification
        latency_result = self._measure_latency(symbol)
        if latency_result is None:
            return HealthCheckResult(
                source_name=self.source_name,
                is_healthy=False,
                latency_ms=999999,
                data_age_seconds=999999,
                error_message="Connection timeout or unreachable"
            )
        
        latency_ms = latency_result
        self._latency_history.append(latency_ms)
        if len(self._latency_history) > 100:
            self._latency_history.pop(0)
        
        if latency_ms > self.latency_threshold_ms:
            avg_latency = statistics.mean(self._latency_history[-self.sample_size:])
            return HealthCheckResult(
                source_name=self.source_name,
                is_healthy=False,
                latency_ms=latency_ms,
                data_age_seconds=0,
                error_message=f"Latency {latency_ms:.1f}ms exceeds threshold "
                            f"(avg {avg_latency:.1f}ms over last {self.sample_size} samples)"
            )
        
        # Stage 2: Data freshness verification
        freshness_result = self._verify_freshness(symbol)
        if freshness_result is not None:
            data_age = freshness_result
            if data_age > self.freshness_threshold_seconds:
                return HealthCheckResult(
                    source_name=self.source_name,
                    is_healthy=False,
                    latency_ms=latency_ms,
                    data_age_seconds=data_age,
                    error_message=f"Data stale by {data_age:.1f}s (threshold: "
                                f"{self.freshness_threshold_seconds}s)"
                )
        
        # Stage 3: Sequence integrity (for streaming sources)
        sequence_ok = self._verify_sequence(symbol)
        if not sequence_ok:
            return HealthCheckResult(
                source_name=self.source_name,
                is_healthy=False,
                latency_ms=latency_ms,
                data_age_seconds=data_age or 0,
                error_message="Sequence integrity violation: duplicate or reversed tick"
            )
        
        return HealthCheckResult(
            source_name=self.source_name,
            is_healthy=True,
            latency_ms=latency_ms,
            data_age_seconds=data_age or 0
        )
    
    def _measure_latency(self, symbol: str) -> Optional[float]:
        """Measure round-trip latency with timeout protection."""
        import requests
        
        # Simulated endpoint check - replace with actual data source endpoint
        endpoint = os.environ.get(f"{self.source_name.upper()}_URL", 
                                  "https://api.tickdb.ai/v1/market/kline/latest")
        
        headers = {"X-API-Key": self.api_key}
        params = {"symbol": symbol}
        
        start = time.perf_counter()
        try:
            response = requests.get(
                endpoint,
                headers=headers,
                params=params,
                timeout=(3.05, 10)  # Connect timeout, read timeout
            )
            elapsed = (time.perf_counter() - start) * 1000
            
            if response.status_code == 200:
                return elapsed
            return None
        except requests.exceptions.Timeout:
            return None
        except requests.exceptions.RequestException:
            return None
    
    def _verify_freshness(self, symbol: str) -> Optional[float]:
        """Verify that returned data is recent enough."""
        import requests
        
        endpoint = os.environ.get(f"{self.source_name.upper()}_URL", 
                                  "https://api.tickdb.ai/v1/market/kline/latest")
        headers = {"X-API-Key": self.api_key}
        
        try:
            response = requests.get(
                endpoint,
                headers=headers,
                params={"symbol": symbol},
                timeout=(3.05, 10)
            )
            
            if response.status_code == 200:
                data = response.json()
                # Assuming the API returns a timestamp field
                tick_timestamp = data.get("data", {}).get("timestamp", 0)
                
                if tick_timestamp:
                    now = datetime.now(timezone.utc).timestamp()
                    return now - (tick_timestamp / 1000)  # Convert ms to seconds
            return None
        except (requests.exceptions.RequestException, KeyError, ValueError):
            return None
    
    def _verify_sequence(self, symbol: str) -> bool:
        """Verify tick sequence integrity. Returns True if integrity is confirmed or unverifiable."""
        # This is a simplified version. Production implementations should:
        # 1. For WebSocket: track message sequence numbers
        # 2. For REST: track timestamps and detect duplicates
        # 3. Implement a bloom filter for duplicate detection at scale
        return True

Health Check Configuration via Environment Variables

Production deployments should never hardcode thresholds. The following environment variables control health check behavior:

# Primary source configuration
PRIMARY_URL=https://api.polygon.io/v2
PRIMARY_API_KEY=your_polygon_key

# Secondary source (TickDB) configuration  
SECONDARY_URL=https://api.tickdb.ai/v1
SECONDARY_API_KEY=your_tickdb_key

# Tertiary source configuration
TERTIARY_URL=https://data.alpaca.markets/v2
TERTIARY_API_KEY=your_alpaca_key

# Health check thresholds
LATENCY_THRESHOLD_MS=500
FRESHNESS_THRESHOLD_SECONDS=2.0
HEALTH_CHECK_INTERVAL_SECONDS=5
FAILOVER_TRIGGER_COUNT=3

Failover Controller: State Machine Implementation

The failover controller implements a state machine with four states: NOMINAL, DEGRADED, FAILOVER_IN_PROGRESS, and FAILOVER_COMPLETE. Transitions between states are triggered by health check results, not by arbitrary timers.

import threading
import time
from enum import Enum
from typing import Dict, Optional, Callable
from dataclasses import dataclass, field

class FailoverState(Enum):
    NOMINAL = "nominal"
    DEGRADED = "degraded"
    FAILOVER_IN_PROGRESS = "failover_in_progress"
    FAILOVER_COMPLETE = "failover_complete"

@dataclass
class DataSource:
    name: str
    url: str
    api_key: str
    is_primary: bool = False
    health_checker: Optional['MarketDataHealthChecker'] = None
    consecutive_failures: int = 0
    consecutive_successes: int = 0

@dataclass
class FailoverConfig:
    failure_threshold: int = 3  # Consecutive failures before marking unhealthy
    recovery_threshold: int = 3  # Consecutive successes before recovery
    check_interval_seconds: float = 5.0
    state_transition_cooldown: float = 1.0  # Prevent rapid state flapping

class FailoverController:
    """State machine for automated data source failover.
    
    Implements health-check-driven state transitions with cooldown
    protection against flapping. Thread-safe for concurrent access.
    """
    
    def __init__(self, config: FailoverConfig):
        self.config = config
        self.sources: Dict[str, DataSource] = {}
        self.state = FailoverState.NOMINAL
        self.active_source: Optional[str] = None
        self._lock = threading.RLock()
        self._last_transition_time: float = 0
        self._callbacks: Dict[FailoverState, list] = {
            state: [] for state in FailoverState
        }
    
    def register_source(self, source: DataSource):
        """Register a data source with the controller."""
        with self._lock:
            self.sources[source.name] = source
            if source.is_primary and self.active_source is None:
                self.active_source = source.name
    
    def register_callback(self, state: FailoverState, callback: Callable):
        """Register a callback to be invoked on state transitions."""
        with self._lock:
            self._callbacks[state].append(callback)
    
    def process_health_result(self, result: 'HealthCheckResult'):
        """Process a health check result and update state machine."""
        with self._lock:
            source = self.sources.get(result.source_name)
            if source is None:
                return
            
            if result.is_healthy:
                source.consecutive_failures = 0
                source.consecutive_successes += 1
            else:
                source.consecutive_failures += 1
                source.consecutive_successes = 0
            
            self._evaluate_state_transition()
    
    def _evaluate_state_transition(self):
        """Evaluate whether a state transition should occur."""
        current_time = time.time()
        
        # Cooldown protection: prevent transitions within the cooldown window
        if current_time - self._last_transition_time < self.config.state_transition_cooldown:
            return
        
        active = self.sources.get(self.active_source)
        if active is None:
            return
        
        # State transition logic
        if self.state == FailoverState.NOMINAL:
            if active.consecutive_failures >= self.config.failure_threshold:
                self._transition_to(FailoverState.DEGRADED)
        
        elif self.state == FailoverState.DEGRADED:
            if active.consecutive_failures >= self.config.failure_threshold * 2:
                self._transition_to(FailoverState.FAILOVER_IN_PROGRESS)
            elif active.consecutive_successes >= self.config.recovery_threshold:
                self._transition_to(FailoverState.NOMINAL)
        
        elif self.state == FailoverState.FAILOVER_COMPLETE:
            # Attempt recovery after cooling period
            if active.consecutive_successes >= self.config.recovery_threshold * 2:
                self._transition_to(FailoverState.NOMINAL)
    
    def _transition_to(self, new_state: FailoverState):
        """Execute a state transition and invoke callbacks."""
        old_state = self.state
        self.state = new_state
        self._last_transition_time = time.time()
        
        print(f"[FAILOVER] State transition: {old_state.value} -> {new_state.value}")
        
        if new_state == FailoverState.FAILOVER_IN_PROGRESS:
            self._execute_failover()
        
        # Invoke registered callbacks
        for callback in self._callbacks.get(new_state, []):
            try:
                callback(old_state, new_state, self.active_source)
            except Exception as e:
                print(f"[FAILOVER] Callback error: {e}")
    
    def _execute_failover(self):
        """Select and activate the next healthy data source."""
        with self._lock:
            # Priority order: primary first, then secondary, then tertiary
            priority_order = ["primary", "secondary", "tertiary"]
            
            for priority in priority_order:
                source = self.sources.get(priority)
                if source and source.consecutive_failures < self.config.failure_threshold:
                    self.active_source = priority
                    self._transition_to(FailoverState.FAILOVER_COMPLETE)
                    return
            
            # No healthy source found - this is a critical failure
            print("[FAILOVER] CRITICAL: No healthy data source available")
    
    def get_active_endpoint(self) -> Optional[str]:
        """Return the URL of the currently active data source."""
        with self._lock:
            source = self.sources.get(self.active_source)
            return source.url if source else None

TickDB Integration: WebSocket Subscription with Automatic Reconnection

When failover occurs, the client must seamlessly reconnect to TickDB's WebSocket endpoint. The following implementation includes heartbeat detection, exponential backoff with jitter, and rate-limit handling.

import json
import time
import random
import threading
import os
from typing import Optional, Callable, Dict, Any
import websocket  # pip install websocket-client

class TickDBWebSocketClient:
    """Production-grade WebSocket client for TickDB with automatic failover support.
    
    Features:
    - Heartbeat detection via ping/pong
    - Exponential backoff with jitter on reconnection
    - Rate-limit handling (code 3001 + Retry-After header)
    - Thread-safe message queue
    - Configurable reconnection policy
    
    WARNING: This client is synchronous. For HFT workloads requiring sub-10ms
    latency, migrate to an asyncio-based implementation using aiohttp.
    """
    
    def __init__(self, api_key: str, on_message: Optional[Callable] = None,
                 on_error: Optional[Callable] = None, on_connect: Optional[Callable] = None):
        self.api_key = api_key
        self.on_message = on_message
        self.on_error = on_error
        self.on_connect = on_connect
        
        # Connection state
        self._ws: Optional[websocket.WebSocket] = None
        self._connected = False
        self._running = False
        self._thread: Optional[threading.Thread] = None
        self._lock = threading.Lock()
        
        # Reconnection parameters
        self._base_delay = 1.0
        self._max_delay = 60.0
        self._jitter_factor = 0.1
        self._max_retries = 10
        self._retry_count = 0
        
        # Subscriptions
        self._subscriptions: Dict[str, Any] = {}
        self._message_queue: list = []
        self._queue_max_size = 1000
    
    def connect(self, endpoint: str = "wss://api.tickdb.ai/v1/ws"):
        """Establish WebSocket connection with API key authentication."""
        # API key passed as URL parameter per TickDB WebSocket spec
        url = f"{endpoint}?api_key={self.api_key}"
        
        try:
            self._ws = websocket.WebSocketApp(
                url,
                on_message=self._handle_message,
                on_error=self._handle_error,
                on_open=self._handle_open,
                on_close=self._handle_close,
                on_ping=self._handle_ping,
                on_pong=self._handle_pong
            )
            
            self._running = True
            self._thread = threading.Thread(target=self._ws.run_forever, daemon=True)
            self._thread.start()
            
        except Exception as e:
            self._schedule_reconnect(f"Connection failed: {e}")
    
    def subscribe(self, channel: str, params: Dict[str, Any]):
        """Subscribe to a data channel.
        
        Supported channels:
        - 'depth': Order book depth (US: L1, HK: L1-L10, Crypto: L1-L10)
        - 'kline': OHLCV candles
        - 'ticker': 24hr rolling ticker stats
        """
        subscription = {
            "cmd": "sub",
            "channel": channel,
            "params": params,
            "id": self._generate_subscription_id(channel, params)
        }
        
        if self._connected and self._ws:
            self._ws.send(json.dumps(subscription))
        
        self._subscriptions[subscription["id"]] = subscription
        print(f"[TICKDB] Subscribed to {channel} with params {params}")
    
    def unsubscribe(self, channel: str, params: Dict[str, Any]):
        """Unsubscribe from a data channel."""
        unsub = {
            "cmd": "unsub",
            "channel": channel,
            "params": params,
            "id": self._generate_subscription_id(channel, params)
        }
        
        if self._connected and self._ws:
            self._ws.send(json.dumps(unsub))
        
        sub_id = unsub["id"]
        self._subscriptions.pop(sub_id, None)
        print(f"[TICKDB] Unsubscribed from {channel}")
    
    def _send_heartbeat(self):
        """Send periodic heartbeat to keep connection alive."""
        if self._connected and self._ws:
            try:
                heartbeat = {"cmd": "ping", "timestamp": int(time.time() * 1000)}
                self._ws.send(json.dumps(heartbeat))
            except Exception as e:
                print(f"[TICKDB] Heartbeat failed: {e}")
                self._schedule_reconnect("Heartbeat failure")
    
    def _handle_message(self, ws, message: str):
        """Process incoming WebSocket messages."""
        try:
            data = json.loads(message)
            
            # Handle ping response
            if data.get("cmd") == "pong":
                return
            
            # Handle error codes
            if "code" in data:
                self._handle_error_code(data)
                return
            
            # Queue message for processing
            with self._lock:
                self._message_queue.append(data)
                if len(self._message_queue) > self._queue_max_size:
                    self._message_queue.pop(0)
            
            # Invoke callback
            if self.on_message:
                self.on_message(data)
                
        except json.JSONDecodeError as e:
            print(f"[TICKDB] JSON decode error: {e}")
        except Exception as e:
            print(f"[TICKDB] Message handling error: {e}")
    
    def _handle_error_code(self, data: Dict[str, Any]):
        """Handle TickDB error codes."""
        code = data.get("code", 0)
        message = data.get("message", "Unknown error")
        
        if code == 3001:
            # Rate limit exceeded
            retry_after = int(data.get("headers", {}).get("Retry-After", 5))
            print(f"[TICKDB] Rate limited. Retrying after {retry_after}s")
            time.sleep(retry_after)
            self._resubscribe_all()
        elif code in (1001, 1002):
            print(f"[TICKDB] Authentication error: {message}")
            if self.on_error:
                self.on_error(f"Auth failed: {message}")
        elif code == 2002:
            print(f"[TICKDB] Symbol not found: {message}")
        else:
            print(f"[TICKDB] Error {code}: {message}")
    
    def _handle_error(self, ws, error):
        """WebSocket error handler."""
        print(f"[TICKDB] WebSocket error: {error}")
        if self.on_error:
            self.on_error(str(error))
        self._schedule_reconnect(f"WebSocket error: {error}")
    
    def _handle_open(self, ws):
        """Handle successful connection."""
        with self._lock:
            self._connected = True
            self._retry_count = 0
        
        print("[TICKDB] Connected successfully")
        
        # Resubscribe to all previously subscribed channels
        self._resubscribe_all()
        
        if self.on_connect:
            self.on_connect()
    
    def _handle_close(self, ws, close_status_code, close_msg):
        """Handle connection closure."""
        with self._lock:
            self._connected = False
        
        print(f"[TICKDB] Connection closed (code: {close_status_code}, msg: {close_msg})")
        self._schedule_reconnect("Connection closed")
    
    def _handle_ping(self, ws, data):
        """Handle incoming ping frame."""
        # websocket-client handles pong automatically
        pass
    
    def _handle_pong(self, ws, data):
        """Handle pong response (connection is alive)."""
        pass
    
    def _schedule_reconnect(self, reason: str):
        """Schedule reconnection with exponential backoff and jitter."""
        with self._lock:
            if not self._running:
                return
            
            self._retry_count += 1
            if self._retry_count > self._max_retries:
                print(f"[TICKDB] Max retries ({self._max_retries}) exceeded. Giving up.")
                return
            
            # Calculate delay with exponential backoff
            delay = min(self._base_delay * (2 ** (self._retry_count - 1)), self._max_delay)
            
            # Add jitter to prevent thundering herd
            jitter = random.uniform(0, delay * self._jitter_factor)
            delay += jitter
            
            print(f"[TICKDB] Reconnecting in {delay:.2f}s (attempt {self._retry_count}/"
                  f"{self._max_retries}): {reason}")
        
        threading.Timer(delay, self._reconnect).start()
    
    def _reconnect(self):
        """Attempt to reconnect to the WebSocket endpoint."""
        if self._running:
            # Re-read endpoint from environment variable (supports failover)
            endpoint = os.environ.get("TICKDB_WS_URL", "wss://api.tickdb.ai/v1/ws")
            self.connect(endpoint)
    
    def _resubscribe_all(self):
        """Resubscribe to all previously subscribed channels."""
        for sub in self._subscriptions.values():
            if self._ws and self._connected:
                self._ws.send(json.dumps(sub))
    
    @staticmethod
    def _generate_subscription_id(channel: str, params: Dict) -> str:
        """Generate a unique subscription ID."""
        import hashlib
        content = f"{channel}:{json.dumps(params, sort_keys=True)}"
        return hashlib.md5(content.encode()).hexdigest()[:8]
    
    def disconnect(self):
        """Gracefully disconnect from the WebSocket."""
        self._running = False
        if self._ws:
            self._ws.close()
        if self._thread:
            self._thread.join(timeout=5)

DNS Failover: Route53 Integration for Sub-30-Second Switching

When the failover controller determines that the primary source is unhealthy, the system must redirect traffic to the secondary source. For applications using DNS-based service discovery, Route 53 health checks combined with weighted routing provide the fastest automatic failover.

Architecture: Route 53 Health Check + Weighted Routing

┌─────────────────────────────────────────────────────────────────────┐
│                        Route 53 DNS Zone                             │
│                                                                       │
│   Hosted Zone: market-data.example.com                               │
│                                                                       │
│   A Record: api.market-data.example.com                              │
│   ├── Health Check: Primary (Polygon)     Weight: 100                │
│   ├── Health Check: Secondary (TickDB)   Weight: 0                   │
│   └── Health Check: Tertiary (Alpaca)     Weight: 0                   │
│                                                                       │
│   When primary fails 3 consecutive checks → weight shifts to 0        │
│   Secondary immediately receives 100% traffic                       │
└─────────────────────────────────────────────────────────────────────┘

Route 53 Health Check Automation

AWS Route 53 health checks poll endpoints at configurable intervals (default: 10 seconds). When you combine Route 53's native health checks with API-driven weight adjustments, you can achieve failover in approximately 30 seconds (3 failed checks × 10-second interval).

import boto3
from botocore.exceptions import ClientError
import os
from typing import List, Dict, Optional

class Route53FailoverManager:
    """Automates DNS failover for market data sources using Route 53.
    
    Prerequisites:
    - AWS credentials configured via environment variables or IAM role
    - Hosted zone already created in Route 53
    - Health checks created via AWS Console or CLI
    
    WARNING: This implementation modifies DNS records. Test thoroughly
    in non-production environments before deployment.
    """
    
    def __init__(self, hosted_zone_id: str, record_name: str, record_type: str = "CNAME"):
        self.route53 = boto3.client('route53')
        self.hosted_zone_id = hosted_zone_id
        self.record_name = record_name
        self.record_type = record_type
    
    def update_record_weights(self, health_checks: Dict[str, Dict[str, any]]):
        """Update DNS record weights based on health check results.
        
        Args:
            health_checks: Dict mapping source name to {
                'health_check_id': Route 53 health check ID,
                'target': CNAME target value,
                'weight': Weight to assign (0 = removed from rotation)
            }
        """
        changes = []
        
        for source_name, config in health_checks.items():
            weight = config.get('weight', 100 if source_name == 'primary' else 0)
            target = config.get('target')
            
            if not target:
                continue
            
            change = {
                'Action': 'UPSERT',
                'ResourceRecordSet': {
                    'Name': f"{source_name}.{self.record_name}",
                    'Type': self.record_type,
                    'TTL': 60,  # Short TTL for fast failover
                    'ResourceRecords': [{'Value': target}],
                    'SetIdentifier': source_name,
                    'Weight': weight,
                    'HealthCheckId': config.get('health_check_id')
                }
            }
            changes.append(change)
        
        try:
            response = self.route53.change_resource_record_sets(
                HostedZoneId=self.hosted_zone_id,
                ChangeBatch={
                    'Comment': f'Market data failover update',
                    'Changes': changes
                }
            )
            
            print(f"[DNS] Change submitted: {response['ChangeInfo']['Status']}")
            return response
            
        except ClientError as e:
            print(f"[DNS] Failed to update records: {e}")
            raise
    
    def get_record_status(self) -> List[Dict]:
        """Retrieve current DNS record configuration."""
        try:
            response = self.route53.list_resource_record_sets(
                HostedZoneId=self.hosted_zone_id,
                StartRecordName=self.record_name,
                MaxItems='100'
            )
            
            records = []
            for record_set in response.get('ResourceRecordSets', []):
                if self.record_name in record_set['Name']:
                    records.append({
                        'name': record_set['Name'],
                        'type': record_set['Type'],
                        'weight': record_set.get('Weight'),
                        'health_check_id': record_set.get('HealthCheckId'),
                        'set_identifier': record_set.get('SetIdentifier')
                    })
            
            return records
            
        except ClientError as e:
            print(f"[DNS] Failed to retrieve records: {e}")
            return []

    def execute_failover(self, from_source: str, to_source: str,
                        health_checks: Dict[str, Dict]):
        """Execute a complete failover from one source to another.
        
        This method:
        1. Sets the source weight to 0 (removes from rotation)
        2. Sets the target weight to 100 (adds to rotation)
        3. Waits for Route 53 propagation
        """
        # Prepare health check updates
        updates = {}
        
        for source_name, config in health_checks.items():
            if source_name == from_source:
                updates[source_name] = {**config, 'weight': 0}
            elif source_name == to_source:
                updates[source_name] = {**config, 'weight': 100}
            else:
                updates[source_name] = {**config, 'weight': 0}
        
        print(f"[DNS] Executing failover: {from_source} -> {to_source}")
        self.update_record_weights(updates)
        
        # Wait for propagation (Route 53 typically propagates within 60 seconds)
        print("[DNS] Waiting for DNS propagation (60s)...")
        import time
        time.sleep(60)
        
        return True

Putting It Together: The Complete Failover Pipeline

The following script orchestrates all components into a single, deployable failover system. It initializes health checkers for all data sources, runs continuous monitoring, and executes automatic failover when thresholds are breached.

import os
import time
import signal
import sys
from datetime import datetime, timezone

# Import our components
from health_checker import MarketDataHealthChecker
from failover_controller import FailoverController, FailoverConfig, DataSource
from websocket_client import TickDBWebSocketClient
from dns_failover import Route53FailoverManager

class MarketDataFailoverSystem:
    """Complete market data failover orchestration system.
    
    Monitors multiple data sources, detects degradation, and automatically
    fails over to backup sources with DNS redirection support.
    
    Expected environment variables:
    - TICKDB_API_KEY: TickDB API authentication key
    - PRIMARY_API_KEY: Primary data source API key
    - TERTIARY_API_KEY: Tertiary data source API key
    - ROUTE53_HOSTED_ZONE_ID: Route 53 hosted zone ID (optional)
    - FAILOVER_ENABLED: 'true' to enable automatic DNS failover
    """
    
    def __init__(self):
        self.running = False
        self.failover_controller = FailoverController(FailoverConfig())
        self.primary_health: Optional[MarketDataHealthChecker] = None
        self.tickdb_health: Optional[MarketDataHealthChecker] = None
        self.tertiary_health: Optional[MarketDataHealthChecker] = None
        self.tickdb_ws: Optional[TickDBWebSocketClient] = None
        self.dns_manager: Optional[Route53FailoverManager] = None
        self.primary_source = "Polygon"
        self.secondary_source = "TickDB"
    
    def initialize(self):
        """Initialize all components and register data sources."""
        print(f"[SYSTEM] Initializing Market Data Failover System at "
              f"{datetime.now(timezone.utc).isoformat()}")
        
        # Initialize health checkers
        tickdb_api_key = os.environ.get("TICKDB_API_KEY", "")
        primary_api_key = os.environ.get("PRIMARY_API_KEY", "")
        tertiary_api_key = os.environ.get("TERTIARY_API_KEY", "")
        
        self.primary_health = MarketDataHealthChecker(
            source_name="primary",
            api_key=primary_api_key,
            latency_threshold_ms=float(os.environ.get("LATENCY_THRESHOLD_MS", "500")),
            freshness_threshold_seconds=float(os.environ.get("FRESHNESS_THRESHOLD_SECONDS", "2.0"))
        )
        
        self.tickdb_health = MarketDataHealthChecker(
            source_name="TickDB",
            api_key=tickdb_api_key,
            latency_threshold_ms=float(os.environ.get("LATENCY_THRESHOLD_MS", "500"))
        )
        
        self.tertiary_health = MarketDataHealthChecker(
            source_name="tertiary",
            api_key=tertiary_api_key
        )
        
        # Register data sources with failover controller
        self.failover_controller.register_source(DataSource(
            name="primary",
            url=os.environ.get("PRIMARY_URL", "https://api.polygon.io/v2"),
            api_key=primary_api_key,
            is_primary=True,
            health_checker=self.primary_health
        ))
        
        self.failover_controller.register_source(DataSource(
            name="secondary",
            url=os.environ.get("SECONDARY_URL", "https://api.tickdb.ai/v1"),
            api_key=tickdb_api_key,
            health_checker=self.tickdb_health
        ))
        
        self.failover_controller.register_source(DataSource(
            name="tertiary",
            url=os.environ.get("TERTIARY_URL", "https://data.alpaca.markets/v2"),
            api_key=tertiary_api_key
        ))
        
        # Register failover callbacks
        self.failover_controller.register_callback(
            FailoverState.FAILOVER_COMPLETE,
            self._on_failover_complete
        )
        
        # Initialize DNS manager if enabled
        if os.environ.get("FAILOVER_ENABLED", "false").lower() == "true":
            hosted_zone_id = os.environ.get("ROUTE53_HOSTED_ZONE_ID")
            if hosted_zone_id:
                self.dns_manager = Route53FailoverManager(
                    hosted_zone_id=hosted_zone_id,
                    record_name="api.market-data.example.com"
                )
                print("[SYSTEM] DNS failover enabled")
        
        # Initialize TickDB WebSocket client
        self.tickdb_ws = TickDBWebSocketClient(
            api_key=tickdb_api_key,
            on_message=self._handle_tickdb_message,
            on_error=self._handle_tickdb_error,
            on_connect=self._on_tickdb_connected
        )
        
        print("[SYSTEM] Initialization complete")
    
    def start(self):
        """Start the failover monitoring loop."""
        self.running = True
        check_interval = float(os.environ.get("HEALTH_CHECK_INTERVAL_SECONDS", "5"))
        
        print(f"[SYSTEM] Starting health monitoring (interval: {check_interval}s)")
        
        try:
            while self.running:
                self._run_health_cycle()
                time.sleep(check_interval)
        except KeyboardInterrupt:
            print("\n[SYSTEM] Shutdown signal received")
            self.shutdown()
    
    def _run_health_cycle(self):
        """Execute one complete health check cycle across all sources."""
        timestamp = datetime.now(timezone.utc).isoformat()
        
        results = []
        
        # Check primary source
        if self.primary_health:
            result = self.primary_health.check()
            results.append(result)
            self.failover_controller.process_health_result(result)
        
        # Check secondary (TickDB) source
        if self.tickdb_health:
            result = self.tickdb_health.check()
            results.append(result)
            self.failover_controller.process_health_result(result)
        
        # Check tertiary source
        if self.tertiary_health:
            result = self.tertiary_health.check()
            results.append(result)
        
        # Log status
        status_line = f"[{timestamp}] "
        for r in results:
            status = "✓" if r.is_healthy else "✗"
            status_line += f"{r.source_name}({status},{r.latency_ms:.0f}ms) "
        
        print(status_line)
        
        # Log errors for unhealthy sources
        for r in results:
            if not r.is_healthy and r.error_message:
                print(f"  └─ [{r.source_name}] {r.error_message}")
    
    def _on_failover_complete(self, old_state, new_state, active_source):
        """Callback invoked when failover completes."""
        print(f"[SYSTEM] FAILED OVER to {active_source}")
        
        # Reconnect TickDB WebSocket if secondary becomes active
        if active_source == "secondary" and self.tickdb_ws:
            tickdb_endpoint = os.environ.get("TICKDB_WS_URL", "wss://api.tickdb.ai/v1/ws")
            self.tickdb_ws.connect(tickdb_endpoint)
            
            # Resubscribe to market depth
            self.tickdb_ws.subscribe("depth", {"symbol": "AAPL.US", "level": 5})
        
        # Execute DNS failover if enabled
        if self.dns_manager and os.environ.get("FAILOVER_ENABLED") == "true":
            try:
                self.dns_manager.execute_failover(
                    from_source=self.primary_source,
                    to_source=self.secondary_source,
                    health_checks={
                        "primary": {"target": "api.polygon.io", "weight": 0},
                        "secondary": {"target": "api.tickdb.ai", "weight": 100},
                        "tertiary": {"target": "data.alpaca.markets", "weight": 0}
                    }
                )
            except Exception as e:
                print(f"[SYSTEM] DNS failover failed: {e}")
    
    def _handle_tickdb_message(self, data):
        """Handle incoming TickDB WebSocket messages."""
        # Process depth updates, ticker data, etc.
        pass
    
    def _handle_tickdb_error(self, error):
        """Handle TickDB WebSocket errors."""
        print(f"[TICKDB] Error: {error}")
    
    def _on_tickdb_connected(self):
        """Callback invoked on successful TickDB connection."""
        print("[TICKDB] Connected and resubscribed to depth channel")
    
    def shutdown(self):
        """Gracefully shut down all components."""
        print("[SYSTEM] Shutting down...")
        self.running = False
        
        if self.tickdb_ws:
            self.tickdb_ws.disconnect()
        
        print("[SYSTEM] Shutdown complete")


def main():
    """Entry point for the failover system."""
    system = MarketDataFailoverSystem()
    
    # Register signal handlers for graceful shutdown
    signal.signal(signal.SIGINT, lambda s, f: system.shutdown())
    signal.signal(signal.SIGTERM, lambda s, f: system.shutdown())
    
    system.initialize()
    system.start()


if __name__ == "__main__":
    main()

Deployment Recommendations by Scale

Scale Architecture TickDB Tier Notes
Individual quant Single-instance failover agent + free API tier Free Health checks every 10s; manual DNS failover
Team (2–5 quants) Standalone failover service + shared API keys Professional Health checks every 5s; automatic DNS failover
Institutional Multi-region deployment + dedicated failover cluster + enterprise API SLA Enterprise Health checks every 1s; Route 53 health checks + API-driven failover; dedicated support

Key Design Principles

The architecture described in this article rests on four non-negotiable principles.

Never trust a single health signal. Latency alone is insufficient. A source returning 400ms responses is technically healthy but unusable for latency-sensitive strategies. Combine latency, freshness, and sequence integrity checks for robust detection.

Design for state, not events. A naive failover system reacts to individual failures. A resilient system tracks consecutive failure counts, applies cooldown periods, and transitions through defined states. This prevents flapping between sources during transient network issues.

Keep the DNS TTL short. A 300-second TTL means your failover takes 15 minutes even if Route 53 detects failure immediately. Keep TTLs at 60 seconds or below for production systems.

Test your failover path. A failover system that has never been tested is a liability, not a safeguard. Run monthly failover drills during off-peak hours. Document the recovery time and update your runbooks accordingly.


Next Steps

If you are building a production market data infrastructure, integrate TickDB as a secondary data source following the WebSocket client pattern in this article. The free tier provides sufficient API access for development and testing.

If you need sub-100ms latency across multiple asset classes, explore TickDB's Professional tier, which includes WebSocket push access to depth data for US equities (L1), Hong Kong equities (L1–L10), and cryptocurrencies (L1–L10).

If your firm requires multi-region redundancy with enterprise SLA, contact enterprise@tickdb.ai to discuss dedicated infrastructure and custom failover configurations.

If you use AI coding assistants, search for and install the tickdb-market-data SKILL in your AI tool's marketplace for direct TickDB API integration within your development environment.


This article does not constitute investment advice. Market infrastructure decisions should be evaluated against your specific risk tolerance, regulatory requirements, and operational constraints.