In a production trading system, the mismatch between data arrival speed and processing capacity is not an edge case — it is the default operating condition. A 10Gbps market feed can generate thousands of order book updates per second per symbol. A mean-reversion strategy, running complex statistical calculations, might consume 50ms per tick. Without an intervention layer, the strategy falls behind in a matters of seconds, and the gap widens until the queue becomes a memory leak.

The producer-consumer pattern resolves this structural imbalance by introducing a bounded buffer between the data source and the processing logic. In Python, asyncio.Queue provides the canonical implementation: it decouples the ingestion thread from the execution thread, allows controlled backpressure when the system approaches its limits, and enables horizontal scaling through multiple consumer workers. This article walks through the complete implementation — from the foundational queue setup to backpressure thresholds, multi-worker distribution, and the monitoring hooks that keep the system alive under stress.

The Problem: Speed Differential Destroys Predictability

Before designing the solution, it is worth formalizing the problem precisely. A market data pipeline has three moving parts:

  • Producer: The WebSocket connection (or REST polling loop) that receives raw ticks and pushes them into the queue.
  • Queue: A bounded buffer that holds items temporarily when production outpaces consumption.
  • Consumer: The strategy or processing logic that retrieves items from the queue and executes the business logic.

The fundamental failure mode is queue overflow. When the producer writes faster than the consumer drains, the queue grows unbounded. In a memory-constrained environment — which every production server is — this leads to OOM crashes or OS-level process termination. In the worst case, the queue holds stale data: by the time the consumer processes a tick from 30 seconds ago, the order book has completely changed, and the signal is garbage.

The producer-consumer pattern addresses this through three mechanisms:

  1. Bounded buffering: The queue has a maximum size. When the queue is full, the producer blocks or drops data — a deliberate choice with measurable consequences.
  2. Backpressure propagation: The consumer can signal to the producer that it is overwhelmed, causing the producer to slow its ingestion rate.
  3. Parallel consumption: Multiple workers pull from the same queue, multiplying the effective processing throughput.

asyncio.Queue: The Right Tool for the Right Reason

Python's asyncio.Queue is the natural choice for an async market data pipeline for several reasons:

  • Non-blocking semantics: await queue.put() and await queue.get() yield control to the event loop, allowing other tasks to run while a task waits.
  • Bounded capacity: The maxsize parameter enforces the buffer limit directly.
  • Task-safe: Multiple coroutines can safely put and get without external synchronization.
  • Integrated with the event loop: No threading complexity; the entire pipeline runs in a single-threaded async context.

The tradeoff is that asyncio.Queue is not suited for CPU-bound consumers. If your strategy performs heavy numerical computation, the async model will block the event loop. For CPU-bound workloads, you would route work to a ProcessPoolExecutor or move to a threading-based architecture. For I/O-bound strategies — order book analysis, signal generation, indicator updates — asyncio.Queue is the correct tool.

Architecture Overview

The architecture consists of four layers:

WebSocket Feed (Producer)
    │
    ▼
asyncio.Queue (Buffer, maxsize configurable)
    │
    ├──▶ Worker 1 ──▶ Strategy Engine 1
    ├──▶ Worker 2 ──▶ Strategy Engine 2
    ├──▶ Worker 3 ──▶ Strategy Engine 3
    └──▶ Worker N ──▶ Strategy Engine N

Each worker pulls from the shared queue independently. The number of workers is tunable based on the CPU profile of the strategy logic. The producer does not know or care how many consumers exist — it only writes to the queue and handles backpressure when the queue is full.

Production-Grade Implementation

1. The Market Data Tick Model

First, define a clean data structure for incoming ticks. Avoid sending raw dictionaries through the queue — use a named tuple or a dataclass with type annotations. This makes the pipeline self-documenting and provides IDE autocompletion.

from dataclasses import dataclass
from datetime import datetime
from typing import Optional
import asyncio


@dataclass(slots=True)
class TickData:
    """Immutable market data tick structure."""
    symbol: str
    timestamp: datetime
    bid: float
    ask: float
    bid_size: int
    ask_size: int
    volume: int

    @property
    def spread(self) -> float:
        return self.ask - self.bid

    @property
    def mid_price(self) -> float:
        return (self.ask + self.bid) / 2.0

    @property
    def pressure_ratio(self) -> float:
        """Buy/sell pressure ratio at L1."""
        if self.ask_size == 0:
            return float('inf')
        return self.bid_size / self.ask_size

The slots=True dataclass reduces memory overhead per instance — important when the queue holds thousands of pending items. The derived properties (spread, mid_price, pressure_ratio) are computed lazily, avoiding redundant calculations at ingestion time.

