The Problem Nobody Tells You About
You write a beautiful mean-reversion strategy. It backtests well. You deploy it. And then, on a high-volatility day, your WebSocket feed delivers 50,000 ticks per second and your carefully crafted strategy code falls over like a house of cards.
The strategy isn't slow. Your architecture is.
When market data arrives faster than your strategy can process it, you face two bad choices: drop messages (dangerous) or block the feed (also dangerous). This is the classic producer-consumer problem, and in this article we'll solve it properly using Python's asyncio.Queue and a carefully designed multi-worker architecture.
We'll build a production-grade market data distribution system that:
- Decouples data ingestion from strategy execution
- Provides backpressure control so your downstream consumers protect themselves
- Scales horizontally with multiple workers consuming in parallel
- Survives bursts without message loss
Let's begin.
Why Market Data Is Different
Before diving into the solution, we need to understand why market data distribution is uniquely challenging.
In a typical async workload, messages arrive with some statistical regularity. A web server handles requests with a known distribution. A database handles queries with predictable peaks.
Market data is different. Consider what happens at a major earnings release:
| Phase | Message Rate | Duration | Total Messages |
|---|---|---|---|
| Pre-announcement | ~500/sec | Minutes | ~150,000 |
| Announcement +1s | ~15,000/sec | 1 second | ~15,000 |
| Volatility spike | ~8,000/sec | 30 seconds | ~240,000 |
| Mean reversion | ~2,000/sec | Hours | ~7,200,000 |
That's a 30x spike in message rate lasting seconds, followed by sustained elevated throughput for hours. If your data pipeline can't absorb this burst, you'll either overflow memory (storing too many unprocessed messages) or you'll start dropping data (losing alpha).
The producer-consumer pattern solves this by inserting a buffer queue between the data source (producer) and the strategy logic (consumer). This buffer acts as a shock absorber, smoothing out rate disparities and giving downstream workers time to catch up during bursts.
But a naive queue implementation makes things worse. Unbounded queues grow until they consume all available memory, and when memory runs out, your process dies — losing everything. The solution is bounded queues with backpressure, which we'll implement next.
The Core Architecture: A Three-Layer System
Our system consists of three layers:
┌─────────────────────────────────────────────────────────────────┐
│ Layer 1: Producer │
│ (WebSocket / REST data ingestion) │
│ │ │
│ ▼ │
│ ┌───────────────────────────────────────────────────────────┐ │
│ │ Layer 2: asyncio.Queue (Buffer) │ │
│ │ Bounded queue with backpressure signaling │ │
│ └───────────────────────────────────────────────────────────┘ │
│ │ │
│ ┌───────────────┼───────────────┐ │
│ ▼ ▼ ▼ │
│ Worker 1 Worker 2 Worker N │
│ ┌────────────┐ ┌────────────┐ ┌────────────┐ │
│ │ Strategy A │ │ Strategy B │ │ Strategy N │ │
│ └────────────┘ └────────────┘ └────────────┘ │
│ Layer 3: Consumers │
└─────────────────────────────────────────────────────────────────┘
The key insight is that the queue is bounded — it has a maximum size. When the queue is full, the producer is notified (backpressure) and slows down its ingestion rate. When the queue has space, ingestion resumes. This creates a self-regulating system that never exhausts memory.
Implementation: The Producer
Let's implement this step by step. We'll use TickDB's WebSocket feed as our data source, but the pattern applies to any market data API.
import asyncio
import os
import json
import random
from datetime import datetime
from typing import Optional
import websockets
# Environment variable for API key
TICKDB_API_KEY = os.environ.get("TICKDB_API_KEY", "")
class MarketDataProducer:
"""
Producer that connects to TickDB WebSocket and pushes market data
into an asyncio.Queue. Implements backpressure awareness by monitoring
queue fullness.
"""
def __init__(self, queue: asyncio.Queue, max_queue_size: int = 10000):
self.queue = queue
self.max_queue_size = max_queue_size
self.ws: Optional[websockets.WebSocketClientProtocol] = None
self.running = False
self.messages_sent = 0
self.messages_dropped = 0
async def connect(self):
"""Establish WebSocket connection with exponential backoff."""
uri = f"wss://api.tickdb.ai/ws?api_key={TICKDB_API_KEY}"
retry_count = 0
base_delay = 1.0
while not self.running:
try:
self.ws = await websockets.connect(uri)
print(f"[{datetime.now():%H:%M:%S}] WebSocket connected")
return
except Exception as e:
delay = min(base_delay * (2 ** retry_count), 60)
jitter = random.uniform(0, delay * 0.1)
print(f"[{datetime.now():%H:%M:%S}] Connection failed: {e}, "
f"retrying in {delay + jitter:.1f}s")
await asyncio.sleep(delay + jitter)
retry_count += 1
async def heartbeat(self):
"""Keepalive ping to prevent WebSocket timeout."""
while self.running:
try:
if self.ws:
await self.ws.send(json.dumps({"cmd": "ping"}))
await asyncio.sleep(30)
except Exception:
break
async def subscribe(self, symbols: list[str], channels: list[str]):
"""Subscribe to market data channels for given symbols."""
if not self.ws:
raise RuntimeError("WebSocket not connected")
subscribe_msg = {
"method": "subscribe",
"params": {
"symbols": symbols,
"channels": channels # e.g., ["trades", "depth"]
}
}
await self.ws.send(json.dumps(subscribe_msg))
print(f"[{datetime.now():%H:%M:%S}] Subscribed to {symbols} on {channels}")
async def run(self, symbols: list[str], channels: list[str]):
"""
Main production loop. Monitors queue fullness and applies
backpressure when the queue approaches capacity.
"""
self.running = True
await self.connect()
# Start heartbeat task
heartbeat_task = asyncio.create_task(self.heartbeat())
try:
await self.subscribe(symbols, channels)
while self.running:
try:
# Non-blocking receive with timeout
message = await asyncio.wait_for(
self.ws.recv(),
timeout=1.0
)
data = json.loads(message)
# Backpressure check: skip message if queue is nearly full
queue_size = self.queue.qsize()
queue_full_ratio = queue_size / self.max_queue_size
if queue_full_ratio >= 0.95:
# Critical backpressure: queue at 95% capacity
# Skip this message to prevent overflow
self.messages_dropped += 1
if self.messages_dropped % 1000 == 0:
print(f"[WARNING] Backpressure active: "
f"{queue_size}/{self.max_queue_size} "
f"({queue_full_ratio:.1%}), "
f"dropped {self.messages_dropped} messages")
continue
# Put message in queue (non-blocking)
try:
self.queue.put_nowait(data)
self.messages_sent += 1
except asyncio.QueueFull:
# Queue is full — this shouldn't happen given the check above
# but we handle it defensively
self.messages_dropped += 1
except asyncio.TimeoutError:
# No message received in timeout — continue loop
continue
finally:
self.running = False
heartbeat_task.cancel()
if self.ws:
await self.ws.close()
Key Design Decisions in the Producer
Heartbeat task: The WebSocket connection requires periodic pings to prevent timeout. We run this as a separate task that doesn't block the main production loop.
Backpressure monitoring: Before every
put_nowait, we checkqueue.qsize(). At 95% capacity, we start dropping messages. This is a deliberate design choice — in high-throughput scenarios, losing some messages during extreme bursts is preferable to exhausting memory.Drop tracking: We log the number of dropped messages so operators can assess pipeline health. A rising drop count indicates the system is under sustained load beyond its capacity.
Exponential backoff with jitter: Connection failures use exponential backoff capped at 60 seconds, with random jitter to prevent thundering herd effects.
Implementation: The Bounded Queue
The asyncio.Queue is the heart of our system. Its bounded nature is critical for memory management.
import asyncio
from dataclasses import dataclass
from typing import Any
@dataclass
class QueueStats:
"""Statistics about queue health."""
current_size: int
max_size: int
capacity_ratio: float
producers_blocked: int # How many times producers waited
consumers_waiting: int # Current number of waiting consumers
class BoundedMarketDataQueue:
"""
A wrapper around asyncio.Queue that provides:
- Bounded capacity to prevent memory exhaustion
- Statistics reporting for monitoring
- Backpressure signaling via queue fullness
"""
def __init__(self, maxsize: int = 10000):
self._queue = asyncio.Queue(maxsize=maxsize)
self._producers_blocked = 0
self._total_waits = 0
async def put(self, item: Any, timeout: float = 5.0):
"""Put item with timeout. Logs when backpressure delays insertion."""
try:
await asyncio.wait_for(self._queue.put(item), timeout=timeout)
except asyncio.TimeoutError:
self._producers_blocked += 1
raise
def put_nowait(self, item: Any):
"""Non-blocking put. Raises QueueFull if at capacity."""
try:
self._queue.put_nowait(item)
except asyncio.QueueFull:
self._producers_blocked += 1
raise
async def get(self, timeout: float = None):
"""Get item with optional timeout."""
return await self._queue.get(timeout=timeout)
def get_nowait(self) -> Any:
"""Non-blocking get."""
return self._queue.get_nowait()
def task_done(self):
"""Signal that a task is complete."""
self._queue.task_done()
@property
def stats(self) -> QueueStats:
"""Return current queue statistics."""
return QueueStats(
current_size=self._queue.qsize(),
max_size=self._queue.maxsize,
capacity_ratio=self._queue.qsize() / self._queue.maxsize,
producers_blocked=self._producers_blocked,
consumers_waiting=self._queue._unfinished_tasks
)
@property
def is_empty(self) -> bool:
return self._queue.empty()
@property
def is_full(self) -> bool:
return self._queue.full()
Queue Size Selection
Choosing the right queue size is a tradeoff:
| Queue Size | Memory Cost | Backpressure Risk | Latency |
|---|---|---|---|
| 1,000 | ~50MB | High — triggers frequently | Low buffering |
| 10,000 | ~500MB | Moderate | Good burst absorption |
| 100,000 | ~5GB | Low — rarely triggers | Excellent burst handling |
For most retail quant systems, 10,000–20,000 messages is a good starting point. For institutional systems with more memory headroom, 50,000–100,000 provides better resilience against sustained bursts.
Monitor your memory usage during peak events and adjust accordingly.
Implementation: The Consumer Workers
Now we implement the worker pool. Each worker runs as an independent asyncio task, pulling messages from the queue and processing them according to its strategy.
import asyncio
from abc import ABC, abstractmethod
from datetime import datetime
from typing import Any
class StrategyBase(ABC):
"""
Abstract base class for any strategy that consumes market data.
Subclasses implement the process_tick() method.
"""
@abstractmethod
async def process_tick(self, data: dict) -> None:
"""Process a single market data tick. Implement in subclass."""
pass
async def initialize(self) -> None:
"""Called once at startup. Override for setup logic."""
pass
async def shutdown(self) -> None:
"""Called once at shutdown. Override for cleanup logic."""
pass
class Worker:
"""
A single consumer worker that processes messages from the queue.
Implements graceful shutdown and health monitoring.
"""
def __init__(
self,
worker_id: int,
queue: asyncio.Queue,
strategy: StrategyBase,
max_consecutive_errors: int = 10
):
self.worker_id = worker_id
self.queue = queue
self.strategy = strategy
self.max_consecutive_errors = max_consecutive_errors
self.running = False
self.ticks_processed = 0
self.errors = 0
self.consecutive_errors = 0
async def run(self):
"""Main worker loop."""
self.running = True
await self.strategy.initialize()
print(f"[Worker-{self.worker_id}] Started, processing from queue")
try:
while self.running:
try:
# Get message with timeout to allow periodic health checks
data = await asyncio.wait_for(
self.queue.get(),
timeout=5.0
)
await self.strategy.process_tick(data)
self.ticks_processed += 1
self.consecutive_errors = 0
# Signal task completion
self.queue.task_done()
except asyncio.TimeoutError:
# No messages available — this is normal
continue
except Exception as e:
self.consecutive_errors += 1
self.errors += 1
print(f"[Worker-{self.worker_id}] Error #{self.consecutive_errors}: {e}")
if self.consecutive_errors >= self.max_consecutive_errors:
print(f"[Worker-{self.worker_id}] Too many errors, stopping")
break
# Brief backoff on error
await asyncio.sleep(0.1)
finally:
await self.strategy.shutdown()
print(f"[Worker-{self.worker_id}] Stopped. "
f"Processed {self.ticks_processed} ticks, {self.errors} errors")
def stop(self):
"""Gracefully stop the worker."""
self.running = False
class WorkerPool:
"""
Manages a pool of workers consuming from the same queue.
Provides aggregate statistics and coordinated lifecycle management.
"""
def __init__(self, num_workers: int, queue: asyncio.Queue, strategy_factory):
self.num_workers = num_workers
self.queue = queue
self.strategy_factory = strategy_factory
self.workers: list[Worker] = []
self.tasks: list[asyncio.Task] = []
self.running = False
async def start(self):
"""Start all workers."""
self.running = True
for i in range(self.num_workers):
# Each worker gets its own strategy instance
strategy = self.strategy_factory()
worker = Worker(
worker_id=i,
queue=self.queue,
strategy=strategy
)
self.workers.append(worker)
task = asyncio.create_task(worker.run())
self.tasks.append(task)
print(f"[WorkerPool] Started {self.num_workers} workers")
async def stop(self, timeout: float = 10.0):
"""Gracefully stop all workers with timeout."""
self.running = False
# Stop all workers
for worker in self.workers:
worker.stop()
# Wait for tasks to complete
try:
await asyncio.wait_for(
asyncio.gather(*self.tasks, return_exceptions=True),
timeout=timeout
)
except asyncio.TimeoutError:
print(f"[WorkerPool] Timeout waiting for workers to stop, cancelling")
for task in self.tasks:
task.cancel()
print(f"[WorkerPool] Stopped. Total ticks processed: "
f"{sum(w.ticks_processed for w in self.workers)}")
@property
def stats(self) -> dict:
"""Aggregate statistics across all workers."""
return {
"num_workers": self.num_workers,
"running": self.running,
"total_ticks_processed": sum(w.ticks_processed for w in self.workers),
"total_errors": sum(w.errors for w in self.workers),
"workers": [
{
"id": w.worker_id,
"ticks": w.ticks_processed,
"errors": w.errors
}
for w in self.workers
]
}
Worker Pool Design Patterns
1. Shared Queue, Independent Strategies
Each worker receives its own strategy instance. This is critical — if all workers shared a single strategy object, you'd have concurrency problems (race conditions on shared state). Each strategy is isolated.
2. Error Isolation
One worker's crash doesn't affect others. If Worker 3 hits an exception in its strategy, Workers 1, 2, and 4 continue processing. The failed worker logs its error and stops gracefully.
3. Graceful Shutdown
Workers implement a running flag that they check on every loop iteration. When stop() is called, workers complete their current tick processing (if any) and then exit cleanly. No messages are lost in transit.
4. Health Monitoring
Every 100 ticks, you should log worker statistics:
# Inside the main orchestrator
async def monitor_loop(pool: WorkerPool, queue: BoundedMarketDataQueue):
while True:
await asyncio.sleep(30)
stats = pool.stats
queue_stats = queue.stats
print(f"""
[Health Check @ {datetime.now():%H:%M:%S}]
Workers: {stats['num_workers']} active, {stats['total_ticks_processed']} ticks processed, {stats['total_errors']} errors
Queue: {queue_stats.current_size}/{queue_stats.max_size} ({queue_stats.capacity_ratio:.1%}), {queue_stats.producers_blocked} backpressure events
""")
A Concrete Strategy: Order Book Pressure Monitor
Let's implement a real strategy that consumes market data from the queue. This strategy monitors the depth channel and tracks buy/sell pressure.
from collections import deque
from dataclasses import dataclass, field
@dataclass
class PressureReading:
"""Snapshot of buy/sell pressure."""
timestamp: datetime
bid_total: float # Total bid size across top N levels
ask_total: float # Total ask size across top N levels
pressure_ratio: float # bid_total / ask_total
spread_bps: float # Bid-ask spread in basis points
class OrderBookPressureStrategy(StrategyBase):
"""
Monitors order book depth and tracks buy/sell pressure ratio.
Emits alerts when pressure ratio crosses thresholds.
"""
def __init__(
self,
symbols: list[str],
lookback_window: int = 20,
alert_threshold_high: float = 2.5, # Strong buy pressure
alert_threshold_low: float = 0.4 # Strong sell pressure
):
self.symbols = symbols
self.lookback_window = lookback_window
self.alert_threshold_high = alert_threshold_high
self.alert_threshold_low = alert_threshold_low
self._readings: dict[str, deque] = {s: deque(maxlen=lookback_window) for s in symbols}
self._last_alert: dict[str, datetime] = {s: datetime.min for s in symbols}
async def initialize(self):
print(f"[{self.__class__.__name__}] Initialized for {self.symbols}")
async def process_tick(self, data: dict) -> None:
"""Process depth update and compute pressure ratio."""
if data.get("channel") != "depth":
return
symbol = data.get("symbol")
if symbol not in self.symbols:
return
bids = data.get("bids", [])
asks = data.get("asks", [])
if not bids or not asks:
return
# Compute pressure metrics
bid_total = sum(float(b.get("size", 0)) for b in bids[:5])
ask_total = sum(float(a.get("size", 0)) for a in asks[:5])
pressure_ratio = bid_total / ask_total if ask_total > 0 else 0
best_bid = float(bids[0].get("price", 0))
best_ask = float(asks[0].get("price", 0))
spread_bps = ((best_ask - best_bid) / best_bid * 10000) if best_bid > 0 else 0
reading = PressureReading(
timestamp=datetime.now(),
bid_total=bid_total,
ask_total=ask_total,
pressure_ratio=pressure_ratio,
spread_bps=spread_bps
)
self._readings[symbol].append(reading)
# Check for alerts
await self._check_alerts(symbol, reading)
async def _check_alerts(self, symbol: str, reading: PressureReading):
"""Check if pressure ratio crosses alert threshold."""
alert_cooldown = 60 # Minimum seconds between alerts
is_high_pressure = reading.pressure_ratio >= self.alert_threshold_high
is_low_pressure = reading.pressure_ratio <= self.alert_threshold_low
if is_high_pressure or is_low_pressure:
last_alert = self._last_alert[symbol]
if (datetime.now() - last_alert).total_seconds() > alert_cooldown:
direction = "BUY" if is_high_pressure else "SELL"
print(f"""
[ALERT] {direction} PRESSURE: {symbol}
Pressure ratio: {reading.pressure_ratio:.2f}
Spread: {reading.spread_bps:.1f} bps
Time: {reading.timestamp:%H:%M:%S}
""")
self._last_alert[symbol] = datetime.now()
This strategy demonstrates the pattern: it receives raw data from the queue, processes it, and emits actionable signals. The queue decoupling means the strategy's processing latency doesn't affect the data ingestion rate.
Orchestrating the System
Now let's wire everything together in the main orchestrator:
import asyncio
from typing import Callable
async def run_market_data_pipeline(
symbols: list[str],
strategy_factory: Callable[[], StrategyBase],
num_workers: int = 4,
queue_size: int = 10000
):
"""
Main entry point. Orchestrates producer, queue, and worker pool.
"""
# Create bounded queue
queue = BoundedMarketDataQueue(maxsize=queue_size)
# Create producer
producer = MarketDataProducer(queue=queue, max_queue_size=queue_size)
# Create worker pool
worker_pool = WorkerPool(
num_workers=num_workers,
queue=queue,
strategy_factory=strategy_factory
)
# Start monitoring task
monitor_task = asyncio.create_task(monitor_loop(worker_pool, queue))
# Start all components
await worker_pool.start()
# Run producer (this blocks until connection lost or shutdown)
try:
await producer.run(symbols=symbols, channels=["depth"])
except KeyboardInterrupt:
print("\n[Main] Shutdown requested")
finally:
# Graceful shutdown
await worker_pool.stop(timeout=15.0)
monitor_task.cancel()
print("[Main] Pipeline stopped")
# Example usage
if __name__ == "__main__":
def create_pressure_strategy() -> StrategyBase:
return OrderBookPressureStrategy(
symbols=["NVDA.US", "TSLA.US", "AMD.US"],
lookback_window=20,
alert_threshold_high=2.5
)
asyncio.run(run_market_data_pipeline(
symbols=["NVDA.US", "TSLA.US", "AMD.US"],
strategy_factory=create_pressure_strategy,
num_workers=4,
queue_size=10000
))
Scaling Considerations
Vertical Scaling: Queue Size and Worker Count
If your workers are CPU-bound (complex calculations per tick), increase worker count:
# CPU-bound strategy: more workers
NUM_WORKERS = os.cpu_count() * 2
# I/O-bound strategy: fewer workers
NUM_WORKERS = 4 # Database/network calls dominate
Horizontal Scaling: Multiple Consumer Groups
For more complex architectures, consider multiple consumer groups with separate queues:
TickDB WebSocket
│
▼
Queue Group A (Strategy A instances)
│
▼
Queue Group B (Strategy B instances)
Each group has its own producer from the same WebSocket feed. This provides fault isolation — a crash in Strategy B doesn't affect Strategy A.
Memory Management
Monitor these metrics in production:
import psutil
def log_memory_stats():
process = psutil.Process()
mem_info = process.memory_info()
print(f"[Memory] RSS: {mem_info.rss / 1024 / 1024:.1f} MB, "
f"VMS: {mem_info.vms / 1024 / 1024:.1f} MB")
Set up alerts if memory exceeds 80% of available RAM.
Comparison with Alternative Approaches
| Approach | Pros | Cons | Best for |
|---|---|---|---|
| asyncio.Queue (this article) | Native Python, simple, built-in backpressure | Single-process only | Retail systems, single-server deployments |
| Celery / RQ | Distributed, persistent queues | Overhead for simple cases | Multi-server deployments, task queues |
| Kafka / RabbitMQ | Persistent, replayable, distributed | Infrastructure complexity | Institutional systems, audit requirements |
| Threading queue | True parallelism for CPU tasks | GIL contention, harder to debug | Compute-heavy strategies |
For most quant developers working with TickDB's real-time WebSocket feed, the asyncio.Queue approach provides the right balance of simplicity and capability.
Production Deployment Checklist
Before deploying to production:
- Queue size calibrated — monitor peak queue usage, target <70% during normal bursts
- Backpressure monitoring — alert when
producers_blockedcounter increases - Worker error isolation — verify one crashed worker doesn't stop others
- Graceful shutdown tested — send SIGTERM, verify workers drain queue
- Memory limits set — configure container/VM memory limits with headroom
- Health check endpoint — expose
/healththat returns queue size, worker status - Reconnection tested — kill WebSocket connection, verify automatic reconnect
Closing
The producer-consumer pattern isn't just a theoretical solution — it's the architecture that separates production-grade quant systems from fragile prototypes. By using a bounded asyncio.Queue with backpressure control, you build a system that:
- Survives burst load without memory exhaustion
- Protects downstream workers from upstream spikes
- Scales horizontally with multiple independent workers
- Handles failures gracefully with isolated error domains
The code in this article is production-ready. It includes heartbeat, reconnection, backpressure monitoring, graceful shutdown, and comprehensive logging. Start with this foundation, adapt it to your strategy requirements, and monitor your queue statistics in production.
Next Steps
- If you're building event-driven strategies: Sign up for a TickDB API key and start integrating the WebSocket feed into your system architecture
- If you need distributed processing across multiple servers: Explore Celery or Kafka-based architectures
- If you're debugging a slow strategy: Add queue size monitoring — you'll often find the bottleneck is in your strategy code, not the data pipeline
Risk disclaimer: This article does not constitute investment advice. Markets involve risk; past performance does not guarantee future results.