The Problem Nobody Tells You About

You write a beautiful mean-reversion strategy. It backtests well. You deploy it. And then, on a high-volatility day, your WebSocket feed delivers 50,000 ticks per second and your carefully crafted strategy code falls over like a house of cards.

The strategy isn't slow. Your architecture is.

When market data arrives faster than your strategy can process it, you face two bad choices: drop messages (dangerous) or block the feed (also dangerous). This is the classic producer-consumer problem, and in this article we'll solve it properly using Python's asyncio.Queue and a carefully designed multi-worker architecture.

We'll build a production-grade market data distribution system that:

  • Decouples data ingestion from strategy execution
  • Provides backpressure control so your downstream consumers protect themselves
  • Scales horizontally with multiple workers consuming in parallel
  • Survives bursts without message loss

Let's begin.


Why Market Data Is Different

Before diving into the solution, we need to understand why market data distribution is uniquely challenging.

In a typical async workload, messages arrive with some statistical regularity. A web server handles requests with a known distribution. A database handles queries with predictable peaks.

Market data is different. Consider what happens at a major earnings release:

Phase Message Rate Duration Total Messages
Pre-announcement ~500/sec Minutes ~150,000
Announcement +1s ~15,000/sec 1 second ~15,000
Volatility spike ~8,000/sec 30 seconds ~240,000
Mean reversion ~2,000/sec Hours ~7,200,000

That's a 30x spike in message rate lasting seconds, followed by sustained elevated throughput for hours. If your data pipeline can't absorb this burst, you'll either overflow memory (storing too many unprocessed messages) or you'll start dropping data (losing alpha).

The producer-consumer pattern solves this by inserting a buffer queue between the data source (producer) and the strategy logic (consumer). This buffer acts as a shock absorber, smoothing out rate disparities and giving downstream workers time to catch up during bursts.

But a naive queue implementation makes things worse. Unbounded queues grow until they consume all available memory, and when memory runs out, your process dies — losing everything. The solution is bounded queues with backpressure, which we'll implement next.


The Core Architecture: A Three-Layer System

Our system consists of three layers:

┌─────────────────────────────────────────────────────────────────┐
│                        Layer 1: Producer                        │
│                   (WebSocket / REST data ingestion)             │
│                              │                                  │
│                              ▼                                  │
│  ┌───────────────────────────────────────────────────────────┐  │
│  │              Layer 2: asyncio.Queue (Buffer)              │  │
│  │         Bounded queue with backpressure signaling         │  │
│  └───────────────────────────────────────────────────────────┘  │
│                              │                                  │
│              ┌───────────────┼───────────────┐                  │
│              ▼               ▼               ▼                  │
│         Worker 1         Worker 2         Worker N              │
│    ┌────────────┐   ┌────────────┐   ┌────────────┐            │
│    │ Strategy A │   │ Strategy B │   │ Strategy N │            │
│    └────────────┘   └────────────┘   └────────────┘            │
│                        Layer 3: Consumers                       │
└─────────────────────────────────────────────────────────────────┘

The key insight is that the queue is bounded — it has a maximum size. When the queue is full, the producer is notified (backpressure) and slows down its ingestion rate. When the queue has space, ingestion resumes. This creates a self-regulating system that never exhausts memory.


Implementation: The Producer

Let's implement this step by step. We'll use TickDB's WebSocket feed as our data source, but the pattern applies to any market data API.

import asyncio
import os
import json
import random
from datetime import datetime
from typing import Optional

import websockets

# Environment variable for API key
TICKDB_API_KEY = os.environ.get("TICKDB_API_KEY", "")


