"Execution is the strategy. Everything else is just theory."

A backtest shows your trend-following system returning 18% annualized with a Sharpe of 1.4. You deploy it. Six months later, you are up 3.2%. The gap between backtest and reality is almost never a flawed alpha. It is slippage. Every entry delayed by 200 milliseconds, every fill priced 3 cents above your signal, every spread widening at the exact moment you need liquidity most — these compound into a 15-percentage-point drag that no因子模型 will catch.

For US equities, the National Best Bid and Offer (NBBO) is your primary tool for controlling this drag. NBBO represents the best available price across all US exchanges for a given security at any instant. Before you enter a position triggered by a breakout signal, you can interrogate the NBBO and its associated depth to estimate realistic execution cost. This article dissects the microstructure of NBBO-based entry, provides production-grade Python code for real-time quote monitoring, and shows how to integrate slippage estimation into your order execution decision.


The NBBO Microstructure: What the Spread Actually Tells You

NBBO consists of two prices: the best bid (highest price a buyer will pay) and the best ask (lowest price a seller will accept). The difference is the spread, and it is the first-order cost of any entry.

Consider the order book snapshot for a hypothetical large-cap stock at the moment a breakout triggers:

Timestamp Bid Price Bid Size Ask Price Ask Size Spread (bps) Pressure Ratio
Signal −1s $150.20 8,400 $150.22 6,200 1.33 1.35
Signal +0s $150.20 4,100 $150.25 9,800 3.33 0.42
Signal +2s $150.18 2,200 $150.32 14,200 9.33 0.15
Signal +5s $150.15 1,800 $150.38 16,500 15.33 0.11

The spread widens from 1.33 basis points to 15.33 basis points within 5 seconds of the signal. The bid side thins from 8,400 shares to 1,800 shares. The pressure ratio — the ratio of cumulative bid size to cumulative ask size at the top five levels — collapses from 1.35 to 0.11. This is a liquidity vacuum. Entering aggressively here means paying the full ask plus significant market impact.

The pressure ratio is the key derived metric:

Buy/Sell Pressure Ratio = Σ(bid_size, top N levels) / Σ(ask_size, top N levels)

A ratio above 1.0 indicates buying pressure; below 1.0 indicates selling pressure. In the context of trend-following entry, a ratio above 1.5 gives you favorable conditions — the book is thick on the bid, meaning market makers are confident enough to post large orders at the current price. A ratio below 0.7 is a warning signal: the book isthin and skewed against you.

Why NBBO Data Matters More Than Midpoint Price

Many traders use the midpoint price ((bid + ask) / 2) as their entry estimate. This is a dangerous simplification. The midpoint assumes you can always execute at the middle of the spread, which is only true in slow, low-volatility markets. At the moment of a breakout signal, the spread is not stationary. It is widening. The midpoint is a moving target that will be further from your actual fill price than the NBBO ask suggests.

The NBBO ask is your worst-case entry price. Your slippage estimate should be:

Estimated Slippage = (Fill Price − Signal Price) / Signal Price
                  = (Ask Price − Signal Price) / Signal Price   [for aggressive buy]

For a signal at $150.20 and an ask at $150.25, your slippage is 5 bps. On a $10 million position, that is $5,000 in immediate cost before commission. Your strategy needs to generate more than 5 bps of alpha in the first few minutes after entry to break even. NBBO monitoring lets you screen entries in real time and defer when conditions are hostile.


Trend-Following Entry Logic: Three-Phase Decision Framework

A robust NBBO-aware trend-following entry operates in three phases.

Phase 1: Pre-Signal Baseline (T − 60s to T − 5s)

Before the breakout signal fires, establish a baseline for the security's typical spread and pressure ratio. This is not optional — you cannot interpret a pressure ratio of 0.8 without knowing whether the stock normally trades at 0.5 or at 1.5. Pull a 20-minute rolling window of NBBO snapshots. Record the median spread in bps, the median pressure ratio, and the standard deviation of both. These are your context parameters.

If the current readings are within one standard deviation of the baseline, the book is in its normal state. If the spread is 2 standard deviations above the median, flag this as elevated volatility. If the pressure ratio is collapsing, flag this as a potential liquidity event.

Phase 2: Signal Trigger + NBBO Evaluation (T = 0 to T + 30s)

