The Moment the Machine Breaks
At 2:47 AM on March 15, 2025, a momentum-following algorithm was still buying Bitcoin at $62,400. The trend signal had triggered. The model said "up." What the model could not see was that the entire futures market was in a liquidity vacuum — the order book had thinned to almost nothing after the sudden rate hike announcement.
By the time the strategy's circuit breaker manually intervened — because no automated one existed — it had accumulated 847 lots of long exposure in a market that dropped 12% in the next 18 minutes.
The loss was not the algorithm's fault. The algorithm did exactly what it was designed to do. The fault was in the architecture: there was no state machine watching over the state machine. No layer asking, "Should we still be running?"
This article builds that layer.
1. The Problem with "Just Add a Stop-Loss"
Traditional stop-loss logic operates at the position level. It asks: "Should this specific trade be closed?" It does not ask: "Should the strategy itself continue operating?"
This distinction matters because catastrophic failures in algorithmic trading rarely come from individual bad trades. They come from correlated sequences — a regime change where every signal in your model fires in the wrong direction simultaneously, and the strategy keeps faithfully executing against a market that has fundamentally shifted.
Consider the failure modes:
| Failure Mode | Description | Why a Simple Stop-Loss Fails |
|---|---|---|
| Consecutive loss spiral | Strategy hits N consecutive losing trades. Market regime has changed but strategy does not know. | Position-level stop does not know how many losses preceded this trade. |
| Intraday drawdown cascade | Multiple positions drift against you throughout the day. Individual stops do not trigger, but cumulative drawdown is extreme. | Stops are per-position, not per-strategy portfolio. |
| Latency blind spot | Network latency or data gaps cause the strategy to trade on stale signals during a fast market. | A stop-loss on position P&L does not capture signal staleness. |
| Correlation breakdown | Strategy assumes diversification across assets, but in tail events, correlation approaches 1. | No strategy-level awareness of cross-position correlation risk. |
The solution is a supervisory state machine — a layer above your trading logic that monitors strategy-level health metrics and enforces circuit breaker states independently of any individual trade's outcome.
2. Designing the Circuit Breaker State Machine
2.1 State Diagram
The circuit breaker operates across four discrete states. Transitions between states are triggered by observable events, not by hard-coded timers.
┌─────────────────────────────────────────────┐
│ │
▼ │
┌──────────┐ loss_threshold_met ┌──────────────┐ consecutive_losses == 0 │
│ HEALTHY │ ──────────────────────▶ │ DEGRADED │ ◀────────────────────────── │
└──────────┘ └──────────────┘ │
▲ │ │
│ │ │
│ recovery_window_elapsed │ │
│ without violation │ │
│ ▼ │
│ ┌──────────────┐ manual_reset_requested │
└─────────────────────────────│ SUSPENDED │ ───────────────────────────────▶│
└──────────────┘ │
▲ │
│ │
│ drawdown_threshold_met │
└──────────────────────────────────────────┘
│
▼
┌───────────┐
│ HALTED │
└───────────┘
2.2 State Definitions
| State | Definition | What the Strategy Can Do |
|---|---|---|
| HEALTHY | All metrics within normal parameters. | Normal operation. |
| DEGRADED | Consecutive losses have reached the configured threshold, but drawdown is still within limits. | Reduce position size by 50%. Increase confirmation requirements. |
| SUSPENDED | Strategy has been in DEGRADED state beyond the recovery window without recovering, OR manual intervention was triggered. | No new trades. Existing positions managed by risk manager only. |
| HALTED | Intraday drawdown has exceeded the hard stop threshold. | Complete shutdown. No position management. Manual reset required. |
2.3 Transition Trigger Logic
from dataclasses import dataclass, field
from enum import Enum, auto
from typing import Optional
import time
class CircuitState(Enum):
HEALTHY = auto()
DEGRADED = auto()
SUSPENDED = auto()
HALTED = auto()
@dataclass
class CircuitBreakerConfig:
"""Configuration for the circuit breaker state machine."""
consecutive_loss_threshold: int = 3 # Trigger DEGRADED after N consecutive losses
recovery_window_seconds: int = 900 # 15-minute window to recover from DEGRADED
drawdown_threshold_pct: float = 0.05 # Hard stop at 5% intraday drawdown
max_position_size_reduction: float = 0.5 # Reduce to 50% in DEGRADED state
@dataclass
class CircuitBreaker:
"""Supervisory state machine for trading strategy risk control."""
config: CircuitBreakerConfig
state: CircuitState = field(default=CircuitState.HEALTHY)
# Internal counters
consecutive_losses: int = 0
consecutive_wins: int = 0
intraday_pnl: float = 0.0
peak_equity: float = 0.0
current_drawdown: float = 0.0
# Timestamps
degraded_since: Optional[float] = None
last_trade_time: Optional[float] = None
def record_trade(self, pnl: float, trade_time: Optional[float] = None) -> CircuitState:
"""
Record a trade outcome and evaluate state transitions.
Returns the new state after evaluation.
Args:
pnl: Profit (+) or loss (-) from the completed trade
trade_time: Unix timestamp of trade execution. Defaults to current time.
"""
self.last_trade_time = trade_time or time.time()
if pnl >= 0:
self.consecutive_losses = 0
self.consecutive_wins += 1
else:
self.consecutive_losses += 1
self.consecutive_wins = 0
self.intraday_pnl += pnl
self._update_drawdown()
return self._evaluate_transitions()
def record_equity_update(self, current_equity: float) -> CircuitState:
"""
Update peak equity and drawdown metrics. Call this at each bar close.
"""
if current_equity > self.peak_equity:
self.peak_equity = current_equity
self._update_drawdown()
return self._evaluate_transitions()
def _update_drawdown(self):
"""Recalculate current drawdown from peak equity."""
if self.peak_equity > 0:
self.current_drawdown = (self.peak_equity -
(self.peak_equity + self.intraday_pnl)) / self.peak_equity
def _evaluate_transitions(self) -> CircuitState:
"""
Evaluate all transition conditions and update state accordingly.
State machine logic follows a strict priority order.
"""
previous_state = self.state
# Priority 1: HALTED state can only be reached from drawdown threshold
if self.current_drawdown >= self.config.drawdown_threshold_pct:
self.state = CircuitState.HALTED
return self.state
# Priority 2: Check DEGRADED conditions
if (self.state == CircuitState.HEALTHY and
self.consecutive_losses >= self.config.consecutive_loss_threshold):
self.state = CircuitState.DEGRADED
self.degraded_since = time.time()
return self.state
# Priority 3: Check recovery from DEGRADED
if self.state == CircuitState.DEGRADED:
if self.consecutive_wins > 0:
# Recovered via winning trades
self.state = CircuitState.HEALTHY
self.degraded_since = None
return self.state
if self.degraded_since is not None:
elapsed = time.time() - self.degraded_since
if elapsed >= self.config.recovery_window_seconds:
self.state = CircuitState.SUSPENDED
return self.state
return self.state
def request_manual_reset(self) -> bool:
"""
Request manual intervention to reset from SUSPENDED or HALTED state.
Only succeeds if the current state is not HALTED (hard stop requires
human review).
"""
if self.state == CircuitState.HALTED:
# Hard stop requires explicit human acknowledgment
return False
if self.state in (CircuitState.DEGRADED, CircuitState.SUSPENDED):
self._reset_counters()
self.state = CircuitState.HEALTHY
return True
return False
def _reset_counters(self):
"""Reset all metrics after manual intervention."""
self.consecutive_losses = 0
self.consecutive_wins = 0
self.degraded_since = None
# Note: intraday_pnl, peak_equity, and current_drawdown are NOT reset
# on manual intervention — these represent actual portfolio state
def can_trade(self) -> tuple[bool, str]:
"""
Query whether new trades should be permitted in the current state.
Returns (can_trade: bool, reason: str).
"""
if self.state == CircuitState.HALTED:
return False, f"Circuit breaker HALTED: drawdown {self.current_drawdown:.2%} exceeds threshold {self.config.drawdown_threshold_pct:.2%}"
if self.state == CircuitState.SUSPENDED:
return False, "Circuit breaker SUSPENDED: strategy requires manual reset after extended degradation"
if self.state == CircuitState.DEGRADED:
return True, f"Circuit breaker DEGRADED: position size reduced to {self.config.max_position_size_reduction:.0%}"
return True, "Circuit breaker HEALTHY: normal operation"
3. Production-Grade Integration with TickDB Depth Data
The state machine above is the logical layer. The data layer is where it connects to real-time market conditions. This section shows how to wire the circuit breaker to TickDB's WebSocket depth channel, so the strategy can make position sizing decisions based on live order book health — not just historical trade P&L.
3.1 Why Order Book Depth Matters for Circuit Breaker Design
A circuit breaker that only reacts to P&L is inherently lagging. By the time you record three consecutive losses, the market may already be in a liquidity vacuum where every trade you take will be at terrible fills. A forward-looking circuit breaker incorporates order book health signals:
- Bid-ask spread widening → Market making is expensive; reduce frequency.
- Depth shrinking at best bid/best ask → Liquidity withdrawal; reduce position size.
- Order book imbalance escalating → One-sided pressure; pause entries.
3.2 WebSocket Connection with Heartbeat, Reconnection, and Rate-Limit Handling
import os
import json
import time
import random
import threading
import logging
from typing import Callable, Optional
from dataclasses import dataclass
import websocket # pip install websocket-client
import requests
logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s")
logger = logging.getLogger(__name__)
@dataclass
class TickDBDepthConfig:
"""Configuration for TickDB WebSocket depth subscription."""
api_key: str
symbols: list[str] # e.g., ["BTC-USD"]
reconnect_max_retries: int = 10
reconnect_base_delay: float = 1.0
reconnect_max_delay: float = 60.0
heartbeat_interval: float = 20.0 # TickDB recommends ping every 20 seconds
timeout_seconds: float = 30.0
class TickDBWebSocketClient:
"""
Production-grade WebSocket client for TickDB market data.
Features: heartbeat, exponential backoff with jitter, rate-limit handling,
thread-safe message dispatch, and env-var-based authentication.
"""
BASE_WS_URL = "wss://api.tickdb.ai/ws/market"
def __init__(self, config: TickDBDepthConfig, on_depth: Callable[[dict], None]):
"""
Args:
config: Connection and subscription configuration
on_depth: Callback function invoked with depth snapshot on each update
"""
self.config = config
self.on_depth = on_depth
self.ws: Optional[websocket.WebSocketApp] = None
self._running = False
self._retry_count = 0
self._last_pong_received: Optional[float] = None
self._lock = threading.Lock()
def connect(self):
"""
Establish WebSocket connection with authentication.
Authentication is via URL parameter — not header — for WebSocket connections.
URL format: wss://api.tickdb.ai/ws/market?api_key=YOUR_KEY
"""
symbols_param = ",".join(self.config.symbols)
url = f"{self.BASE_WS_URL}?api_key={self.config.api_key}&symbols={symbols_param}&depth=10"
self.ws = websocket.WebSocketApp(
url,
on_message=self._handle_message,
on_error=self._handle_error,
on_close=self._handle_close,
on_open=self._handle_open
)
self._running = True
logger.info(f"Connecting to TickDB WebSocket for symbols: {self.config.symbols}")
# Run in daemon thread so it does not block the main thread
ws_thread = threading.Thread(target=self.ws.run_forever, daemon=True)
ws_thread.start()
def _handle_open(self, ws):
logger.info("WebSocket connection opened. Starting heartbeat loop.")
self._heartbeat_thread = threading.Thread(target=self._heartbeat_loop, daemon=True)
self._heartbeat_thread.start()
def _heartbeat_loop(self):
"""Send ping frames at configured intervals to keep connection alive."""
while self._running and self.ws and self.ws.sock and self.ws.sock.connected:
try:
self.ws.send(json.dumps({"cmd": "ping"}))
logger.debug("Heartbeat ping sent")
time.sleep(self.config.heartbeat_interval)
except Exception as e:
logger.warning(f"Heartbeat loop interrupted: {e}")
break
def _handle_message(self, ws, message: str):
"""Parse incoming messages. Depth updates invoke the on_depth callback."""
try:
data = json.loads(message)
# Handle pong response
if data.get("cmd") == "pong":
self._last_pong_received = time.time()
return
# Handle error codes
code = data.get("code")
if code == 3001:
retry_after = int(data.get("headers", {}).get("Retry-After", 5))
logger.warning(f"Rate limited. Retrying after {retry_after} seconds.")
time.sleep(retry_after)
return
if code and code not in (0, 200):
logger.error(f"TickDB error code {code}: {data.get('message')}")
return
# Invoke the depth callback with validated data
if "data" in data and "depth" in data["data"]:
self.on_depth(data["data"])
except json.JSONDecodeError as e:
logger.warning(f"Non-JSON message received: {e}")
except Exception as e:
logger.error(f"Error processing depth data: {e}")
def _handle_error(self, ws, error):
logger.error(f"WebSocket error: {error}")
self._schedule_reconnect()
def _handle_close(self, ws, close_status_code, close_msg):
logger.warning(f"WebSocket closed. Status: {close_status_code}, Message: {close_msg}")
self._running = False
self._schedule_reconnect()
def _schedule_reconnect(self):
"""Implement exponential backoff with jitter to prevent thundering herd."""
if self._retry_count >= self.config.reconnect_max_retries:
logger.error("Max reconnection retries reached. Giving up.")
return
delay = min(
self.config.reconnect_base_delay * (2 ** self._retry_count),
self.config.reconnect_max_delay
)
# Add jitter: random.uniform(0, delay * 0.1) to prevent synchronized retries
jitter = random.uniform(0, delay * 0.1)
sleep_time = delay + jitter
self._retry_count += 1
logger.info(f"Scheduling reconnect #{self._retry_count} in {sleep_time:.2f}s")
time.sleep(sleep_time)
self._running = True
# Re-establish connection in new thread
reconnect_thread = threading.Thread(target=self.connect, daemon=True)
reconnect_thread.start()
def stop(self):
"""Gracefully shut down the WebSocket connection."""
logger.info("Stopping TickDB WebSocket client.")
self._running = False
if self.ws:
self.ws.close()
3.3 Order Book Health Metrics from Depth Data
@dataclass
class BookHealthMetrics:
"""Computed metrics from order book depth data."""
spread_bps: float # Bid-ask spread in basis points
bid_depth: float # Total bid volume (top 10 levels)
ask_depth: float # Total ask volume (top 10 levels)
imbalance_ratio: float # bid_depth / (bid_depth + ask_depth)
depth_ratio: float # bid_depth / ask_depth (if ask_depth > 0)
liquidity_flag: str # "normal" | "thin" | "vacuum"
def compute_book_health(depth_data: dict, mid_price: float) -> BookHealthMetrics:
"""
Compute order book health metrics from TickDB depth snapshot.
Args:
depth_data: Parsed depth data from TickDB WebSocket (has 'bids' and 'asks' arrays)
mid_price: Current mid price for spread normalization
Returns:
BookHealthMetrics with computed values
"""
bids = depth_data.get("bids", [])
asks = depth_data.get("asks", [])
# bids and asks are arrays of [price, size] pairs from top N levels
best_bid = float(bids[0][0]) if bids else 0.0
best_ask = float(asks[0][0]) if asks else 0.0
# Spread in basis points
if mid_price > 0 and best_bid > 0 and best_ask > 0:
spread_bps = ((best_ask - best_bid) / mid_price) * 10000
else:
spread_bps = 0.0
# Aggregate depth
bid_depth = sum(float(b[1]) for b in bids)
ask_depth = sum(float(a[1]) for a in asks)
# Imbalance ratio (0 = all ask, 1 = all bid)
total_depth = bid_depth + ask_depth
imbalance_ratio = bid_depth / total_depth if total_depth > 0 else 0.5
# Depth ratio
depth_ratio = bid_depth / ask_depth if ask_depth > 0 else float('inf')
# Liquidity classification
# These thresholds are market-specific; calibrate based on historical data
if total_depth == 0:
liquidity_flag = "vacuum"
elif spread_bps > 50 or depth_ratio > 5 or depth_ratio < 0.2:
liquidity_flag = "thin"
else:
liquidity_flag = "normal"
return BookHealthMetrics(
spread_bps=spread_bps,
bid_depth=bid_depth,
ask_depth=ask_depth,
imbalance_ratio=imbalance_ratio,
depth_ratio=depth_ratio,
liquidity_flag=liquidity_flag
)
def adjust_position_size_for_book_health(
base_size: float,
book_metrics: BookHealthMetrics,
circuit_state: CircuitState
) -> float:
"""
Adjust trade position size based on order book health and circuit breaker state.
Logic:
- If circuit is HALTED: return 0 (no trades)
- If circuit is SUSPENDED: return 0 (no trades)
- If circuit is DEGRADED: apply circuit reduction
- If book is "vacuum": no new entries regardless of other factors
- If book is "thin": reduce size by 50%
- Apply both reductions multiplicatively
"""
if circuit_state in (CircuitState.HALTED, CircuitState.SUSPENDED):
return 0.0
if book_metrics.liquidity_flag == "vacuum":
logger.warning("Order book in vacuum state. Rejecting new entries.")
return 0.0
adjusted_size = base_size
# Apply circuit breaker reduction if in DEGRADED state
if circuit_state == CircuitState.DEGRADED:
adjusted_size *= 0.5
# Apply book health reduction
if book_metrics.liquidity_flag == "thin":
adjusted_size *= 0.5
return adjusted_size
4. Tying It Together: The Supervisory Loop
The following class wires the circuit breaker state machine to the WebSocket client and provides a clean trading integration interface:
import asyncio
from datetime import datetime
class SupervisedTradingStrategy:
"""
A trading strategy wrapped by the circuit breaker state machine.
This is the integration point between market data, strategy logic,
and risk control.
"""
def __init__(
self,
initial_capital: float,
circuit_config: CircuitBreakerConfig,
tickdb_config: TickDBDepthConfig,
symbols: list[str],
base_position_size: float = 1.0
):
self.capital = initial_capital
self.peak_equity = initial_capital
self.base_position_size = base_position_size
# Initialize circuit breaker
self.circuit_breaker = CircuitBreaker(config=circuit_config)
# Initialize TickDB client
self.ws_client = TickDBWebSocketClient(
config=tickdb_config,
on_depth=self._on_depth_update
)
# Current book health (updated on each depth update)
self.current_book_health: Optional[BookHealthMetrics] = None
# Trade log for audit
self.trade_log: list[dict] = []
self._start_of_day = datetime.now().date()
def _on_depth_update(self, depth_data: dict):
"""Callback invoked on each TickDB depth snapshot."""
# Compute mid price from depth data for metric calculation
bids = depth_data.get("bids", [])
asks = depth_data.get("asks", [])
if bids and asks:
mid_price = (float(bids[0][0]) + float(asks[0][0])) / 2
self.current_book_health = compute_book_health(depth_data, mid_price)
def record_trade(self, symbol: str, pnl: float, size: float, price: float):
"""
Record a completed trade and evaluate circuit breaker.
Call this after each trade closes.
"""
self.capital += pnl
self.peak_equity = max(self.peak_equity, self.capital)
new_state = self.circuit_breaker.record_trade(pnl)
self.circuit_breaker.record_equity_update(self.capital)
self.trade_log.append({
"timestamp": datetime.now().isoformat(),
"symbol": symbol,
"pnl": pnl,
"size": size,
"price": price,
"capital_after": self.capital,
"circuit_state": new_state.name,
"drawdown": self.circuit_breaker.current_drawdown
})
logger.info(
f"Trade recorded: {symbol} | PnL: {pnl:+.2f} | "
f"Capital: {self.capital:.2f} | Drawdown: {self.circuit_breaker.current_drawdown:.2%} | "
f"Circuit: {new_state.name}"
)
# Check if manual intervention is required
can_trade, reason = self.circuit_breaker.can_trade()
if not can_trade:
logger.warning(f"TRADING PAUSED: {reason}")
self._trigger_alert(new_state, reason)
def get_approved_position_size(self, symbol: str) -> tuple[float, str]:
"""
Query the maximum safe position size before placing a new trade.
Returns:
(approved_size, reason): The size to trade and the controlling reason
"""
# Check circuit breaker
circuit_ok, circuit_reason = self.circuit_breaker.can_trade()
if not circuit_ok:
return 0.0, circuit_reason
# Check order book health
if self.current_book_health is None:
return self.base_position_size, "No depth data; using base size"
size = adjust_position_size_for_book_health(
base_size=self.base_position_size,
book_metrics=self.current_book_health,
circuit_state=self.circuit_breaker.state
)
reason = (
f"Circuit: {self.circuit_breaker.state.name} | "
f"Book: {self.current_book_health.liquidity_flag} | "
f"Spread: {self.current_book_health.spread_bps:.1f} bps | "
f"Imbalance: {self.current_book_health.imbalance_ratio:.2f}"
)
return size, reason
def _trigger_alert(self, state: CircuitState, reason: str):
"""
Send alert to human supervisor. Integrate with your alerting system here.
Integration options:
- Slack: POST to incoming webhook
- PagerDuty: Events API v2
- Email: SMTP or SendGrid
- Custom webhook: your-ops-endpoint.com/alerts
"""
alert_payload = {
"alert_type": "CIRCUIT_BREAKER_TRIGGERED",
"state": state.name,
"reason": reason,
"capital": self.capital,
"peak_equity": self.peak_equity,
"drawdown": self.circuit_breaker.current_drawdown,
"consecutive_losses": self.circuit_breaker.consecutive_losses,
"trade_log_count": len(self.trade_log),
"timestamp": datetime.now().isoformat()
}
logger.critical(f"CIRCUIT BREAKER ALERT: {json.dumps(alert_payload)}")
# TODO: Replace with actual alerting integration
# requests.post(os.environ["ALERT_WEBHOOK_URL"], json=alert_payload, timeout=5)
def request_manual_intervention(self) -> bool:
"""
Allow human operator to request a circuit reset.
Only available when in DEGRADED or SUSPENDED states.
HALTED state requires explicit human acknowledgment (not via this method).
"""
success = self.circuit_breaker.request_manual_reset()
if success:
logger.info("Manual intervention accepted. Circuit reset to HEALTHY.")
else:
logger.warning("Manual intervention rejected. Circuit is HALTED — manual override required.")
return success
def start(self):
"""Start the WebSocket connection and begin receiving depth data."""
self.ws_client.connect()
logger.info("Supervised trading strategy started.")
def stop(self):
"""Stop the WebSocket connection."""
self.ws_client.stop()
logger.info("Supervised trading strategy stopped.")
5. Manual Intervention Interface
5.1 Why Manual Overrides Cannot Be Fully Automated
The state machine above has one deliberate asymmetry: the HALTED state cannot be reset programmatically. This is by design.
A hard stop (drawdown threshold exceeded) means something has gone wrong at a level the algorithm cannot self-diagnose. It could be:
- A data feed error causing phantom losses
- A regulatory event the strategy was not designed to handle
- A bug in the strategy logic itself
In each case, automated self-recovery risks compounding the damage. The HALTED state requires a human to review the trade log, identify the root cause, and make an informed decision about whether to resume.
5.2 REST API Endpoints for Human Control
from flask import Flask, jsonify, request
# pip install flask
app = Flask(__name__)
# The strategy instance — injected at application startup
strategy: Optional[SupervisedTradingStrategy] = None
@app.route("/api/v1/circuit/status", methods=["GET"])
def get_circuit_status():
"""
Returns the current state of the circuit breaker.
No authentication required for read-only status endpoint (assuming internal network).
For production, add token-based auth.
"""
if strategy is None:
return jsonify({"error": "Strategy not initialized"}), 500
cb = strategy.circuit_breaker
can_trade, reason = cb.can_trade()
return jsonify({
"state": cb.state.name,
"can_trade": can_trade,
"reason": reason,
"metrics": {
"consecutive_losses": cb.consecutive_losses,
"consecutive_wins": cb.consecutive_wins,
"intraday_pnl": cb.intraday_pnl,
"peak_equity": cb.peak_equity,
"current_drawdown": cb.current_drawdown,
"drawdown_threshold": cb.config.drawdown_threshold_pct,
"degraded_since": cb.degraded_since
},
"trade_log_summary": {
"total_trades": len(strategy.trade_log),
"last_trade_time": strategy.trade_log[-1]["timestamp"] if strategy.trade_log else None
}
})
@app.route("/api/v1/circuit/reset", methods=["POST"])
def request_circuit_reset():
"""
Request a manual reset of the circuit breaker.
Only available for DEGRADED and SUSPENDED states.
HALTED state requires explicit human acknowledgment endpoint.
"""
if strategy is None:
return jsonify({"error": "Strategy not initialized"}), 500
# In production, authenticate this endpoint
# auth_token = request.headers.get("X-Admin-Token")
# if auth_token != os.environ.get("ADMIN_TOKEN"):
# return jsonify({"error": "Unauthorized"}), 401
success = strategy.request_manual_intervention()
if success:
return jsonify({
"status": "success",
"new_state": "HEALTHY",
"message": "Circuit breaker reset to HEALTHY. Monitor closely."
})
else:
return jsonify({
"status": "rejected",
"current_state": strategy.circuit_breaker.state.name,
"message": "Circuit is HALTED. Use /api/v1/circuit/hard-reset for explicit override after root cause review."
}), 403
@app.route("/api/v1/circuit/hard-reset", methods=["POST"])
def hard_reset():
"""
Explicit override for HALTED state. Requires JSON body with:
- acknowledgment: "I have reviewed the trade log and confirmed the root cause"
- reason: string describing the root cause and remediation taken
This endpoint should only be accessible to senior operators or automated
incident management systems after human review.
"""
if strategy is None:
return jsonify({"error": "Strategy not initialized"}), 500
if strategy.circuit_breaker.state != CircuitState.HALTED:
return jsonify({
"error": "Hard reset only available in HALTED state"
}), 400
body = request.get_json()
if not body:
return jsonify({"error": "Request body required"}), 400
acknowledgment = body.get("acknowledgment", "")
reason = body.get("reason", "")
if not acknowledgment or not reason:
return jsonify({"error": "Both 'acknowledgment' and 'reason' fields are required"}), 400
# Reset circuit breaker
strategy.circuit_breaker.state = CircuitState.HEALTHY
strategy.circuit_breaker.consecutive_losses = 0
strategy.circuit_breaker.consecutive_wins = 0
strategy.circuit_breaker.degraded_since = None
logger.critical(
f"HARD RESET executed. Reason: {reason}. "
f"Acknowledged by: {acknowledgment}. Timestamp: {datetime.now().isoformat()}"
)
return jsonify({
"status": "success",
"new_state": "HEALTHY",
"hard_reset_reason": reason,
"timestamp": datetime.now().isoformat(),
"warning": "Monitor strategy closely. A second HALTED trigger will require incident report."
})
@app.route("/api/v1/trades", methods=["GET"])
def get_trade_log():
"""Return recent trade log entries."""
if strategy is None:
return jsonify({"error": "Strategy not initialized"}), 500
limit = request.args.get("limit", default=20, type=int)
return jsonify({
"trades": strategy.trade_log[-limit:],
"total_trades": len(strategy.trade_log)
})
6. Deployment Configuration by Scale
The circuit breaker architecture scales from a single Python process to a distributed risk management service. The following table shows recommended deployment configurations:
| User Type | Architecture | Circuit Breaker Scope | TickDB Usage |
|---|---|---|---|
| Individual trader | Single process, Flask API on localhost | Per-strategy instance within the process | depth channel for book health; kline for trend signals |
| Small fund (2–5 traders) | Shared risk service (Flask + Redis) | Shared circuit breaker across strategies; per-strategy override capability | REST /kline for backtesting validation; WebSocket for live depth |
| Institutional team | Microservices architecture; circuit breaker as sidecar or standalone service | Per-strategy, per-desk, and firm-level circuit breakers with escalation hierarchy | TickDB kline for strategy backtesting; depth for pre-trade risk checks |
6.1 Threshold Calibration Guide
Default thresholds are starting points, not recommendations. Calibrate using your strategy's historical performance:
| Threshold | Default | Calibration approach |
|---|---|---|
consecutive_loss_threshold |
3 | Run backtest; set at the 95th percentile of historical consecutive loss streaks |
recovery_window_seconds |
900 (15 min) | Should exceed your strategy's average time to close a trade |
drawdown_threshold_pct |
5% | Calibrate to your risk appetite; institutional desks often use 2–3% |
max_position_size_reduction |
0.5 | In DEGRADED state, half size is a conservative starting point |
7. Closing: The Algorithm That Knows When to Stop
The most dangerous algorithmic trading system is not the one that loses money. It is the one that keeps trading after it has lost enough money to matter.
A circuit breaker built on a state machine is not just a risk control mechanism. It is an admission that your strategy cannot know everything — that regime changes are invisible to the model generating the signals, and that the only reliable observer of system health is a layer that sits above the model and watches it operate.
The architecture in this article gives you that layer:
- State machine logic that tracks consecutive losses, intraday drawdown, and recovery windows.
- TickDB integration that brings order book health signals into the circuit breaker's decision logic.
- Manual intervention APIs that respect the asymmetry between a self-correcting system and a system that has encountered a failure beyond its design horizon.
- Production-grade WebSocket code that survives network partitions, rate limits, and reconnection storms without accumulating state drift.
The momentum algorithm that lost 847 lots at 2:47 AM did not fail because it made bad decisions. It failed because no one had given it a mechanism to ask: "Should I still be running?"
Now you have the blueprint to build that mechanism.
Next Steps
If you're an individual quant trader, download the complete circuit breaker implementation from the TickDB GitHub repository. Start with a paper-trading period to calibrate your thresholds against your strategy's actual win/loss distribution.
If you want to validate this approach with historical data before deploying it live:
- Sign up at tickdb.ai (free tier available, no credit card required)
- Use the
/v1/market/klineendpoint to pull 2+ years of historical data for your target symbols - Run a backtest that simulates the state machine transitions against historical price series
- Identify the 95th percentile of consecutive loss streaks and calibrate your thresholds accordingly
If you need institutional-grade deployment with per-desk circuit breakers, shared state via Redis, and audit-log integration, reach out to enterprise@tickdb.ai for consulting and platform options.
If you're using AI coding assistants, search for the tickdb-market-data SKILL in your tool's marketplace to get started with the TickDB API directly from your AI workflow.
This article does not constitute investment advice. Markets involve risk; past performance does not guarantee future results. Any trading strategy, including those protected by circuit breakers, can suffer significant losses. Backtest results are simulated and subject to limitations including slippage approximation and limited sample size.