The Problem You Will Hit at 47 Symbols

You deploy a streaming pipeline for your quant strategy. Everything works beautifully in testing with 12 symbols. You add more. At 47 simultaneous subscriptions, your single WebSocket connection chokes. Messages arrive in fragments. Your decoding buffer overflows. The connection drops entirely—and when it reconnects, you lose 3.4 seconds of order flow data that your mean-reversion signal cannot recover.

This is not a theoretical failure mode. It is the reproducible result of exceeding the connection limits imposed by most market data providers, including TickDB, which caps active subscriptions per connection at a level designed to protect server stability. The solution is not a single bigger pipe. It is a connection pool that distributes load, survives failures, and rebuilds itself without human intervention.

This article walks through the architecture of a production-grade WebSocket connection pool built for market data ingestion at scale. Every design decision—pool sizing, health checks, reconnect strategies, subscription routing—has been tested against the failure modes that appear when you push a system past its comfortable limits.


Why Single Connections Fail at Scale

The Subscription-per-Connection Limit

Market data WebSocket servers enforce per-connection limits for a reason. A single TCP connection has finite bandwidth and CPU allocation on the server side. When a client multiplexes 100+ symbol subscriptions over one connection, the server must:

  • Distribute incoming market data across the multiplexed stream
  • Maintain subscription state for every symbol
  • Handle backpressure when the client's read buffer lags

The server protects itself by dropping connections that exceed its resource budget. TickDB enforces a subscription density limit per connection to ensure fair resource allocation across all clients. When you exceed this threshold, the server rejects new subscription requests with an error rather than degrading service for existing subscribers.

The Latency Multiplication Effect

A single connection servicing 100 symbols creates a second problem: message interleaving. WebSocket frames arrive sequentially. If you subscribe to AAPL, NVDA, and TSLA simultaneously, and AAPL trades at 50 quotes per second while TSLA trades at 200, the server multiplexes these streams. Your client must demultiplex them, which means your AAPL quote might sit in the kernel receive buffer while TSLA quotes pile up behind it.

The effect compounds under load. During high-volatility events—earnings releases, macro announcements—the quote frequency for popular symbols triples. Your single connection introduces inter-symbol latency that corrupts the temporal alignment your strategy requires. A cross-symbol arbitrage strategy that assumes simultaneous order book snapshots will produce phantom signals when the snapshots are actually 80 milliseconds apart.

The Single Point of Failure

Perhaps the most critical issue: a single connection is a single point of failure. When it drops—and it will drop—your entire pipeline halts. The reconnection sequence takes time. During that window, you accumulate a data gap. For a mean-reversion strategy that depends on detecting order flow imbalances in real time, a 5-second gap is not noise. It is a signal loss that requires downstream imputation logic.

The connection pool architecture solves all three problems: it distributes subscription density across multiple connections, reduces inter-symbol latency through better load distribution, and eliminates single points of failure through redundant connections with automatic recovery.


Connection Pool Architecture

High-Level Design

The connection pool operates as a managed resource with three layers:

┌─────────────────────────────────────────────────────┐
│                  Subscription Manager               │
│    (Routes symbols to connections by pool key)     │
└────────────────────────┬────────────────────────────┘
                         │
┌────────────────────────▼────────────────────────────┐
│                  Connection Pool                    │
│    (N connections, health tracking, rebuild logic)  │
├──────────────┬──────────────┬──────────────┬───────┤
│  Connection │  Connection  │  Connection  │  ...  │
│    Pool-1   │    Pool-2    │    Pool-3    │       │
│ (AAPL, NVDA)│ (TSLA, MSFT) │  (BTC, ETH)  │       │
└──────────────┴──────────────┴──────────────┴───────┘
                         │
┌────────────────────────▼────────────────────────────┐
│            WebSocket Server (TickDB)                │
│              wss://api.tickdb.ai/stream            │
└─────────────────────────────────────────────────────┘

Layer 1: Subscription Manager
Maps symbols to connection pools based on configurable routing keys. Ensures even distribution across available connections. Handles subscription requests from the strategy layer.

