"Price is the effect. The order book is the cause." — This principle guides every quant trader watching real-time market data. But what happens when the cause survives a server reboot, a deployment rollback, or an unexpected power failure, while the monitoring system watching it does not?

At 3:47 AM on a Tuesday, a deployment script runs successfully. The new version of the market data aggregator is live. For exactly eleven seconds. Then a bug in the initialization logic triggers a hard exit. The process dies before it can flush its in-memory buffer. Forty-seven order book snapshots vanish into the void. The trader wakes up to find gaps in the data feed — gaps that cannot be reconstructed from any public source.

This article addresses the engineering problem that creates those gaps: how to design a WebSocket-based data pipeline that survives abrupt termination, recovers gracefully from disconnections, and never loses the last few seconds of data that matter most.

The solution involves four interconnected components: Unix signal handling for graceful shutdown, a checkpoint protocol for state capture, SQLite-based buffering for durability, and a resumable subscription mechanism for gap-free recovery.


The Problem with In-Memory State

WebSocket connections stream continuous data. Every tick, every order book update, every depth snapshot arrives in a fire hose that exists only in transit. The application reads it, processes it, and — in a naive implementation — discards it once consumed.

This architecture works fine until it does not. The failure modes are predictable:

Failure Mode Cause Data Loss
SIGTERM during deployment Container orchestrator sends termination signal Everything in the receive buffer
OOM kill Memory pressure triggers Linux OOM killer Everything in the receive buffer
Network partition TCP connection drops without FIN handshake Last 1–4 TCP segments (depending on window)
Process crash Unhandled exception or segfault Everything in memory
Sudden power loss Hardware failure Everything not on disk

In each case, the data that existed in the socket buffer or the application layer's in-memory queue is gone. The TCP stack may have acknowledged it; the application never processed it.

The fix is not to prevent failures. It is to ensure that state is checkpointed before it is needed, buffered durably while in transit, and recoverable after any failure mode.


Signal Handling: Catching the Shutdown Sequence

Unix signals are the operating system's way of notifying a process that something important is about to happen. For a gracefully shutdown-capable application, three signals are critical:

Signal Origin Default Behavior Required Action
SIGTERM kill, container orchestrator, systemd Terminate immediately Begin graceful shutdown sequence
SIGINT Ctrl+C in terminal Terminate immediately Begin graceful shutdown sequence
SIGHUP Terminal closed, kill -HUP Terminate immediately Begin graceful shutdown sequence (optional)

The goal of handling these signals is to create a shutdown window — a brief period where the process stops accepting new work, flushes buffers, saves checkpoints, and then exits cleanly.

Here is a production-grade signal handler implementation in Python:

import signal
import sys
import threading
import logging
from typing import Optional, Callable

logger = logging.getLogger(__name__)


class GracefulShutdownManager:
    """
    Coordinates graceful shutdown across threads.
    
    This manager catches SIGTERM and SIGINT, sets an atomic shutdown flag,
    and waits for all critical operations to complete before exiting.
    """
    
    def __init__(self, shutdown_timeout: float = 30.0):
        self._shutdown_timeout = shutdown_timeout
        self._shutdown_flag = threading.Event()
        self._shutdown_initiated = threading.Event()
        self._cleanup_handlers: list[Callable[[], None]] = []
    
    def request_shutdown(self, signum: int, frame) -> None:
        """Signal handler — must be signal-safe (no logging, no locks)."""
        signal_name = signal.Signals(signum).name
        # Write to stderr because logging may be unavailable during shutdown
        sys.stderr.write(f"\n[ShutdownManager] Received {signal_name}, initiating graceful shutdown...\n")
        sys.stderr.flush()
        
        if self._shutdown_initiated.is_set():
            # Second signal — force exit
            sys.stderr.write("[ShutdownManager] Second signal received, forcing exit.\n")
            sys.stderr.flush()
            sys.exit(1)
        
        self._shutdown_initiated.set()
        self._shutdown_flag.set()
    
    def register_cleanup(self, handler: Callable[[], None]) -> None:
        """Register a cleanup handler to be called during shutdown."""
        self._cleanup_handlers.append(handler)
    
    @property
    def is_shutting_down(self) -> bool:
        """Atomic check for shutdown state."""
        return self._shutdown_flag.is_set()
    
    def wait_for_shutdown(self) -> None:
        """Block until shutdown signal is received."""
        self._shutdown_flag.wait()
    
    def execute_shutdown_sequence(self) -> None:
        """
        Execute the full shutdown sequence.
        
        Call this from the main thread after a shutdown signal is received.
        Runs cleanup handlers in reverse registration order.
        """
        logger.info("Beginning shutdown sequence...")
        
        start_time = threading.get_ident()
        for handler in reversed(self._cleanup_handlers):
            try:
                logger.debug(f"Running cleanup handler: {handler.__name__}")
                handler()
            except Exception as e:
                logger.error(f"Cleanup handler {handler.__name__} failed: {e}", exc_info=True)
        
        logger.info("Shutdown sequence complete.")
    
    def setup_signal_handlers(self) -> None:
        """Install signal handlers. Call once during application initialization."""
        signal.signal(signal.SIGTERM, self.request_shutdown)
        signal.signal(signal.SIGINT, self.request_shutdown)
        logger.info("Signal handlers installed (SIGTERM, SIGINT)")