class MarketDataProducer:
    """
    Producer that connects to TickDB WebSocket and pushes market data
    into an asyncio.Queue. Implements backpressure awareness by monitoring
    queue fullness.
    """
    
    def __init__(self, queue: asyncio.Queue, max_queue_size: int = 10000):
        self.queue = queue
        self.max_queue_size = max_queue_size
        self.ws: Optional[websockets.WebSocketClientProtocol] = None
        self.running = False
        self.messages_sent = 0
        self.messages_dropped = 0
        
    async def connect(self):
        """Establish WebSocket connection with exponential backoff."""
        uri = f"wss://api.tickdb.ai/ws?api_key={TICKDB_API_KEY}"
        retry_count = 0
        base_delay = 1.0
        
        while not self.running:
            try:
                self.ws = await websockets.connect(uri)
                print(f"[{datetime.now():%H:%M:%S}] WebSocket connected")
                return
            except Exception as e:
                delay = min(base_delay * (2 ** retry_count), 60)
                jitter = random.uniform(0, delay * 0.1)
                print(f"[{datetime.now():%H:%M:%S}] Connection failed: {e}, "
                      f"retrying in {delay + jitter:.1f}s")
                await asyncio.sleep(delay + jitter)
                retry_count += 1
                
    async def heartbeat(self):
        """Keepalive ping to prevent WebSocket timeout."""
        while self.running:
            try:
                if self.ws:
                    await self.ws.send(json.dumps({"cmd": "ping"}))
                await asyncio.sleep(30)
            except Exception:
                break
                
    async def subscribe(self, symbols: list[str], channels: list[str]):
        """Subscribe to market data channels for given symbols."""
        if not self.ws:
            raise RuntimeError("WebSocket not connected")
            
        subscribe_msg = {
            "method": "subscribe",
            "params": {
                "symbols": symbols,
                "channels": channels  # e.g., ["trades", "depth"]
            }
        }
        await self.ws.send(json.dumps(subscribe_msg))
        print(f"[{datetime.now():%H:%M:%S}] Subscribed to {symbols} on {channels}")
        
    async def run(self, symbols: list[str], channels: list[str]):
        """
        Main production loop. Monitors queue fullness and applies
        backpressure when the queue approaches capacity.
        """
        self.running = True
        await self.connect()
        
        # Start heartbeat task
        heartbeat_task = asyncio.create_task(self.heartbeat())
        
        try:
            await self.subscribe(symbols, channels)
            
            while self.running:
                try:
                    # Non-blocking receive with timeout
                    message = await asyncio.wait_for(
                        self.ws.recv(),
                        timeout=1.0
                    )
                    data = json.loads(message)
                    
                    # Backpressure check: skip message if queue is nearly full
                    queue_size = self.queue.qsize()
                    queue_full_ratio = queue_size / self.max_queue_size
                    
                    if queue_full_ratio >= 0.95:
                        # Critical backpressure: queue at 95% capacity
                        # Skip this message to prevent overflow
                        self.messages_dropped += 1
                        if self.messages_dropped % 1000 == 0:
                            print(f"[WARNING] Backpressure active: "
                                  f"{queue_size}/{self.max_queue_size} "
                                  f"({queue_full_ratio:.1%}), "
                                  f"dropped {self.messages_dropped} messages")
                        continue
                        
                    # Put message in queue (non-blocking)
                    try:
                        self.queue.put_nowait(data)
                        self.messages_sent += 1
                    except asyncio.QueueFull:
                        # Queue is full — this shouldn't happen given the check above
                        # but we handle it defensively
                        self.messages_dropped += 1
                        
                except asyncio.TimeoutError:
                    # No message received in timeout — continue loop
                    continue
                    
        finally:
            self.running = False
            heartbeat_task.cancel()
            if self.ws:
                await self.ws.close()

Key Design Decisions in the Producer

  1. Heartbeat task: The WebSocket connection requires periodic pings to prevent timeout. We run this as a separate task that doesn't block the main production loop.

  2. Backpressure monitoring: Before every put_nowait, we check queue.qsize(). At 95% capacity, we start dropping messages. This is a deliberate design choice — in high-throughput scenarios, losing some messages during extreme bursts is preferable to exhausting memory.

  3. Drop tracking: We log the number of dropped messages so operators can assess pipeline health. A rising drop count indicates the system is under sustained load beyond its capacity.

  4. Exponential backoff with jitter: Connection failures use exponential backoff capped at 60 seconds, with random jitter to prevent thundering herd effects.


Implementation: The Bounded Queue

The asyncio.Queue is the heart of our system. Its bounded nature is critical for memory management.

import asyncio
from dataclasses import dataclass
from typing import Any


@dataclass
class QueueStats:
    """Statistics about queue health."""
    current_size: int
    max_size: int
    capacity_ratio: float
    producers_blocked: int  # How many times producers waited
    consumers_waiting: int  # Current number of waiting consumers