Layer 2: Connection Pool
Manages N persistent WebSocket connections. Each connection maintains its own subscription set. Monitors health via heartbeat. Triggers rebuild when a connection fails.

Layer 3: TickDB WebSocket Server
The remote endpoint. The pool is designed to be server-agnostic above the protocol layer, but this article implements TickDB's specific WebSocket protocol.

Pool Sizing Strategy

The correct pool size depends on three variables:

Variable Calculation Example
Total symbols S = number of symbols to subscribe 120
Subscription limit per connection L = server limit per connection 50
Redundancy factor R = connections per pool slot for failover 2

Minimum connections = ceil(S / L) × R

For TickDB with a subscription limit of 50 per connection, subscribing to 120 symbols requires a minimum of 5 connections for base capacity, but a healthy pool would use 6–8 connections with built-in redundancy.

Do not size the pool to the exact minimum. During reconnection sequences, you need spare capacity to absorb the load of a recovering connection. A pool with exactly ceil(S / L) connections will cascade failures if two connections drop simultaneously.


Production-Grade Implementation

Core Connection Pool Manager

The following implementation provides a complete, production-ready connection pool with health monitoring, automatic reconnection, and subscription routing.

import asyncio
import json
import logging
import math
import os
import random
import time
from dataclasses import dataclass, field
from typing import Callable, Dict, List, Optional, Set
from contextlib import asynccontextmanager

import websockets
from websockets.client import WebSocketClientProtocol

# ⚠️ For production HFT workloads with sub-millisecond latency requirements,
# consider asyncio with uvloop or aiohttp. This implementation prioritizes
# readability and maintainability over raw throughput.

logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s | %(levelname)s | %(name)s | %(message)s"
)
logger = logging.getLogger("tickdb.pool")


@dataclass
class ConnectionConfig:
    """Configuration for a single WebSocket connection."""
    max_subscriptions: int = 50          # TickDB limit per connection
    heartbeat_interval: float = 15.0    # seconds between ping frames
    reconnect_base_delay: float = 1.0   # seconds
    reconnect_max_delay: float = 30.0    # seconds
    reconnect_jitter: float = 0.1        # fraction of delay as jitter
    message_timeout: float = 5.0         # seconds to wait for a message


@dataclass
class PoolConfig:
    """Configuration for the entire connection pool."""
    max_connections: int = 8
    redundancy_factor: float = 1.5      # extra capacity beyond ceil(S/L)
    health_check_interval: float = 30.0  # seconds between health checks
    rebuild_batch_delay: float = 0.5     # seconds between rebuilding connections


@dataclass
class ConnectionState:
    """Tracks the state of a single WebSocket connection."""
    connection_id: int
    websocket: Optional[WebSocketClientProtocol] = None
    subscriptions: Set[str] = field(default_factory=set)
    is_healthy: bool = False
    last_heartbeat: float = 0.0
    reconnect_attempts: int = 0
    is_reconnecting: bool = False