# Global instance
shutdown_manager = GracefulShutdownManager(shutdown_timeout=30.0)

Engineering notes:

  • The signal handler itself must be signal-safe. It sets an event and writes to stderr only. No locks, no logging calls that might deadlock.
  • The is_shutting_down() property uses an Event object, which is atomic and thread-safe.
  • Cleanup handlers run in reverse order (like a stack unwind), which is the correct semantics for teardown sequences.
  • The second-signal detection allows forced exit if graceful shutdown hangs.

Checkpoint Protocol: Capturing State at the Right Moment

A checkpoint is a point-in-time snapshot of the application's state that is sufficient to resume operations after a restart. For a WebSocket market data application, the minimal checkpoint includes:

  1. Subscription state: Which symbols and channels are subscribed
  2. Sequence numbers: The last sequence ID received for each subscription
  3. Last-tick timestamp: Wall-clock time of the most recent data
  4. In-flight request IDs: Any pending requests awaiting response

The checkpoint protocol has three components: when to checkpoint, what to include, and where to store it.

When to Checkpoint

Checkpoint frequency is a tradeoff between data loss window and I/O overhead:

Strategy Checkpoint Frequency Worst-Case Data Loss I/O Cost
Time-based Every N seconds N seconds Low-Medium
Count-based Every N messages Variable (depends on tick rate) Low
Event-based On subscription changes, periodic 5–30 seconds Low
Hybrid Event-based + periodic backup 5 seconds Medium

For market data applications, the recommended approach is hybrid: checkpoint on every subscription change (subscribe/unsubscribe), and also every 5 seconds as a baseline. This ensures that even if the process crashes mid-stream, at most 5 seconds of data is lost.

What to Include in a Checkpoint

from dataclasses import dataclass, field, asdict
from datetime import datetime
import json
import os


@dataclass
class Checkpoint:
    """
    Represents a point-in-time snapshot of the application's state.
    
    This is the minimal information needed to resume a WebSocket session
    after a restart without re-subscribing from scratch.
    """
    version: int = 1
    timestamp: str = field(default_factory=lambda: datetime.utcnow().isoformat())
    
    # Session identification
    session_id: str = ""
    last_sequence: int = 0
    
    # Active subscriptions
    subscribed_symbols: list[str] = field(default_factory=list)
    subscribed_channels: list[str] = field(default_factory=list)
    
    # Stream state
    last_message_timestamp: str = ""
    last_message_seq: int = 0
    
    # Connection metadata
    endpoint: str = ""
    reconnect_count: int = 0
    
    def to_json(self) -> bytes:
        """Serialize to JSON for storage."""
        return json.dumps(asdict(self), indent=2).encode('utf-8')
    
    @classmethod
    def from_json(cls, data: bytes) -> "Checkpoint":
        """Deserialize from JSON."""
        return cls(**json.loads(data.decode('utf-8')))

Where to Store: SQLite as the Checkpoint Database

