The Moment the Machine Breaks

At 2:47 AM on March 15, 2025, a momentum-following algorithm was still buying Bitcoin at $62,400. The trend signal had triggered. The model said "up." What the model could not see was that the entire futures market was in a liquidity vacuum — the order book had thinned to almost nothing after the sudden rate hike announcement.

By the time the strategy's circuit breaker manually intervened — because no automated one existed — it had accumulated 847 lots of long exposure in a market that dropped 12% in the next 18 minutes.

The loss was not the algorithm's fault. The algorithm did exactly what it was designed to do. The fault was in the architecture: there was no state machine watching over the state machine. No layer asking, "Should we still be running?"

This article builds that layer.


1. The Problem with "Just Add a Stop-Loss"

Traditional stop-loss logic operates at the position level. It asks: "Should this specific trade be closed?" It does not ask: "Should the strategy itself continue operating?"

This distinction matters because catastrophic failures in algorithmic trading rarely come from individual bad trades. They come from correlated sequences — a regime change where every signal in your model fires in the wrong direction simultaneously, and the strategy keeps faithfully executing against a market that has fundamentally shifted.

Consider the failure modes:

Failure Mode Description Why a Simple Stop-Loss Fails
Consecutive loss spiral Strategy hits N consecutive losing trades. Market regime has changed but strategy does not know. Position-level stop does not know how many losses preceded this trade.
Intraday drawdown cascade Multiple positions drift against you throughout the day. Individual stops do not trigger, but cumulative drawdown is extreme. Stops are per-position, not per-strategy portfolio.
Latency blind spot Network latency or data gaps cause the strategy to trade on stale signals during a fast market. A stop-loss on position P&L does not capture signal staleness.
Correlation breakdown Strategy assumes diversification across assets, but in tail events, correlation approaches 1. No strategy-level awareness of cross-position correlation risk.

The solution is a supervisory state machine — a layer above your trading logic that monitors strategy-level health metrics and enforces circuit breaker states independently of any individual trade's outcome.


2. Designing the Circuit Breaker State Machine

2.1 State Diagram

The circuit breaker operates across four discrete states. Transitions between states are triggered by observable events, not by hard-coded timers.

                                    ┌─────────────────────────────────────────────┐
                                    │                                             │
                                    ▼                                             │
┌──────────┐    loss_threshold_met    ┌──────────────┐    consecutive_losses == 0   │
│  HEALTHY │ ──────────────────────▶  │  DEGRADED    │ ◀────────────────────────── │
└──────────┘                          └──────────────┘                               │
     ▲                                     │                                        │
     │                                     │                                         │
     │         recovery_window_elapsed      │                                         │
     │         without violation            │                                         │
     │                                     ▼                                         │
     │                              ┌──────────────┐    manual_reset_requested       │
     └─────────────────────────────│  SUSPENDED   │ ───────────────────────────────▶│
                                    └──────────────┘                                │
                                         ▲                                          │
                                         │                                          │
                                         │    drawdown_threshold_met                │
                                         └──────────────────────────────────────────┘
                                                       │
                                                       ▼
                                                 ┌───────────┐
                                                 │   HALTED   │
                                                 └───────────┘

2.2 State Definitions

State Definition What the Strategy Can Do
HEALTHY All metrics within normal parameters. Normal operation.
DEGRADED Consecutive losses have reached the configured threshold, but drawdown is still within limits. Reduce position size by 50%. Increase confirmation requirements.
SUSPENDED Strategy has been in DEGRADED state beyond the recovery window without recovering, OR manual intervention was triggered. No new trades. Existing positions managed by risk manager only.
HALTED Intraday drawdown has exceeded the hard stop threshold. Complete shutdown. No position management. Manual reset required.

2.3 Transition Trigger Logic

from dataclasses import dataclass, field
from enum import Enum, auto
from typing import Optional
import time


class CircuitState(Enum):
    HEALTHY = auto()
    DEGRADED = auto()
    SUSPENDED = auto()
    HALTED = auto()