When the breakout signal fires, immediately capture the NBBO. Do not wait. Evaluate three conditions simultaneously:

  1. Spread threshold: If the current spread exceeds 1.5× the 20-minute median, the cost of entry is elevated. Set a flag.
  2. Depth threshold: If the top-of-book bid size is below 50% of the 20-minute median bid size, market makers are withdrawing. Set a flag.
  3. Pressure ratio threshold: If the pressure ratio falls below 0.7, the book is heavily skewed against buyers. Set a flag.

If zero flags are raised: proceed with market-on-close or aggressive limit order at ask. If one flag is raised: use a passive limit order at midpoint or better, accepting a delay of up to 15 seconds. If two or more flags are raised: defer entry by 30 seconds and re-evaluate.

Phase 3: Post-Entry Validation (T + 30s to T + 5min)

After entry, log the fill price against the signal price. Calculate realized slippage. This data feeds back into your baseline — over 20–30 entries, your slippage estimate will converge to a realistic number. Use this to calibrate your alpha threshold: if your system generates 4 bps of alpha per entry on average, but your measured slippage is 6 bps, your net edge is negative. Either reduce position size or tighten entry conditions.


Production-Grade Code: Real-Time NBBO Monitor with TickDB

The following Python module connects to TickDB's WebSocket feed for real-time quote data, evaluates the three-phase conditions, and outputs an actionable entry recommendation. It is written for production use — every edge case that can cause a silent failure in live trading is handled explicitly.

"""
NBBO Monitor for Trend-Following Entry Slippage Control
=========================================================
Uses TickDB WebSocket to capture real-time NBBO quotes for US equities.
Evaluates spread, depth, and pressure ratio against a rolling baseline.

Engineering warnings:
  - ⚠️ This module is synchronous. For high-frequency strategies (>10 signals/sec),
    migrate to asyncio with aiohttp. The current implementation is designed for
    medium-frequency trend following with signal intervals of 1 second or more.
  - ⚠️ Depth channel coverage varies by market. US equities provide L1 data.
    Verify coverage for your specific symbols via /v1/symbols/available.
"""

import os
import json
import time
import random
import threading
import collections
from datetime import datetime, timedelta
from typing import Optional, Dict, List, Tuple

import websocket  # pip install websocket-client
import requests   # pip install requests


# ============================================================
# Configuration
# ============================================================
class Config:
    """Centralized configuration — no hardcoded secrets."""
    API_KEY: Optional[str] = None          # Loaded from environment
    SYMBOLS: List[str] = []                 # e.g., ["NVDA.US", "TSLA.US"]
    MAX_RECONNECT_ATTEMPTS: int = 5
    BASE_RECONNECT_DELAY: float = 1.0       # seconds
    MAX_RECONNECT_DELAY: float = 30.0       # seconds
    WEBSOCKET_URL: str = "wss://api.tickdb.ai/ws/v1/market/depth"

    @classmethod
    def load(cls):
        """Load API key from environment variable."""
        cls.API_KEY = os.environ.get("TICKDB_API_KEY")
        if not cls.API_KEY:
            raise EnvironmentError(
                "TICKDB_API_KEY is not set. "
                "Generate an API key at tickdb.ai and set it as an environment variable."
            )


# ============================================================
# Error Handling Utilities
# ============================================================
class TickDBError(Exception):
    """Base exception for TickDB API errors."""
    pass


def handle_api_error(response: Dict) -> None:
    """
    Standard TickDB error handler.
    Raises specific exceptions based on error code.
    """
    code = response.get("code", 0)
    message = response.get("message", "Unknown error")

    if code == 0:
        return  # Success — no error

    error_map = {
        1001: ("Invalid API key — check TICKDB_API_KEY env var", ValueError),
        1002: ("API key missing — check TICKDB_API_KEY env var", ValueError),
        2002: ("Symbol not found — verify via /v1/symbols/available", KeyError),
        3001: ("Rate limit exceeded", None),  # Handled separately in send_with_retry
    }

    if code == 3001:
        return  # Caller should handle rate limiting via headers

    if code in error_map:
        msg, exc_cls = error_map[code]
        raise exc_cls(f"[TickDB {code}] {msg}: {message}")

    raise TickDBError(f"[TickDB {code}] Unexpected error: {message}")


