"Price is the effect. The order book is the cause." — This principle guides every quant trader watching real-time market data. But what happens when the cause survives a server reboot, a deployment rollback, or an unexpected power failure, while the monitoring system watching it does not?
At 3:47 AM on a Tuesday, a deployment script runs successfully. The new version of the market data aggregator is live. For exactly eleven seconds. Then a bug in the initialization logic triggers a hard exit. The process dies before it can flush its in-memory buffer. Forty-seven order book snapshots vanish into the void. The trader wakes up to find gaps in the data feed — gaps that cannot be reconstructed from any public source.
This article addresses the engineering problem that creates those gaps: how to design a WebSocket-based data pipeline that survives abrupt termination, recovers gracefully from disconnections, and never loses the last few seconds of data that matter most.
The solution involves four interconnected components: Unix signal handling for graceful shutdown, a checkpoint protocol for state capture, SQLite-based buffering for durability, and a resumable subscription mechanism for gap-free recovery.
The Problem with In-Memory State
WebSocket connections stream continuous data. Every tick, every order book update, every depth snapshot arrives in a fire hose that exists only in transit. The application reads it, processes it, and — in a naive implementation — discards it once consumed.
This architecture works fine until it does not. The failure modes are predictable:
| Failure Mode | Cause | Data Loss |
|---|---|---|
| SIGTERM during deployment | Container orchestrator sends termination signal | Everything in the receive buffer |
| OOM kill | Memory pressure triggers Linux OOM killer | Everything in the receive buffer |
| Network partition | TCP connection drops without FIN handshake | Last 1–4 TCP segments (depending on window) |
| Process crash | Unhandled exception or segfault | Everything in memory |
| Sudden power loss | Hardware failure | Everything not on disk |
In each case, the data that existed in the socket buffer or the application layer's in-memory queue is gone. The TCP stack may have acknowledged it; the application never processed it.
The fix is not to prevent failures. It is to ensure that state is checkpointed before it is needed, buffered durably while in transit, and recoverable after any failure mode.
Signal Handling: Catching the Shutdown Sequence
Unix signals are the operating system's way of notifying a process that something important is about to happen. For a gracefully shutdown-capable application, three signals are critical:
| Signal | Origin | Default Behavior | Required Action |
|---|---|---|---|
| SIGTERM | kill, container orchestrator, systemd |
Terminate immediately | Begin graceful shutdown sequence |
| SIGINT | Ctrl+C in terminal | Terminate immediately | Begin graceful shutdown sequence |
| SIGHUP | Terminal closed, kill -HUP |
Terminate immediately | Begin graceful shutdown sequence (optional) |
The goal of handling these signals is to create a shutdown window — a brief period where the process stops accepting new work, flushes buffers, saves checkpoints, and then exits cleanly.
Here is a production-grade signal handler implementation in Python:
import signal
import sys
import threading
import logging
from typing import Optional, Callable
logger = logging.getLogger(__name__)
class GracefulShutdownManager:
"""
Coordinates graceful shutdown across threads.
This manager catches SIGTERM and SIGINT, sets an atomic shutdown flag,
and waits for all critical operations to complete before exiting.
"""
def __init__(self, shutdown_timeout: float = 30.0):
self._shutdown_timeout = shutdown_timeout
self._shutdown_flag = threading.Event()
self._shutdown_initiated = threading.Event()
self._cleanup_handlers: list[Callable[[], None]] = []
def request_shutdown(self, signum: int, frame) -> None:
"""Signal handler — must be signal-safe (no logging, no locks)."""
signal_name = signal.Signals(signum).name
# Write to stderr because logging may be unavailable during shutdown
sys.stderr.write(f"\n[ShutdownManager] Received {signal_name}, initiating graceful shutdown...\n")
sys.stderr.flush()
if self._shutdown_initiated.is_set():
# Second signal — force exit
sys.stderr.write("[ShutdownManager] Second signal received, forcing exit.\n")
sys.stderr.flush()
sys.exit(1)
self._shutdown_initiated.set()
self._shutdown_flag.set()
def register_cleanup(self, handler: Callable[[], None]) -> None:
"""Register a cleanup handler to be called during shutdown."""
self._cleanup_handlers.append(handler)
@property
def is_shutting_down(self) -> bool:
"""Atomic check for shutdown state."""
return self._shutdown_flag.is_set()
def wait_for_shutdown(self) -> None:
"""Block until shutdown signal is received."""
self._shutdown_flag.wait()
def execute_shutdown_sequence(self) -> None:
"""
Execute the full shutdown sequence.
Call this from the main thread after a shutdown signal is received.
Runs cleanup handlers in reverse registration order.
"""
logger.info("Beginning shutdown sequence...")
start_time = threading.get_ident()
for handler in reversed(self._cleanup_handlers):
try:
logger.debug(f"Running cleanup handler: {handler.__name__}")
handler()
except Exception as e:
logger.error(f"Cleanup handler {handler.__name__} failed: {e}", exc_info=True)
logger.info("Shutdown sequence complete.")
def setup_signal_handlers(self) -> None:
"""Install signal handlers. Call once during application initialization."""
signal.signal(signal.SIGTERM, self.request_shutdown)
signal.signal(signal.SIGINT, self.request_shutdown)
logger.info("Signal handlers installed (SIGTERM, SIGINT)")
# Global instance
shutdown_manager = GracefulShutdownManager(shutdown_timeout=30.0)
Engineering notes:
- The signal handler itself must be signal-safe. It sets an event and writes to stderr only. No locks, no logging calls that might deadlock.
- The
is_shutting_down()property uses anEventobject, which is atomic and thread-safe. - Cleanup handlers run in reverse order (like a stack unwind), which is the correct semantics for teardown sequences.
- The second-signal detection allows forced exit if graceful shutdown hangs.
Checkpoint Protocol: Capturing State at the Right Moment
A checkpoint is a point-in-time snapshot of the application's state that is sufficient to resume operations after a restart. For a WebSocket market data application, the minimal checkpoint includes:
- Subscription state: Which symbols and channels are subscribed
- Sequence numbers: The last sequence ID received for each subscription
- Last-tick timestamp: Wall-clock time of the most recent data
- In-flight request IDs: Any pending requests awaiting response
The checkpoint protocol has three components: when to checkpoint, what to include, and where to store it.
When to Checkpoint
Checkpoint frequency is a tradeoff between data loss window and I/O overhead:
| Strategy | Checkpoint Frequency | Worst-Case Data Loss | I/O Cost |
|---|---|---|---|
| Time-based | Every N seconds | N seconds | Low-Medium |
| Count-based | Every N messages | Variable (depends on tick rate) | Low |
| Event-based | On subscription changes, periodic | 5–30 seconds | Low |
| Hybrid | Event-based + periodic backup | 5 seconds | Medium |
For market data applications, the recommended approach is hybrid: checkpoint on every subscription change (subscribe/unsubscribe), and also every 5 seconds as a baseline. This ensures that even if the process crashes mid-stream, at most 5 seconds of data is lost.
What to Include in a Checkpoint
from dataclasses import dataclass, field, asdict
from datetime import datetime
import json
import os
@dataclass
class Checkpoint:
"""
Represents a point-in-time snapshot of the application's state.
This is the minimal information needed to resume a WebSocket session
after a restart without re-subscribing from scratch.
"""
version: int = 1
timestamp: str = field(default_factory=lambda: datetime.utcnow().isoformat())
# Session identification
session_id: str = ""
last_sequence: int = 0
# Active subscriptions
subscribed_symbols: list[str] = field(default_factory=list)
subscribed_channels: list[str] = field(default_factory=list)
# Stream state
last_message_timestamp: str = ""
last_message_seq: int = 0
# Connection metadata
endpoint: str = ""
reconnect_count: int = 0
def to_json(self) -> bytes:
"""Serialize to JSON for storage."""
return json.dumps(asdict(self), indent=2).encode('utf-8')
@classmethod
def from_json(cls, data: bytes) -> "Checkpoint":
"""Deserialize from JSON."""
return cls(**json.loads(data.decode('utf-8')))
Where to Store: SQLite as the Checkpoint Database
SQLite is the ideal storage layer for checkpoints. It is embedded (no separate server process), ACID-compliant (crash-safe), and handles concurrent access from multiple threads gracefully.
The checkpoint database schema is intentionally minimal:
import sqlite3
import threading
from contextlib import contextmanager
from pathlib import Path
from typing import Optional
import structlog
logger = structlog.get_logger()
class CheckpointStore:
"""
Persistent checkpoint storage using SQLite.
Stores the last known good state for each subscription,
enabling resumable WebSocket sessions after restart.
"""
def __init__(self, db_path: str | Path):
self._db_path = Path(db_path)
self._local = threading.local()
self._init_db()
def _get_conn(self) -> sqlite3.Connection:
"""Get a thread-local database connection."""
if not hasattr(self._local, 'conn'):
self._local.conn = sqlite3.connect(
str(self._db_path),
timeout=10.0,
isolation_level='DEFERRED'
)
self._local.conn.row_factory = sqlite3.Row
return self._local.conn
@contextmanager
def _transaction(self):
"""Context manager for transactional writes."""
conn = self._get_conn()
try:
yield conn
conn.commit()
except Exception:
conn.rollback()
raise
def _init_db(self) -> None:
"""Initialize the database schema."""
with self._transaction() as conn:
conn.execute("""
CREATE TABLE IF NOT EXISTS checkpoints (
subscription_key TEXT PRIMARY KEY,
checkpoint_data BLOB NOT NULL,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
""")
conn.execute("""
CREATE INDEX IF NOT EXISTS idx_updated_at
ON checkpoints(updated_at)
""")
logger.info("checkpoint_db_initialized", path=str(self._db_path))
def save_checkpoint(self, key: str, checkpoint: Checkpoint) -> None:
"""
Persist a checkpoint to the database.
Uses UPSERT semantics — inserts or replaces based on subscription_key.
"""
with self._transaction() as conn:
conn.execute(
"""
INSERT OR REPLACE INTO checkpoints (subscription_key, checkpoint_data, updated_at)
VALUES (?, ?, CURRENT_TIMESTAMP)
""",
(key, checkpoint.to_json())
)
logger.debug("checkpoint_saved", key=key, seq=checkpoint.last_sequence)
def load_checkpoint(self, key: str) -> Optional[Checkpoint]:
"""
Retrieve the last checkpoint for a subscription.
Returns None if no checkpoint exists (cold start scenario).
"""
conn = self._get_conn()
row = conn.execute(
"SELECT checkpoint_data FROM checkpoints WHERE subscription_key = ?",
(key,)
).fetchone()
if row is None:
logger.debug("checkpoint_not_found", key=key)
return None
checkpoint = Checkpoint.from_json(row['checkpoint_data'])
logger.info("checkpoint_loaded", key=key, seq=checkpoint.last_sequence)
return checkpoint
def delete_checkpoint(self, key: str) -> None:
"""Remove a checkpoint (e.g., after successful resubscription)."""
with self._transaction() as conn:
conn.execute("DELETE FROM checkpoints WHERE subscription_key = ?", (key,))
logger.debug("checkpoint_deleted", key=key)
Engineering notes:
- The database uses
isolation_level='DEFERRED'to avoid holding locks unnecessarily during reads. timeout=10.0prevents "database is locked" errors under high write pressure.- Thread-local connections prevent cross-thread contention — each thread has its own connection.
- The
UPSERTpattern (INSERT OR REPLACE) simplifies the save logic at the cost of losing history. If history is needed, switch toINSERT+ explicitUPDATEwith a version check.
The Message Buffer: Surviving the Gap Between Checkpoints
Even with 5-second checkpoint intervals, a crash can still lose up to 5 seconds of data. For high-frequency trading applications, this is unacceptable. The message buffer addresses this gap.
The message buffer is a write-ahead log (WAL) that stores every incoming message before processing. It provides durability without sacrificing latency:
import asyncio
import json
import struct
import threading
import uuid
from collections import deque
from dataclasses import dataclass, field
from datetime import datetime
from pathlib import Path
from typing import Optional
import sqlite3
@dataclass
class BufferedMessage:
"""A single message with durability metadata."""
id: str = field(default_factory=lambda: str(uuid.uuid4()))
timestamp: str = field(default_factory=lambda: datetime.utcnow().isoformat())
channel: str = ""
symbol: str = ""
sequence: int = 0
payload: bytes = b""
persisted: bool = False
class DurableMessageBuffer:
"""
Write-ahead log for incoming WebSocket messages.
Messages are written to SQLite before processing, ensuring that
no data is lost even if the process crashes mid-tick.
⚠️ Performance note: This buffer introduces ~1-5ms of latency per message
due to SQLite's fsync on each write. For ultra-low-latency applications,
consider batched writes with periodic fsync, or a memory-mapped append-only
file with periodic SQLite dumps.
"""
def __init__(self, db_path: str | Path, batch_size: int = 100, flush_interval: float = 0.5):
self._db_path = Path(db_path)
self._batch_size = batch_size
self._flush_interval = flush_interval
self._pending: deque[BufferedMessage] = deque()
self._lock = threading.Lock()
self._local = threading.local()
self._pending_count = 0
self._init_db()
def _get_conn(self) -> sqlite3.Connection:
if not hasattr(self._local, 'conn'):
self._local.conn = sqlite3.connect(
str(self._db_path),
timeout=30.0,
isolation_level='DEFERRED'
)
self._local.conn.execute("PRAGMA synchronous = NORMAL")
self._local.conn.execute("PRAGMA journal_mode = WAL")
return self._local.conn
def _init_db(self) -> None:
"""Initialize the WAL schema."""
conn = self._get_conn()
conn.execute("""
CREATE TABLE IF NOT EXISTS message_log (
id TEXT PRIMARY KEY,
timestamp TEXT NOT NULL,
channel TEXT NOT NULL,
symbol TEXT NOT NULL,
sequence INTEGER NOT NULL,
payload BLOB NOT NULL,
processed INTEGER DEFAULT 0
)
""")
conn.execute("""
CREATE INDEX IF NOT EXISTS idx_unprocessed
ON message_log(processed, timestamp)
""")
conn.commit()
def write(self, channel: str, symbol: str, sequence: int, payload: bytes) -> BufferedMessage:
"""
Write a message to the durable buffer.
Returns immediately after the SQLite write completes.
The message is marked as unprocessed until mark_processed() is called.
"""
message = BufferedMessage(
channel=channel,
symbol=symbol,
sequence=sequence,
payload=payload
)
with self._lock:
conn = self._get_conn()
conn.execute(
"""
INSERT INTO message_log (id, timestamp, channel, symbol, sequence, payload, processed)
VALUES (?, ?, ?, ?, ?, ?, 0)
""",
(message.id, message.timestamp, channel, symbol, sequence, payload)
)
# WAL mode means durability is guaranteed without explicit sync on every write
# unless PRAGMA synchronous = FULL (not recommended for latency)
self._pending.append(message)
self._pending_count += 1
# Check if we should flush
if self._pending_count >= self._batch_size:
self.flush()
return message
def mark_processed(self, message_id: str) -> None:
"""Mark a message as successfully processed to enable cleanup."""
with self._lock:
conn = self._get_conn()
conn.execute(
"UPDATE message_log SET processed = 1 WHERE id = ?",
(message_id,)
)
# Remove from pending deque
self._pending = deque(m for m in self._pending if m.id != message_id)
def flush(self) -> None:
"""
Force a sync of pending writes.
Call this during shutdown to ensure all messages are durable.
"""
with self._lock:
if self._local.conn:
self._local.conn.commit()
self._pending_count = 0
def purge_processed(self, keep_last_n: int = 1000) -> int:
"""
Remove processed messages, keeping the last N for safety.
Returns the number of rows deleted.
"""
conn = self._get_conn()
cursor = conn.execute(
"""
DELETE FROM message_log
WHERE processed = 1
AND id NOT IN (
SELECT id FROM message_log
ORDER BY timestamp DESC
LIMIT ?
)
""",
(keep_last_n,)
)
conn.commit()
deleted = cursor.rowcount
if deleted > 0:
conn.execute("VACUUM") # Reclaim disk space
return deleted
def get_unprocessed(self, channel: Optional[str] = None, symbol: Optional[str] = None,
limit: int = 1000) -> list[BufferedMessage]:
"""
Retrieve unprocessed messages for recovery.
Ordered by timestamp to enable sequential replay.
"""
conn = self._get_conn()
query = "SELECT * FROM message_log WHERE processed = 0"
params = []
if channel:
query += " AND channel = ?"
params.append(channel)
if symbol:
query += " AND symbol = ?"
params.append(symbol)
query += " ORDER BY timestamp ASC LIMIT ?"
params.append(limit)
rows = conn.execute(query, params).fetchall()
return [
BufferedMessage(
id=row['id'],
timestamp=row['timestamp'],
channel=row['channel'],
symbol=row['symbol'],
sequence=row['sequence'],
payload=row['payload'],
persisted=True
)
for row in rows
]
Engineering notes:
- WAL mode (
PRAGMA journal_mode = WAL) provides better concurrency than the default rollback journal, allowing reads to proceed while writes are in flight. PRAGMA synchronous = NORMALis a deliberate tradeoff: it returns immediately after the OS acknowledges the write (not after the disk confirms it), reducing latency at the cost of potential data loss in a system crash (but not a process crash).- The
purge_processed()method withVACUUMreclaims disk space. Run this periodically or during low-traffic windows. - For truly latency-sensitive applications, consider
aiosqlitewith batched writes every N milliseconds rather than per-message writes.
Resumable Subscriptions: Closing the Loop After Restart
The final piece is the resumption logic — what happens when the application starts up and finds a checkpoint and a buffer full of unprocessed messages.
The resumption sequence follows these steps:
- Load the last checkpoint from SQLite
- If a checkpoint exists, determine the gap between the last processed sequence and the current sequence
- Replay unprocessed messages from the buffer in sequence order
- Subscribe to the WebSocket stream starting from the checkpoint's sequence number
- Handle any gap in the stream (missed messages that were never received)
import asyncio
import json
import os
import time
from dataclasses import dataclass
from typing import Optional, Callable, Awaitable
import websockets
import websockets.client
from websockets.client import WebSocketClientProtocol
import structlog
logger = structlog.get_logger()
@dataclass
class ResumptionResult:
"""Result of a resumption attempt."""
checkpoint_loaded: bool
buffered_messages_replayed: int
gap_detected: bool
gap_start: Optional[int] = None
gap_end: Optional[int] = None
class ResumableWebSocketClient:
"""
WebSocket client with graceful shutdown, checkpoint persistence,
and resumable subscriptions.
This is the orchestration layer that ties together signal handling,
checkpoint storage, the message buffer, and WebSocket reconnection.
"""
def __init__(
self,
api_key: str,
endpoint: str,
checkpoint_store: "CheckpointStore",
message_buffer: "DurableMessageBuffer",
shutdown_manager: "GracefulShutdownManager",
):
self._api_key = api_key or os.environ.get("TICKDB_API_KEY", "")
if not self._api_key:
raise ValueError("API key must be provided or set in TICKDB_API_KEY")
self._endpoint = endpoint
self._checkpoint_store = checkpoint_store
self._message_buffer = message_buffer
self._shutdown_manager = shutdown_manager
self._ws: Optional[WebSocketClientProtocol] = None
self._subscriptions: dict[str, dict] = {}
self._running = False
self._reconnect_delay = 1.0
self._max_reconnect_delay = 60.0
async def connect(self) -> None:
"""Establish WebSocket connection with authentication."""
headers = {"X-API-Key": self._api_key}
self._ws = await websockets.client.connect(
f"{self._endpoint}?api_key={self._api_key}",
extra_headers=headers,
ping_interval=20,
ping_timeout=10,
close_timeout=5,
)
logger.info("websocket_connected", endpoint=self._endpoint)
async def subscribe(self, channel: str, symbols: list[str]) -> str:
"""
Subscribe to a channel for the given symbols.
Returns a subscription ID. Saves a checkpoint immediately
after subscription is confirmed.
"""
if self._ws is None or not self._ws.open:
raise RuntimeError("WebSocket not connected")
subscribe_msg = {
"cmd": "subscribe",
"channel": channel,
"symbols": symbols
}
await self._ws.send(json.dumps(subscribe_msg))
# Wait for subscription confirmation
response = await asyncio.wait_for(self._ws.recv(), timeout=5.0)
resp_data = json.loads(response)
if resp_data.get("code") != 0:
raise RuntimeError(f"Subscription failed: {resp_data.get('message')}")
subscription_id = f"{channel}:{','.join(symbols)}"
self._subscriptions[subscription_id] = {
"channel": channel,
"symbols": symbols,
"last_sequence": 0
}
# Save checkpoint immediately after successful subscription
checkpoint = Checkpoint(
session_id=subscription_id,
endpoint=self._endpoint,
subscribed_channels=[channel],
subscribed_symbols=symbols,
)
self._checkpoint_store.save_checkpoint(subscription_id, checkpoint)
logger.info("subscription_confirmed", subscription_id=subscription_id)
return subscription_id
async def resume(self) -> ResumptionResult:
"""
Attempt to resume from the last checkpoint and replay buffered messages.
This is the core recovery logic. Call it during startup before
establishing a new connection.
"""
result = ResumptionResult(
checkpoint_loaded=False,
buffered_messages_replayed=0,
gap_detected=False
)
# Step 1: Load checkpoint for each subscription
for sub_id, sub_info in self._subscriptions.items():
checkpoint = self._checkpoint_store.load_checkpoint(sub_id)
if checkpoint:
result.checkpoint_loaded = True
sub_info["last_sequence"] = checkpoint.last_sequence
sub_info["checkpoint"] = checkpoint
logger.info("resuming_from_checkpoint",
subscription=sub_id,
last_seq=checkpoint.last_sequence)
# Step 2: Replay unprocessed messages from the buffer
buffered = self._message_buffer.get_unprocessed(limit=10000)
if buffered:
logger.info("replaying_buffered_messages", count=len(buffered))
for msg in buffered:
# Process the message through the application logic
await self._process_message(msg.channel, msg.symbol, msg.payload)
self._message_buffer.mark_processed(msg.id)
result.buffered_messages_replayed += 1
logger.info("buffer_replay_complete",
messages=result.buffered_messages_replayed)
# Step 3: Determine if there is a gap in the stream
# (This requires knowing the current server-side sequence,
# which is typically available in the subscription confirmation)
# In practice, you would query the server for the last sequence
# and compare against checkpoint.last_sequence
return result
async def _process_message(self, channel: str, symbol: str, payload: bytes) -> None:
"""
Application-specific message processing.
Override this method to implement your data handling logic.
"""
# Default implementation: parse and log
data = json.loads(payload)
logger.debug("message_processed", channel=channel, symbol=symbol, data=data)
async def receive_loop(self) -> None:
"""
Main receive loop — processes incoming messages and manages checkpoints.
This loop runs until shutdown is requested or connection is lost.
"""
self._running = True
checkpoint_counter = 0
while self._running and not self._shutdown_manager.is_shutting_down:
try:
if self._ws is None or not self._ws.open:
await self.connect()
# Re-subscribe to all channels after reconnect
for sub_id, sub_info in self._subscriptions.items():
await self.subscribe(sub_info["channel"], sub_info["symbols"])
# Wait for message with timeout to allow shutdown checks
message = await asyncio.wait_for(
self._ws.recv(),
timeout=1.0
)
data = json.loads(message)
# Extract sequence number and channel from message
sequence = data.get("seq", 0)
channel = data.get("channel", "unknown")
symbol = data.get("symbol", "unknown")
# Write to durable buffer first
self._message_buffer.write(channel, symbol, sequence, message.encode())
# Process the message
await self._process_message(channel, symbol, message.encode())
# Mark as processed in buffer
# (In practice, track the message ID returned by write())
buffered = self._message_buffer.get_unprocessed(limit=1)
if buffered:
self._message_buffer.mark_processed(buffered[0].id)
# Update checkpoint periodically (every 100 messages)
checkpoint_counter += 1
if checkpoint_counter >= 100:
for sub_id, sub_info in self._subscriptions.items():
checkpoint = Checkpoint(
session_id=sub_id,
last_sequence=sub_info.get("last_sequence", 0),
subscribed_channels=[sub_info["channel"]],
subscribed_symbols=sub_info["symbols"],
endpoint=self._endpoint,
)
self._checkpoint_store.save_checkpoint(sub_id, checkpoint)
checkpoint_counter = 0
# Update sequence tracking
for sub_id, sub_info in self._subscriptions.items():
if sub_info["channel"] == channel:
sub_info["last_sequence"] = max(
sub_info.get("last_sequence", 0),
sequence
)
except asyncio.TimeoutError:
# Timeout is expected — check shutdown flag and continue
continue
except websockets.exceptions.ConnectionClosed as e:
logger.warning("websocket_disconnected", code=e.code, reason=e.reason)
self._running = False
break
except Exception as e:
logger.error("receive_loop_error", error=str(e), exc_info=True)
await asyncio.sleep(1)
logger.info("receive_loop_exiting", running=self._running)
async def run(self, channels: list[dict]) -> None:
"""
Main entry point — connect, subscribe, and run the receive loop.
Implements exponential backoff with jitter for reconnection.
"""
try:
await self.connect()
# Attempt resumption from checkpoint and buffer
resumption = await self.resume()
# Subscribe to all configured channels
for channel_config in channels:
await self.subscribe(
channel_config["channel"],
channel_config["symbols"]
)
# Run the receive loop
await self.receive_loop()
except Exception as e:
logger.error("run_error", error=str(e), exc_info=True)
raise
finally:
# Graceful shutdown: flush buffers and save final checkpoint
logger.info("executing_graceful_shutdown")
self._message_buffer.flush()
for sub_id, sub_info in self._subscriptions.items():
final_checkpoint = Checkpoint(
session_id=sub_id,
last_sequence=sub_info.get("last_sequence", 0),
subscribed_channels=[sub_info["channel"]],
subscribed_symbols=sub_info["symbols"],
endpoint=self._endpoint,
)
self._checkpoint_store.save_checkpoint(sub_id, final_checkpoint)
if self._ws and self._ws.open:
await self._ws.close()
logger.info("shutdown_complete")
Putting It All Together: The Application Bootstrap
The final piece is wiring everything together with proper initialization and the main event loop:
import asyncio
import logging
import sys
import os
from pathlib import Path
import structlog
from dotenv import load_dotenv
from signal_handler import shutdown_manager
from checkpoint_store import CheckpointStore
from message_buffer import DurableMessageBuffer
from resumable_client import ResumableWebSocketClient
def configure_logging(log_level: str = "INFO") -> None:
"""Configure structured logging for production."""
structlog.configure(
processors=[
structlog.stdlib.filter_by_level,
structlog.stdlib.add_logger_name,
structlog.stdlib.add_log_level,
structlog.processors.TimeStamper(fmt="iso"),
structlog.processors.StackInfoRenderer(),
structlog.processors.format_exc_info,
structlog.processors.JSONRenderer()
],
wrapper_class=structlog.stdlib.BoundLogger,
context_class=dict,
logger_factory=structlog.stdlib.LoggerFactory(),
cache_logger_on_first_use=True,
)
logging.basicConfig(
format="%(message)s",
stream=sys.stdout,
level=getattr(logging, log_level.upper())
)
async def main() -> None:
"""Application entry point."""
# Configuration from environment
api_key = os.environ.get("TICKDB_API_KEY", "")
endpoint = os.environ.get("TICKDB_WS_ENDPOINT", "wss://api.tickdb.ai/v1/market/ws")
data_dir = Path(os.environ.get("DATA_DIR", "/tmp/market_data"))
# Ensure data directory exists
data_dir.mkdir(parents=True, exist_ok=True)
# Initialize components
checkpoint_store = CheckpointStore(data_dir / "checkpoints.db")
message_buffer = DurableMessageBuffer(
data_dir / "message_buffer.db",
batch_size=100,
flush_interval=0.5
)
# Register cleanup handlers with shutdown manager
shutdown_manager.register_cleanup(checkpoint_store._get_conn().close)
shutdown_manager.register_cleanup(message_buffer.flush)
shutdown_manager.setup_signal_handlers()
# Initialize the client
client = ResumableWebSocketClient(
api_key=api_key,
endpoint=endpoint,
checkpoint_store=checkpoint_store,
message_buffer=message_buffer,
shutdown_manager=shutdown_manager,
)
# Define channels to subscribe
channels = [
{"channel": "depth", "symbols": ["AAPL.US", "TSLA.US"]},
{"channel": "kline_1m", "symbols": ["BTC.CC"]},
]
try:
await client.run(channels)
except KeyboardInterrupt:
# SIGINT was received — shutdown manager handles this
pass
if __name__ == "__main__":
load_dotenv() # Load .env file if present
configure_logging()
asyncio.run(main())
Summary: The Data Durability Stack
The architecture described in this article creates a multi-layer defense against data loss:
| Layer | Mechanism | Protects Against |
|---|---|---|
| L1: Signal handling | SIGTERM/SIGINT handlers with cleanup registration | Graceful shutdown, SIGKICK not required |
| L2: Message buffer | SQLite WAL with PRAGMA journal_mode=WAL |
Process crash, OOM, unexpected termination |
| L3: Checkpoints | Periodic state snapshots to SQLite | Connection drops, network partitions |
| L4: Resumption | Replay unprocessed messages + re-subscribe from checkpoint | Any failure scenario |
| L5: Gap detection | Sequence number tracking and server-side validation | Missed messages during reconnection |
Together, these layers ensure that the application never loses more than a few seconds of data — and recovers fully after any failure, whether planned (deployment) or unplanned (crash, network failure, power loss).
Next Steps
If you are building a market data pipeline, apply this architecture to your WebSocket client. Start with the GracefulShutdownManager and CheckpointStore — they require no external dependencies and provide immediate resilience improvements.
If you need production-ready connectors, TickDB provides native WebSocket support with built-in heartbeat, reconnection, and authentication. The /v1/market/ws endpoint delivers real-time depth, trades, and kline data across US equities, Hong Kong equities, and cryptocurrency markets. Sign up at tickdb.ai for a free API key with no credit card required.
If you are handling high-frequency data, consider replacing the per-message SQLite writes with a batched write strategy — accumulate messages in memory and flush every N milliseconds or every N messages. This reduces I/O overhead while maintaining durability guarantees.
This article does not constitute investment advice. Market data systems involve engineering complexity; ensure thorough testing in a non-production environment before deployment.