@dataclass
class CircuitBreakerConfig:
    """Configuration for the circuit breaker state machine."""
    consecutive_loss_threshold: int = 3      # Trigger DEGRADED after N consecutive losses
    recovery_window_seconds: int = 900       # 15-minute window to recover from DEGRADED
    drawdown_threshold_pct: float = 0.05     # Hard stop at 5% intraday drawdown
    max_position_size_reduction: float = 0.5 # Reduce to 50% in DEGRADED state


@dataclass
class CircuitBreaker:
    """Supervisory state machine for trading strategy risk control."""
    config: CircuitBreakerConfig
    state: CircuitState = field(default=CircuitState.HEALTHY)
    
    # Internal counters
    consecutive_losses: int = 0
    consecutive_wins: int = 0
    intraday_pnl: float = 0.0
    peak_equity: float = 0.0
    current_drawdown: float = 0.0
    
    # Timestamps
    degraded_since: Optional[float] = None
    last_trade_time: Optional[float] = None
    
    def record_trade(self, pnl: float, trade_time: Optional[float] = None) -> CircuitState:
        """
        Record a trade outcome and evaluate state transitions.
        Returns the new state after evaluation.
        
        Args:
            pnl: Profit (+) or loss (-) from the completed trade
            trade_time: Unix timestamp of trade execution. Defaults to current time.
        """
        self.last_trade_time = trade_time or time.time()
        
        if pnl >= 0:
            self.consecutive_losses = 0
            self.consecutive_wins += 1
        else:
            self.consecutive_losses += 1
            self.consecutive_wins = 0
        
        self.intraday_pnl += pnl
        self._update_drawdown()
        
        return self._evaluate_transitions()
    
    def record_equity_update(self, current_equity: float) -> CircuitState:
        """
        Update peak equity and drawdown metrics. Call this at each bar close.
        """
        if current_equity > self.peak_equity:
            self.peak_equity = current_equity
        
        self._update_drawdown()
        return self._evaluate_transitions()
    
    def _update_drawdown(self):
        """Recalculate current drawdown from peak equity."""
        if self.peak_equity > 0:
            self.current_drawdown = (self.peak_equity - 
                (self.peak_equity + self.intraday_pnl)) / self.peak_equity
    
    def _evaluate_transitions(self) -> CircuitState:
        """
        Evaluate all transition conditions and update state accordingly.
        State machine logic follows a strict priority order.
        """
        previous_state = self.state
        
        # Priority 1: HALTED state can only be reached from drawdown threshold
        if self.current_drawdown >= self.config.drawdown_threshold_pct:
            self.state = CircuitState.HALTED
            return self.state
        
        # Priority 2: Check DEGRADED conditions
        if (self.state == CircuitState.HEALTHY and 
            self.consecutive_losses >= self.config.consecutive_loss_threshold):
            self.state = CircuitState.DEGRADED
            self.degraded_since = time.time()
            return self.state
        
        # Priority 3: Check recovery from DEGRADED
        if self.state == CircuitState.DEGRADED:
            if self.consecutive_wins > 0:
                # Recovered via winning trades
                self.state = CircuitState.HEALTHY
                self.degraded_since = None
                return self.state
            if self.degraded_since is not None:
                elapsed = time.time() - self.degraded_since
                if elapsed >= self.config.recovery_window_seconds:
                    self.state = CircuitState.SUSPENDED
                    return self.state
        
        return self.state
    
    def request_manual_reset(self) -> bool:
        """
        Request manual intervention to reset from SUSPENDED or HALTED state.
        Only succeeds if the current state is not HALTED (hard stop requires
        human review).
        """
        if self.state == CircuitState.HALTED:
            # Hard stop requires explicit human acknowledgment
            return False
        
        if self.state in (CircuitState.DEGRADED, CircuitState.SUSPENDED):
            self._reset_counters()
            self.state = CircuitState.HEALTHY
            return True
        
        return False
    
    def _reset_counters(self):
        """Reset all metrics after manual intervention."""
        self.consecutive_losses = 0
        self.consecutive_wins = 0
        self.degraded_since = None
        # Note: intraday_pnl, peak_equity, and current_drawdown are NOT reset
        # on manual intervention — these represent actual portfolio state
    
    def can_trade(self) -> tuple[bool, str]:
        """
        Query whether new trades should be permitted in the current state.
        Returns (can_trade: bool, reason: str).
        """
        if self.state == CircuitState.HALTED:
            return False, f"Circuit breaker HALTED: drawdown {self.current_drawdown:.2%} exceeds threshold {self.config.drawdown_threshold_pct:.2%}"
        
        if self.state == CircuitState.SUSPENDED:
            return False, "Circuit breaker SUSPENDED: strategy requires manual reset after extended degradation"
        
        if self.state == CircuitState.DEGRADED:
            return True, f"Circuit breaker DEGRADED: position size reduced to {self.config.max_position_size_reduction:.0%}"
        
        return True, "Circuit breaker HEALTHY: normal operation"