2. The Producer: WebSocket Connection with Backpressure-Aware Write

The producer connects to a market data WebSocket feed, receives ticks, and pushes them to the queue. The critical design point is handling the case when the queue is full: the producer must block with a timeout rather than silently dropping data or crashing.

import asyncio
import logging
from typing import Callable, Optional
import websockets

logger = logging.getLogger(__name__)


class MarketDataProducer:
    """
    WebSocket producer that pushes parsed ticks into an asyncio.Queue.

    Implements backpressure-aware writing: when the queue is full,
    the producer waits up to `write_timeout` seconds before dropping
    the tick (with a logged warning) to prevent indefinite blocking.
    """

    def __init__(
        self,
        queue: asyncio.Queue[TickData],
        websocket_url: str,
        symbols: list[str],
        max_queue_size: int = 1000,
        write_timeout: float = 1.0,
        api_key: Optional[str] = None,
    ):
        self._queue = queue
        self._url = websocket_url
        self._symbols = symbols
        self._max_queue_size = max_queue_size
        self._write_timeout = write_timeout
        self._api_key = api_key
        self._running = False

    async def start(self) -> None:
        """
        Establish WebSocket connection and begin streaming ticks.
        Implements exponential backoff on reconnection.
        """
        self._running = True
        retry_delay = 1.0
        max_delay = 60.0

        while self._running:
            try:
                headers = {}
                if self._api_key:
                    headers["X-API-Key"] = self._api_key

                async with websockets.connect(
                    self._url,
                    extra_headers=headers,
                    ping_interval=20,
                    ping_timeout=10,
                ) as ws:
                    logger.info("WebSocket connected: %s", self._url)
                    retry_delay = 1.0  # reset on successful connection

                    # Subscribe to symbols
                    subscribe_msg = {
                        "method": "subscribe",
                        "params": {"symbols": self._symbols},
                        "id": 1,
                    }
                    await ws.send(json.dumps(subscribe_msg))

                    # Receive loop
                    async for raw_message in ws:
                        tick = self._parse_tick(raw_message)
                        if tick is None:
                            continue

                        # Backpressure-aware put with timeout
                        try:
                            await asyncio.wait_for(
                                self._queue.put(tick),
                                timeout=self._write_timeout,
                            )
                        except asyncio.TimeoutError:
                            # Queue is full — log and drop to prevent stall
                            logger.warning(
                                "Queue overflow: dropped tick for %s "
                                "(queue size: %d, max: %d)",
                                tick.symbol,
                                self._queue.qsize(),
                                self._max_queue_size,
                            )
                            # Optional: emit a metric here for alerting

            except websockets.exceptions.ConnectionClosed as e:
                logger.warning("WebSocket disconnected: %s. Reconnecting in %.1fs.", e, retry_delay)
                await asyncio.sleep(retry_delay)
                retry_delay = min(retry_delay * 2, max_delay)  # exponential backoff

            except Exception as e:
                logger.error("Unexpected producer error: %s", e, exc_info=True)
                await asyncio.sleep(retry_delay)
                retry_delay = min(retry_delay * 2, max_delay)

    async def stop(self) -> None:
        """Gracefully stop the producer."""
        self._running = False

    def _parse_tick(self, raw_message: str) -> Optional[TickData]:
        """Parse incoming WebSocket message into TickData. Override for custom formats."""
        import json
        try:
            msg = json.loads(raw_message)
            # Adapt to actual API payload structure
            return TickData(
                symbol=msg["symbol"],
                timestamp=datetime.fromisoformat(msg["timestamp"]),
                bid=float(msg["bid"]),
                ask=float(msg["ask"]),
                bid_size=int(msg["bid_size"]),
                ask_size=int(msg["ask_size"]),
                volume=int(msg["volume"]),
            )
        except (json.JSONDecodeError, KeyError, ValueError) as e:
            logger.debug("Failed to parse message: %s", e)
            return None

The write_timeout parameter is the backpressure control knob. When the queue is full, the producer waits up to write_timeout seconds. If the consumer has not drained the queue by then, the tick is dropped with a logged warning. This behavior is preferable to blocking indefinitely — it preserves the liveness of the producer at the cost of occasional data loss. For tick data that arrives at 100Hz, dropping one tick per minute is an acceptable tradeoff for system stability.

3. The Worker: Consumer with Graceful Shutdown

Each worker is an async task that continuously pulls from the queue and processes ticks. The worker must handle empty queues (no data available), full queues (producer is overloaded), and graceful shutdown signals.

import asyncio
import logging
from datetime import datetime
from typing import Callable, Optional

logger = logging.getLogger(__name__)


