In a production trading system, the mismatch between data arrival speed and processing capacity is not an edge case — it is the default operating condition. A 10Gbps market feed can generate thousands of order book updates per second per symbol. A mean-reversion strategy, running complex statistical calculations, might consume 50ms per tick. Without an intervention layer, the strategy falls behind in a matters of seconds, and the gap widens until the queue becomes a memory leak.
The producer-consumer pattern resolves this structural imbalance by introducing a bounded buffer between the data source and the processing logic. In Python, asyncio.Queue provides the canonical implementation: it decouples the ingestion thread from the execution thread, allows controlled backpressure when the system approaches its limits, and enables horizontal scaling through multiple consumer workers. This article walks through the complete implementation — from the foundational queue setup to backpressure thresholds, multi-worker distribution, and the monitoring hooks that keep the system alive under stress.
The Problem: Speed Differential Destroys Predictability
Before designing the solution, it is worth formalizing the problem precisely. A market data pipeline has three moving parts:
- Producer: The WebSocket connection (or REST polling loop) that receives raw ticks and pushes them into the queue.
- Queue: A bounded buffer that holds items temporarily when production outpaces consumption.
- Consumer: The strategy or processing logic that retrieves items from the queue and executes the business logic.
The fundamental failure mode is queue overflow. When the producer writes faster than the consumer drains, the queue grows unbounded. In a memory-constrained environment — which every production server is — this leads to OOM crashes or OS-level process termination. In the worst case, the queue holds stale data: by the time the consumer processes a tick from 30 seconds ago, the order book has completely changed, and the signal is garbage.
The producer-consumer pattern addresses this through three mechanisms:
- Bounded buffering: The queue has a maximum size. When the queue is full, the producer blocks or drops data — a deliberate choice with measurable consequences.
- Backpressure propagation: The consumer can signal to the producer that it is overwhelmed, causing the producer to slow its ingestion rate.
- Parallel consumption: Multiple workers pull from the same queue, multiplying the effective processing throughput.
asyncio.Queue: The Right Tool for the Right Reason
Python's asyncio.Queue is the natural choice for an async market data pipeline for several reasons:
- Non-blocking semantics:
await queue.put()andawait queue.get()yield control to the event loop, allowing other tasks to run while a task waits. - Bounded capacity: The
maxsizeparameter enforces the buffer limit directly. - Task-safe: Multiple coroutines can safely put and get without external synchronization.
- Integrated with the event loop: No threading complexity; the entire pipeline runs in a single-threaded async context.
The tradeoff is that asyncio.Queue is not suited for CPU-bound consumers. If your strategy performs heavy numerical computation, the async model will block the event loop. For CPU-bound workloads, you would route work to a ProcessPoolExecutor or move to a threading-based architecture. For I/O-bound strategies — order book analysis, signal generation, indicator updates — asyncio.Queue is the correct tool.
Architecture Overview
The architecture consists of four layers:
WebSocket Feed (Producer)
│
▼
asyncio.Queue (Buffer, maxsize configurable)
│
├──▶ Worker 1 ──▶ Strategy Engine 1
├──▶ Worker 2 ──▶ Strategy Engine 2
├──▶ Worker 3 ──▶ Strategy Engine 3
└──▶ Worker N ──▶ Strategy Engine N
Each worker pulls from the shared queue independently. The number of workers is tunable based on the CPU profile of the strategy logic. The producer does not know or care how many consumers exist — it only writes to the queue and handles backpressure when the queue is full.
Production-Grade Implementation
1. The Market Data Tick Model
First, define a clean data structure for incoming ticks. Avoid sending raw dictionaries through the queue — use a named tuple or a dataclass with type annotations. This makes the pipeline self-documenting and provides IDE autocompletion.
from dataclasses import dataclass
from datetime import datetime
from typing import Optional
import asyncio
@dataclass(slots=True)
class TickData:
"""Immutable market data tick structure."""
symbol: str
timestamp: datetime
bid: float
ask: float
bid_size: int
ask_size: int
volume: int
@property
def spread(self) -> float:
return self.ask - self.bid
@property
def mid_price(self) -> float:
return (self.ask + self.bid) / 2.0
@property
def pressure_ratio(self) -> float:
"""Buy/sell pressure ratio at L1."""
if self.ask_size == 0:
return float('inf')
return self.bid_size / self.ask_size
The slots=True dataclass reduces memory overhead per instance — important when the queue holds thousands of pending items. The derived properties (spread, mid_price, pressure_ratio) are computed lazily, avoiding redundant calculations at ingestion time.
2. The Producer: WebSocket Connection with Backpressure-Aware Write
The producer connects to a market data WebSocket feed, receives ticks, and pushes them to the queue. The critical design point is handling the case when the queue is full: the producer must block with a timeout rather than silently dropping data or crashing.
import asyncio
import logging
from typing import Callable, Optional
import websockets
logger = logging.getLogger(__name__)
class MarketDataProducer:
"""
WebSocket producer that pushes parsed ticks into an asyncio.Queue.
Implements backpressure-aware writing: when the queue is full,
the producer waits up to `write_timeout` seconds before dropping
the tick (with a logged warning) to prevent indefinite blocking.
"""
def __init__(
self,
queue: asyncio.Queue[TickData],
websocket_url: str,
symbols: list[str],
max_queue_size: int = 1000,
write_timeout: float = 1.0,
api_key: Optional[str] = None,
):
self._queue = queue
self._url = websocket_url
self._symbols = symbols
self._max_queue_size = max_queue_size
self._write_timeout = write_timeout
self._api_key = api_key
self._running = False
async def start(self) -> None:
"""
Establish WebSocket connection and begin streaming ticks.
Implements exponential backoff on reconnection.
"""
self._running = True
retry_delay = 1.0
max_delay = 60.0
while self._running:
try:
headers = {}
if self._api_key:
headers["X-API-Key"] = self._api_key
async with websockets.connect(
self._url,
extra_headers=headers,
ping_interval=20,
ping_timeout=10,
) as ws:
logger.info("WebSocket connected: %s", self._url)
retry_delay = 1.0 # reset on successful connection
# Subscribe to symbols
subscribe_msg = {
"method": "subscribe",
"params": {"symbols": self._symbols},
"id": 1,
}
await ws.send(json.dumps(subscribe_msg))
# Receive loop
async for raw_message in ws:
tick = self._parse_tick(raw_message)
if tick is None:
continue
# Backpressure-aware put with timeout
try:
await asyncio.wait_for(
self._queue.put(tick),
timeout=self._write_timeout,
)
except asyncio.TimeoutError:
# Queue is full — log and drop to prevent stall
logger.warning(
"Queue overflow: dropped tick for %s "
"(queue size: %d, max: %d)",
tick.symbol,
self._queue.qsize(),
self._max_queue_size,
)
# Optional: emit a metric here for alerting
except websockets.exceptions.ConnectionClosed as e:
logger.warning("WebSocket disconnected: %s. Reconnecting in %.1fs.", e, retry_delay)
await asyncio.sleep(retry_delay)
retry_delay = min(retry_delay * 2, max_delay) # exponential backoff
except Exception as e:
logger.error("Unexpected producer error: %s", e, exc_info=True)
await asyncio.sleep(retry_delay)
retry_delay = min(retry_delay * 2, max_delay)
async def stop(self) -> None:
"""Gracefully stop the producer."""
self._running = False
def _parse_tick(self, raw_message: str) -> Optional[TickData]:
"""Parse incoming WebSocket message into TickData. Override for custom formats."""
import json
try:
msg = json.loads(raw_message)
# Adapt to actual API payload structure
return TickData(
symbol=msg["symbol"],
timestamp=datetime.fromisoformat(msg["timestamp"]),
bid=float(msg["bid"]),
ask=float(msg["ask"]),
bid_size=int(msg["bid_size"]),
ask_size=int(msg["ask_size"]),
volume=int(msg["volume"]),
)
except (json.JSONDecodeError, KeyError, ValueError) as e:
logger.debug("Failed to parse message: %s", e)
return None
The write_timeout parameter is the backpressure control knob. When the queue is full, the producer waits up to write_timeout seconds. If the consumer has not drained the queue by then, the tick is dropped with a logged warning. This behavior is preferable to blocking indefinitely — it preserves the liveness of the producer at the cost of occasional data loss. For tick data that arrives at 100Hz, dropping one tick per minute is an acceptable tradeoff for system stability.
3. The Worker: Consumer with Graceful Shutdown
Each worker is an async task that continuously pulls from the queue and processes ticks. The worker must handle empty queues (no data available), full queues (producer is overloaded), and graceful shutdown signals.
import asyncio
import logging
from datetime import datetime
from typing import Callable, Optional
logger = logging.getLogger(__name__)
class MarketDataWorker:
"""
Async worker that consumes ticks from a queue and executes strategy logic.
Tracks processing lag and emits backpressure signals when the queue
grows beyond a threshold.
"""
def __init__(
self,
worker_id: int,
queue: asyncio.Queue[TickData],
processor: Callable[[TickData], None],
lag_threshold: int = 500,
):
self._worker_id = worker_id
self._queue = queue
self._processor = processor
self._lag_threshold = lag_threshold
self._running = False
self._processed_count = 0
self._dropped_count = 0
@property
def metrics(self) -> dict:
return {
"worker_id": self._worker_id,
"processed": self._processed_count,
"dropped": self._dropped_count,
"queue_size": self._queue.qsize(),
}
async def run(self) -> None:
"""Main consumer loop with graceful shutdown support."""
self._running = True
logger.info("Worker %d started", self._worker_id)
try:
while self._running:
try:
# Get next tick with timeout to allow periodic checks
tick = await asyncio.wait_for(
self._queue.get(),
timeout=1.0,
)
# Backpressure signal: queue is dangerously large
if self._queue.qsize() > self._lag_threshold:
logger.warning(
"Worker %d: queue backlog detected "
"(size: %d, threshold: %d)",
self._worker_id,
self._queue.qsize(),
self._lag_threshold,
)
# Execute strategy logic
await self._process_tick(tick)
self._processed_count += 1
self._queue.task_done()
except asyncio.TimeoutError:
# No data available — continue loop (allows shutdown check)
continue
except Exception as e:
logger.error(
"Worker %d: processing error for %s: %s",
self._worker_id,
getattr(tick, 'symbol', 'unknown'),
e,
exc_info=True,
)
self._dropped_count += 1
self._queue.task_done()
finally:
logger.info("Worker %d stopped (processed: %d, dropped: %d)",
self._worker_id, self._processed_count, self._dropped_count)
async def _process_tick(self, tick: TickData) -> None:
"""
Execute strategy logic on a single tick.
Replace this with actual strategy implementation.
"""
# Example: compute mid-price and pressure ratio
mid = tick.mid_price
pressure = tick.pressure_ratio
# Example: trigger strategy signal if pressure exceeds threshold
if pressure > 2.5:
logger.debug(
"Strong buy pressure detected: %s @ %.4f (ratio: %.2f)",
tick.symbol, mid, pressure,
)
# Simulate async I/O (e.g., persisting to database, sending to signal engine)
await asyncio.sleep(0.001) # remove in production
async def stop(self) -> None:
"""Initiate graceful shutdown."""
self._running = False
The lag_threshold parameter defines the queue size at which the worker emits a warning. This is not a hard limit — the queue continues accepting data — but it serves as an early warning system. In a monitored production environment, this warning would trigger an alert and a metric emission for dashboards.
4. The Multi-Worker Supervisor
The supervisor manages a pool of workers, distributes them across available CPU cores, and provides a unified shutdown interface.
import asyncio
import logging
from typing import Callable, Optional
logger = logging.getLogger(__name__)
class WorkerPool:
"""
Manages a pool of MarketDataWorker instances.
Automatically scales worker count based on CPU profile.
Provides aggregated metrics and coordinated shutdown.
"""
def __init__(
self,
queue: asyncio.Queue[TickData],
processor: Callable[[TickData], None],
num_workers: int = 4,
lag_threshold: int = 500,
):
self._queue = queue
self._processor = processor
self._num_workers = num_workers
self._lag_threshold = lag_threshold
self._workers: list[MarketDataWorker] = []
self._tasks: list[asyncio.Task] = []
async def start(self) -> None:
"""Spawn all workers and begin processing."""
logger.info("Starting worker pool with %d workers", self._num_workers)
for worker_id in range(self._num_workers):
worker = MarketDataWorker(
worker_id=worker_id,
queue=self._queue,
processor=self._processor,
lag_threshold=self._lag_threshold,
)
self._workers.append(worker)
task = asyncio.create_task(worker.run())
self._tasks.append(task)
# Wait for all workers to acknowledge startup
await asyncio.sleep(0.1)
running = sum(1 for t in self._tasks if not t.done())
logger.info("Worker pool active: %d/%d workers running", running, self._num_workers)
async def stop(self, timeout: float = 5.0) -> None:
"""
Gracefully shutdown all workers within the timeout window.
"""
logger.info("Initiating worker pool shutdown (timeout: %.1fs)", timeout)
# Signal all workers to stop
for worker in self._workers:
await worker.stop()
# Wait for tasks to complete
done, pending = await asyncio.wait(
self._tasks,
timeout=timeout,
)
# Cancel any stragglers
for task in pending:
task.cancel()
try:
await task
except asyncio.CancelledError:
pass
logger.info("Worker pool shutdown complete")
def get_metrics(self) -> dict:
"""Aggregate metrics from all workers."""
individual = [w.metrics for w in self._workers]
total_processed = sum(w["processed"] for w in individual)
total_dropped = sum(w["dropped"] for w in individual)
return {
"num_workers": self._num_workers,
"total_processed": total_processed,
"total_dropped": total_dropped,
"queue_size": self._queue.qsize(),
"workers": individual,
}
5. Putting It Together: The Main Application
import asyncio
import logging
import os
from datetime import datetime
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(name)s: %(message)s",
)
logger = logging.getLogger(__name__)
async def strategy_processor(tick: TickData) -> None:
"""User-defined strategy logic. Replace with actual implementation."""
pass # your strategy here
async def main() -> None:
# Load API key from environment variable (never hardcode)
api_key = os.environ.get("TICKDB_API_KEY")
if not api_key:
raise RuntimeError("TICKDB_API_KEY environment variable is not set")
# Shared queue with bounded capacity (1000 items max)
# ⚠️ Tuning: maxsize too small → backpressure triggers prematurely
# maxsize too large → memory pressure during queue buildup
tick_queue: asyncio.Queue[TickData] = asyncio.Queue(maxsize=1000)
# Initialize components
producer = MarketDataProducer(
queue=tick_queue,
websocket_url="wss://stream.tickdb.ai/v1/market/ws",
symbols=["NVDA.US", "AAPL.US", "TSLA.US"],
max_queue_size=1000,
write_timeout=1.0,
api_key=api_key,
)
# Determine worker count: 4 is a safe default for I/O-bound strategies
# For CPU-bound logic, reduce to 2 and profile first
num_workers = int(os.environ.get("WORKER_COUNT", "4"))
worker_pool = WorkerPool(
queue=tick_queue,
processor=strategy_processor,
num_workers=num_workers,
lag_threshold=500,
)
try:
# Start all components concurrently
await asyncio.gather(
producer.start(),
worker_pool.start(),
)
except KeyboardInterrupt:
logger.info("Received interrupt signal")
finally:
await asyncio.gather(
producer.stop(),
worker_pool.stop(timeout=5.0),
)
metrics = worker_pool.get_metrics()
logger.info("Final metrics: %s", metrics)
if __name__ == "__main__":
asyncio.run(main())
Tuning the Queue: The Critical Parameters
The producer-consumer pattern introduces three new tuning knobs that do not exist in a naive single-threaded pipeline.
maxsize: Queue Buffer Capacity
The queue's maxsize is the most consequential parameter. It directly determines how much burst capacity the system has before backpressure kicks in.
| maxsize | Behavior | Tradeoff |
|---|---|---|
| Too small (< 100) | Backpressure triggers frequently even during normal bursts. Throughput suffers. | Low memory usage, but system cannot absorb natural variability |
| Too large (> 10,000) | System absorbs large bursts but queues stale data. Memory pressure increases. | High memory consumption; risk of processing stale ticks |
| Calibrated (500–2000 for 100Hz feed) | Absorbs natural variability; backpressure triggers only under genuine overload | Balanced memory and liveness |
A good starting point for a 100Hz feed: maxsize = expected_burst_seconds * feed_rate * safety_factor. For a 5-second burst at 100Hz with a 2x safety factor: 5 * 100 * 2 = 1000.
write_timeout: Backpressure Patience
The write_timeout in the producer determines how long the producer waits for the queue to accept a new item. A timeout of 1–2 seconds is reasonable. Shorter timeouts cause more aggressive data dropping during temporary spikes. Longer timeouts risk stalling the WebSocket reading loop, which can cause the exchange to disconnect you for inactivity.
lag_threshold: Early Warning Trigger
The worker's lag_threshold triggers a warning when the queue size exceeds it. This is not a hard limit — the queue continues accepting data. It is an observability trigger. Set it to approximately 50–75% of maxsize to give operators enough warning before the queue fills completely.
Observability: Knowing When the System Is Stressed
A producer-consumer pipeline without monitoring is a time bomb. The following metrics are the minimum viable observability stack:
import time
from dataclasses import dataclass
@dataclass
class PipelineMetrics:
"""Aggregated metrics for the producer-consumer pipeline."""
timestamp: datetime
queue_size: int
queue_utilization_pct: float
items_produced: int
items_consumed: int
items_dropped: int
avg_processing_latency_ms: float
producer_connected: bool
async def collect_metrics(
queue: asyncio.Queue,
producer: MarketDataProducer,
worker_pool: WorkerPool,
) -> PipelineMetrics:
"""Periodic metrics collection for monitoring dashboards."""
maxsize = queue.maxsize
qsize = queue.qsize()
return PipelineMetrics(
timestamp=datetime.utcnow(),
queue_size=qsize,
queue_utilization_pct=round(qsize / maxsize * 100, 2) if maxsize > 0 else 0,
items_produced=producer.items_produced, # add counter to producer
items_consumed=sum(w.processed for w in worker_pool._workers),
items_dropped=sum(w.dropped for w in worker_pool._workers),
avg_processing_latency_ms=0.0, # implement with moving window
producer_connected=producer.is_connected, # add flag to producer
)
Key metrics to visualize:
- Queue utilization:
qsize / maxsize * 100%. Spikes above 80% indicate backpressure events. - Drop rate: Items dropped per minute. A non-zero drop rate confirms backpressure is active.
- Processing lag: Time difference between tick timestamp and processing timestamp.
Graceful Degradation: What Happens When the Queue Fills
The system has three failure modes when the queue fills, each with different consequences:
- Producer blocks (no timeout): The WebSocket reading loop stalls. The exchange may disconnect the client for inactivity. This is the worst outcome — cascading failure.
- Producer drops with warning (
write_timeoutbehavior): The tick is discarded, a warning is logged, and the producer continues. Stale data accumulates in the strategy, but the pipeline remains alive. - Consumer throttles upstream (rate-limit propagation): Not implemented in the basic pattern. Requires the producer to also consume a signal from the consumer indicating "slow down." This adds complexity but provides the smoothest degradation.
The implementation above uses strategy 2, which is the correct default: the producer stays alive, operators are warned, and the system degrades gracefully under load.
Scaling Beyond One Process
The architecture described runs in a single process. For institutional-scale systems handling dozens of symbols at tick rates exceeding 10,000/second, horizontal scaling becomes necessary. The asyncio.Queue does not cross process boundaries — you would replace it with a message broker (Redis Streams, Kafka, or ZeroMQ) as the buffer layer, and keep the producer-consumer logic for each worker process.
The transition is additive, not disruptive: each worker process runs the same logic, reads from the shared broker, and writes results to a centralized sink. The strategy code, worker logic, and monitoring hooks remain identical — only the queue implementation changes.
Closing
The producer-consumer pattern is not a performance optimization — it is a structural solution to a fundamental mismatch in data pipeline design. By introducing a bounded, backpressure-aware buffer between the market data feed and the strategy logic, you create a system that remains predictable under stress, degrades gracefully under overload, and can be horizontally scaled without rewriting the core logic.
The three parameters that determine success are maxsize (buffer capacity), write_timeout (backpressure patience), and lag_threshold (early warning trigger). Calibrate these against your actual feed rate and strategy latency profile, measure them in production, and tune until the drop rate is non-zero only during genuine overload events — not during normal burst traffic.
For TickDB users integrating this pattern: the WebSocket feed from TickDB's depth channel delivers order book updates at sub-100ms latency. The producer code above connects directly to wss://stream.tickdb.ai/v1/market/ws with your TICKDB_API_KEY. The queue absorbs variability; the multi-worker pool scales horizontally; the metrics hooks integrate with your existing monitoring stack.
Next Steps
If you're building a real-time strategy: Start with a single worker and a queue size of 1000. Run a 24-hour backtest against historical TickDB kline data to establish your strategy's per-tick latency budget. Then calibrate maxsize to absorb that budget's natural variability.
If you need historical backtest data: Visit tickdb.ai to access 10+ years of cleaned, aligned US equity OHLCV data via the /v1/market/kline endpoint. Align your backtest period with at least one full market cycle to validate strategy stability across regimes.
If you need enterprise-scale distribution: Reach out to enterprise@tickdb.ai for multi-symbol WebSocket subscriptions and dedicated infrastructure configurations that support institutional-level throughput.
This article does not constitute investment advice. Markets involve risk; past performance does not guarantee future results.