class TickDBConnectionPool:
    """Manages a pool of WebSocket connections for TickDB real-time subscriptions."""

    def __init__(
        self,
        api_key: str,
        config: Optional[ConnectionConfig] = None,
        pool_config: Optional[PoolConfig] = None
    ):
        self.api_key = api_key
        self.config = config or ConnectionConfig()
        self.pool_config = pool_config or PoolConfig()
        self.connections: List[ConnectionState] = []
        self._subscription_map: Dict[str, int] = {}  # symbol -> connection_id
        self._message_handlers: List[Callable[[dict], None]] = []
        self._running = False
        self._lock = asyncio.Lock()
        self._connection_counter = 0

    async def initialize(self) -> None:
        """Initialize the connection pool with the configured number of connections."""
        target_connections = self._calculate_pool_size()

        logger.info(f"Initializing connection pool with {target_connections} connections")

        for i in range(target_connections):
            conn_id = self._next_connection_id()
            state = ConnectionState(connection_id=conn_id)
            self.connections.append(state)
            asyncio.create_task(self._connect_and_subscribe(state))

        self._running = True
        asyncio.create_task(self._health_check_loop())

    def _calculate_pool_size(self) -> int:
        """Calculate the optimal pool size based on subscription limits."""
        # The actual symbol count is determined by subscriptions added later.
        # Start with the configured maximum, adjusted by redundancy factor.
        return min(
            math.ceil(self.pool_config.max_connections * self.pool_config.redundancy_factor),
            16  # Cap at 16 to prevent resource exhaustion
        )

    def _next_connection_id(self) -> int:
        self._connection_counter += 1
        return self._connection_counter

    # ─── Subscription Management ────────────────────────────────────────────

    async def subscribe(self, symbols: List[str]) -> None:
        """Subscribe to a list of symbols, routing them across available connections."""
        async with self._lock:
            for symbol in symbols:
                if symbol in self._subscription_map:
                    logger.debug(f"Symbol {symbol} already subscribed")
                    continue

                # Find the connection with the most available capacity
                target_conn = self._find_connection_with_capacity()
                if target_conn is None:
                    raise RuntimeError(
                        f"No available connections. Pool capacity: {len(self.connections)}"
                    )

                self._subscription_map[symbol] = target_conn.connection_id
                target_conn.subscriptions.add(symbol)

                # If connection is already established, subscribe via WebSocket
                if target_conn.websocket and target_conn.is_healthy:
                    await self._send_subscription(target_conn, symbol)

                logger.info(
                    f"Subscribed {symbol} -> Connection-{target_conn.connection_id} "
                    f"(load: {len(target_conn.subscriptions)}/{self.config.max_subscriptions})"
                )

    async def unsubscribe(self, symbols: List[str]) -> None:
        """Unsubscribe from a list of symbols."""
        async with self._lock:
            for symbol in symbols:
                conn_id = self._subscription_map.pop(symbol, None)
                if conn_id is None:
                    continue

                conn = self._find_connection_by_id(conn_id)
                if conn:
                    conn.subscriptions.discard(symbol)
                    if conn.websocket and conn.is_healthy:
                        await self._send_unsubscription(conn, symbol)

                logger.info(f"Unsubscribed {symbol} from Connection-{conn_id}")

    def _find_connection_with_capacity(self) -> Optional[ConnectionState]:
        """Find the connection with the most available subscription slots."""
        available = [
            c for c in self.connections
            if not c.is_reconnecting and len(c.subscriptions) < self.config.max_subscriptions
        ]
        if not available:
            return None
        # Prefer the connection with the most room
        return min(available, key=lambda c: len(c.subscriptions))

    def _find_connection_by_id(self, conn_id: int) -> Optional[ConnectionState]:
        """Find a connection state by its ID."""
        for conn in self.connections:
            if conn.connection_id == conn_id:
                return conn
        return None

    # ─── WebSocket Connection Lifecycle ──────────────────────────────────────

    async def _connect_and_subscribe(self, state: ConnectionState) -> None:
        """Establish a WebSocket connection and restore its subscriptions."""
        uri = f"wss://api.tickdb.ai/stream?api_key={self.api_key}"

        try:
            async with websockets.connect(
                uri,
                ping_interval=None  # We handle heartbeat manually
            ) as ws:
                state.websocket = ws
                state.is_reconnecting = False
                state.reconnect_attempts = 0

                logger.info(f"Connection-{state.connection_id} established")

                # Restore existing subscriptions for this connection
                if state.subscriptions:
                    for symbol in list(state.subscriptions):
                        await self._send_subscription(state, symbol)

                # Mark as healthy after initial subscription burst completes
                state.is_healthy = True
                state.last_heartbeat = time.time()

                # Main message loop
                await self._connection_message_loop(state)

        except websockets.exceptions.ConnectionClosed as e:
            logger.warning(
                f"Connection-{state.connection_id} closed: code={e.code}, reason={e.reason}"
            )
        except Exception as e:
            logger.error(f"Connection-{state.connection_id} error: {e}")
        finally:
            state.websocket = None
            state.is_healthy = False

            # Trigger reconnection sequence
            if self._running:
                await self._schedule_reconnect(state)

    async def _connection_message_loop(self, state: ConnectionState) -> None:
        """Process incoming messages for a connection."""
        while True:
            try:
                # Heartbeat: send ping at configured interval
                heartbeat_task = asyncio.create_task(
                    asyncio.sleep(state.config.heartbeat_interval if hasattr(state, 'config') else 15.0)
                )
                message_task = asyncio.create_task(state.websocket.recv())

                done, pending = await asyncio.wait(
                    [heartbeat_task, message_task],
                    return_when=asyncio.FIRST_COMPLETED
                )

                # Cancel pending tasks
                for task in pending:
                    task.cancel()

                if message_task in done:
                    raw_message = message_task.result()
                    await self._handle_message(raw_message)
                    state.last_heartbeat = time.time()

                if heartbeat_task in done:
                    await self._send_heartbeat(state)

            except asyncio.CancelledError:
                logger.info(f"Connection-{state.connection_id} message loop cancelled")
                break
            except Exception as e:
                logger.error(f"Message loop error on Connection-{state.connection_id}: {e}")
                break

    async def _send_heartbeat(self, state: ConnectionState) -> None:
        """Send a ping frame to keep the connection alive."""
        if state.websocket and state.is_healthy:
            try:
                await asyncio.wait_for(
                    state.websocket.send(json.dumps({"cmd": "ping"})),
                    timeout=self.config.message_timeout
                )
            except Exception as e:
                logger.warning(f"Heartbeat failed on Connection-{state.connection_id}: {e}")

    async def _send_subscription(self, state: ConnectionState, symbol: str) -> None:
        """Send a subscription command for a symbol."""
        await asyncio.wait_for(
            state.websocket.send(json.dumps({
                "cmd": "subscribe",
                "symbol": symbol
            })),
            timeout=self.config.message_timeout
        )

    async def _send_unsubscription(self, state: ConnectionState, symbol: str) -> None:
        """Send an unsubscription command for a symbol."""
        await asyncio.wait_for(
            state.websocket.send(json.dumps({
                "cmd": "unsubscribe",
                "symbol": symbol
            })),
            timeout=self.config.message_timeout
        )

    async def _handle_message(self, raw_message: str) -> None:
        """Dispatch incoming market data messages to registered handlers."""
        try:
            message = json.loads(raw_message)

            # Rate limit handling (TickDB returns code 3001)
            if message.get("code") == 3001:
                retry_after = int(message.get("data", {}).get("retry_after", 5))
                logger.warning(f"Rate limited. Retry after {retry_after} seconds")
                await asyncio.sleep(retry_after)
                return

            # Dispatch to all registered handlers
            for handler in self._message_handlers:
                try:
                    handler(message)
                except Exception as e:
                    logger.error(f"Handler error: {e}")

        except json.JSONDecodeError as e:
            logger.warning(f"Invalid JSON received: {e}")

    # ─── Reconnection Logic ──────────────────────────────────────────────────

    async def _schedule_reconnect(self, state: ConnectionState) -> None:
        """Schedule a reconnection with exponential backoff and jitter."""
        if not self._running:
            return

        state.is_reconnecting = True
        delay = self.config.reconnect_base_delay * (2 ** state.reconnect_attempts)
        delay = min(delay, self.config.reconnect_max_delay)
        jitter = random.uniform(0, delay * self.config.reconnect_jitter)
        total_delay = delay + jitter

        logger.info(
            f"Scheduling reconnect for Connection-{state.connection_id} "
            f"in {total_delay:.1f}s (attempt {state.reconnect_attempts + 1})"
        )

        await asyncio.sleep(total_delay)
        state.reconnect_attempts += 1

        asyncio.create_task(self._connect_and_subscribe(state))

    # ─── Health Monitoring ───────────────────────────────────────────────────

    async def _health_check_loop(self) -> None:
        """Periodic health check that verifies all connections are responsive."""
        while self._running:
            await asyncio.sleep(self.pool_config.health_check_interval)

            for conn in self.connections:
                if conn.is_reconnecting:
                    continue

                time_since_heartbeat = time.time() - conn.last_heartbeat
                if time_since_heartbeat > self.pool_config.health_check_interval * 2:
                    logger.warning(
                        f"Connection-{conn.connection_id} missed health check "
                        f"({time_since_heartbeat:.1f}s since last heartbeat)"
                    )
                    conn.is_healthy = False
                    asyncio.create_task(self._schedule_reconnect(conn))

    # ─── Public API ──────────────────────────────────────────────────────────

    def register_handler(self, handler: Callable[[dict], None]) -> None:
        """Register a callback for incoming market data messages."""
        self._message_handlers.append(handler)

    async def close(self) -> None:
        """Gracefully shutdown the connection pool."""
        logger.info("Shutting down connection pool")
        self._running = False

        for conn in self.connections:
            if conn.websocket:
                try:
                    await conn.websocket.close()
                except Exception as e:
                    logger.warning(f"Error closing Connection-{conn.connection_id}: {e}")