3. Production-Grade Integration with TickDB Depth Data

The state machine above is the logical layer. The data layer is where it connects to real-time market conditions. This section shows how to wire the circuit breaker to TickDB's WebSocket depth channel, so the strategy can make position sizing decisions based on live order book health — not just historical trade P&L.

3.1 Why Order Book Depth Matters for Circuit Breaker Design

A circuit breaker that only reacts to P&L is inherently lagging. By the time you record three consecutive losses, the market may already be in a liquidity vacuum where every trade you take will be at terrible fills. A forward-looking circuit breaker incorporates order book health signals:

  • Bid-ask spread widening → Market making is expensive; reduce frequency.
  • Depth shrinking at best bid/best ask → Liquidity withdrawal; reduce position size.
  • Order book imbalance escalating → One-sided pressure; pause entries.

3.2 WebSocket Connection with Heartbeat, Reconnection, and Rate-Limit Handling

import os
import json
import time
import random
import threading
import logging
from typing import Callable, Optional
from dataclasses import dataclass

import websocket  # pip install websocket-client
import requests

logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s")
logger = logging.getLogger(__name__)


@dataclass
class TickDBDepthConfig:
    """Configuration for TickDB WebSocket depth subscription."""
    api_key: str
    symbols: list[str]  # e.g., ["BTC-USD"]
    reconnect_max_retries: int = 10
    reconnect_base_delay: float = 1.0
    reconnect_max_delay: float = 60.0
    heartbeat_interval: float = 20.0  # TickDB recommends ping every 20 seconds
    timeout_seconds: float = 30.0


