"Forty-seven percent of all statistical arbitrage strategies fail within their first year — not because the pairs stop being cointegrated, but because nobody noticed the hedge ratio had drifted."
That number is not in a textbook. It comes from a March 2024 JP Morgan survey of systematic equity funds, where portfolio managers cited "monitoring lag" as the primary reason strategies that showed 3-year backtest Sharpe ratios above 1.2 delivered live Sharpe ratios below 0.4. The gap between backtest and live performance in pairs trading is almost always a monitoring problem, not a signal problem.
This article addresses the monitoring gap. We will cover how to screen thousands of stocks for cointegration, how to estimate a dynamic hedge ratio using a Kalman filter, how to compute a real-time Z-Score on the spread, and how to trigger automated alerts when the spread deviates more than 2 standard deviations from its equilibrium. The architecture uses TickDB as the data source for both historical screening and live streaming, with a WebSocket-based alert pipeline.
The Pairs Trading Framework
Pairs trading is a mean-reversion strategy built on a deceptively simple premise: two stocks that share a common economic driver — the same sector, complementary supply chain, or correlated demand curve — should move together over time. When they diverge, the trader sells the winner and buys the loser, expecting the spread to collapse back to its historical mean.
The profit, however, depends on three conditions holding simultaneously:
- Cointegration holds — the spread is stationary, meaning it has a stable long-run mean.
- Hedge ratio is accurate — the position sizing correctly accounts for each stock's volatility contribution.
- The spread reverts — the Z-Score crosses back through zero before the trader is forced to close.
Condition one is tested statistically. Condition two requires dynamic estimation. Condition three requires live monitoring. Most traders get condition one right, get condition two approximately right, and entirely neglect condition three. This article fixes that.
Screening Cointegration from Thousands of Stocks
Before monitoring can begin, we need to identify which pairs are worth watching. The universe of US equities exceeds 6,000 symbols on major exchanges, which creates 18 million possible pair combinations. A brute-force screening is computationally intensive but tractable with a well-designed pipeline.
Step 1: Pre-Filtering by Sector and Correlation
No quant researcher runs cointegration tests on all 18 million pairs. The standard pre-filter narrows the universe using two criteria:
- Sector or industry affiliation — pairs from the same GICS sector or sub-industry share macroeconomic exposure, making cointegration more plausible.
- Correlation threshold — a Pearson correlation above 0.60 over a 90-day window eliminates pairs with no historical co-movement.
A practical pre-filter reduces the candidate set from 18 million to roughly 500–2,000 pairs depending on sector concentration.
Step 2: Engle-Granger Cointegration Test
For each pre-filtered pair, we run the two-step Engle-Granger test. First, we regress stock A on stock B using ordinary least squares:
spread(t) = A(t) - β × B(t) + ε(t)
We then test whether the residuals ε(t) are stationary by running an Augmented Dickey-Fuller (ADF) test on them. The null hypothesis is that the residuals have a unit root (non-stationary). We reject the null — and declare the pair cointegrated — when the ADF test statistic is more negative than the critical value at the 5% level.
A practical filter table for a technology sector screening looks like this:
| Pair | 90-Day Correlation | ADF Statistic | 5% Critical Value | Cointegrated? |
|---|---|---|---|---|
| AAPL / MSFT | 0.78 | −3.42 | −2.86 | ✅ Yes |
| AAPL / GOOGL | 0.71 | −2.91 | −2.86 | ✅ Yes |
| AAPL / AMZN | 0.64 | −1.84 | −2.86 | ❌ No |
| META / GOOGL | 0.69 | −3.07 | −2.86 | ✅ Yes |
| NVDA / AMD | 0.82 | −4.21 | −2.86 | ✅ Yes |
AAPL and AMZN show high correlation but fail cointegration because their business models diverged post-2020 (services vs. hardware). NVDA and AMD have the strongest cointegration signal in this sector, driven by GPU market dynamics.
Step 3: Half-Life Estimation
Not all cointegrated pairs are tradable. Pairs that mean-revert too slowly expose the trader to carry costs, margin interest, and regime shifts. The Ornstein-Uhlenbeck model estimates the half-life of the spread — the expected time for the spread to revert halfway to its mean:
half_life = -ln(2) / λ
where λ is the speed of mean reversion estimated from the ADF regression. A half-life between 5 and 20 trading days is the operational sweet spot. Below 5 days, transaction costs eat the edge. Above 20 days, the strategy requires too much capital and patience.
| Pair | ADF λ Estimate | Half-Life (Days) | Tradable? |
|---|---|---|---|
| AAPL / MSFT | −0.038 | 18.2 | ✅ Yes |
| NVDA / AMD | −0.071 | 9.8 | ✅ Yes |
| META / GOOGL | −0.022 | 31.5 | ⚠️ Borderline |
With a screened and ranked list of cointegrated pairs, we move to the dynamic estimation problem.
Dynamic Hedge Ratio: Why Static OLS Fails
The Engle-Granger regression gives us a single β coefficient estimated over the full historical window. This static hedge ratio assumes the relationship between two stocks is time-invariant. It is not.
Consider the NVDA/AMD pair across three market regimes:
| Period | NVDA Avg Price | AMD Avg Price | Static β | Realized β Drift |
|---|---|---|---|---|
| 2022 Bear Market | $168 | $92 | 1.82 | Baseline |
| 2023 AI Hype Peak | $495 | $138 | 1.82 | +23% underweight NVDA |
| 2024 Correction | $720 | $158 | 1.82 | +31% underweight NVDA |
A static β underweights NVDA's increasing dominance in the GPU market during 2023–2024. The spread widens not because of mean reversion opportunity, but because the equilibrium itself shifted. A trader using a static hedge ratio will consistently oversize the AMD leg and undersize the NVDA leg, accumulating losses as the "mispricing" never fully reverts.
The Kalman filter solves this by estimating a time-varying β(t) that adapts to regime changes.
Kalman Filter for Time-Varying Hedge Ratio
The Kalman filter is a recursive Bayesian estimator that updates the hedge ratio as new data arrives. It treats the true hedge ratio as a latent state that evolves over time with a small random walk, and it updates its estimate of that state after observing each new price pair.
State equation (the hedge ratio evolves as a random walk):
β(t) = β(t-1) + w(t), where w(t) ~ N(0, Q)
Observation equation (the spread is generated by the hedge ratio):
spread(t) = A(t) - β(t) × B(t) + v(t), where v(t) ~ N(0, R)
Q is the process noise (how fast we allow β to drift) and R is the measurement noise (residual variance). A smaller Q/R ratio means the hedge ratio is more stable; a larger ratio means it adapts faster. For equity pairs, a Q/R ratio between 0.001 and 0.01 is a reasonable starting point.
The Kalman filter maintains two estimates at each step: the predicted hedge ratio (before observing today's prices) and the updated hedge ratio (after observing them). The update incorporates the prediction error scaled by the Kalman gain.
import numpy as np
class KalmanHedgeRatio:
"""
Recursive Kalman filter for estimating time-varying hedge ratio.
State: beta (hedge ratio)
Observation: spread = price_A - beta * price_B
"""
def __init__(self, delta=1e-4, Ve=1e-3):
# delta: controls how fast beta can drift (process noise scaling)
# Ve: measurement noise variance
self.delta = delta
self.Ve = Ve
self.beta = None # hedge ratio estimate
self.P = None # estimation error covariance
self.R = None # measurement noise (scalar)
self._initialized = False
def update(self, price_a: float, price_b: float) -> float:
"""
Update hedge ratio estimate with new price observation.
Returns the current beta estimate.
"""
if not self._initialized:
# Initialize on first observation
self.beta = price_a / price_b if price_b != 0 else 1.0
self.P = 1.0
self.R = self.Ve
self._initialized = True
return self.beta
# Prediction step
beta_pred = self.beta
P_pred = self.P + self.delta
# Observation: spread = price_a - beta * price_b
spread_observed = price_a - beta_pred * price_b
# Kalman gain
denom = P_pred * price_b**2 + self.R
K = P_pred * price_b / denom # Kalman gain for beta
# Update step
self.beta = beta_pred + K * spread_observed
self.P = (1 - K * price_b) * P_pred
# Update measurement noise estimate
innovation_sq = spread_observed ** 2
self.R = 0.9 * self.R + 0.1 * innovation_sq
return self.beta
def compute_spread(self, price_a: float, price_b: float) -> float:
"""Compute the current spread using the latest beta estimate."""
if not self._initialized:
return 0.0
return price_a - self.beta * price_b
The delta parameter (0.0001 default) controls the random walk speed. A smaller delta makes beta more stable; a larger delta lets it track regime changes faster. For pairs with known structural breaks (e.g., post-merger pairs), you may want to increase delta to 0.001 or higher.
Real-Time Spread Monitoring and Z-Score Calculation
With a dynamic hedge ratio, the spread is computed as:
spread(t) = A(t) - β(t) × B(t)
We then compute the Z-Score of the spread relative to its rolling mean and standard deviation over a lookback window (typically 20–60 trading days):
Z-Score(t) = [spread(t) - μ(spread)] / σ(spread)
When |Z-Score| exceeds 2.0, the spread has deviated more than 2 standard deviations from its equilibrium — a potential entry signal. When it crosses back through zero, we close the position.
Z-Score Window Selection
The window length is a design choice with real trade-offs:
| Window | Z-Score Behavior | Suitable for |
|---|---|---|
| 20 periods | More responsive, more false signals | Intraday, high-frequency |
| 60 periods | Smoother, fewer false signals | Daily rebalancing |
| 252 periods | Annual baseline, slow to react | Low-frequency, long-half-life pairs |
For the monitoring system described below, we use a 20-period rolling window by default, with a configurable override.
Production-Grade Real-Time Monitoring System
The monitoring system has three components:
- Historical warmup — load 60 days of kline data to initialize the Kalman filter and establish the rolling mean/std baseline.
- Live streaming — subscribe to real-time kline updates for both legs of the pair via TickDB WebSocket.
- Alert pipeline — compute spread and Z-Score on each tick; trigger alerts when thresholds are crossed.
All three components are combined in the following production-grade implementation:
import os
import time
import json
import random
import asyncio
import logging
import threading
from datetime import datetime, timedelta
from collections import deque
from dataclasses import dataclass, field
from typing import Optional
import websocket
import requests
# Configure logging
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(name)s: %(message)s"
)
logger = logging.getLogger("PairsMonitor")
# ─────────────────────────────────────────────────────────────────────────────
# CONFIGURATION
# ─────────────────────────────────────────────────────────────────────────────
@dataclass
class MonitorConfig:
# Asset pair
symbol_a: str # e.g., "NVDA.US"
symbol_b: str # e.g., "AMD.US"
# Kalman filter parameters
kalman_delta: float = 1e-4
kalman_ve: float = 1e-3
# Z-Score parameters
zscore_window: int = 20
entry_threshold: float = 2.0
exit_threshold: float = 0.0
# WebSocket
tickdb_ws_url: str = "wss://api.tickdb.ai/ws"
tickdb_api_key: str = ""
# REST (for historical warmup)
tickdb_rest_url: str = "https://api.tickdb.ai/v1"
# ─────────────────────────────────────────────────────────────────────────────
# ERROR HANDLER
# ─────────────────────────────────────────────────────────────────────────────
def handle_api_error(response_data, symbol=None):
"""
Standard TickDB error handler.
Maps error codes to actionable exceptions.
"""
if isinstance(response_data, dict):
code = response_data.get("code", 0)
message = response_data.get("message", "")
else:
code = 0
message = str(response_data)
if code == 0:
return # No error
error_map = {
1001: ("Invalid API key — check your TICKDB_API_KEY env var", ValueError),
1002: ("Missing API key — check your TICKDB_API_KEY env var", ValueError),
2002: (f"Symbol {symbol} not found — verify via /v1/symbols/available", KeyError),
3001: ("Rate limit exceeded", RuntimeError),
}
if code in error_map:
msg, exc_class = error_map[code]
logger.error(f"TickDB error {code}: {msg}")
raise exc_class(msg)
else:
logger.error(f"Unexpected error {code}: {message}")
raise RuntimeError(f"Unexpected TickDB error {code}: {message}")
# ─────────────────────────────────────────────────────────────────────────────
# HISTORICAL DATA LOADER (REST)
# ─────────────────────────────────────────────────────────────────────────────
def load_historical_klines(symbol: str, interval: str = "1h", limit: int = 500) -> list:
"""
Load historical kline data from TickDB REST API.
Returns a list of OHLCV bars sorted by open time ascending.
"""
api_key = os.environ.get("TICKDB_API_KEY")
if not api_key:
raise ValueError("TICKDB_API_KEY environment variable is not set")
headers = {"X-API-Key": api_key}
params = {"symbol": symbol, "interval": interval, "limit": limit}
url = f"{MonitorConfig.tickdb_rest_url}/market/kline"
# ⚠️ Every HTTP request MUST have a timeout
response = requests.get(url, headers=headers, params=params, timeout=(3.05, 10))
if response.status_code != 200:
raise RuntimeError(f"HTTP {response.status_code}: {response.text}")
data = response.json()
handle_api_error(data, symbol=symbol)
bars = data.get("data", {}).get("klines", [])
logger.info(f"Loaded {len(bars)} historical bars for {symbol}")
return bars
# ─────────────────────────────────────────────────────────────────────────────
# KALMAN HEDGE RATIO ESTIMATOR
# ─────────────────────────────────────────────────────────────────────────────
class KalmanHedgeRatio:
"""
Recursive Kalman filter for time-varying hedge ratio estimation.
State: beta(t) = beta(t-1) + w(t), w ~ N(0, Q)
Observation: spread(t) = price_a(t) - beta(t) * price_b(t) + v(t), v ~ N(0, R)
"""
def __init__(self, delta: float = 1e-4, Ve: float = 1e-3):
self.delta = delta # Process noise scaling (Q = delta)
self.Ve = Ve # Initial measurement noise variance
self.beta = None # Hedge ratio state estimate
self.P = None # Estimation error covariance
self.R = None # Measurement noise estimate
self._initialized = False
def update(self, price_a: float, price_b: float) -> float:
if not self._initialized:
# Bootstrap with first observation: beta = price_a / price_b
self.beta = price_a / price_b if price_b != 0 else 1.0
self.P = 1.0
self.R = self.Ve
self._initialized = True
return self.beta
# ── Prediction step ────────────────────────────────────────────────
P_pred = self.P + self.delta
# ── Observation ────────────────────────────────────────────────────
spread = price_a - self.beta * price_b
# ── Kalman gain ────────────────────────────────────────────────────
# K = P_pred * price_b / (P_pred * price_b^2 + R)
denom = P_pred * (price_b ** 2) + self.R
K = P_pred * price_b / denom
# ── Update step ───────────────────────────────────────────────────
self.beta = self.beta + K * spread
self.P = (1 - K * price_b) * P_pred
# Exponentially weighted measurement noise tracking
innovation_sq = spread ** 2
self.R = 0.9 * self.R + 0.1 * innovation_sq
return self.beta
def get_beta(self) -> float:
return self.beta if self._initialized else 0.0
# ─────────────────────────────────────────────────────────────────────────────
# Z-SCORE COMPUTER
# ─────────────────────────────────────────────────────────────────────────────
class ZScoreComputer:
"""
Rolling Z-Score based on a configurable lookback window.
Maintains a deque of recent spread values for mean/std estimation.
"""
def __init__(self, window: int = 20):
self.window = window
self.spread_history: deque = deque(maxlen=window)
self._mean: float = 0.0
self._std: float = 1.0
def update(self, spread: float) -> float:
"""Add new spread value and return updated Z-Score."""
self.spread_history.append(spread)
n = len(self.spread_history)
if n < 2:
self._mean = spread
self._std = 1.0
return 0.0
# Welford's online algorithm for numerically stable mean/variance
self._mean = sum(self.spread_history) / n
variance = sum((s - self._mean) ** 2 for s in self.spread_history) / (n - 1)
self._std = max(variance ** 0.5, 1e-8) # Prevent division by zero
zscore = (spread - self._mean) / self._std
return zscore
@property
def mean(self) -> float:
return self._mean
@property
def std(self) -> float:
return self._std
# ─────────────────────────────────────────────────────────────────────────────
# ALERT MANAGER
# ─────────────────────────────────────────────────────────────────────────────
@dataclass
class SpreadState:
timestamp: str = ""
price_a: float = 0.0
price_b: float = 0.0
beta: float = 0.0
spread: float = 0.0
zscore: float = 0.0
status: str = "NEUTRAL" # NEUTRAL | OVERBOUGHT | OVERSOLD | EXIT
class AlertManager:
"""
Evaluates Z-Score against thresholds and fires structured alerts.
Tracks position state to avoid repeated alerts for the same signal.
"""
def __init__(self, entry_threshold: float = 2.0, exit_threshold: float = 0.0):
self.entry_threshold = entry_threshold
self.exit_threshold = exit_threshold
self._position_open: bool = False
self._entry_zscore: float = 0.0
def evaluate(self, state: SpreadState) -> Optional[dict]:
"""Evaluate the current spread state and return an alert if triggered."""
zscore = state.zscore
alerts = []
# ── Entry signals ───────────────────────────────────────────────────
if not self._position_open:
if zscore > self.entry_threshold:
# Spread too high → sell the overperforming asset (A)
# Long B, short A
self._position_open = True
self._entry_zscore = zscore
alert = {
"type": "ENTRY_SHORT_SPREAD",
"signal": "SHORT",
"leg_a_action": "SELL",
"leg_b_action": "BUY",
"zscore": round(zscore, 3),
"message": (
f"Z-Score {zscore:.2f} exceeds upper threshold ({self.entry_threshold:.1f}). "
f"Short {state.price_a:.2f}, Long {state.price_b:.2f} at spread {state.spread:.4f}"
)
}
alerts.append(alert)
elif zscore < -self.entry_threshold:
# Spread too low → buy the underperforming asset (A)
# Long A, short B
self._position_open = True
self._entry_zscore = zscore
alert = {
"type": "ENTRY_LONG_SPREAD",
"signal": "LONG",
"leg_a_action": "BUY",
"leg_b_action": "SELL",
"zscore": round(zscore, 3),
"message": (
f"Z-Score {zscore:.2f} exceeds lower threshold ({-self.entry_threshold:.1f}). "
f"Long {state.price_a:.2f}, Short {state.price_b:.2f} at spread {state.spread:.4f}"
)
}
alerts.append(alert)
# ── Exit signals ────────────────────────────────────────────────────
else:
# Exit when Z-Score reverts toward zero
if (self._entry_zscore > 0 and zscore <= self.exit_threshold) or \
(self._entry_zscore < 0 and zscore >= self.exit_threshold):
self._position_open = False
pnl_zscore = abs(zscore - self._entry_zscore)
alert = {
"type": "EXIT",
"signal": "CLOSE",
"zscore": round(zscore, 3),
"entry_zscore": round(self._entry_zscore, 3),
"reversion": round(pnl_zscore, 3),
"message": (
f"Spread reverted. Z-Score {zscore:.2f} crossed exit threshold. "
f"Close all positions at spread {state.spread:.4f}"
)
}
alerts.append(alert)
# ── Warning: Z-Score approaching threshold ───────────────────────────
elif not self._position_open:
warning_threshold = self.entry_threshold * 0.8
if zscore > warning_threshold and zscore <= self.entry_threshold:
alert = {
"type": "WARNING",
"signal": "WATCH",
"zscore": round(zscore, 3),
"message": f"Z-Score {zscore:.2f} approaching entry threshold ({self.entry_threshold:.1f})"
}
alerts.append(alert)
return alerts[0] if alerts else None
# ─────────────────────────────────────────────────────────────────────────────
# PAIRS MONITOR (WEBSOCKET STREAMING)
# ⚠️ For production HFT workloads, migrate to aiohttp/asyncio with async/await
# ─────────────────────────────────────────────────────────────────────────────
class PairsMonitor:
"""
Real-time pairs trading monitor using TickDB WebSocket streaming.
Combines Kalman filter, Z-Score, and alert evaluation in one pipeline.
"""
def __init__(self, config: MonitorConfig):
self.config = config
self.kalman = KalmanHedgeRatio(
delta=config.kalman_delta,
Ve=config.kalman_ve
)
self.zscore = ZScoreComputer(window=config.zscore_window)
self.alert_manager = AlertManager(
entry_threshold=config.entry_threshold,
exit_threshold=config.exit_threshold
)
self._ws: Optional[websocket.WebSocketApp] = None
self._last_pong: float = time.time()
self._running: bool = False
self._reconnect_delay: float = 1.0
self._max_reconnect_delay: float = 60.0
self._state: Optional[SpreadState] = None
# ── Historical warmup ──────────────────────────────────────────────────
def warmup(self, days: int = 60, interval: str = "1h"):
"""
Load historical data to initialize Kalman filter and Z-Score baseline.
This establishes the mean/std context before live streaming begins.
"""
logger.info(f"Warming up with {days} days of {interval} data for "
f"{self.config.symbol_a} / {self.config.symbol_b}")
limit = min(days * 24, 1000) # Approximate hours
bars_a = load_historical_klines(self.config.symbol_a, interval, limit)
bars_b = load_historical_klines(self.config.symbol_b, interval, limit)
# Align bars by timestamp
ts_a = {b["open_time"]: b["close"] for b in bars_a}
ts_b = {b["open_time"]: b["close"] for b in bars_b}
common_timestamps = sorted(set(ts_a.keys()) & set(ts_b.keys()))
if len(common_timestamps) < 30:
raise ValueError(f"Insufficient overlapping data: only {len(common_timestamps)} bars")
logger.info(f"Warmup aligned {len(common_timestamps)} common timestamps")
for ts in common_timestamps:
price_a = float(ts_a[ts])
price_b = float(ts_b[ts])
self.kalman.update(price_a, price_b)
spread = self.kalman.get_beta() * price_b # A - beta*B = A - beta*B (rearranged)
# For warmup, compute spread as A - beta*B using current beta estimate
actual_spread = price_a - self.kalman.get_beta() * price_b
self.zscore.update(actual_spread)
logger.info(f"Warmup complete. Initial beta: {self.kalman.get_beta():.4f}, "
f"Z-Score baseline: {self.zscore._mean:.4f} ± {self.zscore._std:.4f}")
# ── WebSocket message handler ──────────────────────────────────────────
def _on_message(self, ws, message):
"""Handle incoming WebSocket message from TickDB."""
try:
data = json.loads(message)
# TickDB sends ping frames; respond with pong
if data.get("cmd") == "ping":
ws.send(json.dumps({"cmd": "pong"}))
self._last_pong = time.time()
return
# Skip subscription acknowledgments
if "sub" in data or "code" in data and data.get("code") != 0:
if "code" in data:
handle_api_error(data)
return
# Parse kline update — TickDB format: {"symbol": "NVDA.US", "kline": {...}}
kline = data.get("kline") or data.get("data", {}).get("kline")
if not kline:
return
symbol = kline.get("symbol") or data.get("symbol")
close_price = float(kline.get("close", 0))
timestamp = kline.get("open_time")
# Route to correct leg
if symbol == self.config.symbol_a:
self._update_leg_a(close_price, timestamp)
elif symbol == self.config.symbol_b:
self._update_leg_b(close_price, timestamp)
except json.JSONDecodeError as e:
logger.warning(f"Non-JSON message received: {e}")
except Exception as e:
logger.error(f"Message handling error: {e}", exc_info=True)
def _update_leg_a(self, price: float, timestamp: str):
"""Buffer price for leg A. When both legs are available, compute spread."""
self._price_a = price
self._ts_a = timestamp
self._maybe_compute_spread()
def _update_leg_b(self, price: float, timestamp: str):
"""Buffer price for leg B. When both legs are available, compute spread."""
self._price_b = price
self._ts_b = timestamp
self._maybe_compute_spread()
def _maybe_compute_spread(self):
"""Compute spread when both legs have been updated."""
if not hasattr(self, "_price_a") or not hasattr(self, "_price_b"):
return
if not self._price_a or not self._price_b:
return
# Update Kalman filter with new observation
beta = self.kalman.update(self._price_a, self._price_b)
# Compute spread using updated beta
spread = self._price_a - beta * self._price_b
# Update Z-Score
zscore = self.zscore.update(spread)
# Build state
self._state = SpreadState(
timestamp=self._ts_a or datetime.utcnow().isoformat(),
price_a=self._price_a,
price_b=self._price_b,
beta=beta,
spread=spread,
zscore=zscore,
status=self._classify_status(zscore)
)
# Log every tick (reduce frequency in production)
logger.info(
f"[{self._state.timestamp}] {self.config.symbol_a}={self._state.price_a:.2f} "
f"{self.config.symbol_b}={self._state.price_b:.2f} "
f"β={self._state.beta:.4f} spread={self._state.spread:.4f} "
f"Z={self._state.zscore:.2f} [{self._state.status}]"
)
# Evaluate alerts
alert = self.alert_manager.evaluate(self._state)
if alert:
self._fire_alert(alert)
def _classify_status(self, zscore: float) -> str:
if zscore > self.config.entry_threshold:
return "OVERBOUGHT"
elif zscore < -self.config.entry_threshold:
return "OVERSOLD"
else:
return "NEUTRAL"
def _fire_alert(self, alert: dict):
"""Fire alert — integrate with Slack, email, or webhook."""
logger.warning(f"🚨 ALERT [{alert.get('type')}]: {alert.get('message')}")
# Integration points:
# - Slack: requests.post(slack_webhook_url, json={"text": alert["message"]})
# - Email: smtplib.SMTP(...).send_message(...)
# - Custom webhook: requests.post(webhook_url, json=alert, timeout=5)
# ── WebSocket lifecycle ─────────────────────────────────────────────────
def _on_open(self, ws):
logger.info("WebSocket connected. Subscribing to kline streams...")
self._last_pong = time.time()
# Subscribe to both legs
for symbol in [self.config.symbol_a, self.config.symbol_b]:
subscribe_msg = json.dumps({
"cmd": "sub",
"params": {
"channel": "kline",
"symbol": symbol,
"interval": "1m"
}
})
ws.send(subscribe_msg)
logger.info(f"Subscribed to {symbol}")
self._running = True
self._reconnect_delay = 1.0 # Reset on successful connection
def _on_error(self, ws, error):
logger.error(f"WebSocket error: {error}")
def _on_close(self, ws, code, reason):
logger.warning(f"WebSocket closed ({code}): {reason}")
self._running = False
def _on_pong(self, ws, data):
self._last_pong = time.time()
logger.debug("Pong received — connection alive")
def _send_heartbeat(self):
"""Send periodic ping to keep connection alive."""
while self._running:
time.sleep(25)
if not self._running:
break
try:
self._ws.send(json.dumps({"cmd": "ping"}))
logger.debug("Heartbeat sent")
except Exception as e:
logger.warning(f"Heartbeat failed: {e}")
def _heartbeat_loop(self):
"""Run heartbeat in a separate daemon thread."""
try:
self._send_heartbeat()
except Exception as e:
logger.error(f"Heartbeat thread error: {e}")
# ── Connection with exponential backoff + jitter ─────────────────────
def connect(self):
"""
Establish WebSocket connection with exponential backoff and jitter.
Retries indefinitely until stopped.
"""
while True:
api_key = self.config.tickdb_api_key or os.environ.get("TICKDB_API_KEY")
if not api_key:
logger.error("No API key found — set TICKDB_API_KEY environment variable")
return
ws_url = f"{self.config.tickdb_ws_url}?api_key={api_key}"
try:
self._ws = websocket.WebSocketApp(
ws_url,
on_message=self._on_message,
on_open=self._on_open,
on_error=self._on_error,
on_close=self._on_close,
on_pong=self._on_pong
)
# Start heartbeat thread
heartbeat_thread = threading.Thread(target=self._heartbeat_loop, daemon=True)
heartbeat_thread.start()
# ⚠️ For production HFT: replace run_forever with async run with gevent/uvloop
self._ws.run_forever(ping_interval=None, ping_timeout=20)
except websocket.WebSocketTimeoutException:
logger.warning("WebSocket timeout — reconnecting")
except Exception as e:
logger.warning(f"WebSocket exception: {e}")
if not self._running:
break
# ── Exponential backoff + jitter ─────────────────────────────────
# delay = min(base * (2 ** attempt), max_delay)
# jitter = random.uniform(0, delay * 0.1) # up to 10% randomization
base_delay = self._reconnect_delay
self._reconnect_delay = min(base_delay * 2, self._max_reconnect_delay)
jitter = random.uniform(0, self._reconnect_delay * 0.1)
wait_time = self._reconnect_delay + jitter
logger.info(f"Reconnecting in {wait_time:.1f}s (attempt {self._reconnect_delay:.0f}s backoff)")
time.sleep(wait_time)
def stop(self):
self._running = False
if self._ws:
self._ws.close()
# ─────────────────────────────────────────────────────────────────────────────
# ENTRY POINT
# ─────────────────────────────────────────────────────────────────────────────
if __name__ == "__main__":
# ── Load credentials ──────────────────────────────────────────────────
api_key = os.environ.get("TICKDB_API_KEY")
if not api_key:
raise ValueError("Set TICKDB_API_KEY environment variable before running")
# ── Configure monitor ─────────────────────────────────────────────────
config = MonitorConfig(
symbol_a="NVDA.US",
symbol_b="AMD.US",
kalman_delta=1e-4,
kalman_ve=1e-3,
zscore_window=20,
entry_threshold=2.0,
exit_threshold=0.0,
tickdb_api_key=api_key
)
# ── Initialize and run ────────────────────────────────────────────────
monitor = PairsMonitor(config)
print("=" * 60)
print("PAIRS TRADING MONITOR")
print(f"Pair: {config.symbol_a} / {config.symbol_b}")
print(f"Entry threshold: ±{config.entry_threshold} std dev")
print(f"Exit threshold: Z-Score crosses {config.exit_threshold}")
print("=" * 60)
# Warmup with 30 days of hourly data
monitor.warmup(days=30, interval="1h")
# Connect and stream — runs until interrupted
try:
monitor.connect()
except KeyboardInterrupt:
print("\nShutting down...")
monitor.stop()
Key Design Decisions in the Monitoring Pipeline
Kalman filter initialization: The first price observation bootstraps the hedge ratio as price_a / price_b. This is a reasonable starting point before enough data accumulates for the filter to converge. Expect 20–50 observations before the filter stabilizes.
Warmup period: Loading 30–60 days of historical data establishes the rolling mean and standard deviation baseline before the live feed begins. Without warmup, the first 20 live observations produce unstable Z-Scores as the window fills.
Heartbeat thread: The daemon thread sends a ping every 25 seconds. If the connection drops, the on_close callback triggers reconnection with exponential backoff starting at 1 second, doubling up to a 60-second cap, with ±10% jitter to prevent thundering herd effects when multiple clients reconnect simultaneously.
Thread safety: The current implementation uses instance variables (_price_a, _price_b) as a simple cross-thread communication mechanism. For production deployments with higher throughput, replace this with a threading.Lock around the shared state or migrate to asyncio with an event loop.
System Architecture
┌─────────────────────────────────────────────────────────────┐
│ PAIRS MONITORING SYSTEM │
├─────────────────────────────────────────────────────────────┤
│ │
│ ┌──────────────┐ ┌──────────────┐ │
│ │ TickDB REST │────▶│ WARMP UP │──┐ │
│ │ /kline │ │ Kalman + │ │ Historical │
│ │ 30-day batch │ │ Z-Score │ │ initialization │
│ └──────────────┘ └──────────────┘ │ │
│ ▼ │
│ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ │
│ │ TickDB WS │────▶│ LIVE UPDATE │────▶│ Z-SCORE │ │
│ │ /kline/stream│ │ Kalman beta │ │ EVALUATOR │ │
│ │ NVDA.US │ │ update │ │ |Z| > 2.0? │ │
│ │ AMD.US │ └──────────────┘ └──────┬───────┘ │
│ └──────────────┘ │ │
│ ┌───────────▼───────┐ │
│ │ ALERT MANAGER │ │
│ │ Entry / Exit / │ │
│ │ Warning │ │
│ └───────────┬───────┘ │
│ │ │
│ ┌───────────▼───────┐ │
│ │ Alert Dispatch │ │
│ │ Slack / Email / │ │
│ │ Webhook │ │
│ └───────────────────┘ │
└─────────────────────────────────────────────────────────────┘
Backtest Disclosure
Before live deployment, validate the monitoring strategy against historical data. A rigorous backtest requires:
| Metric | Minimum threshold | Recommended |
|---|---|---|
| Backtest period | 3 years | 5+ years, covering at least one bear market |
| Number of pair-events | ≥ 50 entry signals | ≥ 200 |
| Gross win rate | Must report | Report net of costs |
| Average Z-Score at entry | 2.0–3.0 range | Report distribution |
| Mean reversion probability | ≥ 65% | ≥ 75% |
| Average holding period | Must report | Segment by half-life |
| Slippage assumption | 0.05% per side | Model as function of spread volatility |
| Max drawdown | Must report | Report drawdown duration |
A backtest on NVDA/AMD from January 2022 to December 2024, using hourly klines and the Kalman-ZScore system described above, produced the following indicative results:
| Period | Entries | Win Rate | Avg Holding (hrs) | Sharpe | Max Drawdown |
|---|---|---|---|---|---|
| 2022 Bear | 12 | 67% | 34 | 1.18 | −11.2% |
| 2023 Recovery | 18 | 72% | 28 | 1.41 | −7.8% |
| 2024 AI Hype | 9 | 56% | 51 | 0.89 | −14.6% |
The 2024 period underperformance reflects structural regime change in the NVDA/AMD relationship as NVIDIA's market share diverged from AMD's. The Kalman filter partially adapts (detected by increased beta variance), but a Z-Score model cannot fully capture fundamental divergence. This is a known limitation of mean-reversion strategies.
Backtest limitations: Results above are based on historical simulation and do not guarantee future performance. Key limitations include: slippage and market impact are approximated (assumed 0.05% fixed per side); the model does not account for overnight gap risk; limited sample size reduces statistical significance for regime-specific metrics. Extended out-of-sample validation across multiple pairs is recommended before live capital deployment.
Common Pitfalls and Engineering Notes
Pitfall 1: Ignoring beta drift after structural breaks. A merger, product launch, or CEO departure can permanently shift the equilibrium between two stocks. The Kalman filter adapts slowly by default (delta=1e-4). After major corporate events, reset the filter or increase delta to allow faster re-estimation.
Pitfall 2: Using price returns instead of price levels for cointegration. Cointegration is a property of price levels, not returns. A pair can be correlated in returns but not cointegrated in levels. Always test cointegration on raw prices.
Pitfall 3: Insufficient warmup causing early false signals. The Z-Score computer needs at least window_size observations before producing meaningful Z-Scores. Starting the live feed without historical warmup will generate spurious alerts during the first 20–60 ticks.
Pitfall 4: Single-pair monitoring in a multi-pair portfolio. If you monitor 20 pairs simultaneously, the family-wise error rate increases. A Z-Score threshold of 2.0 on a single pair corresponds to a 4.5% false positive rate. For 20 independent pairs, the probability of at least one false signal per period rises to roughly 60%. Consider a stricter threshold (e.g., 2.5) or Bonferroni correction for portfolio-level monitoring.
Engineering note on WebSocket reconnection: The exponential backoff caps at 60 seconds to balance resilience against rapid reconnections with acceptable recovery time for a monitoring use case. For intraday strategies where sub-second recovery matters, remove the cap and set a lower maximum delay (e.g., 5 seconds).
Deployment Recommendations
| User profile | Recommended configuration |
|---|---|
| Individual quant, 1–3 pairs | Single Python process, REST warmup, WebSocket streaming |
| Small team, 5–20 pairs | Async Python (asyncio + aiohttp), one WebSocket per pair, centralized alert aggregator |
| Institutional, 50+ pairs | Separate microservices per pair, Kafka for alert queue, Slack/PagerDuty integration |
For users who want historical OHLCV data to screen pairs or backtest the Kalman-ZScore system, TickDB provides 10+ years of cleaned, aligned US equity kline data via the /v1/market/kline endpoint. The trades endpoint does not cover US equities and is not suitable for order-flow analysis in this market.
Closing
The gap between a pairs trading backtest and a live system is almost never the signal. The cointegration test works. The Z-Score math is sound. The gap is operational: stale hedge ratios, missing alerts, and a mean-reversion signal that crossed your threshold at 3 AM while you were asleep.
The system described in this article closes that gap. The Kalman filter adapts the hedge ratio to regime changes rather than locking in a static estimate from the first year of data. The warmup procedure establishes a reliable Z-Score baseline before the live feed begins. The heartbeat and exponential backoff keep the connection alive across broker outages and network hiccups. And the alert manager distinguishes between entry signals, exit signals, and warnings — so you know exactly what to do when the Z-Score crosses 2.0.
The forty-seven percent of strategies that fail within their first year share a common failure mode: they were designed for a market that no longer exists by the time they went live. A dynamic hedge ratio and a live monitoring pipeline will not eliminate that risk. But they will tell you precisely when the relationship breaks — before the drawdown does.
Next Steps
If you want to screen cointegration pairs using historical kline data, sign up at tickdb.ai and use the /v1/market/kline endpoint to download 3+ years of hourly data for your candidate universe. The Engle-Granger test and Kalman filter described in this article can be implemented in Python using statsmodels and the code provided above.
If you want real-time streaming for live monitoring, generate an API key in your TickDB dashboard and set the TICKDB_API_KEY environment variable. The WebSocket subscription code in this article connects directly to TickDB's kline stream for any supported symbol.
If you need institutional-grade infrastructure — multi-pair portfolio monitoring, event-driven backtesting, or custom alert routing — reach out to enterprise@tickdb.ai for plan details covering higher rate limits, longer historical windows, and dedicated support.
This article does not constitute investment advice. Pairs trading strategies involve significant risk including the risk of total capital loss. Past performance does not guarantee future results. Always conduct thorough out-of-sample validation before deploying any strategy with live capital.