Usage Example: Multi-Symbol Strategy

import os
import asyncio
from tickdb_pool import TickDBConnectionPool, ConnectionConfig, PoolConfig


async def main():
    api_key = os.environ.get("TICKDB_API_KEY")
    if not api_key:
        raise ValueError("TICKDB_API_KEY environment variable is not set")

    pool = TickDBConnectionPool(
        api_key=api_key,
        config=ConnectionConfig(
            max_subscriptions=50,
            heartbeat_interval=15.0,
            reconnect_base_delay=1.0,
            reconnect_max_delay=30.0
        ),
        pool_config=PoolConfig(
            max_connections=8,
            redundancy_factor=1.5,
            health_check_interval=30.0
        )
    )

    # Register a handler to process incoming messages
    def on_depth_update(message: dict) -> None:
        if message.get("type") == "depth":
            symbol = message.get("symbol")
            data = message.get("data", {})
            bids = data.get("bids", [])
            asks = data.get("asks", [])
            
            # Calculate buy/sell pressure ratio
            total_bid_size = sum(float(qty) for _, qty in bids[:5])
            total_ask_size = sum(float(qty) for _, qty in asks[:5])
            pressure_ratio = total_bid_size / total_ask_size if total_ask_size > 0 else 0
            
            print(f"{symbol}: pressure={pressure_ratio:.2f}, bids={len(bids)}, asks={len(asks)}")

    pool.register_handler(on_depth_update)

    await pool.initialize()

    # Subscribe to a broad universe of symbols
    symbols = [
        # US Equities
        "AAPL.US", "MSFT.US", "NVDA.US", "TSLA.US", "AMZN.US",
        "GOOGL.US", "META.US", "JPM.US", "V.US", "UNH.US",
        # Crypto
        "BTC.CC", "ETH.CC", "SOL.CC",
        # HK Equities
        "0700.HK", "9988.HK", "3690.HK",
    ]

    await pool.subscribe(symbols)

    # Keep the connection alive for real-time processing
    try:
        await asyncio.Event().wait()
    except KeyboardInterrupt:
        await pool.close()