class TickDBWebSocketClient:
    """
    Production-grade WebSocket client for TickDB market data.
    Features: heartbeat, exponential backoff with jitter, rate-limit handling,
    thread-safe message dispatch, and env-var-based authentication.
    """
    
    BASE_WS_URL = "wss://api.tickdb.ai/ws/market"
    
    def __init__(self, config: TickDBDepthConfig, on_depth: Callable[[dict], None]):
        """
        Args:
            config: Connection and subscription configuration
            on_depth: Callback function invoked with depth snapshot on each update
        """
        self.config = config
        self.on_depth = on_depth
        self.ws: Optional[websocket.WebSocketApp] = None
        self._running = False
        self._retry_count = 0
        self._last_pong_received: Optional[float] = None
        self._lock = threading.Lock()
    
    def connect(self):
        """
        Establish WebSocket connection with authentication.
        Authentication is via URL parameter — not header — for WebSocket connections.
        
        URL format: wss://api.tickdb.ai/ws/market?api_key=YOUR_KEY
        """
        symbols_param = ",".join(self.config.symbols)
        url = f"{self.BASE_WS_URL}?api_key={self.config.api_key}&symbols={symbols_param}&depth=10"
        
        self.ws = websocket.WebSocketApp(
            url,
            on_message=self._handle_message,
            on_error=self._handle_error,
            on_close=self._handle_close,
            on_open=self._handle_open
        )
        
        self._running = True
        logger.info(f"Connecting to TickDB WebSocket for symbols: {self.config.symbols}")
        
        # Run in daemon thread so it does not block the main thread
        ws_thread = threading.Thread(target=self.ws.run_forever, daemon=True)
        ws_thread.start()
    
    def _handle_open(self, ws):
        logger.info("WebSocket connection opened. Starting heartbeat loop.")
        self._heartbeat_thread = threading.Thread(target=self._heartbeat_loop, daemon=True)
        self._heartbeat_thread.start()
    
    def _heartbeat_loop(self):
        """Send ping frames at configured intervals to keep connection alive."""
        while self._running and self.ws and self.ws.sock and self.ws.sock.connected:
            try:
                self.ws.send(json.dumps({"cmd": "ping"}))
                logger.debug("Heartbeat ping sent")
                time.sleep(self.config.heartbeat_interval)
            except Exception as e:
                logger.warning(f"Heartbeat loop interrupted: {e}")
                break
    
    def _handle_message(self, ws, message: str):
        """Parse incoming messages. Depth updates invoke the on_depth callback."""
        try:
            data = json.loads(message)
            
            # Handle pong response
            if data.get("cmd") == "pong":
                self._last_pong_received = time.time()
                return
            
            # Handle error codes
            code = data.get("code")
            if code == 3001:
                retry_after = int(data.get("headers", {}).get("Retry-After", 5))
                logger.warning(f"Rate limited. Retrying after {retry_after} seconds.")
                time.sleep(retry_after)
                return
            
            if code and code not in (0, 200):
                logger.error(f"TickDB error code {code}: {data.get('message')}")
                return
            
            # Invoke the depth callback with validated data
            if "data" in data and "depth" in data["data"]:
                self.on_depth(data["data"])
                
        except json.JSONDecodeError as e:
            logger.warning(f"Non-JSON message received: {e}")
        except Exception as e:
            logger.error(f"Error processing depth data: {e}")
    
    def _handle_error(self, ws, error):
        logger.error(f"WebSocket error: {error}")
        self._schedule_reconnect()
    
    def _handle_close(self, ws, close_status_code, close_msg):
        logger.warning(f"WebSocket closed. Status: {close_status_code}, Message: {close_msg}")
        self._running = False
        self._schedule_reconnect()
    
    def _schedule_reconnect(self):
        """Implement exponential backoff with jitter to prevent thundering herd."""
        if self._retry_count >= self.config.reconnect_max_retries:
            logger.error("Max reconnection retries reached. Giving up.")
            return
        
        delay = min(
            self.config.reconnect_base_delay * (2 ** self._retry_count),
            self.config.reconnect_max_delay
        )
        # Add jitter: random.uniform(0, delay * 0.1) to prevent synchronized retries
        jitter = random.uniform(0, delay * 0.1)
        sleep_time = delay + jitter
        
        self._retry_count += 1
        logger.info(f"Scheduling reconnect #{self._retry_count} in {sleep_time:.2f}s")
        
        time.sleep(sleep_time)
        self._running = True
        
        # Re-establish connection in new thread
        reconnect_thread = threading.Thread(target=self.connect, daemon=True)
        reconnect_thread.start()
    
    def stop(self):
        """Gracefully shut down the WebSocket connection."""
        logger.info("Stopping TickDB WebSocket client.")
        self._running = False
        if self.ws:
            self.ws.close()

3.3 Order Book Health Metrics from Depth Data

@dataclass
class BookHealthMetrics:
    """Computed metrics from order book depth data."""
    spread_bps: float                    # Bid-ask spread in basis points
    bid_depth: float                    # Total bid volume (top 10 levels)
    ask_depth: float                    # Total ask volume (top 10 levels)
    imbalance_ratio: float             # bid_depth / (bid_depth + ask_depth)
    depth_ratio: float                  # bid_depth / ask_depth (if ask_depth > 0)
    liquidity_flag: str                 # "normal" | "thin" | "vacuum"