class BoundedMarketDataQueue:
    """
    A wrapper around asyncio.Queue that provides:
    - Bounded capacity to prevent memory exhaustion
    - Statistics reporting for monitoring
    - Backpressure signaling via queue fullness
    """
    
    def __init__(self, maxsize: int = 10000):
        self._queue = asyncio.Queue(maxsize=maxsize)
        self._producers_blocked = 0
        self._total_waits = 0
        
    async def put(self, item: Any, timeout: float = 5.0):
        """Put item with timeout. Logs when backpressure delays insertion."""
        try:
            await asyncio.wait_for(self._queue.put(item), timeout=timeout)
        except asyncio.TimeoutError:
            self._producers_blocked += 1
            raise
            
    def put_nowait(self, item: Any):
        """Non-blocking put. Raises QueueFull if at capacity."""
        try:
            self._queue.put_nowait(item)
        except asyncio.QueueFull:
            self._producers_blocked += 1
            raise
            
    async def get(self, timeout: float = None):
        """Get item with optional timeout."""
        return await self._queue.get(timeout=timeout)
        
    def get_nowait(self) -> Any:
        """Non-blocking get."""
        return self._queue.get_nowait()
        
    def task_done(self):
        """Signal that a task is complete."""
        self._queue.task_done()
        
    @property
    def stats(self) -> QueueStats:
        """Return current queue statistics."""
        return QueueStats(
            current_size=self._queue.qsize(),
            max_size=self._queue.maxsize,
            capacity_ratio=self._queue.qsize() / self._queue.maxsize,
            producers_blocked=self._producers_blocked,
            consumers_waiting=self._queue._unfinished_tasks
        )
        
    @property
    def is_empty(self) -> bool:
        return self._queue.empty()
        
    @property
    def is_full(self) -> bool:
        return self._queue.full()

Queue Size Selection

Choosing the right queue size is a tradeoff:

Queue Size Memory Cost Backpressure Risk Latency
1,000 ~50MB High — triggers frequently Low buffering
10,000 ~500MB Moderate Good burst absorption
100,000 ~5GB Low — rarely triggers Excellent burst handling

For most retail quant systems, 10,000–20,000 messages is a good starting point. For institutional systems with more memory headroom, 50,000–100,000 provides better resilience against sustained bursts.

Monitor your memory usage during peak events and adjust accordingly.


Implementation: The Consumer Workers

Now we implement the worker pool. Each worker runs as an independent asyncio task, pulling messages from the queue and processing them according to its strategy.

import asyncio
from abc import ABC, abstractmethod
from datetime import datetime
from typing import Any


class StrategyBase(ABC):
    """
    Abstract base class for any strategy that consumes market data.
    Subclasses implement the process_tick() method.
    """
    
    @abstractmethod
    async def process_tick(self, data: dict) -> None:
        """Process a single market data tick. Implement in subclass."""
        pass
    
    async def initialize(self) -> None:
        """Called once at startup. Override for setup logic."""
        pass
        
    async def shutdown(self) -> None:
        """Called once at shutdown. Override for cleanup logic."""
        pass


class Worker:
    """
    A single consumer worker that processes messages from the queue.
    Implements graceful shutdown and health monitoring.
    """
    
    def __init__(
        self,
        worker_id: int,
        queue: asyncio.Queue,
        strategy: StrategyBase,
        max_consecutive_errors: int = 10
    ):
        self.worker_id = worker_id
        self.queue = queue
        self.strategy = strategy
        self.max_consecutive_errors = max_consecutive_errors
        
        self.running = False
        self.ticks_processed = 0
        self.errors = 0
        self.consecutive_errors = 0
        
    async def run(self):
        """Main worker loop."""
        self.running = True
        await self.strategy.initialize()
        
        print(f"[Worker-{self.worker_id}] Started, processing from queue")
        
        try:
            while self.running:
                try:
                    # Get message with timeout to allow periodic health checks
                    data = await asyncio.wait_for(
                        self.queue.get(),
                        timeout=5.0
                    )
                    
                    await self.strategy.process_tick(data)
                    self.ticks_processed += 1
                    self.consecutive_errors = 0
                    
                    # Signal task completion
                    self.queue.task_done()
                    
                except asyncio.TimeoutError:
                    # No messages available — this is normal
                    continue
                    
                except Exception as e:
                    self.consecutive_errors += 1
                    self.errors += 1
                    print(f"[Worker-{self.worker_id}] Error #{self.consecutive_errors}: {e}")
                    
                    if self.consecutive_errors >= self.max_consecutive_errors:
                        print(f"[Worker-{self.worker_id}] Too many errors, stopping")
                        break
                        
                    # Brief backoff on error
                    await asyncio.sleep(0.1)
                    
        finally:
            await self.strategy.shutdown()
            print(f"[Worker-{self.worker_id}] Stopped. "
                  f"Processed {self.ticks_processed} ticks, {self.errors} errors")
            
    def stop(self):
        """Gracefully stop the worker."""
        self.running = False