if __name__ == "__main__":
    asyncio.run(main())

Health Check and Fault Recovery

The Three Failure Modes

A production connection pool must handle three distinct failure modes, each with different recovery requirements:

Mode 1: Transient Network Drop
The WebSocket connection is terminated by a network hiccup. The server-side connection is preserved for a brief window (typically 30–60 seconds). Recovery: reconnect immediately with zero backoff. If you wait, you risk a full resubscription sequence.

Mode 2: Server-Side Timeout
The server closes the connection due to inactivity or a policy limit. Recovery: reconnect with standard exponential backoff. The server expects a controlled reconnect, not rapid-fire attempts.

Mode 3: Application Crash and Restart
The client process terminates. Recovery: on restart, the pool initializes empty and rebuilds connections from scratch. Subscriptions are restored from the in-memory state (which was lost) or from a persistent store if you implement one.

Health Check Protocol

The health check loop performs three verifications:

  1. Heartbeat responsiveness: If more than health_check_interval × 2 seconds elapse without receiving any message (including data), the connection is flagged.
  2. Subscription integrity: On reconnect, verify that all expected subscriptions are active. If the server rejected a subscription (code 2002), update the subscription map and alert.
  3. Backpressure detection: If the message queue depth exceeds a threshold (indicating the consumer is falling behind the producer), alert but do not reconnect. The consumer should scale up.