# ============================================================
# Rolling Baseline Calculator
# ============================================================
class BaselineCalculator:
    """
    Maintains a rolling 20-minute window of NBBO snapshots.
    Computes median spread, median pressure ratio, and standard deviations.
    """

    WINDOW_SECONDS: int = 1200  # 20 minutes

    def __init__(self):
        self.snapshots: collections.deque = collections.deque()
        self.lock = threading.Lock()

    def add_snapshot(self, bid_price: float, ask_price: float,
                     bid_size: float, ask_size: float, pressure_ratio: float):
        """Add a new NBBO snapshot with timestamp."""
        timestamp = time.time()
        self.snapshots.append({
            "timestamp": timestamp,
            "bid_price": bid_price,
            "ask_price": ask_price,
            "bid_size": bid_size,
            "ask_size": ask_size,
            "spread_bps": ((ask_price - bid_price) / bid_price) * 10000,
            "pressure_ratio": pressure_ratio,
        })
        self._prune_old()

    def _prune_old(self):
        """Remove snapshots outside the 20-minute window."""
        cutoff = time.time() - self.WINDOW_SECONDS
        while self.snapshots and self.snapshots[0]["timestamp"] < cutoff:
            self.snapshots.popleft()

    def get_baseline(self) -> Dict[str, float]:
        """Return median and std of spread and pressure ratio."""
        if len(self.snapshots) < 10:
            return {
                "median_spread_bps": 0.0,
                "std_spread_bps": 0.0,
                "median_pressure_ratio": 1.0,
                "std_pressure_ratio": 0.0,
                "median_bid_size": 0.0,
                "median_ask_size": 0.0,
            }

        spreads = [s["spread_bps"] for s in self.snapshots]
        pressures = [s["pressure_ratio"] for s in self.snapshots]
        bid_sizes = [s["bid_size"] for s in self.snapshots]
        ask_sizes = [s["ask_size"] for s in self.snapshots]

        def median(lst: List[float]) -> float:
            sorted_lst = sorted(lst)
            n = len(sorted_lst)
            mid = n // 2
            return sorted_lst[mid] if n % 2 else (sorted_lst[mid - 1] + sorted_lst[mid]) / 2

        def std(lst: List[float]) -> float:
            m = sum(lst) / len(lst)
            variance = sum((x - m) ** 2 for x in lst) / len(lst)
            return variance ** 0.5

        return {
            "median_spread_bps": median(spreads),
            "std_spread_bps": std(spreads),
            "median_pressure_ratio": median(pressures),
            "std_pressure_ratio": std(pressures),
            "median_bid_size": median(bid_sizes),
            "median_ask_size": median(ask_sizes),
        }


# ============================================================
# Entry Condition Evaluator
# ============================================================
class EntryEvaluator:
    """
    Evaluates three conditions against the baseline to produce an entry recommendation.
    """

    SPREAD_THRESHOLD_MULTIPLIER: float = 1.5   # Flag if spread > 1.5x median
    DEPTH_THRESHOLD_RATIO: float = 0.5         # Flag if bid size < 50% of median
    PRESSURE_RATIO_THRESHOLD: float = 0.7       # Flag if ratio < 0.7

    def __init__(self, baseline_calc: BaselineCalculator):
        self.baseline = baseline_calc

    def evaluate(self, current: Dict) -> Tuple[str, int, str]:
        """
        Evaluate entry conditions.
        Returns: (recommendation, flag_count, reason)
        """
        base = self.baseline.get_baseline()

        flags = 0
        reasons = []

        # Condition 1: Spread threshold
        if base["median_spread_bps"] > 0:
            spread_ratio = current["spread_bps"] / base["median_spread_bps"]
            if spread_ratio > self.SPREAD_THRESHOLD_MULTIPLIER:
                flags += 1
                reasons.append(f"Spread {current['spread_bps']:.1f} bps is {spread_ratio:.1f}x median ({base['median_spread_bps']:.1f} bps)")
            if current["spread_bps"] > base["median_spread_bps"] + 2 * base["std_spread_bps"]:
                flags += 1
                reasons.append(f"Spread 2σ above median")

        # Condition 2: Depth threshold
        if base["median_bid_size"] > 0:
            depth_ratio = current["bid_size"] / base["median_bid_size"]
            if depth_ratio < self.DEPTH_THRESHOLD_RATIO:
                flags += 1
                reasons.append(f"Bid size {current['bid_size']:.0f} is {depth_ratio:.0%} of median ({base['median_bid_size']:.0f})")

        # Condition 3: Pressure ratio threshold
        if current["pressure_ratio"] < self.PRESSURE_RATIO_THRESHOLD:
            flags += 1
            reasons.append(f"Pressure ratio {current['pressure_ratio']:.2f} below threshold ({self.PRESSURE_RATIO_THRESHOLD})")

        # Decision logic
        if flags == 0:
            return ("PROCEED", 0, "All conditions nominal")
        elif flags == 1:
            return ("LIMIT", 1, "; ".join(reasons))
        else:
            return ("DEFER", flags, "; ".join(reasons))


