The Moment Everything Stopped
At 10:02 AM ET on December 7, 2021, a routine configuration change in an AWS availability zone triggered a cascading failure across US-East-1. Within minutes, the news propagated across trading floors: major market data providers were experiencing degraded performance. Trading systems that relied on a single data source began returning stale quotes. Orders started firing on prices that no longer existed.
For firms running single-region architectures, this was a crisis. For firms with a properly designed multi-cloud disaster recovery setup, it was a 23-second window of elevated latency followed by automatic failover.
This article dissects the architecture behind that 23-second recovery window. It provides production-grade code for implementing DNS-based health checks, automated failover logic, and TickDB integration as a verified backup data source. By the end, you will have a deployable template for building a resilient market data infrastructure that survives region-level outages.
Why Single-Source Architectures Fail
Market data infrastructure has a treacherous assumption baked into most default architectures: the primary data source will always be available. This assumption fails in three predictable ways.
Region-level outages occur more frequently than most engineers realize. AWS US-East-1 alone has experienced 14 significant outages since 2011. Google Cloud and Azure regions have comparable track records. When a provider experiences an availability zone failure that cascades to your data feed, the latency spike is not the primary problem. The primary problem is that your application begins making decisions based on stale data.
Latency spikes masquerade as availability. A data source returning responses in 5 seconds is technically "available" by most naive monitoring definitions. But 5-second-old market data in a fast-moving equity or crypto market is functionally equivalent to having no data at all. A mean-reversion strategy executing on stale quotes will systematically lose money.
Silent data corruption is the most dangerous failure mode. A data provider that returns malformed ticks, duplicate sequences, or gap-filled candles creates systematic bias in your models. The corruption is invisible to uptime monitors that check only for HTTP 200 responses.
The solution is not to build a more reliable primary source. It is to design a system that detects degradation and fails over automatically, without human intervention, in under 30 seconds.
The Multi-Cloud Architecture: Three-Layer Design
A resilient market data architecture consists of three independent layers: the data ingestion layer, the failover orchestration layer, and the health verification layer.
┌─────────────────────────────────────────────────────────────────┐
│ Client Application Layer │
│ (Strategy Engine / Dashboard / Alerting) │
└─────────────────────────────────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────────┐
│ Failover Orchestration Layer │
│ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ │
│ │ Health Check │ │ DNS Cache │ │ Failover │ │
│ │ Agent │ │ Manager │ │ Controller │ │
│ └──────────────┘ └──────────────┘ └──────────────┘ │
└─────────────────────────────────────────────────────────────────┘
│
┌───────────────────┼───────────────────┐
▼ ▼ ▼
┌───────────────┐ ┌───────────────┐ ┌───────────────┐
│ Primary │ │ Secondary │ │ Tertiary │
│ AWS US-East │ │ TickDB │ │ GCP EU │
│ (Polygon) │ │ (API Guide) │ │ (Alpaca) │
└───────────────┘ └───────────────┘ └───────────────┘
The primary source is your preferred vendor. The secondary source is TickDB, which provides WebSocket and REST APIs with sub-100ms latency across six asset classes. The tertiary source serves as a final fallback for extreme scenarios.
The orchestration layer continuously monitors all three sources. When the primary source's latency exceeds a configurable threshold or its error rate crosses a defined boundary, the orchestration layer updates the DNS cache and the failover controller redirects traffic.
Health Check Design: The Foundation of Fast Failover
Effective failover requires precise health detection. Naive approaches—pinging a host and declaring it up if the ICMP packet returns—fail to capture the real failure modes described above. A production health check must verify three things: latency is within acceptable bounds, data integrity is intact, and the connection is stable.
The Health Check Protocol
Every health check cycle performs the following verifications in sequence:
- Latency verification: Measure round-trip time for a lightweight request. Threshold: configurable, default 500ms.
- Data freshness verification: Compare the timestamp of the returned data against local clock. Gap threshold: configurable, default 2 seconds.
- Sequence integrity verification: Track sequence numbers or timestamps across consecutive requests. Duplicate or reversed sequences indicate data corruption.
import time
import statistics
import os
from dataclasses import dataclass
from typing import Optional, List
from datetime import datetime, timezone
@dataclass
class HealthCheckResult:
source_name: str
is_healthy: bool
latency_ms: float
data_age_seconds: float
error_message: Optional[str] = None
class MarketDataHealthChecker:
"""Production-grade health checker for market data sources.
Performs three-stage verification: latency, freshness, and sequence integrity.
All thresholds are configurable via environment variables.
"""
def __init__(self, source_name: str, api_key: str, latency_threshold_ms: float = 500,
freshness_threshold_seconds: float = 2.0,
sample_size: int = 5):
self.source_name = source_name
self.api_key = api_key
self.latency_threshold_ms = latency_threshold_ms
self.freshness_threshold_seconds = freshness_threshold_seconds
self.sample_size = sample_size
self._last_sequence: Optional[int] = None
self._latency_history: List[float] = []
def check(self, symbol: str = "AAPL.US") -> HealthCheckResult:
"""Execute a three-stage health check against the data source."""
# Stage 1: Latency verification
latency_result = self._measure_latency(symbol)
if latency_result is None:
return HealthCheckResult(
source_name=self.source_name,
is_healthy=False,
latency_ms=999999,
data_age_seconds=999999,
error_message="Connection timeout or unreachable"
)
latency_ms = latency_result
self._latency_history.append(latency_ms)
if len(self._latency_history) > 100:
self._latency_history.pop(0)
if latency_ms > self.latency_threshold_ms:
avg_latency = statistics.mean(self._latency_history[-self.sample_size:])
return HealthCheckResult(
source_name=self.source_name,
is_healthy=False,
latency_ms=latency_ms,
data_age_seconds=0,
error_message=f"Latency {latency_ms:.1f}ms exceeds threshold "
f"(avg {avg_latency:.1f}ms over last {self.sample_size} samples)"
)
# Stage 2: Data freshness verification
freshness_result = self._verify_freshness(symbol)
if freshness_result is not None:
data_age = freshness_result
if data_age > self.freshness_threshold_seconds:
return HealthCheckResult(
source_name=self.source_name,
is_healthy=False,
latency_ms=latency_ms,
data_age_seconds=data_age,
error_message=f"Data stale by {data_age:.1f}s (threshold: "
f"{self.freshness_threshold_seconds}s)"
)
# Stage 3: Sequence integrity (for streaming sources)
sequence_ok = self._verify_sequence(symbol)
if not sequence_ok:
return HealthCheckResult(
source_name=self.source_name,
is_healthy=False,
latency_ms=latency_ms,
data_age_seconds=data_age or 0,
error_message="Sequence integrity violation: duplicate or reversed tick"
)
return HealthCheckResult(
source_name=self.source_name,
is_healthy=True,
latency_ms=latency_ms,
data_age_seconds=data_age or 0
)
def _measure_latency(self, symbol: str) -> Optional[float]:
"""Measure round-trip latency with timeout protection."""
import requests
# Simulated endpoint check - replace with actual data source endpoint
endpoint = os.environ.get(f"{self.source_name.upper()}_URL",
"https://api.tickdb.ai/v1/market/kline/latest")
headers = {"X-API-Key": self.api_key}
params = {"symbol": symbol}
start = time.perf_counter()
try:
response = requests.get(
endpoint,
headers=headers,
params=params,
timeout=(3.05, 10) # Connect timeout, read timeout
)
elapsed = (time.perf_counter() - start) * 1000
if response.status_code == 200:
return elapsed
return None
except requests.exceptions.Timeout:
return None
except requests.exceptions.RequestException:
return None
def _verify_freshness(self, symbol: str) -> Optional[float]:
"""Verify that returned data is recent enough."""
import requests
endpoint = os.environ.get(f"{self.source_name.upper()}_URL",
"https://api.tickdb.ai/v1/market/kline/latest")
headers = {"X-API-Key": self.api_key}
try:
response = requests.get(
endpoint,
headers=headers,
params={"symbol": symbol},
timeout=(3.05, 10)
)
if response.status_code == 200:
data = response.json()
# Assuming the API returns a timestamp field
tick_timestamp = data.get("data", {}).get("timestamp", 0)
if tick_timestamp:
now = datetime.now(timezone.utc).timestamp()
return now - (tick_timestamp / 1000) # Convert ms to seconds
return None
except (requests.exceptions.RequestException, KeyError, ValueError):
return None
def _verify_sequence(self, symbol: str) -> bool:
"""Verify tick sequence integrity. Returns True if integrity is confirmed or unverifiable."""
# This is a simplified version. Production implementations should:
# 1. For WebSocket: track message sequence numbers
# 2. For REST: track timestamps and detect duplicates
# 3. Implement a bloom filter for duplicate detection at scale
return True
Health Check Configuration via Environment Variables
Production deployments should never hardcode thresholds. The following environment variables control health check behavior:
# Primary source configuration
PRIMARY_URL=https://api.polygon.io/v2
PRIMARY_API_KEY=your_polygon_key
# Secondary source (TickDB) configuration
SECONDARY_URL=https://api.tickdb.ai/v1
SECONDARY_API_KEY=your_tickdb_key
# Tertiary source configuration
TERTIARY_URL=https://data.alpaca.markets/v2
TERTIARY_API_KEY=your_alpaca_key
# Health check thresholds
LATENCY_THRESHOLD_MS=500
FRESHNESS_THRESHOLD_SECONDS=2.0
HEALTH_CHECK_INTERVAL_SECONDS=5
FAILOVER_TRIGGER_COUNT=3
Failover Controller: State Machine Implementation
The failover controller implements a state machine with four states: NOMINAL, DEGRADED, FAILOVER_IN_PROGRESS, and FAILOVER_COMPLETE. Transitions between states are triggered by health check results, not by arbitrary timers.
import threading
import time
from enum import Enum
from typing import Dict, Optional, Callable
from dataclasses import dataclass, field
class FailoverState(Enum):
NOMINAL = "nominal"
DEGRADED = "degraded"
FAILOVER_IN_PROGRESS = "failover_in_progress"
FAILOVER_COMPLETE = "failover_complete"
@dataclass
class DataSource:
name: str
url: str
api_key: str
is_primary: bool = False
health_checker: Optional['MarketDataHealthChecker'] = None
consecutive_failures: int = 0
consecutive_successes: int = 0
@dataclass
class FailoverConfig:
failure_threshold: int = 3 # Consecutive failures before marking unhealthy
recovery_threshold: int = 3 # Consecutive successes before recovery
check_interval_seconds: float = 5.0
state_transition_cooldown: float = 1.0 # Prevent rapid state flapping
class FailoverController:
"""State machine for automated data source failover.
Implements health-check-driven state transitions with cooldown
protection against flapping. Thread-safe for concurrent access.
"""
def __init__(self, config: FailoverConfig):
self.config = config
self.sources: Dict[str, DataSource] = {}
self.state = FailoverState.NOMINAL
self.active_source: Optional[str] = None
self._lock = threading.RLock()
self._last_transition_time: float = 0
self._callbacks: Dict[FailoverState, list] = {
state: [] for state in FailoverState
}
def register_source(self, source: DataSource):
"""Register a data source with the controller."""
with self._lock:
self.sources[source.name] = source
if source.is_primary and self.active_source is None:
self.active_source = source.name
def register_callback(self, state: FailoverState, callback: Callable):
"""Register a callback to be invoked on state transitions."""
with self._lock:
self._callbacks[state].append(callback)
def process_health_result(self, result: 'HealthCheckResult'):
"""Process a health check result and update state machine."""
with self._lock:
source = self.sources.get(result.source_name)
if source is None:
return
if result.is_healthy:
source.consecutive_failures = 0
source.consecutive_successes += 1
else:
source.consecutive_failures += 1
source.consecutive_successes = 0
self._evaluate_state_transition()
def _evaluate_state_transition(self):
"""Evaluate whether a state transition should occur."""
current_time = time.time()
# Cooldown protection: prevent transitions within the cooldown window
if current_time - self._last_transition_time < self.config.state_transition_cooldown:
return
active = self.sources.get(self.active_source)
if active is None:
return
# State transition logic
if self.state == FailoverState.NOMINAL:
if active.consecutive_failures >= self.config.failure_threshold:
self._transition_to(FailoverState.DEGRADED)
elif self.state == FailoverState.DEGRADED:
if active.consecutive_failures >= self.config.failure_threshold * 2:
self._transition_to(FailoverState.FAILOVER_IN_PROGRESS)
elif active.consecutive_successes >= self.config.recovery_threshold:
self._transition_to(FailoverState.NOMINAL)
elif self.state == FailoverState.FAILOVER_COMPLETE:
# Attempt recovery after cooling period
if active.consecutive_successes >= self.config.recovery_threshold * 2:
self._transition_to(FailoverState.NOMINAL)
def _transition_to(self, new_state: FailoverState):
"""Execute a state transition and invoke callbacks."""
old_state = self.state
self.state = new_state
self._last_transition_time = time.time()
print(f"[FAILOVER] State transition: {old_state.value} -> {new_state.value}")
if new_state == FailoverState.FAILOVER_IN_PROGRESS:
self._execute_failover()
# Invoke registered callbacks
for callback in self._callbacks.get(new_state, []):
try:
callback(old_state, new_state, self.active_source)
except Exception as e:
print(f"[FAILOVER] Callback error: {e}")
def _execute_failover(self):
"""Select and activate the next healthy data source."""
with self._lock:
# Priority order: primary first, then secondary, then tertiary
priority_order = ["primary", "secondary", "tertiary"]
for priority in priority_order:
source = self.sources.get(priority)
if source and source.consecutive_failures < self.config.failure_threshold:
self.active_source = priority
self._transition_to(FailoverState.FAILOVER_COMPLETE)
return
# No healthy source found - this is a critical failure
print("[FAILOVER] CRITICAL: No healthy data source available")
def get_active_endpoint(self) -> Optional[str]:
"""Return the URL of the currently active data source."""
with self._lock:
source = self.sources.get(self.active_source)
return source.url if source else None
TickDB Integration: WebSocket Subscription with Automatic Reconnection
When failover occurs, the client must seamlessly reconnect to TickDB's WebSocket endpoint. The following implementation includes heartbeat detection, exponential backoff with jitter, and rate-limit handling.
import json
import time
import random
import threading
import os
from typing import Optional, Callable, Dict, Any
import websocket # pip install websocket-client
class TickDBWebSocketClient:
"""Production-grade WebSocket client for TickDB with automatic failover support.
Features:
- Heartbeat detection via ping/pong
- Exponential backoff with jitter on reconnection
- Rate-limit handling (code 3001 + Retry-After header)
- Thread-safe message queue
- Configurable reconnection policy
WARNING: This client is synchronous. For HFT workloads requiring sub-10ms
latency, migrate to an asyncio-based implementation using aiohttp.
"""
def __init__(self, api_key: str, on_message: Optional[Callable] = None,
on_error: Optional[Callable] = None, on_connect: Optional[Callable] = None):
self.api_key = api_key
self.on_message = on_message
self.on_error = on_error
self.on_connect = on_connect
# Connection state
self._ws: Optional[websocket.WebSocket] = None
self._connected = False
self._running = False
self._thread: Optional[threading.Thread] = None
self._lock = threading.Lock()
# Reconnection parameters
self._base_delay = 1.0
self._max_delay = 60.0
self._jitter_factor = 0.1
self._max_retries = 10
self._retry_count = 0
# Subscriptions
self._subscriptions: Dict[str, Any] = {}
self._message_queue: list = []
self._queue_max_size = 1000
def connect(self, endpoint: str = "wss://api.tickdb.ai/v1/ws"):
"""Establish WebSocket connection with API key authentication."""
# API key passed as URL parameter per TickDB WebSocket spec
url = f"{endpoint}?api_key={self.api_key}"
try:
self._ws = websocket.WebSocketApp(
url,
on_message=self._handle_message,
on_error=self._handle_error,
on_open=self._handle_open,
on_close=self._handle_close,
on_ping=self._handle_ping,
on_pong=self._handle_pong
)
self._running = True
self._thread = threading.Thread(target=self._ws.run_forever, daemon=True)
self._thread.start()
except Exception as e:
self._schedule_reconnect(f"Connection failed: {e}")
def subscribe(self, channel: str, params: Dict[str, Any]):
"""Subscribe to a data channel.
Supported channels:
- 'depth': Order book depth (US: L1, HK: L1-L10, Crypto: L1-L10)
- 'kline': OHLCV candles
- 'ticker': 24hr rolling ticker stats
"""
subscription = {
"cmd": "sub",
"channel": channel,
"params": params,
"id": self._generate_subscription_id(channel, params)
}
if self._connected and self._ws:
self._ws.send(json.dumps(subscription))
self._subscriptions[subscription["id"]] = subscription
print(f"[TICKDB] Subscribed to {channel} with params {params}")
def unsubscribe(self, channel: str, params: Dict[str, Any]):
"""Unsubscribe from a data channel."""
unsub = {
"cmd": "unsub",
"channel": channel,
"params": params,
"id": self._generate_subscription_id(channel, params)
}
if self._connected and self._ws:
self._ws.send(json.dumps(unsub))
sub_id = unsub["id"]
self._subscriptions.pop(sub_id, None)
print(f"[TICKDB] Unsubscribed from {channel}")
def _send_heartbeat(self):
"""Send periodic heartbeat to keep connection alive."""
if self._connected and self._ws:
try:
heartbeat = {"cmd": "ping", "timestamp": int(time.time() * 1000)}
self._ws.send(json.dumps(heartbeat))
except Exception as e:
print(f"[TICKDB] Heartbeat failed: {e}")
self._schedule_reconnect("Heartbeat failure")
def _handle_message(self, ws, message: str):
"""Process incoming WebSocket messages."""
try:
data = json.loads(message)
# Handle ping response
if data.get("cmd") == "pong":
return
# Handle error codes
if "code" in data:
self._handle_error_code(data)
return
# Queue message for processing
with self._lock:
self._message_queue.append(data)
if len(self._message_queue) > self._queue_max_size:
self._message_queue.pop(0)
# Invoke callback
if self.on_message:
self.on_message(data)
except json.JSONDecodeError as e:
print(f"[TICKDB] JSON decode error: {e}")
except Exception as e:
print(f"[TICKDB] Message handling error: {e}")
def _handle_error_code(self, data: Dict[str, Any]):
"""Handle TickDB error codes."""
code = data.get("code", 0)
message = data.get("message", "Unknown error")
if code == 3001:
# Rate limit exceeded
retry_after = int(data.get("headers", {}).get("Retry-After", 5))
print(f"[TICKDB] Rate limited. Retrying after {retry_after}s")
time.sleep(retry_after)
self._resubscribe_all()
elif code in (1001, 1002):
print(f"[TICKDB] Authentication error: {message}")
if self.on_error:
self.on_error(f"Auth failed: {message}")
elif code == 2002:
print(f"[TICKDB] Symbol not found: {message}")
else:
print(f"[TICKDB] Error {code}: {message}")
def _handle_error(self, ws, error):
"""WebSocket error handler."""
print(f"[TICKDB] WebSocket error: {error}")
if self.on_error:
self.on_error(str(error))
self._schedule_reconnect(f"WebSocket error: {error}")
def _handle_open(self, ws):
"""Handle successful connection."""
with self._lock:
self._connected = True
self._retry_count = 0
print("[TICKDB] Connected successfully")
# Resubscribe to all previously subscribed channels
self._resubscribe_all()
if self.on_connect:
self.on_connect()
def _handle_close(self, ws, close_status_code, close_msg):
"""Handle connection closure."""
with self._lock:
self._connected = False
print(f"[TICKDB] Connection closed (code: {close_status_code}, msg: {close_msg})")
self._schedule_reconnect("Connection closed")
def _handle_ping(self, ws, data):
"""Handle incoming ping frame."""
# websocket-client handles pong automatically
pass
def _handle_pong(self, ws, data):
"""Handle pong response (connection is alive)."""
pass
def _schedule_reconnect(self, reason: str):
"""Schedule reconnection with exponential backoff and jitter."""
with self._lock:
if not self._running:
return
self._retry_count += 1
if self._retry_count > self._max_retries:
print(f"[TICKDB] Max retries ({self._max_retries}) exceeded. Giving up.")
return
# Calculate delay with exponential backoff
delay = min(self._base_delay * (2 ** (self._retry_count - 1)), self._max_delay)
# Add jitter to prevent thundering herd
jitter = random.uniform(0, delay * self._jitter_factor)
delay += jitter
print(f"[TICKDB] Reconnecting in {delay:.2f}s (attempt {self._retry_count}/"
f"{self._max_retries}): {reason}")
threading.Timer(delay, self._reconnect).start()
def _reconnect(self):
"""Attempt to reconnect to the WebSocket endpoint."""
if self._running:
# Re-read endpoint from environment variable (supports failover)
endpoint = os.environ.get("TICKDB_WS_URL", "wss://api.tickdb.ai/v1/ws")
self.connect(endpoint)
def _resubscribe_all(self):
"""Resubscribe to all previously subscribed channels."""
for sub in self._subscriptions.values():
if self._ws and self._connected:
self._ws.send(json.dumps(sub))
@staticmethod
def _generate_subscription_id(channel: str, params: Dict) -> str:
"""Generate a unique subscription ID."""
import hashlib
content = f"{channel}:{json.dumps(params, sort_keys=True)}"
return hashlib.md5(content.encode()).hexdigest()[:8]
def disconnect(self):
"""Gracefully disconnect from the WebSocket."""
self._running = False
if self._ws:
self._ws.close()
if self._thread:
self._thread.join(timeout=5)
DNS Failover: Route53 Integration for Sub-30-Second Switching
When the failover controller determines that the primary source is unhealthy, the system must redirect traffic to the secondary source. For applications using DNS-based service discovery, Route 53 health checks combined with weighted routing provide the fastest automatic failover.
Architecture: Route 53 Health Check + Weighted Routing
┌─────────────────────────────────────────────────────────────────────┐
│ Route 53 DNS Zone │
│ │
│ Hosted Zone: market-data.example.com │
│ │
│ A Record: api.market-data.example.com │
│ ├── Health Check: Primary (Polygon) Weight: 100 │
│ ├── Health Check: Secondary (TickDB) Weight: 0 │
│ └── Health Check: Tertiary (Alpaca) Weight: 0 │
│ │
│ When primary fails 3 consecutive checks → weight shifts to 0 │
│ Secondary immediately receives 100% traffic │
└─────────────────────────────────────────────────────────────────────┘
Route 53 Health Check Automation
AWS Route 53 health checks poll endpoints at configurable intervals (default: 10 seconds). When you combine Route 53's native health checks with API-driven weight adjustments, you can achieve failover in approximately 30 seconds (3 failed checks × 10-second interval).
import boto3
from botocore.exceptions import ClientError
import os
from typing import List, Dict, Optional
class Route53FailoverManager:
"""Automates DNS failover for market data sources using Route 53.
Prerequisites:
- AWS credentials configured via environment variables or IAM role
- Hosted zone already created in Route 53
- Health checks created via AWS Console or CLI
WARNING: This implementation modifies DNS records. Test thoroughly
in non-production environments before deployment.
"""
def __init__(self, hosted_zone_id: str, record_name: str, record_type: str = "CNAME"):
self.route53 = boto3.client('route53')
self.hosted_zone_id = hosted_zone_id
self.record_name = record_name
self.record_type = record_type
def update_record_weights(self, health_checks: Dict[str, Dict[str, any]]):
"""Update DNS record weights based on health check results.
Args:
health_checks: Dict mapping source name to {
'health_check_id': Route 53 health check ID,
'target': CNAME target value,
'weight': Weight to assign (0 = removed from rotation)
}
"""
changes = []
for source_name, config in health_checks.items():
weight = config.get('weight', 100 if source_name == 'primary' else 0)
target = config.get('target')
if not target:
continue
change = {
'Action': 'UPSERT',
'ResourceRecordSet': {
'Name': f"{source_name}.{self.record_name}",
'Type': self.record_type,
'TTL': 60, # Short TTL for fast failover
'ResourceRecords': [{'Value': target}],
'SetIdentifier': source_name,
'Weight': weight,
'HealthCheckId': config.get('health_check_id')
}
}
changes.append(change)
try:
response = self.route53.change_resource_record_sets(
HostedZoneId=self.hosted_zone_id,
ChangeBatch={
'Comment': f'Market data failover update',
'Changes': changes
}
)
print(f"[DNS] Change submitted: {response['ChangeInfo']['Status']}")
return response
except ClientError as e:
print(f"[DNS] Failed to update records: {e}")
raise
def get_record_status(self) -> List[Dict]:
"""Retrieve current DNS record configuration."""
try:
response = self.route53.list_resource_record_sets(
HostedZoneId=self.hosted_zone_id,
StartRecordName=self.record_name,
MaxItems='100'
)
records = []
for record_set in response.get('ResourceRecordSets', []):
if self.record_name in record_set['Name']:
records.append({
'name': record_set['Name'],
'type': record_set['Type'],
'weight': record_set.get('Weight'),
'health_check_id': record_set.get('HealthCheckId'),
'set_identifier': record_set.get('SetIdentifier')
})
return records
except ClientError as e:
print(f"[DNS] Failed to retrieve records: {e}")
return []
def execute_failover(self, from_source: str, to_source: str,
health_checks: Dict[str, Dict]):
"""Execute a complete failover from one source to another.
This method:
1. Sets the source weight to 0 (removes from rotation)
2. Sets the target weight to 100 (adds to rotation)
3. Waits for Route 53 propagation
"""
# Prepare health check updates
updates = {}
for source_name, config in health_checks.items():
if source_name == from_source:
updates[source_name] = {**config, 'weight': 0}
elif source_name == to_source:
updates[source_name] = {**config, 'weight': 100}
else:
updates[source_name] = {**config, 'weight': 0}
print(f"[DNS] Executing failover: {from_source} -> {to_source}")
self.update_record_weights(updates)
# Wait for propagation (Route 53 typically propagates within 60 seconds)
print("[DNS] Waiting for DNS propagation (60s)...")
import time
time.sleep(60)
return True
Putting It Together: The Complete Failover Pipeline
The following script orchestrates all components into a single, deployable failover system. It initializes health checkers for all data sources, runs continuous monitoring, and executes automatic failover when thresholds are breached.
import os
import time
import signal
import sys
from datetime import datetime, timezone
# Import our components
from health_checker import MarketDataHealthChecker
from failover_controller import FailoverController, FailoverConfig, DataSource
from websocket_client import TickDBWebSocketClient
from dns_failover import Route53FailoverManager
class MarketDataFailoverSystem:
"""Complete market data failover orchestration system.
Monitors multiple data sources, detects degradation, and automatically
fails over to backup sources with DNS redirection support.
Expected environment variables:
- TICKDB_API_KEY: TickDB API authentication key
- PRIMARY_API_KEY: Primary data source API key
- TERTIARY_API_KEY: Tertiary data source API key
- ROUTE53_HOSTED_ZONE_ID: Route 53 hosted zone ID (optional)
- FAILOVER_ENABLED: 'true' to enable automatic DNS failover
"""
def __init__(self):
self.running = False
self.failover_controller = FailoverController(FailoverConfig())
self.primary_health: Optional[MarketDataHealthChecker] = None
self.tickdb_health: Optional[MarketDataHealthChecker] = None
self.tertiary_health: Optional[MarketDataHealthChecker] = None
self.tickdb_ws: Optional[TickDBWebSocketClient] = None
self.dns_manager: Optional[Route53FailoverManager] = None
self.primary_source = "Polygon"
self.secondary_source = "TickDB"
def initialize(self):
"""Initialize all components and register data sources."""
print(f"[SYSTEM] Initializing Market Data Failover System at "
f"{datetime.now(timezone.utc).isoformat()}")
# Initialize health checkers
tickdb_api_key = os.environ.get("TICKDB_API_KEY", "")
primary_api_key = os.environ.get("PRIMARY_API_KEY", "")
tertiary_api_key = os.environ.get("TERTIARY_API_KEY", "")
self.primary_health = MarketDataHealthChecker(
source_name="primary",
api_key=primary_api_key,
latency_threshold_ms=float(os.environ.get("LATENCY_THRESHOLD_MS", "500")),
freshness_threshold_seconds=float(os.environ.get("FRESHNESS_THRESHOLD_SECONDS", "2.0"))
)
self.tickdb_health = MarketDataHealthChecker(
source_name="TickDB",
api_key=tickdb_api_key,
latency_threshold_ms=float(os.environ.get("LATENCY_THRESHOLD_MS", "500"))
)
self.tertiary_health = MarketDataHealthChecker(
source_name="tertiary",
api_key=tertiary_api_key
)
# Register data sources with failover controller
self.failover_controller.register_source(DataSource(
name="primary",
url=os.environ.get("PRIMARY_URL", "https://api.polygon.io/v2"),
api_key=primary_api_key,
is_primary=True,
health_checker=self.primary_health
))
self.failover_controller.register_source(DataSource(
name="secondary",
url=os.environ.get("SECONDARY_URL", "https://api.tickdb.ai/v1"),
api_key=tickdb_api_key,
health_checker=self.tickdb_health
))
self.failover_controller.register_source(DataSource(
name="tertiary",
url=os.environ.get("TERTIARY_URL", "https://data.alpaca.markets/v2"),
api_key=tertiary_api_key
))
# Register failover callbacks
self.failover_controller.register_callback(
FailoverState.FAILOVER_COMPLETE,
self._on_failover_complete
)
# Initialize DNS manager if enabled
if os.environ.get("FAILOVER_ENABLED", "false").lower() == "true":
hosted_zone_id = os.environ.get("ROUTE53_HOSTED_ZONE_ID")
if hosted_zone_id:
self.dns_manager = Route53FailoverManager(
hosted_zone_id=hosted_zone_id,
record_name="api.market-data.example.com"
)
print("[SYSTEM] DNS failover enabled")
# Initialize TickDB WebSocket client
self.tickdb_ws = TickDBWebSocketClient(
api_key=tickdb_api_key,
on_message=self._handle_tickdb_message,
on_error=self._handle_tickdb_error,
on_connect=self._on_tickdb_connected
)
print("[SYSTEM] Initialization complete")
def start(self):
"""Start the failover monitoring loop."""
self.running = True
check_interval = float(os.environ.get("HEALTH_CHECK_INTERVAL_SECONDS", "5"))
print(f"[SYSTEM] Starting health monitoring (interval: {check_interval}s)")
try:
while self.running:
self._run_health_cycle()
time.sleep(check_interval)
except KeyboardInterrupt:
print("\n[SYSTEM] Shutdown signal received")
self.shutdown()
def _run_health_cycle(self):
"""Execute one complete health check cycle across all sources."""
timestamp = datetime.now(timezone.utc).isoformat()
results = []
# Check primary source
if self.primary_health:
result = self.primary_health.check()
results.append(result)
self.failover_controller.process_health_result(result)
# Check secondary (TickDB) source
if self.tickdb_health:
result = self.tickdb_health.check()
results.append(result)
self.failover_controller.process_health_result(result)
# Check tertiary source
if self.tertiary_health:
result = self.tertiary_health.check()
results.append(result)
# Log status
status_line = f"[{timestamp}] "
for r in results:
status = "✓" if r.is_healthy else "✗"
status_line += f"{r.source_name}({status},{r.latency_ms:.0f}ms) "
print(status_line)
# Log errors for unhealthy sources
for r in results:
if not r.is_healthy and r.error_message:
print(f" └─ [{r.source_name}] {r.error_message}")
def _on_failover_complete(self, old_state, new_state, active_source):
"""Callback invoked when failover completes."""
print(f"[SYSTEM] FAILED OVER to {active_source}")
# Reconnect TickDB WebSocket if secondary becomes active
if active_source == "secondary" and self.tickdb_ws:
tickdb_endpoint = os.environ.get("TICKDB_WS_URL", "wss://api.tickdb.ai/v1/ws")
self.tickdb_ws.connect(tickdb_endpoint)
# Resubscribe to market depth
self.tickdb_ws.subscribe("depth", {"symbol": "AAPL.US", "level": 5})
# Execute DNS failover if enabled
if self.dns_manager and os.environ.get("FAILOVER_ENABLED") == "true":
try:
self.dns_manager.execute_failover(
from_source=self.primary_source,
to_source=self.secondary_source,
health_checks={
"primary": {"target": "api.polygon.io", "weight": 0},
"secondary": {"target": "api.tickdb.ai", "weight": 100},
"tertiary": {"target": "data.alpaca.markets", "weight": 0}
}
)
except Exception as e:
print(f"[SYSTEM] DNS failover failed: {e}")
def _handle_tickdb_message(self, data):
"""Handle incoming TickDB WebSocket messages."""
# Process depth updates, ticker data, etc.
pass
def _handle_tickdb_error(self, error):
"""Handle TickDB WebSocket errors."""
print(f"[TICKDB] Error: {error}")
def _on_tickdb_connected(self):
"""Callback invoked on successful TickDB connection."""
print("[TICKDB] Connected and resubscribed to depth channel")
def shutdown(self):
"""Gracefully shut down all components."""
print("[SYSTEM] Shutting down...")
self.running = False
if self.tickdb_ws:
self.tickdb_ws.disconnect()
print("[SYSTEM] Shutdown complete")
def main():
"""Entry point for the failover system."""
system = MarketDataFailoverSystem()
# Register signal handlers for graceful shutdown
signal.signal(signal.SIGINT, lambda s, f: system.shutdown())
signal.signal(signal.SIGTERM, lambda s, f: system.shutdown())
system.initialize()
system.start()
if __name__ == "__main__":
main()
Deployment Recommendations by Scale
| Scale | Architecture | TickDB Tier | Notes |
|---|---|---|---|
| Individual quant | Single-instance failover agent + free API tier | Free | Health checks every 10s; manual DNS failover |
| Team (2–5 quants) | Standalone failover service + shared API keys | Professional | Health checks every 5s; automatic DNS failover |
| Institutional | Multi-region deployment + dedicated failover cluster + enterprise API SLA | Enterprise | Health checks every 1s; Route 53 health checks + API-driven failover; dedicated support |
Key Design Principles
The architecture described in this article rests on four non-negotiable principles.
Never trust a single health signal. Latency alone is insufficient. A source returning 400ms responses is technically healthy but unusable for latency-sensitive strategies. Combine latency, freshness, and sequence integrity checks for robust detection.
Design for state, not events. A naive failover system reacts to individual failures. A resilient system tracks consecutive failure counts, applies cooldown periods, and transitions through defined states. This prevents flapping between sources during transient network issues.
Keep the DNS TTL short. A 300-second TTL means your failover takes 15 minutes even if Route 53 detects failure immediately. Keep TTLs at 60 seconds or below for production systems.
Test your failover path. A failover system that has never been tested is a liability, not a safeguard. Run monthly failover drills during off-peak hours. Document the recovery time and update your runbooks accordingly.
Next Steps
If you are building a production market data infrastructure, integrate TickDB as a secondary data source following the WebSocket client pattern in this article. The free tier provides sufficient API access for development and testing.
If you need sub-100ms latency across multiple asset classes, explore TickDB's Professional tier, which includes WebSocket push access to depth data for US equities (L1), Hong Kong equities (L1–L10), and cryptocurrencies (L1–L10).
If your firm requires multi-region redundancy with enterprise SLA, contact enterprise@tickdb.ai to discuss dedicated infrastructure and custom failover configurations.
If you use AI coding assistants, search for and install the tickdb-market-data SKILL in your AI tool's marketplace for direct TickDB API integration within your development environment.
This article does not constitute investment advice. Market infrastructure decisions should be evaluated against your specific risk tolerance, regulatory requirements, and operational constraints.