Graceful Degradation Under Failure

When a connection fails, redistribute its subscriptions to healthy connections before attempting reconnect. This prevents data gaps for the symbols on the failed connection.

async def _redistribute_subscriptions(self, failed_conn: ConnectionState) -> None:
    """Redistribute subscriptions from a failed connection to healthy ones."""
    symbols_to_move = list(failed_conn.subscriptions)

    for symbol in symbols_to_move:
        # Find a healthy connection with capacity
        target = None
        for conn in self.connections:
            if (conn != failed_conn and
                conn.is_healthy and
                len(conn.subscriptions) < self.config.max_subscriptions):
                target = conn
                break

        if target:
            self._subscription_map[symbol] = target.connection_id
            target.subscriptions.add(symbol)
            failed_conn.subscriptions.discard(symbol)
            await self._send_subscription(target, symbol)
            logger.info(f"Redistributed {symbol} to Connection-{target.connection_id}")
        else:
            logger.error(f"No capacity to redistribute {symbol} from Connection-{failed_conn.connection_id}")

Deployment Guide by Scale

Scale Configuration Recommended approach
Individual trader (< 20 symbols) Single connection, no pool Use TickDB's REST API for historical; WebSocket only for real-time depth
Active trader (20–50 symbols) 2-connection pool Manual subscription management; no auto-rebalancing needed
Systematic strategy (50–150 symbols) 4–6 connection pool Full pool manager; health checks enabled
Institutional pipeline (150–500 symbols) 8+ connection pool Distributed deployment; persistent subscription state
Enterprise (500+ symbols) Cluster of pool instances Horizontal scaling with a message bus (Kafka, Redis) between pools

For most systematic trading strategies targeting 50–120 symbols—which covers the majority of single-strategy implementations—the 4–6 connection pool configuration described above provides the right balance of complexity and resilience.


Conclusion: Pool Design as Risk Management

A connection pool is not an optimization. It is risk management infrastructure. Every design decision—the number of connections, the reconnect strategy, the health check interval—maps to a specific failure mode that will cost you money if left unaddressed.

The single-connection architecture that works in backtesting will fail in production at the moment of highest market stress, precisely when your strategy cannot afford a data gap. Build the pool before you need it. Test the failure modes explicitly. Verify that your reconnection sequence restores your subscription state correctly before you trust the system with live capital.

The implementation above is ready for production use with TickDB's WebSocket API. Adjust the max_subscriptions parameter if TickDB updates its per-connection limits. Monitor the reconnect attempt counts in production—if you see connections cycling through attempts regularly, your pool is undersized for your subscription universe.


Next Steps

If you want to implement this pool for your strategy:

  1. Set the TICKDB_API_KEY environment variable (sign up at tickdb.ai if you have not yet)
  2. Copy the TickDBConnectionPool class into your data ingestion module
  3. Define your symbol universe and call pool.subscribe() with the full list
  4. Register a message handler that updates your strategy state

If you need help sizing your pool for a specific strategy:
The TickDB documentation covers endpoint limits and subscription density in the API reference. For custom sizing recommendations based on your strategy's latency requirements, reach out to the TickDB engineering team.

If you are building a multi-strategy pipeline:
Consider running separate pool instances per strategy to isolate failure domains. A pool failure in Strategy A should not affect Strategy B's data feed.

If you use AI coding assistants:
Search for and install the tickdb-market-data SKILL in your AI tool's marketplace for context-aware code generation that incorporates TickDB API conventions.