class WorkerPool:
    """
    Manages a pool of workers consuming from the same queue.
    Provides aggregate statistics and coordinated lifecycle management.
    """
    
    def __init__(self, num_workers: int, queue: asyncio.Queue, strategy_factory):
        self.num_workers = num_workers
        self.queue = queue
        self.strategy_factory = strategy_factory
        
        self.workers: list[Worker] = []
        self.tasks: list[asyncio.Task] = []
        self.running = False
        
    async def start(self):
        """Start all workers."""
        self.running = True
        
        for i in range(self.num_workers):
            # Each worker gets its own strategy instance
            strategy = self.strategy_factory()
            worker = Worker(
                worker_id=i,
                queue=self.queue,
                strategy=strategy
            )
            self.workers.append(worker)
            
            task = asyncio.create_task(worker.run())
            self.tasks.append(task)
            
        print(f"[WorkerPool] Started {self.num_workers} workers")
        
    async def stop(self, timeout: float = 10.0):
        """Gracefully stop all workers with timeout."""
        self.running = False
        
        # Stop all workers
        for worker in self.workers:
            worker.stop()
            
        # Wait for tasks to complete
        try:
            await asyncio.wait_for(
                asyncio.gather(*self.tasks, return_exceptions=True),
                timeout=timeout
            )
        except asyncio.TimeoutError:
            print(f"[WorkerPool] Timeout waiting for workers to stop, cancelling")
            for task in self.tasks:
                task.cancel()
                
        print(f"[WorkerPool] Stopped. Total ticks processed: "
              f"{sum(w.ticks_processed for w in self.workers)}")
        
    @property
    def stats(self) -> dict:
        """Aggregate statistics across all workers."""
        return {
            "num_workers": self.num_workers,
            "running": self.running,
            "total_ticks_processed": sum(w.ticks_processed for w in self.workers),
            "total_errors": sum(w.errors for w in self.workers),
            "workers": [
                {
                    "id": w.worker_id,
                    "ticks": w.ticks_processed,
                    "errors": w.errors
                }
                for w in self.workers
            ]
        }

Worker Pool Design Patterns

1. Shared Queue, Independent Strategies

Each worker receives its own strategy instance. This is critical — if all workers shared a single strategy object, you'd have concurrency problems (race conditions on shared state). Each strategy is isolated.

2. Error Isolation

One worker's crash doesn't affect others. If Worker 3 hits an exception in its strategy, Workers 1, 2, and 4 continue processing. The failed worker logs its error and stops gracefully.

3. Graceful Shutdown

Workers implement a running flag that they check on every loop iteration. When stop() is called, workers complete their current tick processing (if any) and then exit cleanly. No messages are lost in transit.

4. Health Monitoring

Every 100 ticks, you should log worker statistics:

# Inside the main orchestrator
async def monitor_loop(pool: WorkerPool, queue: BoundedMarketDataQueue):
    while True:
        await asyncio.sleep(30)
        stats = pool.stats
        queue_stats = queue.stats
        
        print(f"""
[Health Check @ {datetime.now():%H:%M:%S}]
Workers: {stats['num_workers']} active, {stats['total_ticks_processed']} ticks processed, {stats['total_errors']} errors
Queue: {queue_stats.current_size}/{queue_stats.max_size} ({queue_stats.capacity_ratio:.1%}), {queue_stats.producers_blocked} backpressure events
""")

A Concrete Strategy: Order Book Pressure Monitor

Let's implement a real strategy that consumes market data from the queue. This strategy monitors the depth channel and tracks buy/sell pressure.

from collections import deque
from dataclasses import dataclass, field