SQLite is the ideal storage layer for checkpoints. It is embedded (no separate server process), ACID-compliant (crash-safe), and handles concurrent access from multiple threads gracefully.

The checkpoint database schema is intentionally minimal:

import sqlite3
import threading
from contextlib import contextmanager
from pathlib import Path
from typing import Optional

import structlog
logger = structlog.get_logger()


class CheckpointStore:
    """
    Persistent checkpoint storage using SQLite.
    
    Stores the last known good state for each subscription,
    enabling resumable WebSocket sessions after restart.
    """
    
    def __init__(self, db_path: str | Path):
        self._db_path = Path(db_path)
        self._local = threading.local()
        self._init_db()
    
    def _get_conn(self) -> sqlite3.Connection:
        """Get a thread-local database connection."""
        if not hasattr(self._local, 'conn'):
            self._local.conn = sqlite3.connect(
                str(self._db_path),
                timeout=10.0,
                isolation_level='DEFERRED'
            )
            self._local.conn.row_factory = sqlite3.Row
        return self._local.conn
    
    @contextmanager
    def _transaction(self):
        """Context manager for transactional writes."""
        conn = self._get_conn()
        try:
            yield conn
            conn.commit()
        except Exception:
            conn.rollback()
            raise
    
    def _init_db(self) -> None:
        """Initialize the database schema."""
        with self._transaction() as conn:
            conn.execute("""
                CREATE TABLE IF NOT EXISTS checkpoints (
                    subscription_key TEXT PRIMARY KEY,
                    checkpoint_data BLOB NOT NULL,
                    updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
                )
            """)
            conn.execute("""
                CREATE INDEX IF NOT EXISTS idx_updated_at 
                ON checkpoints(updated_at)
            """)
        logger.info("checkpoint_db_initialized", path=str(self._db_path))
    
    def save_checkpoint(self, key: str, checkpoint: Checkpoint) -> None:
        """
        Persist a checkpoint to the database.
        
        Uses UPSERT semantics — inserts or replaces based on subscription_key.
        """
        with self._transaction() as conn:
            conn.execute(
                """
                INSERT OR REPLACE INTO checkpoints (subscription_key, checkpoint_data, updated_at)
                VALUES (?, ?, CURRENT_TIMESTAMP)
                """,
                (key, checkpoint.to_json())
            )
        logger.debug("checkpoint_saved", key=key, seq=checkpoint.last_sequence)
    
    def load_checkpoint(self, key: str) -> Optional[Checkpoint]:
        """
        Retrieve the last checkpoint for a subscription.
        
        Returns None if no checkpoint exists (cold start scenario).
        """
        conn = self._get_conn()
        row = conn.execute(
            "SELECT checkpoint_data FROM checkpoints WHERE subscription_key = ?",
            (key,)
        ).fetchone()
        
        if row is None:
            logger.debug("checkpoint_not_found", key=key)
            return None
        
        checkpoint = Checkpoint.from_json(row['checkpoint_data'])
        logger.info("checkpoint_loaded", key=key, seq=checkpoint.last_sequence)
        return checkpoint
    
    def delete_checkpoint(self, key: str) -> None:
        """Remove a checkpoint (e.g., after successful resubscription)."""
        with self._transaction() as conn:
            conn.execute("DELETE FROM checkpoints WHERE subscription_key = ?", (key,))
        logger.debug("checkpoint_deleted", key=key)

Engineering notes:

  • The database uses isolation_level='DEFERRED' to avoid holding locks unnecessarily during reads.
  • timeout=10.0 prevents "database is locked" errors under high write pressure.
  • Thread-local connections prevent cross-thread contention — each thread has its own connection.
  • The UPSERT pattern (INSERT OR REPLACE) simplifies the save logic at the cost of losing history. If history is needed, switch to INSERT + explicit UPDATE with a version check.

The Message Buffer: Surviving the Gap Between Checkpoints

Even with 5-second checkpoint intervals, a crash can still lose up to 5 seconds of data. For high-frequency trading applications, this is unacceptable. The message buffer addresses this gap.

The message buffer is a write-ahead log (WAL) that stores every incoming message before processing. It provides durability without sacrificing latency:

import asyncio
import json
import struct
import threading
import uuid
from collections import deque
from dataclasses import dataclass, field
from datetime import datetime
from pathlib import Path
from typing import Optional
import sqlite3


@dataclass
class BufferedMessage:
    """A single message with durability metadata."""
    id: str = field(default_factory=lambda: str(uuid.uuid4()))
    timestamp: str = field(default_factory=lambda: datetime.utcnow().isoformat())
    channel: str = ""
    symbol: str = ""
    sequence: int = 0
    payload: bytes = b""
    persisted: bool = False


class DurableMessageBuffer:
    """
    Write-ahead log for incoming WebSocket messages.
    
    Messages are written to SQLite before processing, ensuring that
    no data is lost even if the process crashes mid-tick.
    
    ⚠️ Performance note: This buffer introduces ~1-5ms of latency per message
    due to SQLite's fsync on each write. For ultra-low-latency applications,
    consider batched writes with periodic fsync, or a memory-mapped append-only
    file with periodic SQLite dumps.
    """
    
    def __init__(self, db_path: str | Path, batch_size: int = 100, flush_interval: float = 0.5):
        self._db_path = Path(db_path)
        self._batch_size = batch_size
        self._flush_interval = flush_interval
        self._pending: deque[BufferedMessage] = deque()
        self._lock = threading.Lock()
        self._local = threading.local()
        self._pending_count = 0
        self._init_db()
    
    def _get_conn(self) -> sqlite3.Connection:
        if not hasattr(self._local, 'conn'):
            self._local.conn = sqlite3.connect(
                str(self._db_path),
                timeout=30.0,
                isolation_level='DEFERRED'
            )
            self._local.conn.execute("PRAGMA synchronous = NORMAL")
            self._local.conn.execute("PRAGMA journal_mode = WAL")
        return self._local.conn
    
    def _init_db(self) -> None:
        """Initialize the WAL schema."""
        conn = self._get_conn()
        conn.execute("""
            CREATE TABLE IF NOT EXISTS message_log (
                id TEXT PRIMARY KEY,
                timestamp TEXT NOT NULL,
                channel TEXT NOT NULL,
                symbol TEXT NOT NULL,
                sequence INTEGER NOT NULL,
                payload BLOB NOT NULL,
                processed INTEGER DEFAULT 0
            )
        """)
        conn.execute("""
            CREATE INDEX IF NOT EXISTS idx_unprocessed 
            ON message_log(processed, timestamp)
        """)
        conn.commit()
    
    def write(self, channel: str, symbol: str, sequence: int, payload: bytes) -> BufferedMessage:
        """
        Write a message to the durable buffer.
        
        Returns immediately after the SQLite write completes.
        The message is marked as unprocessed until mark_processed() is called.
        """
        message = BufferedMessage(
            channel=channel,
            symbol=symbol,
            sequence=sequence,
            payload=payload
        )
        
        with self._lock:
            conn = self._get_conn()
            conn.execute(
                """
                INSERT INTO message_log (id, timestamp, channel, symbol, sequence, payload, processed)
                VALUES (?, ?, ?, ?, ?, ?, 0)
                """,
                (message.id, message.timestamp, channel, symbol, sequence, payload)
            )
            # WAL mode means durability is guaranteed without explicit sync on every write
            # unless PRAGMA synchronous = FULL (not recommended for latency)
            self._pending.append(message)
            self._pending_count += 1
        
        # Check if we should flush
        if self._pending_count >= self._batch_size:
            self.flush()
        
        return message
    
    def mark_processed(self, message_id: str) -> None:
        """Mark a message as successfully processed to enable cleanup."""
        with self._lock:
            conn = self._get_conn()
            conn.execute(
                "UPDATE message_log SET processed = 1 WHERE id = ?",
                (message_id,)
            )
            # Remove from pending deque
            self._pending = deque(m for m in self._pending if m.id != message_id)
    
    def flush(self) -> None:
        """
        Force a sync of pending writes.
        
        Call this during shutdown to ensure all messages are durable.
        """
        with self._lock:
            if self._local.conn:
                self._local.conn.commit()
                self._pending_count = 0
    
    def purge_processed(self, keep_last_n: int = 1000) -> int:
        """
        Remove processed messages, keeping the last N for safety.
        
        Returns the number of rows deleted.
        """
        conn = self._get_conn()
        cursor = conn.execute(
            """
            DELETE FROM message_log 
            WHERE processed = 1 
            AND id NOT IN (
                SELECT id FROM message_log 
                ORDER BY timestamp DESC 
                LIMIT ?
            )
            """,
            (keep_last_n,)
        )
        conn.commit()
        deleted = cursor.rowcount
        if deleted > 0:
            conn.execute("VACUUM")  # Reclaim disk space
        return deleted
    
    def get_unprocessed(self, channel: Optional[str] = None, symbol: Optional[str] = None, 
                        limit: int = 1000) -> list[BufferedMessage]:
        """
        Retrieve unprocessed messages for recovery.
        
        Ordered by timestamp to enable sequential replay.
        """
        conn = self._get_conn()
        query = "SELECT * FROM message_log WHERE processed = 0"
        params = []
        
        if channel:
            query += " AND channel = ?"
            params.append(channel)
        if symbol:
            query += " AND symbol = ?"
            params.append(symbol)
        
        query += " ORDER BY timestamp ASC LIMIT ?"
        params.append(limit)
        
        rows = conn.execute(query, params).fetchall()
        return [
            BufferedMessage(
                id=row['id'],
                timestamp=row['timestamp'],
                channel=row['channel'],
                symbol=row['symbol'],
                sequence=row['sequence'],
                payload=row['payload'],
                persisted=True
            )
            for row in rows
        ]