# ============================================================
# WebSocket Connection Manager
# ============================================================
class TickDBWebSocket:
    """
    WebSocket client for TickDB depth data.
    Implements heartbeat, exponential backoff + jitter, and rate-limit handling.
    """

    def __init__(self, symbols: List[str]):
        self.symbols = symbols
        self.ws: Optional[websocket.WebSocket] = None
        self.baseline_calc = BaselineCalculator()
        self.evaluator = EntryEvaluator(self.baseline_calc)
        self.reconnect_attempts = 0
        self.running = False
        self._last_ping = 0
        self._ping_interval = 25  # seconds — TickDB recommends ping every 30s

    def connect(self):
        """Establish WebSocket connection with authentication."""
        url = f"{Config.WEBSOCKET_URL}?api_key={Config.API_KEY}"
        self.ws = websocket.WebSocketApp(
            url,
            on_open=self._on_open,
            on_message=self._on_message,
            on_error=self._on_error,
            on_close=self._on_close,
        )

    def _on_open(self, ws):
        """Subscribe to depth channel for configured symbols."""
        print(f"[{datetime.now().isoformat()}] WebSocket connected. Subscribing to symbols: {self.symbols}")
        for symbol in self.symbols:
            subscribe_msg = json.dumps({
                "cmd": "sub",
                "params": {
                    "symbol": symbol,
                    "channel": "depth",
                    "depth": 5,  # Top 5 levels for pressure ratio calculation
                }
            })
            ws.send(subscribe_msg)
        self.running = True

    def _on_message(self, ws, message: str):
        """Process incoming depth data."""
        try:
            data = json.loads(message)

            # Handle heartbeat response
            if data.get("cmd") == "pong":
                self._last_ping = time.time()
                return

            # Handle depth snapshot
            if "data" in data:
                self._process_depth(data["data"])

        except json.JSONDecodeError:
            print(f"[WARN] Failed to parse message: {message[:100]}")

    def _process_depth(self, depth_data: Dict):
        """
        Extract NBBO from depth snapshot and update baseline.
        On breakout signal (triggered externally), evaluate entry conditions.
        """
        symbol = depth_data.get("symbol", "UNKNOWN")
        levels = depth_data.get("levels", [])

        if not levels or len(levels) < 2:
            return

        # Bid is at index 0 (highest bid), Ask is at index 1 (lowest ask)
        bid_level = levels[0]
        ask_level = levels[1]

        bid_price = float(bid_level.get("price", 0))
        ask_price = float(ask_level.get("price", 0))
        bid_size = float(bid_level.get("size", 0))
        ask_size = float(ask_level.get("size", 0))

        if bid_price <= 0 or ask_price <= 0:
            return

        spread_bps = ((ask_price - bid_price) / bid_price) * 10000

        # Calculate pressure ratio using top 5 levels
        top_bid = sum(float(l.get("size", 0)) for l in levels[::2][:5])  # Even indices = bids
        top_ask = sum(float(l.get("size", 0)) for l in levels[1::2][:5])  # Odd indices = asks
        pressure_ratio = top_bid / top_ask if top_ask > 0 else 0.0

        current = {
            "symbol": symbol,
            "bid_price": bid_price,
            "ask_price": ask_price,
            "bid_size": bid_size,
            "ask_size": ask_size,
            "spread_bps": spread_bps,
            "pressure_ratio": pressure_ratio,
            "top_bid_depth": top_bid,
            "top_ask_depth": top_ask,
        }

        # Update rolling baseline
        self.baseline_calc.add_snapshot(
            bid_price, ask_price, bid_size, ask_size, pressure_ratio
        )

        # Log current state
        timestamp = datetime.now().strftime("%H:%M:%S")
        print(f"[{timestamp}] {symbol} | Bid: {bid_price:.2f} ({bid_size:.0f}) | "
              f"Ask: {ask_price:.2f} ({ask_size:.0f}) | "
              f"Spread: {spread_bps:.2f} bps | PR: {pressure_ratio:.2f}")

        # Evaluate entry conditions (simplified — in production this would be
        # triggered by your breakout signal module)
        recommendation, flags, reason = self.evaluator.evaluate(current)

        if recommendation != "PROCEED":
            print(f"  ⚠️  ENTRY {recommendation} | Flags: {flags} | Reason: {reason}")

    def _on_error(self, ws, error):
        """Log error and attempt reconnection."""
        print(f"[ERROR] WebSocket error: {error}")
        self._schedule_reconnect()

    def _on_close(self, ws, close_status_code, close_msg):
        """Handle connection close."""
        print(f"[INFO] WebSocket closed: {close_status_code} — {close_msg}")
        self.running = False
        self._schedule_reconnect()

    def _schedule_reconnect(self):
        """Schedule reconnection with exponential backoff + jitter."""
        if self.reconnect_attempts >= Config.MAX_RECONNECT_ATTEMPTS:
            print("[FATAL] Max reconnect attempts reached. Exiting.")
            return

        delay = min(
            Config.BASE_RECONNECT_DELAY * (2 ** self.reconnect_attempts),
            Config.MAX_RECONNECT_DELAY
        )
        # Add jitter: up to 10% randomization to prevent thundering herd
        jitter = random.uniform(0, delay * 0.1)
        delay = delay + jitter

        print(f"[INFO] Reconnecting in {delay:.1f}s (attempt {self.reconnect_attempts + 1}/{Config.MAX_RECONNECT_ATTEMPTS})")
        self.reconnect_attempts += 1

        def reconnect():
            time.sleep(delay)
            self.reconnect_attempts = 0  # Reset on successful reconnect
            self.connect()
            # In production, use threading.Thread(target=self.ws.run_forever).start()
            # For synchronous usage in this example, we block:
            try:
                self.ws.run_forever(ping_interval=30)
            except Exception as e:
                print(f"[ERROR] Reconnection failed: {e}")

        # Launch reconnect in background thread
        t = threading.Thread(target=reconnect, daemon=True)
        t.start()

    def start_heartbeat(self):
        """Send periodic ping to keep connection alive."""
        def heartbeat_loop():
            while self.running:
                if time.time() - self._last_ping > self._ping_interval:
                    if self.ws:
                        try:
                            self.ws.send(json.dumps({"cmd": "ping"}))
                            self._last_ping = time.time()
                        except Exception as e:
                            print(f"[WARN] Heartbeat failed: {e}")
                time.sleep(5)

        t = threading.Thread(target=heartbeat_loop, daemon=True)
        t.start()

    def run(self):
        """Start the WebSocket client."""
        Config.load()
        print(f"[INFO] Starting NBBO Monitor for {len(self.symbols)} symbols")
        self.connect()
        self.start_heartbeat()
        try:
            self.ws.run_forever(ping_interval=30)
        except KeyboardInterrupt:
            print("[INFO] Shutting down NBBO Monitor")
            self.running = False
            if self.ws:
                self.ws.close()