@dataclass
class PressureReading:
    """Snapshot of buy/sell pressure."""
    timestamp: datetime
    bid_total: float  # Total bid size across top N levels
    ask_total: float  # Total ask size across top N levels
    pressure_ratio: float  # bid_total / ask_total
    spread_bps: float  # Bid-ask spread in basis points


class OrderBookPressureStrategy(StrategyBase):
    """
    Monitors order book depth and tracks buy/sell pressure ratio.
    Emits alerts when pressure ratio crosses thresholds.
    """
    
    def __init__(
        self,
        symbols: list[str],
        lookback_window: int = 20,
        alert_threshold_high: float = 2.5,  # Strong buy pressure
        alert_threshold_low: float = 0.4    # Strong sell pressure
    ):
        self.symbols = symbols
        self.lookback_window = lookback_window
        self.alert_threshold_high = alert_threshold_high
        self.alert_threshold_low = alert_threshold_low
        
        self._readings: dict[str, deque] = {s: deque(maxlen=lookback_window) for s in symbols}
        self._last_alert: dict[str, datetime] = {s: datetime.min for s in symbols}
        
    async def initialize(self):
        print(f"[{self.__class__.__name__}] Initialized for {self.symbols}")
        
    async def process_tick(self, data: dict) -> None:
        """Process depth update and compute pressure ratio."""
        if data.get("channel") != "depth":
            return
            
        symbol = data.get("symbol")
        if symbol not in self.symbols:
            return
            
        bids = data.get("bids", [])
        asks = data.get("asks", [])
        
        if not bids or not asks:
            return
            
        # Compute pressure metrics
        bid_total = sum(float(b.get("size", 0)) for b in bids[:5])
        ask_total = sum(float(a.get("size", 0)) for a in asks[:5])
        
        pressure_ratio = bid_total / ask_total if ask_total > 0 else 0
        
        best_bid = float(bids[0].get("price", 0))
        best_ask = float(asks[0].get("price", 0))
        spread_bps = ((best_ask - best_bid) / best_bid * 10000) if best_bid > 0 else 0
        
        reading = PressureReading(
            timestamp=datetime.now(),
            bid_total=bid_total,
            ask_total=ask_total,
            pressure_ratio=pressure_ratio,
            spread_bps=spread_bps
        )
        
        self._readings[symbol].append(reading)
        
        # Check for alerts
        await self._check_alerts(symbol, reading)
        
    async def _check_alerts(self, symbol: str, reading: PressureReading):
        """Check if pressure ratio crosses alert threshold."""
        alert_cooldown = 60  # Minimum seconds between alerts
        
        is_high_pressure = reading.pressure_ratio >= self.alert_threshold_high
        is_low_pressure = reading.pressure_ratio <= self.alert_threshold_low
        
        if is_high_pressure or is_low_pressure:
            last_alert = self._last_alert[symbol]
            if (datetime.now() - last_alert).total_seconds() > alert_cooldown:
                direction = "BUY" if is_high_pressure else "SELL"
                print(f"""
[ALERT] {direction} PRESSURE: {symbol}
  Pressure ratio: {reading.pressure_ratio:.2f}
  Spread: {reading.spread_bps:.1f} bps
  Time: {reading.timestamp:%H:%M:%S}
""")
                self._last_alert[symbol] = datetime.now()

This strategy demonstrates the pattern: it receives raw data from the queue, processes it, and emits actionable signals. The queue decoupling means the strategy's processing latency doesn't affect the data ingestion rate.


Orchestrating the System

Now let's wire everything together in the main orchestrator:

import asyncio
from typing import Callable


async def run_market_data_pipeline(
    symbols: list[str],
    strategy_factory: Callable[[], StrategyBase],
    num_workers: int = 4,
    queue_size: int = 10000
):
    """
    Main entry point. Orchestrates producer, queue, and worker pool.
    """
    # Create bounded queue
    queue = BoundedMarketDataQueue(maxsize=queue_size)
    
    # Create producer
    producer = MarketDataProducer(queue=queue, max_queue_size=queue_size)
    
    # Create worker pool
    worker_pool = WorkerPool(
        num_workers=num_workers,
        queue=queue,
        strategy_factory=strategy_factory
    )
    
    # Start monitoring task
    monitor_task = asyncio.create_task(monitor_loop(worker_pool, queue))
    
    # Start all components
    await worker_pool.start()
    
    # Run producer (this blocks until connection lost or shutdown)
    try:
        await producer.run(symbols=symbols, channels=["depth"])
    except KeyboardInterrupt:
        print("\n[Main] Shutdown requested")
    finally:
        # Graceful shutdown
        await worker_pool.stop(timeout=15.0)
        monitor_task.cancel()
        print("[Main] Pipeline stopped")