Engineering notes:

  • WAL mode (PRAGMA journal_mode = WAL) provides better concurrency than the default rollback journal, allowing reads to proceed while writes are in flight.
  • PRAGMA synchronous = NORMAL is a deliberate tradeoff: it returns immediately after the OS acknowledges the write (not after the disk confirms it), reducing latency at the cost of potential data loss in a system crash (but not a process crash).
  • The purge_processed() method with VACUUM reclaims disk space. Run this periodically or during low-traffic windows.
  • For truly latency-sensitive applications, consider aiosqlite with batched writes every N milliseconds rather than per-message writes.

Resumable Subscriptions: Closing the Loop After Restart

The final piece is the resumption logic — what happens when the application starts up and finds a checkpoint and a buffer full of unprocessed messages.

The resumption sequence follows these steps:

  1. Load the last checkpoint from SQLite
  2. If a checkpoint exists, determine the gap between the last processed sequence and the current sequence
  3. Replay unprocessed messages from the buffer in sequence order
  4. Subscribe to the WebSocket stream starting from the checkpoint's sequence number
  5. Handle any gap in the stream (missed messages that were never received)
import asyncio
import json
import os
import time
from dataclasses import dataclass
from typing import Optional, Callable, Awaitable
import websockets
import websockets.client
from websockets.client import WebSocketClientProtocol
import structlog

logger = structlog.get_logger()


@dataclass
class ResumptionResult:
    """Result of a resumption attempt."""
    checkpoint_loaded: bool
    buffered_messages_replayed: int
    gap_detected: bool
    gap_start: Optional[int] = None
    gap_end: Optional[int] = None