def compute_book_health(depth_data: dict, mid_price: float) -> BookHealthMetrics:
    """
    Compute order book health metrics from TickDB depth snapshot.
    
    Args:
        depth_data: Parsed depth data from TickDB WebSocket (has 'bids' and 'asks' arrays)
        mid_price: Current mid price for spread normalization
    
    Returns:
        BookHealthMetrics with computed values
    """
    bids = depth_data.get("bids", [])
    asks = depth_data.get("asks", [])
    
    # bids and asks are arrays of [price, size] pairs from top N levels
    best_bid = float(bids[0][0]) if bids else 0.0
    best_ask = float(asks[0][0]) if asks else 0.0
    
    # Spread in basis points
    if mid_price > 0 and best_bid > 0 and best_ask > 0:
        spread_bps = ((best_ask - best_bid) / mid_price) * 10000
    else:
        spread_bps = 0.0
    
    # Aggregate depth
    bid_depth = sum(float(b[1]) for b in bids)
    ask_depth = sum(float(a[1]) for a in asks)
    
    # Imbalance ratio (0 = all ask, 1 = all bid)
    total_depth = bid_depth + ask_depth
    imbalance_ratio = bid_depth / total_depth if total_depth > 0 else 0.5
    
    # Depth ratio
    depth_ratio = bid_depth / ask_depth if ask_depth > 0 else float('inf')
    
    # Liquidity classification
    # These thresholds are market-specific; calibrate based on historical data
    if total_depth == 0:
        liquidity_flag = "vacuum"
    elif spread_bps > 50 or depth_ratio > 5 or depth_ratio < 0.2:
        liquidity_flag = "thin"
    else:
        liquidity_flag = "normal"
    
    return BookHealthMetrics(
        spread_bps=spread_bps,
        bid_depth=bid_depth,
        ask_depth=ask_depth,
        imbalance_ratio=imbalance_ratio,
        depth_ratio=depth_ratio,
        liquidity_flag=liquidity_flag
    )


def adjust_position_size_for_book_health(
    base_size: float,
    book_metrics: BookHealthMetrics,
    circuit_state: CircuitState
) -> float:
    """
    Adjust trade position size based on order book health and circuit breaker state.
    
    Logic:
    - If circuit is HALTED: return 0 (no trades)
    - If circuit is SUSPENDED: return 0 (no trades)
    - If circuit is DEGRADED: apply circuit reduction
    - If book is "vacuum": no new entries regardless of other factors
    - If book is "thin": reduce size by 50%
    - Apply both reductions multiplicatively
    """
    if circuit_state in (CircuitState.HALTED, CircuitState.SUSPENDED):
        return 0.0
    
    if book_metrics.liquidity_flag == "vacuum":
        logger.warning("Order book in vacuum state. Rejecting new entries.")
        return 0.0
    
    adjusted_size = base_size
    
    # Apply circuit breaker reduction if in DEGRADED state
    if circuit_state == CircuitState.DEGRADED:
        adjusted_size *= 0.5
    
    # Apply book health reduction
    if book_metrics.liquidity_flag == "thin":
        adjusted_size *= 0.5
    
    return adjusted_size

4. Tying It Together: The Supervisory Loop

The following class wires the circuit breaker state machine to the WebSocket client and provides a clean trading integration interface:

import asyncio
from datetime import datetime