# ============================================================
# REST API: Historical Kline for Baseline Initialization
# ============================================================
def fetch_historical_kline(symbol: str, interval: str = "1m", limit: int = 100) -> List[Dict]:
    """
    Fetch historical kline data via TickDB REST API.
    Used to pre-populate the baseline calculator before live monitoring begins.
    """
    url = "https://api.tickdb.ai/v1/market/kline"
    headers = {
        "X-API-Key": Config.API_KEY or os.environ.get("TICKDB_API_KEY", ""),
        "Content-Type": "application/json"
    }
    params = {
        "symbol": symbol,
        "interval": interval,
        "limit": limit,
    }

    try:
        response = requests.get(
            url,
            headers=headers,
            params=params,
            timeout=(3.05, 10)  # Connect timeout, read timeout
        )
        response.raise_for_status()
        data = response.json()

        # Handle rate limiting
        if data.get("code") == 3001:
            retry_after = int(response.headers.get("Retry-After", 5))
            print(f"[WARN] Rate limited. Retrying after {retry_after}s")
            time.sleep(retry_after)
            return fetch_historical_kline(symbol, interval, limit)  # Retry once

        handle_api_error(data)

        return data.get("data", [])

    except requests.exceptions.Timeout:
        raise TimeoutError(f"Request to {url} timed out after 10 seconds")
    except requests.exceptions.RequestException as e:
        raise RuntimeError(f"REST request failed: {e}")