class MarketDataWorker:
    """
    Async worker that consumes ticks from a queue and executes strategy logic.

    Tracks processing lag and emits backpressure signals when the queue
    grows beyond a threshold.
    """

    def __init__(
        self,
        worker_id: int,
        queue: asyncio.Queue[TickData],
        processor: Callable[[TickData], None],
        lag_threshold: int = 500,
    ):
        self._worker_id = worker_id
        self._queue = queue
        self._processor = processor
        self._lag_threshold = lag_threshold
        self._running = False
        self._processed_count = 0
        self._dropped_count = 0

    @property
    def metrics(self) -> dict:
        return {
            "worker_id": self._worker_id,
            "processed": self._processed_count,
            "dropped": self._dropped_count,
            "queue_size": self._queue.qsize(),
        }

    async def run(self) -> None:
        """Main consumer loop with graceful shutdown support."""
        self._running = True
        logger.info("Worker %d started", self._worker_id)

        try:
            while self._running:
                try:
                    # Get next tick with timeout to allow periodic checks
                    tick = await asyncio.wait_for(
                        self._queue.get(),
                        timeout=1.0,
                    )

                    # Backpressure signal: queue is dangerously large
                    if self._queue.qsize() > self._lag_threshold:
                        logger.warning(
                            "Worker %d: queue backlog detected "
                            "(size: %d, threshold: %d)",
                            self._worker_id,
                            self._queue.qsize(),
                            self._lag_threshold,
                        )

                    # Execute strategy logic
                    await self._process_tick(tick)
                    self._processed_count += 1
                    self._queue.task_done()

                except asyncio.TimeoutError:
                    # No data available — continue loop (allows shutdown check)
                    continue

                except Exception as e:
                    logger.error(
                        "Worker %d: processing error for %s: %s",
                        self._worker_id,
                        getattr(tick, 'symbol', 'unknown'),
                        e,
                        exc_info=True,
                    )
                    self._dropped_count += 1
                    self._queue.task_done()

        finally:
            logger.info("Worker %d stopped (processed: %d, dropped: %d)",
                        self._worker_id, self._processed_count, self._dropped_count)

    async def _process_tick(self, tick: TickData) -> None:
        """
        Execute strategy logic on a single tick.
        Replace this with actual strategy implementation.
        """
        # Example: compute mid-price and pressure ratio
        mid = tick.mid_price
        pressure = tick.pressure_ratio

        # Example: trigger strategy signal if pressure exceeds threshold
        if pressure > 2.5:
            logger.debug(
                "Strong buy pressure detected: %s @ %.4f (ratio: %.2f)",
                tick.symbol, mid, pressure,
            )

        # Simulate async I/O (e.g., persisting to database, sending to signal engine)
        await asyncio.sleep(0.001)  # remove in production

    async def stop(self) -> None:
        """Initiate graceful shutdown."""
        self._running = False

The lag_threshold parameter defines the queue size at which the worker emits a warning. This is not a hard limit — the queue continues accepting data — but it serves as an early warning system. In a monitored production environment, this warning would trigger an alert and a metric emission for dashboards.

4. The Multi-Worker Supervisor

The supervisor manages a pool of workers, distributes them across available CPU cores, and provides a unified shutdown interface.

import asyncio
import logging
from typing import Callable, Optional

logger = logging.getLogger(__name__)


class WorkerPool:
    """
    Manages a pool of MarketDataWorker instances.

    Automatically scales worker count based on CPU profile.
    Provides aggregated metrics and coordinated shutdown.
    """

    def __init__(
        self,
        queue: asyncio.Queue[TickData],
        processor: Callable[[TickData], None],
        num_workers: int = 4,
        lag_threshold: int = 500,
    ):
        self._queue = queue
        self._processor = processor
        self._num_workers = num_workers
        self._lag_threshold = lag_threshold
        self._workers: list[MarketDataWorker] = []
        self._tasks: list[asyncio.Task] = []

    async def start(self) -> None:
        """Spawn all workers and begin processing."""
        logger.info("Starting worker pool with %d workers", self._num_workers)

        for worker_id in range(self._num_workers):
            worker = MarketDataWorker(
                worker_id=worker_id,
                queue=self._queue,
                processor=self._processor,
                lag_threshold=self._lag_threshold,
            )
            self._workers.append(worker)
            task = asyncio.create_task(worker.run())
            self._tasks.append(task)

        # Wait for all workers to acknowledge startup
        await asyncio.sleep(0.1)
        running = sum(1 for t in self._tasks if not t.done())
        logger.info("Worker pool active: %d/%d workers running", running, self._num_workers)

    async def stop(self, timeout: float = 5.0) -> None:
        """
        Gracefully shutdown all workers within the timeout window.
        """
        logger.info("Initiating worker pool shutdown (timeout: %.1fs)", timeout)

        # Signal all workers to stop
        for worker in self._workers:
            await worker.stop()

        # Wait for tasks to complete
        done, pending = await asyncio.wait(
            self._tasks,
            timeout=timeout,
        )

        # Cancel any stragglers
        for task in pending:
            task.cancel()
            try:
                await task
            except asyncio.CancelledError:
                pass

        logger.info("Worker pool shutdown complete")

    def get_metrics(self) -> dict:
        """Aggregate metrics from all workers."""
        individual = [w.metrics for w in self._workers]
        total_processed = sum(w["processed"] for w in individual)
        total_dropped = sum(w["dropped"] for w in individual)

        return {
            "num_workers": self._num_workers,
            "total_processed": total_processed,
            "total_dropped": total_dropped,
            "queue_size": self._queue.qsize(),
            "workers": individual,
        }