class SupervisedTradingStrategy:
    """
    A trading strategy wrapped by the circuit breaker state machine.
    This is the integration point between market data, strategy logic,
    and risk control.
    """
    
    def __init__(
        self,
        initial_capital: float,
        circuit_config: CircuitBreakerConfig,
        tickdb_config: TickDBDepthConfig,
        symbols: list[str],
        base_position_size: float = 1.0
    ):
        self.capital = initial_capital
        self.peak_equity = initial_capital
        self.base_position_size = base_position_size
        
        # Initialize circuit breaker
        self.circuit_breaker = CircuitBreaker(config=circuit_config)
        
        # Initialize TickDB client
        self.ws_client = TickDBWebSocketClient(
            config=tickdb_config,
            on_depth=self._on_depth_update
        )
        
        # Current book health (updated on each depth update)
        self.current_book_health: Optional[BookHealthMetrics] = None
        
        # Trade log for audit
        self.trade_log: list[dict] = []
        self._start_of_day = datetime.now().date()
    
    def _on_depth_update(self, depth_data: dict):
        """Callback invoked on each TickDB depth snapshot."""
        # Compute mid price from depth data for metric calculation
        bids = depth_data.get("bids", [])
        asks = depth_data.get("asks", [])
        if bids and asks:
            mid_price = (float(bids[0][0]) + float(asks[0][0])) / 2
            self.current_book_health = compute_book_health(depth_data, mid_price)
    
    def record_trade(self, symbol: str, pnl: float, size: float, price: float):
        """
        Record a completed trade and evaluate circuit breaker.
        Call this after each trade closes.
        """
        self.capital += pnl
        self.peak_equity = max(self.peak_equity, self.capital)
        
        new_state = self.circuit_breaker.record_trade(pnl)
        self.circuit_breaker.record_equity_update(self.capital)
        
        self.trade_log.append({
            "timestamp": datetime.now().isoformat(),
            "symbol": symbol,
            "pnl": pnl,
            "size": size,
            "price": price,
            "capital_after": self.capital,
            "circuit_state": new_state.name,
            "drawdown": self.circuit_breaker.current_drawdown
        })
        
        logger.info(
            f"Trade recorded: {symbol} | PnL: {pnl:+.2f} | "
            f"Capital: {self.capital:.2f} | Drawdown: {self.circuit_breaker.current_drawdown:.2%} | "
            f"Circuit: {new_state.name}"
        )
        
        # Check if manual intervention is required
        can_trade, reason = self.circuit_breaker.can_trade()
        if not can_trade:
            logger.warning(f"TRADING PAUSED: {reason}")
            self._trigger_alert(new_state, reason)
    
    def get_approved_position_size(self, symbol: str) -> tuple[float, str]:
        """
        Query the maximum safe position size before placing a new trade.
        
        Returns:
            (approved_size, reason): The size to trade and the controlling reason
        """
        # Check circuit breaker
        circuit_ok, circuit_reason = self.circuit_breaker.can_trade()
        if not circuit_ok:
            return 0.0, circuit_reason
        
        # Check order book health
        if self.current_book_health is None:
            return self.base_position_size, "No depth data; using base size"
        
        size = adjust_position_size_for_book_health(
            base_size=self.base_position_size,
            book_metrics=self.current_book_health,
            circuit_state=self.circuit_breaker.state
        )
        
        reason = (
            f"Circuit: {self.circuit_breaker.state.name} | "
            f"Book: {self.current_book_health.liquidity_flag} | "
            f"Spread: {self.current_book_health.spread_bps:.1f} bps | "
            f"Imbalance: {self.current_book_health.imbalance_ratio:.2f}"
        )
        
        return size, reason
    
    def _trigger_alert(self, state: CircuitState, reason: str):
        """
        Send alert to human supervisor. Integrate with your alerting system here.
        
        Integration options:
        - Slack: POST to incoming webhook
        - PagerDuty: Events API v2
        - Email: SMTP or SendGrid
        - Custom webhook: your-ops-endpoint.com/alerts
        """
        alert_payload = {
            "alert_type": "CIRCUIT_BREAKER_TRIGGERED",
            "state": state.name,
            "reason": reason,
            "capital": self.capital,
            "peak_equity": self.peak_equity,
            "drawdown": self.circuit_breaker.current_drawdown,
            "consecutive_losses": self.circuit_breaker.consecutive_losses,
            "trade_log_count": len(self.trade_log),
            "timestamp": datetime.now().isoformat()
        }
        
        logger.critical(f"CIRCUIT BREAKER ALERT: {json.dumps(alert_payload)}")
        # TODO: Replace with actual alerting integration
        # requests.post(os.environ["ALERT_WEBHOOK_URL"], json=alert_payload, timeout=5)
    
    def request_manual_intervention(self) -> bool:
        """
        Allow human operator to request a circuit reset.
        Only available when in DEGRADED or SUSPENDED states.
        HALTED state requires explicit human acknowledgment (not via this method).
        """
        success = self.circuit_breaker.request_manual_reset()
        if success:
            logger.info("Manual intervention accepted. Circuit reset to HEALTHY.")
        else:
            logger.warning("Manual intervention rejected. Circuit is HALTED — manual override required.")
        return success
    
    def start(self):
        """Start the WebSocket connection and begin receiving depth data."""
        self.ws_client.connect()
        logger.info("Supervised trading strategy started.")
    
    def stop(self):
        """Stop the WebSocket connection."""
        self.ws_client.stop()
        logger.info("Supervised trading strategy stopped.")

5. Manual Intervention Interface

5.1 Why Manual Overrides Cannot Be Fully Automated

The state machine above has one deliberate asymmetry: the HALTED state cannot be reset programmatically. This is by design.