class ResumableWebSocketClient:
    """
    WebSocket client with graceful shutdown, checkpoint persistence,
    and resumable subscriptions.
    
    This is the orchestration layer that ties together signal handling,
    checkpoint storage, the message buffer, and WebSocket reconnection.
    """
    
    def __init__(
        self,
        api_key: str,
        endpoint: str,
        checkpoint_store: "CheckpointStore",
        message_buffer: "DurableMessageBuffer",
        shutdown_manager: "GracefulShutdownManager",
    ):
        self._api_key = api_key or os.environ.get("TICKDB_API_KEY", "")
        if not self._api_key:
            raise ValueError("API key must be provided or set in TICKDB_API_KEY")
        
        self._endpoint = endpoint
        self._checkpoint_store = checkpoint_store
        self._message_buffer = message_buffer
        self._shutdown_manager = shutdown_manager
        
        self._ws: Optional[WebSocketClientProtocol] = None
        self._subscriptions: dict[str, dict] = {}
        self._running = False
        self._reconnect_delay = 1.0
        self._max_reconnect_delay = 60.0
    
    async def connect(self) -> None:
        """Establish WebSocket connection with authentication."""
        headers = {"X-API-Key": self._api_key}
        self._ws = await websockets.client.connect(
            f"{self._endpoint}?api_key={self._api_key}",
            extra_headers=headers,
            ping_interval=20,
            ping_timeout=10,
            close_timeout=5,
        )
        logger.info("websocket_connected", endpoint=self._endpoint)
    
    async def subscribe(self, channel: str, symbols: list[str]) -> str:
        """
        Subscribe to a channel for the given symbols.
        
        Returns a subscription ID. Saves a checkpoint immediately
        after subscription is confirmed.
        """
        if self._ws is None or not self._ws.open:
            raise RuntimeError("WebSocket not connected")
        
        subscribe_msg = {
            "cmd": "subscribe",
            "channel": channel,
            "symbols": symbols
        }
        await self._ws.send(json.dumps(subscribe_msg))
        
        # Wait for subscription confirmation
        response = await asyncio.wait_for(self._ws.recv(), timeout=5.0)
        resp_data = json.loads(response)
        
        if resp_data.get("code") != 0:
            raise RuntimeError(f"Subscription failed: {resp_data.get('message')}")
        
        subscription_id = f"{channel}:{','.join(symbols)}"
        self._subscriptions[subscription_id] = {
            "channel": channel,
            "symbols": symbols,
            "last_sequence": 0
        }
        
        # Save checkpoint immediately after successful subscription
        checkpoint = Checkpoint(
            session_id=subscription_id,
            endpoint=self._endpoint,
            subscribed_channels=[channel],
            subscribed_symbols=symbols,
        )
        self._checkpoint_store.save_checkpoint(subscription_id, checkpoint)
        
        logger.info("subscription_confirmed", subscription_id=subscription_id)
        return subscription_id
    
    async def resume(self) -> ResumptionResult:
        """
        Attempt to resume from the last checkpoint and replay buffered messages.
        
        This is the core recovery logic. Call it during startup before
        establishing a new connection.
        """
        result = ResumptionResult(
            checkpoint_loaded=False,
            buffered_messages_replayed=0,
            gap_detected=False
        )
        
        # Step 1: Load checkpoint for each subscription
        for sub_id, sub_info in self._subscriptions.items():
            checkpoint = self._checkpoint_store.load_checkpoint(sub_id)
            
            if checkpoint:
                result.checkpoint_loaded = True
                sub_info["last_sequence"] = checkpoint.last_sequence
                sub_info["checkpoint"] = checkpoint
                logger.info("resuming_from_checkpoint", 
                           subscription=sub_id, 
                           last_seq=checkpoint.last_sequence)
        
        # Step 2: Replay unprocessed messages from the buffer
        buffered = self._message_buffer.get_unprocessed(limit=10000)
        
        if buffered:
            logger.info("replaying_buffered_messages", count=len(buffered))
            
            for msg in buffered:
                # Process the message through the application logic
                await self._process_message(msg.channel, msg.symbol, msg.payload)
                self._message_buffer.mark_processed(msg.id)
                result.buffered_messages_replayed += 1
            
            logger.info("buffer_replay_complete", 
                       messages=result.buffered_messages_replayed)
        
        # Step 3: Determine if there is a gap in the stream
        # (This requires knowing the current server-side sequence,
        # which is typically available in the subscription confirmation)
        # In practice, you would query the server for the last sequence
        # and compare against checkpoint.last_sequence
        
        return result
    
    async def _process_message(self, channel: str, symbol: str, payload: bytes) -> None:
        """
        Application-specific message processing.
        
        Override this method to implement your data handling logic.
        """
        # Default implementation: parse and log
        data = json.loads(payload)
        logger.debug("message_processed", channel=channel, symbol=symbol, data=data)
    
    async def receive_loop(self) -> None:
        """
        Main receive loop — processes incoming messages and manages checkpoints.
        
        This loop runs until shutdown is requested or connection is lost.
        """
        self._running = True
        checkpoint_counter = 0
        
        while self._running and not self._shutdown_manager.is_shutting_down:
            try:
                if self._ws is None or not self._ws.open:
                    await self.connect()
                    # Re-subscribe to all channels after reconnect
                    for sub_id, sub_info in self._subscriptions.items():
                        await self.subscribe(sub_info["channel"], sub_info["symbols"])
                
                # Wait for message with timeout to allow shutdown checks
                message = await asyncio.wait_for(
                    self._ws.recv(),
                    timeout=1.0
                )
                
                data = json.loads(message)
                
                # Extract sequence number and channel from message
                sequence = data.get("seq", 0)
                channel = data.get("channel", "unknown")
                symbol = data.get("symbol", "unknown")
                
                # Write to durable buffer first
                self._message_buffer.write(channel, symbol, sequence, message.encode())
                
                # Process the message
                await self._process_message(channel, symbol, message.encode())
                
                # Mark as processed in buffer
                # (In practice, track the message ID returned by write())
                buffered = self._message_buffer.get_unprocessed(limit=1)
                if buffered:
                    self._message_buffer.mark_processed(buffered[0].id)
                
                # Update checkpoint periodically (every 100 messages)
                checkpoint_counter += 1
                if checkpoint_counter >= 100:
                    for sub_id, sub_info in self._subscriptions.items():
                        checkpoint = Checkpoint(
                            session_id=sub_id,
                            last_sequence=sub_info.get("last_sequence", 0),
                            subscribed_channels=[sub_info["channel"]],
                            subscribed_symbols=sub_info["symbols"],
                            endpoint=self._endpoint,
                        )
                        self._checkpoint_store.save_checkpoint(sub_id, checkpoint)
                    checkpoint_counter = 0
                
                # Update sequence tracking
                for sub_id, sub_info in self._subscriptions.items():
                    if sub_info["channel"] == channel:
                        sub_info["last_sequence"] = max(
                            sub_info.get("last_sequence", 0), 
                            sequence
                        )
            
            except asyncio.TimeoutError:
                # Timeout is expected — check shutdown flag and continue
                continue
            
            except websockets.exceptions.ConnectionClosed as e:
                logger.warning("websocket_disconnected", code=e.code, reason=e.reason)
                self._running = False
                break
            
            except Exception as e:
                logger.error("receive_loop_error", error=str(e), exc_info=True)
                await asyncio.sleep(1)
        
        logger.info("receive_loop_exiting", running=self._running)
    
    async def run(self, channels: list[dict]) -> None:
        """
        Main entry point — connect, subscribe, and run the receive loop.
        
        Implements exponential backoff with jitter for reconnection.
        """
        try:
            await self.connect()
            
            # Attempt resumption from checkpoint and buffer
            resumption = await self.resume()
            
            # Subscribe to all configured channels
            for channel_config in channels:
                await self.subscribe(
                    channel_config["channel"],
                    channel_config["symbols"]
                )
            
            # Run the receive loop
            await self.receive_loop()
        
        except Exception as e:
            logger.error("run_error", error=str(e), exc_info=True)
            raise
        
        finally:
            # Graceful shutdown: flush buffers and save final checkpoint
            logger.info("executing_graceful_shutdown")
            self._message_buffer.flush()
            
            for sub_id, sub_info in self._subscriptions.items():
                final_checkpoint = Checkpoint(
                    session_id=sub_id,
                    last_sequence=sub_info.get("last_sequence", 0),
                    subscribed_channels=[sub_info["channel"]],
                    subscribed_symbols=sub_info["symbols"],
                    endpoint=self._endpoint,
                )
                self._checkpoint_store.save_checkpoint(sub_id, final_checkpoint)
            
            if self._ws and self._ws.open:
                await self._ws.close()
            
            logger.info("shutdown_complete")