5. Putting It Together: The Main Application

import asyncio
import logging
import os
from datetime import datetime

logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s [%(levelname)s] %(name)s: %(message)s",
)
logger = logging.getLogger(__name__)


async def strategy_processor(tick: TickData) -> None:
    """User-defined strategy logic. Replace with actual implementation."""
    pass  # your strategy here


async def main() -> None:
    # Load API key from environment variable (never hardcode)
    api_key = os.environ.get("TICKDB_API_KEY")
    if not api_key:
        raise RuntimeError("TICKDB_API_KEY environment variable is not set")

    # Shared queue with bounded capacity (1000 items max)
    # ⚠️ Tuning: maxsize too small → backpressure triggers prematurely
    #            maxsize too large → memory pressure during queue buildup
    tick_queue: asyncio.Queue[TickData] = asyncio.Queue(maxsize=1000)

    # Initialize components
    producer = MarketDataProducer(
        queue=tick_queue,
        websocket_url="wss://stream.tickdb.ai/v1/market/ws",
        symbols=["NVDA.US", "AAPL.US", "TSLA.US"],
        max_queue_size=1000,
        write_timeout=1.0,
        api_key=api_key,
    )

    # Determine worker count: 4 is a safe default for I/O-bound strategies
    # For CPU-bound logic, reduce to 2 and profile first
    num_workers = int(os.environ.get("WORKER_COUNT", "4"))

    worker_pool = WorkerPool(
        queue=tick_queue,
        processor=strategy_processor,
        num_workers=num_workers,
        lag_threshold=500,
    )

    try:
        # Start all components concurrently
        await asyncio.gather(
            producer.start(),
            worker_pool.start(),
        )
    except KeyboardInterrupt:
        logger.info("Received interrupt signal")
    finally:
        await asyncio.gather(
            producer.stop(),
            worker_pool.stop(timeout=5.0),
        )

        metrics = worker_pool.get_metrics()
        logger.info("Final metrics: %s", metrics)


if __name__ == "__main__":
    asyncio.run(main())

Tuning the Queue: The Critical Parameters

The producer-consumer pattern introduces three new tuning knobs that do not exist in a naive single-threaded pipeline.

maxsize: Queue Buffer Capacity

The queue's maxsize is the most consequential parameter. It directly determines how much burst capacity the system has before backpressure kicks in.

maxsize Behavior Tradeoff
Too small (< 100) Backpressure triggers frequently even during normal bursts. Throughput suffers. Low memory usage, but system cannot absorb natural variability
Too large (> 10,000) System absorbs large bursts but queues stale data. Memory pressure increases. High memory consumption; risk of processing stale ticks
Calibrated (500–2000 for 100Hz feed) Absorbs natural variability; backpressure triggers only under genuine overload Balanced memory and liveness

A good starting point for a 100Hz feed: maxsize = expected_burst_seconds * feed_rate * safety_factor. For a 5-second burst at 100Hz with a 2x safety factor: 5 * 100 * 2 = 1000.

write_timeout: Backpressure Patience

The write_timeout in the producer determines how long the producer waits for the queue to accept a new item. A timeout of 1–2 seconds is reasonable. Shorter timeouts cause more aggressive data dropping during temporary spikes. Longer timeouts risk stalling the WebSocket reading loop, which can cause the exchange to disconnect you for inactivity.

lag_threshold: Early Warning Trigger

The worker's lag_threshold triggers a warning when the queue size exceeds it. This is not a hard limit — the queue continues accepting data. It is an observability trigger. Set it to approximately 50–75% of maxsize to give operators enough warning before the queue fills completely.