A hard stop (drawdown threshold exceeded) means something has gone wrong at a level the algorithm cannot self-diagnose. It could be:

  • A data feed error causing phantom losses
  • A regulatory event the strategy was not designed to handle
  • A bug in the strategy logic itself

In each case, automated self-recovery risks compounding the damage. The HALTED state requires a human to review the trade log, identify the root cause, and make an informed decision about whether to resume.

5.2 REST API Endpoints for Human Control

from flask import Flask, jsonify, request
# pip install flask

app = Flask(__name__)

# The strategy instance — injected at application startup
strategy: Optional[SupervisedTradingStrategy] = None


@app.route("/api/v1/circuit/status", methods=["GET"])
def get_circuit_status():
    """
    Returns the current state of the circuit breaker.
    No authentication required for read-only status endpoint (assuming internal network).
    For production, add token-based auth.
    """
    if strategy is None:
        return jsonify({"error": "Strategy not initialized"}), 500
    
    cb = strategy.circuit_breaker
    can_trade, reason = cb.can_trade()
    
    return jsonify({
        "state": cb.state.name,
        "can_trade": can_trade,
        "reason": reason,
        "metrics": {
            "consecutive_losses": cb.consecutive_losses,
            "consecutive_wins": cb.consecutive_wins,
            "intraday_pnl": cb.intraday_pnl,
            "peak_equity": cb.peak_equity,
            "current_drawdown": cb.current_drawdown,
            "drawdown_threshold": cb.config.drawdown_threshold_pct,
            "degraded_since": cb.degraded_since
        },
        "trade_log_summary": {
            "total_trades": len(strategy.trade_log),
            "last_trade_time": strategy.trade_log[-1]["timestamp"] if strategy.trade_log else None
        }
    })


@app.route("/api/v1/circuit/reset", methods=["POST"])
def request_circuit_reset():
    """
    Request a manual reset of the circuit breaker.
    Only available for DEGRADED and SUSPENDED states.
    HALTED state requires explicit human acknowledgment endpoint.
    """
    if strategy is None:
        return jsonify({"error": "Strategy not initialized"}), 500
    
    # In production, authenticate this endpoint
    # auth_token = request.headers.get("X-Admin-Token")
    # if auth_token != os.environ.get("ADMIN_TOKEN"):
    #     return jsonify({"error": "Unauthorized"}), 401
    
    success = strategy.request_manual_intervention()
    
    if success:
        return jsonify({
            "status": "success",
            "new_state": "HEALTHY",
            "message": "Circuit breaker reset to HEALTHY. Monitor closely."
        })
    else:
        return jsonify({
            "status": "rejected",
            "current_state": strategy.circuit_breaker.state.name,
            "message": "Circuit is HALTED. Use /api/v1/circuit/hard-reset for explicit override after root cause review."
        }), 403


@app.route("/api/v1/circuit/hard-reset", methods=["POST"])
def hard_reset():
    """
    Explicit override for HALTED state. Requires JSON body with:
    - acknowledgment: "I have reviewed the trade log and confirmed the root cause"
    - reason: string describing the root cause and remediation taken
    
    This endpoint should only be accessible to senior operators or automated
    incident management systems after human review.
    """
    if strategy is None:
        return jsonify({"error": "Strategy not initialized"}), 500
    
    if strategy.circuit_breaker.state != CircuitState.HALTED:
        return jsonify({
            "error": "Hard reset only available in HALTED state"
        }), 400
    
    body = request.get_json()
    if not body:
        return jsonify({"error": "Request body required"}), 400
    
    acknowledgment = body.get("acknowledgment", "")
    reason = body.get("reason", "")
    
    if not acknowledgment or not reason:
        return jsonify({"error": "Both 'acknowledgment' and 'reason' fields are required"}), 400
    
    # Reset circuit breaker
    strategy.circuit_breaker.state = CircuitState.HEALTHY
    strategy.circuit_breaker.consecutive_losses = 0
    strategy.circuit_breaker.consecutive_wins = 0
    strategy.circuit_breaker.degraded_since = None
    
    logger.critical(
        f"HARD RESET executed. Reason: {reason}. "
        f"Acknowledged by: {acknowledgment}. Timestamp: {datetime.now().isoformat()}"
    )
    
    return jsonify({
        "status": "success",
        "new_state": "HEALTHY",
        "hard_reset_reason": reason,
        "timestamp": datetime.now().isoformat(),
        "warning": "Monitor strategy closely. A second HALTED trigger will require incident report."
    })