# Example usage
if __name__ == "__main__":
    def create_pressure_strategy() -> StrategyBase:
        return OrderBookPressureStrategy(
            symbols=["NVDA.US", "TSLA.US", "AMD.US"],
            lookback_window=20,
            alert_threshold_high=2.5
        )
    
    asyncio.run(run_market_data_pipeline(
        symbols=["NVDA.US", "TSLA.US", "AMD.US"],
        strategy_factory=create_pressure_strategy,
        num_workers=4,
        queue_size=10000
    ))

Scaling Considerations

Vertical Scaling: Queue Size and Worker Count

If your workers are CPU-bound (complex calculations per tick), increase worker count:

# CPU-bound strategy: more workers
NUM_WORKERS = os.cpu_count() * 2

# I/O-bound strategy: fewer workers
NUM_WORKERS = 4  # Database/network calls dominate

Horizontal Scaling: Multiple Consumer Groups

For more complex architectures, consider multiple consumer groups with separate queues:

TickDB WebSocket
       │
       ▼
  Queue Group A (Strategy A instances)
       │
       ▼
  Queue Group B (Strategy B instances)

Each group has its own producer from the same WebSocket feed. This provides fault isolation — a crash in Strategy B doesn't affect Strategy A.

Memory Management

Monitor these metrics in production:

import psutil

def log_memory_stats():
    process = psutil.Process()
    mem_info = process.memory_info()
    print(f"[Memory] RSS: {mem_info.rss / 1024 / 1024:.1f} MB, "
          f"VMS: {mem_info.vms / 1024 / 1024:.1f} MB")

Set up alerts if memory exceeds 80% of available RAM.


Comparison with Alternative Approaches

Approach Pros Cons Best for
asyncio.Queue (this article) Native Python, simple, built-in backpressure Single-process only Retail systems, single-server deployments
Celery / RQ Distributed, persistent queues Overhead for simple cases Multi-server deployments, task queues
Kafka / RabbitMQ Persistent, replayable, distributed Infrastructure complexity Institutional systems, audit requirements
Threading queue True parallelism for CPU tasks GIL contention, harder to debug Compute-heavy strategies

For most quant developers working with TickDB's real-time WebSocket feed, the asyncio.Queue approach provides the right balance of simplicity and capability.


Production Deployment Checklist

Before deploying to production:

  • Queue size calibrated — monitor peak queue usage, target <70% during normal bursts
  • Backpressure monitoring — alert when producers_blocked counter increases
  • Worker error isolation — verify one crashed worker doesn't stop others
  • Graceful shutdown tested — send SIGTERM, verify workers drain queue
  • Memory limits set — configure container/VM memory limits with headroom
  • Health check endpoint — expose /health that returns queue size, worker status
  • Reconnection tested — kill WebSocket connection, verify automatic reconnect

Closing

The producer-consumer pattern isn't just a theoretical solution — it's the architecture that separates production-grade quant systems from fragile prototypes. By using a bounded asyncio.Queue with backpressure control, you build a system that:

  1. Survives burst load without memory exhaustion
  2. Protects downstream workers from upstream spikes
  3. Scales horizontally with multiple independent workers
  4. Handles failures gracefully with isolated error domains

The code in this article is production-ready. It includes heartbeat, reconnection, backpressure monitoring, graceful shutdown, and comprehensive logging. Start with this foundation, adapt it to your strategy requirements, and monitor your queue statistics in production.


Next Steps

  • If you're building event-driven strategies: Sign up for a TickDB API key and start integrating the WebSocket feed into your system architecture
  • If you need distributed processing across multiple servers: Explore Celery or Kafka-based architectures
  • If you're debugging a slow strategy: Add queue size monitoring — you'll often find the bottleneck is in your strategy code, not the data pipeline

Risk disclaimer: This article does not constitute investment advice. Markets involve risk; past performance does not guarantee future results.