Observability: Knowing When the System Is Stressed

A producer-consumer pipeline without monitoring is a time bomb. The following metrics are the minimum viable observability stack:

import time
from dataclasses import dataclass


@dataclass
class PipelineMetrics:
    """Aggregated metrics for the producer-consumer pipeline."""
    timestamp: datetime
    queue_size: int
    queue_utilization_pct: float
    items_produced: int
    items_consumed: int
    items_dropped: int
    avg_processing_latency_ms: float
    producer_connected: bool


async def collect_metrics(
    queue: asyncio.Queue,
    producer: MarketDataProducer,
    worker_pool: WorkerPool,
) -> PipelineMetrics:
    """Periodic metrics collection for monitoring dashboards."""
    maxsize = queue.maxsize
    qsize = queue.qsize()

    return PipelineMetrics(
        timestamp=datetime.utcnow(),
        queue_size=qsize,
        queue_utilization_pct=round(qsize / maxsize * 100, 2) if maxsize > 0 else 0,
        items_produced=producer.items_produced,  # add counter to producer
        items_consumed=sum(w.processed for w in worker_pool._workers),
        items_dropped=sum(w.dropped for w in worker_pool._workers),
        avg_processing_latency_ms=0.0,  # implement with moving window
        producer_connected=producer.is_connected,  # add flag to producer
    )

Key metrics to visualize:

  • Queue utilization: qsize / maxsize * 100%. Spikes above 80% indicate backpressure events.
  • Drop rate: Items dropped per minute. A non-zero drop rate confirms backpressure is active.
  • Processing lag: Time difference between tick timestamp and processing timestamp.

Graceful Degradation: What Happens When the Queue Fills

The system has three failure modes when the queue fills, each with different consequences:

  1. Producer blocks (no timeout): The WebSocket reading loop stalls. The exchange may disconnect the client for inactivity. This is the worst outcome — cascading failure.
  2. Producer drops with warning (write_timeout behavior): The tick is discarded, a warning is logged, and the producer continues. Stale data accumulates in the strategy, but the pipeline remains alive.
  3. Consumer throttles upstream (rate-limit propagation): Not implemented in the basic pattern. Requires the producer to also consume a signal from the consumer indicating "slow down." This adds complexity but provides the smoothest degradation.

The implementation above uses strategy 2, which is the correct default: the producer stays alive, operators are warned, and the system degrades gracefully under load.

Scaling Beyond One Process

The architecture described runs in a single process. For institutional-scale systems handling dozens of symbols at tick rates exceeding 10,000/second, horizontal scaling becomes necessary. The asyncio.Queue does not cross process boundaries — you would replace it with a message broker (Redis Streams, Kafka, or ZeroMQ) as the buffer layer, and keep the producer-consumer logic for each worker process.

The transition is additive, not disruptive: each worker process runs the same logic, reads from the shared broker, and writes results to a centralized sink. The strategy code, worker logic, and monitoring hooks remain identical — only the queue implementation changes.

Closing

The producer-consumer pattern is not a performance optimization — it is a structural solution to a fundamental mismatch in data pipeline design. By introducing a bounded, backpressure-aware buffer between the market data feed and the strategy logic, you create a system that remains predictable under stress, degrades gracefully under overload, and can be horizontally scaled without rewriting the core logic.

The three parameters that determine success are maxsize (buffer capacity), write_timeout (backpressure patience), and lag_threshold (early warning trigger). Calibrate these against your actual feed rate and strategy latency profile, measure them in production, and tune until the drop rate is non-zero only during genuine overload events — not during normal burst traffic.

For TickDB users integrating this pattern: the WebSocket feed from TickDB's depth channel delivers order book updates at sub-100ms latency. The producer code above connects directly to wss://stream.tickdb.ai/v1/market/ws with your TICKDB_API_KEY. The queue absorbs variability; the multi-worker pool scales horizontally; the metrics hooks integrate with your existing monitoring stack.

Next Steps

If you're building a real-time strategy: Start with a single worker and a queue size of 1000. Run a 24-hour backtest against historical TickDB kline data to establish your strategy's per-tick latency budget. Then calibrate maxsize to absorb that budget's natural variability.

If you need historical backtest data: Visit tickdb.ai to access 10+ years of cleaned, aligned US equity OHLCV data via the /v1/market/kline endpoint. Align your backtest period with at least one full market cycle to validate strategy stability across regimes.

If you need enterprise-scale distribution: Reach out to enterprise@tickdb.ai for multi-symbol WebSocket subscriptions and dedicated infrastructure configurations that support institutional-level throughput.


This article does not constitute investment advice. Markets involve risk; past performance does not guarantee future results.