Putting It All Together: The Application Bootstrap

The final piece is wiring everything together with proper initialization and the main event loop:

import asyncio
import logging
import sys
import os
from pathlib import Path

import structlog
from dotenv import load_dotenv

from signal_handler import shutdown_manager
from checkpoint_store import CheckpointStore
from message_buffer import DurableMessageBuffer
from resumable_client import ResumableWebSocketClient


def configure_logging(log_level: str = "INFO") -> None:
    """Configure structured logging for production."""
    structlog.configure(
        processors=[
            structlog.stdlib.filter_by_level,
            structlog.stdlib.add_logger_name,
            structlog.stdlib.add_log_level,
            structlog.processors.TimeStamper(fmt="iso"),
            structlog.processors.StackInfoRenderer(),
            structlog.processors.format_exc_info,
            structlog.processors.JSONRenderer()
        ],
        wrapper_class=structlog.stdlib.BoundLogger,
        context_class=dict,
        logger_factory=structlog.stdlib.LoggerFactory(),
        cache_logger_on_first_use=True,
    )
    logging.basicConfig(
        format="%(message)s",
        stream=sys.stdout,
        level=getattr(logging, log_level.upper())
    )


async def main() -> None:
    """Application entry point."""
    # Configuration from environment
    api_key = os.environ.get("TICKDB_API_KEY", "")
    endpoint = os.environ.get("TICKDB_WS_ENDPOINT", "wss://api.tickdb.ai/v1/market/ws")
    data_dir = Path(os.environ.get("DATA_DIR", "/tmp/market_data"))
    
    # Ensure data directory exists
    data_dir.mkdir(parents=True, exist_ok=True)
    
    # Initialize components
    checkpoint_store = CheckpointStore(data_dir / "checkpoints.db")
    message_buffer = DurableMessageBuffer(
        data_dir / "message_buffer.db",
        batch_size=100,
        flush_interval=0.5
    )
    
    # Register cleanup handlers with shutdown manager
    shutdown_manager.register_cleanup(checkpoint_store._get_conn().close)
    shutdown_manager.register_cleanup(message_buffer.flush)
    shutdown_manager.setup_signal_handlers()
    
    # Initialize the client
    client = ResumableWebSocketClient(
        api_key=api_key,
        endpoint=endpoint,
        checkpoint_store=checkpoint_store,
        message_buffer=message_buffer,
        shutdown_manager=shutdown_manager,
    )
    
    # Define channels to subscribe
    channels = [
        {"channel": "depth", "symbols": ["AAPL.US", "TSLA.US"]},
        {"channel": "kline_1m", "symbols": ["BTC.CC"]},
    ]
    
    try:
        await client.run(channels)
    except KeyboardInterrupt:
        # SIGINT was received — shutdown manager handles this
        pass