# ============================================================
# Signal Module: Breakout Detection Trigger
# ============================================================
class BreakoutSignal:
    """
    Simple breakout signal generator.
    In production, replace with your actual strategy signal logic.
    """

    def __init__(self, symbol: str, threshold_pct: float = 0.01):
        self.symbol = symbol
        self.threshold_pct = threshold_pct
        self.reference_price = None

    def on_bar(self, close_price: float, timestamp: datetime):
        """
        Called on each new bar.
        Returns True if breakout signal fires.
        """
        if self.reference_price is None:
            self.reference_price = close_price
            return False

        pct_change = (close_price - self.reference_price) / self.reference_price

        if abs(pct_change) >= self.threshold_pct:
            # Breakout triggered
            signal_price = close_price
            self.reference_price = close_price  # Reset for next signal
            return True

        return False


# ============================================================
# Main Entry Point
# ============================================================
def main():
    """
    Initialize NBBO monitor, pre-populate baseline from historical data,
    then start real-time WebSocket monitoring.
    """
    Config.load()

    symbols = ["NVDA.US"]  # Replace with your target symbols

    # Step 1: Pre-populate baseline with historical kline data
    print(f"[INFO] Fetching historical kline for baseline initialization...")
    for symbol in symbols:
        klines = fetch_historical_kline(symbol, interval="1m", limit=100)
        print(f"  Loaded {len(klines)} historical bars for {symbol}")

        # In production: you would convert kline OHLC to synthetic NBBO
        # For this example, we let the live WebSocket feed build the baseline

    # Step 2: Initialize and start WebSocket monitor
    monitor = TickDBWebSocket(symbols)
    monitor.run()


if __name__ == "__main__":
    main()

Engineering notes on the code:

  • The WebSocket implementation uses websocket-client. For strategies generating more than 10 signals per second, migrate to asyncio with aiohttp for non-blocking I/O.
  • The depth channel provides L1 data for US equities. If you need deeper book levels, verify availability for your specific symbols via /v1/symbols/available before relying on higher-level depth.
  • The heartbeat loop runs in a background thread. In a production deployment, consider using schedule or APScheduler for more robust timing control.
  • The reconnect logic includes jitter to prevent thundering herd on shared connection drops.

Interpreting Order Book Depth: Beyond the Top of Book

The pressure ratio at the top of book is your first signal, but it is not sufficient for large orders. If you are entering a $5 million position in a stock with an average daily volume (ADV) of $50 million, the top five levels may not contain enough liquidity. You need to look deeper.

For institutional-size entries, calculate the cumulative depth ratio:

Cumulative Depth Ratio (CDR) = Σ(bid_size, top N levels) / Order Size

If CDR < 1.0 for N = 10 levels, your order will walk the book — you will consume the top levels and keep taking higher prices until your order is filled. This is the scenario where a VWAP (Volume-Weighted Average Price) strategy outperforms a market order by 3–8 bps on average.

The following extension computes the CDR and estimates market impact cost:

def estimate_market_impact(depth_levels: List[Dict], order_size: float) -> Dict[str, float]:
    """
    Estimate market impact for a given order size based on depth levels.

    Args:
        depth_levels: List of {price, size, side} from TickDB depth channel
        order_size: Target order size in shares

    Returns:
        Dictionary with estimated_cost, avg_fill_price, pct_walked
    """
    bid_levels = [l for l in depth_levels if l.get("side") == "bid"]
    ask_levels = [l for l in depth_levels if l.get("side") == "ask"]

    filled_qty = 0
    total_cost = 0.0
    levels_used = 0

    for side, levels in [("buy", ask_levels), ("sell", bid_levels)]:
        for level in levels:
            available = float(level.get("size", 0))
            price = float(level.get("price", 0))

            take_qty = min(order_size - filled_qty, available)
            if take_qty <= 0:
                break

            filled_qty += take_qty
            total_cost += take_qty * price
            levels_used += 1

        if filled_qty >= order_size:
            break

    if filled_qty == 0:
        return {
            "estimated_cost": 0.0,
            "avg_fill_price": 0.0,
            "pct_walked": 0.0,
            "fill_ratio": 0.0,
        }

    avg_fill = total_cost / filled_qty
    reference_price = float(bid_levels[0].get("price", 0)) if bid_levels else 0.0

    estimated_cost_bps = ((avg_fill - reference_price) / reference_price) * 10000 if reference_price > 0 else 0.0

    return {
        "estimated_cost": total_cost,
        "avg_fill_price": avg_fill,
        "pct_walked": levels_used / max(len(bid_levels), len(ask_levels), 1),
        "fill_ratio": filled_qty / order_size,
        "cost_bps": estimated_cost_bps,
    }

Use this function as a pre-trade calculator. Before sending a market order, run it against the current depth snapshot. If the estimated cost exceeds your system's break-even threshold (typically 2–4 bps for medium-frequency trend following), either split the order across time or switch to a passive limit order.


Data Source Comparison: NBBO via TickDB vs. SIP Feed

Capability SIP Consolidated Feed TickDB WebSocket (depth)
Data source SEC-regulated SIP (Securities Information Processor) Aggregated from exchange feeds
NBBO accuracy Latency: ~100–300 µs for SIP; slower for consolidated view Real-time push via WebSocket; latency varies by symbol
Depth levels L1 only (best bid / best ask) L1 for US equities; L1–L10 for HK and crypto
Historical data SIP historical requires separate subscription TickDB /kline provides 10+ years of OHLCV for US equities
WebSocket support Requires exchange direct feeds (e.g., CBOE, Nasdaq) Native WebSocket with heartbeat, reconnection, rate-limit handling
Coverage US equities only 6 asset classes including US equities, HK equities, crypto, forex, commodities, indices
Code complexity Requires parsing multiple exchange feeds and computing NBBO manually TickDB handles NBBO aggregation and delivers via single WebSocket stream

For the majority of quant strategies — medium-frequency trend following, event-driven entry, mean-reversion signals — TickDB's consolidated depth channel provides sufficient fidelity without the engineering overhead of managing a SIP integration. If you are running HFT with sub-millisecond requirements, a direct exchange feed is necessary; for everyone else, the WebSocket simplicity and built-in reliability mechanisms outweigh the marginal latency advantage.


Deployment Recommendations by User Segment

Segment Recommended approach Configuration
Individual quant Start with free tier; monitor NVDA.US as a learning case Use the module above with NVDA.US; set breakout threshold to 1%
Trading team Deploy on a cloud VM (AWS c5.large minimum); integrate with existing signal framework Pass signal triggers from your existing system to EntryEvaluator.evaluate()
Institutional Full depth analysis with CDR pre-trade calculator; consider order-splitting logic Extend estimate_market_impact to support TWAP/VWAP splitting; connect to your OMS

Closing

The moment a breakout signal fires is the worst time to make a fast decision. Liquidity evaporates precisely when everyone wants it. The NBBO is not just a price — it is a real-time instrument for measuring the cost of your conviction.

Build the baseline before you need it. Monitor the pressure ratio at every tick. When the flags accumulate, wait. A deferred entry that costs 0.5 bps in slippage beats an immediate entry that costs 8 bps — and the missed move will not matter if the signal is still valid 30 seconds later.

The gap between a working backtest and a working strategy is almost always slippage. Close it systematically.


Next Steps

If you want to test this with real NBBO data, sign up at tickdb.ai (free tier available, no credit card required) and generate an API key. Set TICKDB_API_KEY as an environment variable, then run the code above with any US equity symbol.

If you need 10+ years of historical OHLCV data to backtest your trend-following strategy over multiple market cycles, explore the TickDB Professional plan at tickdb.ai or contact enterprise@tickdb.ai.

If you use AI coding assistants, search for the tickdb-market-data SKILL in your AI tool's marketplace to access pre-built TickDB utilities directly in your workflow.


This article does not constitute investment advice. Markets involve risk; past performance does not guarantee future results. Slippage estimates are based on historical microstructure behavior and may not reflect future conditions.