@app.route("/api/v1/trades", methods=["GET"])
def get_trade_log():
    """Return recent trade log entries."""
    if strategy is None:
        return jsonify({"error": "Strategy not initialized"}), 500
    
    limit = request.args.get("limit", default=20, type=int)
    return jsonify({
        "trades": strategy.trade_log[-limit:],
        "total_trades": len(strategy.trade_log)
    })

6. Deployment Configuration by Scale

The circuit breaker architecture scales from a single Python process to a distributed risk management service. The following table shows recommended deployment configurations:

User Type Architecture Circuit Breaker Scope TickDB Usage
Individual trader Single process, Flask API on localhost Per-strategy instance within the process depth channel for book health; kline for trend signals
Small fund (2–5 traders) Shared risk service (Flask + Redis) Shared circuit breaker across strategies; per-strategy override capability REST /kline for backtesting validation; WebSocket for live depth
Institutional team Microservices architecture; circuit breaker as sidecar or standalone service Per-strategy, per-desk, and firm-level circuit breakers with escalation hierarchy TickDB kline for strategy backtesting; depth for pre-trade risk checks

6.1 Threshold Calibration Guide

Default thresholds are starting points, not recommendations. Calibrate using your strategy's historical performance:

Threshold Default Calibration approach
consecutive_loss_threshold 3 Run backtest; set at the 95th percentile of historical consecutive loss streaks
recovery_window_seconds 900 (15 min) Should exceed your strategy's average time to close a trade
drawdown_threshold_pct 5% Calibrate to your risk appetite; institutional desks often use 2–3%
max_position_size_reduction 0.5 In DEGRADED state, half size is a conservative starting point

7. Closing: The Algorithm That Knows When to Stop

The most dangerous algorithmic trading system is not the one that loses money. It is the one that keeps trading after it has lost enough money to matter.

A circuit breaker built on a state machine is not just a risk control mechanism. It is an admission that your strategy cannot know everything — that regime changes are invisible to the model generating the signals, and that the only reliable observer of system health is a layer that sits above the model and watches it operate.

The architecture in this article gives you that layer:

  • State machine logic that tracks consecutive losses, intraday drawdown, and recovery windows.
  • TickDB integration that brings order book health signals into the circuit breaker's decision logic.
  • Manual intervention APIs that respect the asymmetry between a self-correcting system and a system that has encountered a failure beyond its design horizon.
  • Production-grade WebSocket code that survives network partitions, rate limits, and reconnection storms without accumulating state drift.

The momentum algorithm that lost 847 lots at 2:47 AM did not fail because it made bad decisions. It failed because no one had given it a mechanism to ask: "Should I still be running?"

Now you have the blueprint to build that mechanism.


Next Steps

If you're an individual quant trader, download the complete circuit breaker implementation from the TickDB GitHub repository. Start with a paper-trading period to calibrate your thresholds against your strategy's actual win/loss distribution.

If you want to validate this approach with historical data before deploying it live:

  1. Sign up at tickdb.ai (free tier available, no credit card required)
  2. Use the /v1/market/kline endpoint to pull 2+ years of historical data for your target symbols
  3. Run a backtest that simulates the state machine transitions against historical price series
  4. Identify the 95th percentile of consecutive loss streaks and calibrate your thresholds accordingly

If you need institutional-grade deployment with per-desk circuit breakers, shared state via Redis, and audit-log integration, reach out to enterprise@tickdb.ai for consulting and platform options.

If you're using AI coding assistants, search for the tickdb-market-data SKILL in your tool's marketplace to get started with the TickDB API directly from your AI workflow.


This article does not constitute investment advice. Markets involve risk; past performance does not guarantee future results. Any trading strategy, including those protected by circuit breakers, can suffer significant losses. Backtest results are simulated and subject to limitations including slippage approximation and limited sample size.