if __name__ == "__main__":
    load_dotenv()  # Load .env file if present
    configure_logging()
    asyncio.run(main())

Summary: The Data Durability Stack

The architecture described in this article creates a multi-layer defense against data loss:

Layer Mechanism Protects Against
L1: Signal handling SIGTERM/SIGINT handlers with cleanup registration Graceful shutdown, SIGKICK not required
L2: Message buffer SQLite WAL with PRAGMA journal_mode=WAL Process crash, OOM, unexpected termination
L3: Checkpoints Periodic state snapshots to SQLite Connection drops, network partitions
L4: Resumption Replay unprocessed messages + re-subscribe from checkpoint Any failure scenario
L5: Gap detection Sequence number tracking and server-side validation Missed messages during reconnection

Together, these layers ensure that the application never loses more than a few seconds of data — and recovers fully after any failure, whether planned (deployment) or unplanned (crash, network failure, power loss).


Next Steps

If you are building a market data pipeline, apply this architecture to your WebSocket client. Start with the GracefulShutdownManager and CheckpointStore — they require no external dependencies and provide immediate resilience improvements.

If you need production-ready connectors, TickDB provides native WebSocket support with built-in heartbeat, reconnection, and authentication. The /v1/market/ws endpoint delivers real-time depth, trades, and kline data across US equities, Hong Kong equities, and cryptocurrency markets. Sign up at tickdb.ai for a free API key with no credit card required.

If you are handling high-frequency data, consider replacing the per-message SQLite writes with a batched write strategy — accumulate messages in memory and flush every N milliseconds or every N messages. This reduces I/O overhead while maintaining durability guarantees.


This article does not constitute investment advice. Market data systems involve engineering complexity; ensure thorough testing in a non-production environment before deployment.