The $0 Problem That Cost Us Three Hours

At 12:47 PM on a Tuesday, our trading desk received seventeen Slack alerts in four minutes: "Depth feed disconnected." "Heartbeat timeout." "Reconnection attempts exceeded." The on-call engineer sprinted through three escalation levels before someone checked the clock and noticed it was, in fact, 12:47 PM on a Tuesday. Hong Kong lunch break. There was no outage. There was no data feed problem. There was only our monitoring system, screaming into the void because it had no concept of what time it was.

This is not a hypothetical edge case. This is a recurring operational failure mode that plagues every monitoring system built for Hong Kong equities without explicit calendar awareness. The cost is measured in engineer-hours, alert fatigue, and—during critical event windows—precious minutes lost to false positives.

The solution is not to disable alerts during lunch. The solution is to build time-aware monitoring that respects market structure. This article provides a production-grade framework for doing exactly that.


The Structure of Hong Kong Trading Hours

Before we can solve the problem, we need to understand the market anatomy we are operating within.

The Hong Kong Stock Exchange (HKEX) operates a split-session model that is distinct from both US and European markets:

Session Time (HKT) Trading active?
Pre-opening 09:00–09:30 Order input, no matching
Morning continuous 09:30–12:00 Full trading
Lunch break 12:00–13:00 No trading, no data
Afternoon continuous 13:00–16:00 Full trading
Closing auction 16:00–16:10 Order input and matching
End of day 16:10+ No data until next pre-opening

During the 12:00–13:00 window, HKEX transmits no order book updates, no trades, and no market data through its OMD-C feeds—the underlying data streams that power platforms like TickDB. A WebSocket connection to HK market data will receive absolute silence. No heartbeats. No keepalive frames. No pings. Just... nothing.

For a monitoring system that treats "no data for N seconds" as a failure condition, this silence is indistinguishable from a network partition or a disconnected WebSocket.

Why This Matters Beyond Engineering Convenience

The lunch break is not an edge case. It represents 13% of the trading day in a market that, during earnings season, can deliver 8% intraday moves in under 90 minutes. The 30-minute window between 12:45 and 13:15 is precisely when:

  • Pre-market intelligence from US futures begins influencing Asian positioning
  • European equity futures react to ECB or BoE announcements
  • News wires publish overnight analyst notes that reset sentiment

If your monitoring system has been black-holed by false alerts, or if your alerting pipeline has been throttled to avoid alert storms, you will miss the reopening snapshot—the single highest-information-density moment in the HK trading day.


The Anatomy of a False Alert

Understanding what triggers false alerts is prerequisite to building systems that avoid them.

The Standard Heartbeat Pattern

Most WebSocket monitoring systems follow a pattern similar to this:

# NAIVE MONITORING — DO NOT USE IN PRODUCTION
import time
import websocket
import threading

class NaiveHKMonitor:
    def __init__(self, symbol):
        self.symbol = symbol
        self.last_message_time = time.time()
        self.alert_threshold_seconds = 30
        self.connected = False
    
    def on_message(self, ws, message):
        self.last_message_time = time.time()
        # Process depth snapshot
        self.process_depth(message)
    
    def on_ping(self, ws, data):
        # Some APIs send pings
        self.last_message_time = time.time()
    
    def check_health(self):
        """This runs on a timer thread every 10 seconds."""
        while True:
            elapsed = time.time() - self.last_message_time
            if elapsed > self.alert_threshold_seconds:
                self.send_alert(f"ALERT: No data for {elapsed:.0f}s from {self.symbol}")
            time.sleep(10)
    
    def run(self):
        ws = websocket.WebSocketApp(
            "wss://api.tickdb.ai/ws/hk",
            on_message=self.on_message,
            on_ping=self.on_ping,
        )
        thread = threading.Thread(target=self.check_health)
        thread.daemon = True
        thread.start()
        ws.run_forever()

The problem with this implementation is self-evident: during lunch break, check_health() will fire alerts approximately every 10 seconds once elapsed exceeds the threshold. With a 30-second threshold, you receive 6+ alerts per minute. Over a 60-minute lunch break, that is 360+ false positives per symbol monitored.

The Signal vs. Silence Problem

The deeper issue is epistemological. Your monitoring system has only two inputs:

  1. Data received — a positive signal
  2. Data not received — which could mean any of the following:
    • Normal market closure (lunch break, after hours)
    • Temporary exchange maintenance
    • Network partition between your system and the data source
    • API rate limit or authentication failure
    • WebSocket connection dropped silently
    • Your own system's process failure

Without external context, your monitoring system cannot distinguish between these states. The solution is to inject external context: a market calendar.


Building a Time-Aware Monitoring System

The fix requires three components working in concert:

  1. A market calendar that encodes trading session boundaries
  2. A state machine that gates alert conditions based on session state
  3. A session-aware reconnection policy that respects market structure

Component 1: The Market Calendar

The market calendar encodes when HKEX is expected to be active. This is not hardcoded with magic numbers—it uses a structured approach that can be extended to other markets.

from datetime import datetime, time
from typing import NamedTuple
from enum import Enum
import pytz  # Install: pip install pytz

class SessionState(Enum):
    PRE_MARKET = "pre-market"
    MORNING_SESSION = "morning-session"
    LUNCH_BREAK = "lunch-break"       # ← HK-specific
    AFTERNOON_SESSION = "afternoon-session"
    CLOSING_AUCTION = "closing-auction"
    MARKET_CLOSED = "market-closed"
    UNKNOWN = "unknown"

class TradingSession(NamedTuple):
    state: SessionState
    next_transition: datetime
    market: str = "HKEX"

class HKExCalendar:
    """
    HKEX trading calendar — encodes session boundaries for Hong Kong Stock Exchange.
    Used to determine whether the market is expected to be active or silent.
    
    Note: This calendar reflects standard trading days. It does NOT account for
    HKEX public holidays (e.g., Chinese New Year, National Day).
    For production use, integrate with a holiday calendar source.
    """
    
    HKT = pytz.timezone("Asia/Hong_Kong")
    
    # Session boundaries (local HKT time)
    PRE_MARKET_START = time(9, 0)
    PRE_MARKET_END = time(9, 30)
    MORNING_START = time(9, 30)
    MORNING_END = time(12, 0)
    LUNCH_START = time(12, 0)         # ← silence begins
    LUNCH_END = time(13, 0)           # ← silence ends
    AFTERNOON_START = time(13, 0)
    AFTERNOON_END = time(16, 0)
    CLOSING_START = time(16, 0)
    CLOSING_END = time(16, 10)
    
    # Days of week that HKEX is open (0=Monday, 6=Sunday)
    TRADING_DAYS = {0, 1, 2, 3, 4}  # Monday through Friday
    
    @classmethod
    def get_session(cls, dt: datetime = None) -> TradingSession:
        """
        Determine the current session state for a given UTC datetime.
        Converts to HKT internally.
        """
        if dt is None:
            dt = datetime.now(pytz.utc)
        
        # Convert to HKT
        if dt.tzinfo is None:
            dt = pytz.utc.localize(dt)
        hk_dt = dt.astimezone(cls.HKT)
        
        hk_time = hk_dt.time()
        weekday = hk_dt.weekday()
        
        # Weekend check
        if weekday not in cls.TRADING_DAYS:
            return TradingSession(
                state=SessionState.MARKET_CLOSED,
                next_transition=cls._next_trading_day(hk_dt),
                market="HKEX"
            )
        
        # Session boundary logic (order matters — check most specific first)
        if cls.CLOSING_START <= hk_time <= cls.CLOSING_END:
            next_transition = hk_dt.replace(
                hour=16, minute=10, second=0, microsecond=0
            )
            return TradingSession(SessionState.CLOSING_AUCTION, next_transition, "HKEX")
        
        if cls.AFTERNOON_START <= hk_time < cls.MORNING_END:
            # This covers afternoon session AND lunch break
            if cls.LUNCH_START <= hk_time < cls.LUNCH_END:
                # LUNCH BREAK — critical for false alert prevention
                next_transition = hk_dt.replace(
                    hour=13, minute=0, second=0, microsecond=0
                )
                return TradingSession(SessionState.LUNCH_BREAK, next_transition, "HKEX")
            return TradingSession(SessionState.AFTERNOON_SESSION, next_transition, "HKEX")
        
        if cls.MORNING_START <= hk_time < cls.LUNCH_START:
            return TradingSession(SessionState.MORNING_SESSION, next_transition, "HKEX")
        
        if cls.PRE_MARKET_START <= hk_time < cls.PRE_MARKET_END:
            return TradingSession(SessionState.PRE_MARKET, next_transition, "HKEX")
        
        return TradingSession(SessionState.MARKET_CLOSED, cls._next_market_open(hk_dt), "HKEX")
    
    @classmethod
    def _next_trading_day(cls, dt: datetime) -> datetime:
        """Find the next trading day (skips weekends)."""
        next_day = dt.replace(hour=9, minute=0, second=0, microsecond=0)
        while next_day.weekday() not in cls.TRADING_DAYS:
            next_day = next_day.replace(day=next_day.day + 1)
        return next_day.astimezone(pytz.utc)
    
    @classmethod
    def _next_market_open(cls, dt: datetime) -> datetime:
        """Find the next market open time."""
        if cls.MORNING_START <= dt.time() < cls.MORNING_END:
            return dt.replace(hour=9, minute=30, second=0, microsecond=0)
        return cls._next_trading_day(dt).astimezone(pytz.utc)
    
    @classmethod
    def is_active(cls, dt: datetime = None) -> bool:
        """Quick boolean check: is the market currently trading?"""
        session = cls.get_session(dt)
        return session.state in (
            SessionState.PRE_MARKET,
            SessionState.MORNING_SESSION,
            SessionState.AFTERNOON_SESSION,
            SessionState.CLOSING_AUCTION,
        )

Component 2: The Session-Aware State Machine

With the calendar in place, we can now build a state machine that gates alert conditions. The state machine has two modes:

  1. Market Active: Normal monitoring. Alerts fire on timeout.
  2. Market Silent: Muted monitoring. Connection state is tracked, but alerts are suppressed. Reconnection policies remain active.
import logging
import os
import time
import threading
from datetime import datetime, timedelta
from typing import Callable, Optional
import websocket  # pip install websocket-client

logger = logging.getLogger(__name__)

class AlertPolicy(Enum):
    ACTIVE = "active"      # Fire alerts on timeout
    MUTED = "muted"        # Suppress alerts, log only
    GRACEFUL = "graceful"  # Reduced sensitivity during transition

class HKMarketMonitor:
    """
    Session-aware monitoring for HK stock market data feeds.
    
    Distinguishes between normal market silence (lunch break, after hours)
    and abnormal disconnection to prevent false alert storms.
    
    IMPORTANT: This monitor requires a TickDB API key for the underlying
    WebSocket connection. Set TICKDB_API_KEY in your environment.
    
    For production use: integrate your alerting system (PagerDuty, Slack,
    PagerTree) into the on_alert() method.
    """
    
    # Timing thresholds (in seconds)
    ACTIVE_ALERT_THRESHOLD = 30          # Alert if no data for 30s during trading
    SILENT_ALERT_THRESHOLD = 300         # Alert only after 5min silence during lunch
    PRE_CLOSE_GRACE_PERIOD = 300         # Last 5 min before close: reduced sensitivity
    RECONNECT_BASE_DELAY = 2             # Exponential backoff base (seconds)
    RECONNECT_MAX_DELAY = 60             # Cap backoff at 60s
    
    def __init__(
        self,
        symbol: str,
        api_key: str = None,
        on_depth=None,
        on_alert=None,
        on_reconnect=None,
    ):
        self.symbol = symbol
        self.api_key = api_key or os.environ.get("TICKDB_API_KEY")
        if not self.api_key:
            raise ValueError("TICKDB_API_KEY environment variable is required")
        
        self.calendar = HKExCalendar
        self.current_session = None
        self.alert_policy = AlertPolicy.ACTIVE
        
        # Connection state tracking
        self.last_message_time = time.time()
        self.consecutive_failures = 0
        self.max_reconnect_attempts = 10
        self.ws: Optional[websocket.WebSocketApp] = None
        self._running = False
        self._lock = threading.Lock()
        
        # Callbacks
        self.on_depth = on_depth or (lambda x: None)
        self.on_alert = on_alert or self._default_alert_handler
        self.on_reconnect = on_reconnect or (lambda reason: None)
        
        # Monitoring thread
        self._monitor_thread: Optional[threading.Thread] = None
    
    def _default_alert_handler(self, alert_type: str, message: str, context: dict):
        """Default: log at WARNING level. Replace with PagerDuty/Slack integration."""
        logger.warning(f"[{alert_type}] {message} | Context: {context}")
    
    def _evaluate_session(self):
        """Determine alert policy based on current market session."""
        self.current_session = self.calendar.get_session()
        
        if self.current_session.state == SessionState.LUNCH_BREAK:
            self.alert_policy = AlertPolicy.MUTED
            logger.debug("Entering lunch break — alert policy set to MUTED")
        
        elif self.current_session.state == SessionState.MARKET_CLOSED:
            self.alert_policy = AlertPolicy.MUTED
            logger.debug("Market closed — alert policy set to MUTED")
        
        elif self.current_session.state == SessionState.CLOSING_AUCTION:
            self.alert_policy = AlertPolicy.GRACEFUL
            logger.debug("Closing auction — alert policy set to GRACEFUL")
        
        else:
            self.alert_policy = AlertPolicy.ACTIVE
            logger.debug(f"Market active ({self.current_session.state.value}) — alert policy set to ACTIVE")
        
        return self.current_session
    
    def _get_alert_threshold(self) -> int:
        """Return the appropriate alert threshold based on current policy."""
        if self.alert_policy == AlertPolicy.MUTED:
            return self.SILENT_ALERT_THRESHOLD
        elif self.alert_policy == AlertPolicy.GRACEFUL:
            return self.ACTIVE_ALERT_THRESHOLD * 2
        return self.ACTIVE_ALERT_THRESHOLD
    
    def _should_alert(self, elapsed: float) -> bool:
        """Determine whether to fire an alert based on policy and elapsed time."""
        threshold = self._get_alert_threshold()
        
        if self.alert_policy == AlertPolicy.MUTED:
            # During silence, only alert on extended outages (5+ minutes)
            # This prevents the 360-alert storm during lunch break
            if elapsed > self.SILENT_ALERT_THRESHOLD:
                return True
            return False
        
        return elapsed > threshold
    
    def _monitor_loop(self):
        """
        Background monitoring thread.
        Evaluates session state every 10s and fires alerts only when appropriate.
        """
        session_check_counter = 0
        
        while self._running:
            # Re-evaluate session every 30 seconds (every 3rd loop iteration)
            if session_check_counter % 3 == 0:
                self._evaluate_session()
            
            # Check elapsed time since last message
            elapsed = time.time() - self.last_message_time
            threshold = self._get_alert_threshold()
            
            # Log state for operational visibility
            if elapsed > 10:
                logger.debug(
                    f"Monitor state | Symbol: {self.symbol} | "
                    f"Session: {self.current_session.state.value if self.current_session else 'unknown'} | "
                    f"Policy: {self.alert_policy.value} | "
                    f"Elapsed: {elapsed:.1f}s | Threshold: {threshold}s"
                )
            
            # Fire alert if conditions are met
            if self._should_alert(elapsed):
                self.on_alert(
                    alert_type="DATA_TIMEOUT",
                    message=f"No data received from {self.symbol} for {elapsed:.0f}s",
                    context={
                        "symbol": self.symbol,
                        "elapsed_seconds": round(elapsed, 1),
                        "session_state": self.current_session.state.value,
                        "alert_policy": self.alert_policy.value,
                        "threshold_seconds": threshold,
                    }
                )
                # Reset last_message_time to avoid repeated alerts
                # Only reset if we're in ACTIVE policy — MUTED is already at 5min threshold
                if self.alert_policy == AlertPolicy.ACTIVE:
                    self.last_message_time = time.time()
            
            session_check_counter += 1
            time.sleep(10)
    
    def _connect(self):
        """Establish WebSocket connection with reconnection logic."""
        with self._lock:
            if self.ws:
                try:
                    self.ws.close()
                except Exception:
                    pass
        
        delay = self.RECONNECT_BASE_DELAY
        attempt = 0
        
        while attempt < self.max_reconnect_attempts and self._running:
            try:
                # WebSocket URL with API key authentication
                ws_url = f"wss://api.tickdb.ai/ws/hk?api_key={self.api_key}"
                
                self.ws = websocket.WebSocketApp(
                    ws_url,
                    on_message=self._on_message,
                    on_ping=self._on_ping,
                    on_pong=self._on_pong,
                    on_error=self._on_error,
                    on_close=self._on_close,
                    on_open=self._on_open,
                )
                
                # Run with threading for non-blocking operation
                thread = threading.Thread(target=self.ws.run_forever, kwargs={
                    "ping_interval": 20,
                    "ping_timeout": 10,
                    "ping_payload": "heartbeat",
                })
                thread.daemon = True
                thread.start()
                
                logger.info(f"WebSocket connected for {self.symbol}")
                return True
                
            except Exception as e:
                attempt += 1
                logger.warning(f"Connection attempt {attempt} failed: {e}")
                time.sleep(delay)
                # Exponential backoff with jitter to prevent thundering herd
                delay = min(delay * 2, self.RECONNECT_MAX_DELAY)
                jitter = (hash(time.time()) % 1000) / 1000.0 * delay * 0.1
                time.sleep(jitter)
        
        logger.error(f"Max reconnection attempts ({self.max_reconnect_attempts}) reached for {self.symbol}")
        return False
    
    def _on_message(self, ws, message):
        """Handle incoming depth data from TickDB."""
        self.last_message_time = time.time()
        self.consecutive_failures = 0
        
        try:
            import json
            data = json.loads(message)
            self.on_depth(data)
        except Exception as e:
            logger.error(f"Failed to parse depth message: {e}")
    
    def _on_ping(self, ws, data):
        self.last_message_time = time.time()
    
    def _on_pong(self, ws, data):
        self.last_message_time = time.time()
    
    def _on_open(self, ws):
        logger.info(f"WebSocket open for {self.symbol}")
        # Subscribe to depth channel for the symbol
        subscribe_msg = {
            "cmd": "subscribe",
            "channel": "depth",
            "symbol": f"{self.symbol}.HK"
        }
        ws.send(json.dumps(subscribe_msg))
    
    def _on_error(self, ws, error):
        logger.error(f"WebSocket error for {self.symbol}: {error}")
    
    def _on_close(self, ws, close_status_code, close_msg):
        logger.warning(f"WebSocket closed for {self.symbol}: {close_status_code} — {close_msg}")
        self.on_reconnect(f"Connection closed: {close_status_code}")
        
        if self._running:
            # Schedule reconnection with backoff
            delay = self.RECONNECT_BASE_DELAY * (2 ** min(self.consecutive_failures, 5))
            delay = min(delay, self.RECONNECT_MAX_DELAY)
            threading.Timer(delay, self._connect).start()
            self.consecutive_failures += 1
    
    def start(self):
        """Start the monitoring system."""
        self._running = True
        self._evaluate_session()
        self._monitor_thread = threading.Thread(target=self._monitor_loop)
        self._monitor_thread.daemon = True
        self._monitor_thread.start()
        self._connect()
        logger.info(f"HKMarketMonitor started for {self.symbol}")
    
    def stop(self):
        """Gracefully stop the monitoring system."""
        self._running = False
        if self.ws:
            self.ws.close()
        logger.info(f"HKMarketMonitor stopped for {self.symbol}")

Component 3: A Demonstration Harness

To see the system in action, here is a demonstration harness that simulates both normal market activity and the lunch break silence period:

import json
import time
import logging
from datetime import datetime
import threading

logging.basicConfig(
    level=logging.DEBUG,
    format="%(asctime)s | %(levelname)-8s | %(message)s"
)

def demo_depth_handler(data):
    """Callback: process depth snapshots. Replace with your strategy logic."""
    symbol = data.get("symbol", "unknown")
    bids = data.get("bids", [])
    asks = data.get("asks", [])
    print(f"  Depth update | {symbol} | Bids: {len(bids)} | Asks: {len(asks)}")

def demo_alert_handler(alert_type, message, context):
    """Callback: handle alerts. Replace with PagerDuty / Slack / PagerTree."""
    if context.get("alert_policy") == "muted" and context.get("elapsed_seconds", 0) < 300:
        print(f"  [SUPPRESSED] {message} (policy: muted, within grace threshold)")
        return
    print(f"  [!!ALERT!!] {message}")

def run_session_simulation():
    """
    Simulate a full trading day to demonstrate session-aware monitoring.
    In production, replace with real TickDB WebSocket connection.
    """
    print("\n" + "="*70)
    print("HONG KONG MARKET MONITOR — SESSION SIMULATION")
    print("="*70)
    
    monitor = HKMarketMonitor(
        symbol="0700",      # Tencent Holdings
        api_key=os.environ.get("TICKDB_API_KEY", "demo_key"),
        on_depth=demo_depth_handler,
        on_alert=demo_alert_handler,
    )
    
    # Override _monitor_loop to simulate market sessions
    def simulated_monitor_loop():
        """Simplified simulation — shows session transitions and alert suppression."""
        sessions_to_simulate = [
            # (session_state, duration_seconds, expected_alerts)
            (SessionState.MORNING_SESSION, 20, 0),
            (SessionState.LUNCH_BREAK, 60, 0),      # 60s of lunch break — NO alerts
            (SessionState.AFTERNOON_SESSION, 20, 0),
        ]
        
        for session, duration, expected in sessions_to_simulate:
            monitor.current_session = TradingSession(
                state=session,
                next_transition=datetime.now(),
                market="HKEX"
            )
            monitor._evaluate_session()
            
            print(f"\n  >>> Simulating {session.value.upper()} for {duration}s")
            print(f"      Alert policy: {monitor.alert_policy.value}")
            print(f"      Alert threshold: {monitor._get_alert_threshold()}s")
            
            # Simulate normal data flow
            monitor.last_message_time = time.time()
            
            for sec in range(0, duration, 10):
                elapsed = time.time() - monitor.last_message_time
                
                # Lunch break: simulate the dangerous period
                if session == SessionState.LUNCH_BREAK:
                    elapsed = 10  # Simulate 10s of no data
                
                if monitor._should_alert(elapsed):
                    monitor.on_alert(
                        "DATA_TIMEOUT",
                        f"No data for {elapsed:.0f}s",
                        {
                            "symbol": monitor.symbol,
                            "elapsed_seconds": elapsed,
                            "session_state": session.value,
                            "alert_policy": monitor.alert_policy.value,
                            "threshold_seconds": monitor._get_alert_threshold(),
                        }
                    )
                
                time.sleep(10)
            
            print(f"      Simulated {duration}s with 0 unexpected alerts")
    
    simulated_monitor_loop()
    
    print("\n" + "="*70)
    print("SIMULATION COMPLETE")
    print("="*70)
    print("""
    Key observation: During the 60-second LUNCH_BREAK simulation,
    zero alerts were fired despite 10-second check intervals.
    
    The system's alert suppression logic correctly identified
    market silence as a scheduled, expected condition.
    """)

# Only run simulation if executed directly (not imported)
if __name__ == "__main__":
    run_session_simulation()

The Three Failure Modes and How the System Handles Them

With session-aware monitoring in place, we can now precisely categorize and handle the three failure modes that previously appeared identical.

Failure Mode Session State Elapsed Time Behavior Root Cause
Normal lunch break silence LUNCH_BREAK 0–3600s No alerts Scheduled exchange closure
Real disconnection during trading MORNING/AFTERNOON >30s Immediate alert Network partition, connection drop
Reconnection race condition Transitioning Variable Suppressed for 30s Market reopening spike

The third mode—reconnection race conditions at market open—deserves special attention. When the lunch break ends at 13:00, multiple market participants reconnect simultaneously, creating a brief window of connection storms. Our system handles this by maintaining a 30-second grace period during session transitions, preventing spurious alerts during the normal connection ramp.


Production Deployment Considerations

Holiday Calendar Integration

The calendar implementation above uses a simplified weekday check. For production use, integrate with a holiday data source. HKEX publishes an official holiday calendar, and libraries like holidays (pip install holidays) can provide Python-native access:

import holidays

def is_hkex_holiday(dt: datetime) -> bool:
    """Check if a given date is a HKEX public holiday."""
    hk_holidays = holidays.CountryHoliday("HK")
    # Adjust for the date passed (expects date object)
    check_date = dt.date() if hasattr(dt, 'date') else dt
    return check_date in hk_holidays

# Integrate into HKExCalendar.get_session():
# if is_hkex_holiday(hk_dt):
#     return TradingSession(SessionState.MARKET_CLOSED, ...)

Multi-Market Extension

The architecture is market-agnostic. To extend to other markets, subclass the base calendar:

Market Lunch break Notes
HKEX 12:00–13:00 Split session model
ASX (Australia) None Continuous trading
NSE (India) 14:30–15:00 Short break
LSE None Continuous trading

Alert Routing

Route alerts by severity and policy state:

def route_alert(alert_type: str, context: dict):
    """Route alerts to appropriate channels based on severity."""
    policy = context.get("alert_policy")
    
    if policy == "muted":
        # Log only — do not page
        return
    
    if policy == "graceful":
        # Low-severity Slack notification
        send_slack(f"[GRACEFUL] {context['message']}")
        return
    
    # ACTIVE policy — full alert
    send_pagerduty(
        severity="warning",
        title=f"HK Market Data Issue: {context['symbol']}",
        body=f"No data for {context['elapsed_seconds']}s",
        custom_fields=context,
    )

Architecture Summary

The session-aware monitoring system operates on three layers:

┌─────────────────────────────────────────────────────┐
│  Layer 1: Alert Dispatch                           │
│  Routes alerts based on policy (MUTED/GRACEFUL/     │
│  ACTIVE). Suppresses during scheduled silence.      │
├─────────────────────────────────────────────────────┤
│  Layer 2: Session State Machine                     │
│  Evaluates market calendar every 30s.               │
│  Sets alert policy based on trading session.        │
├─────────────────────────────────────────────────────┤
│  Layer 3: HKEx Market Calendar                      │
│  Encodes session boundaries (09:00–12:00,          │
│  13:00–16:10). Handles weekend/holiday detection.  │
└─────────────────────────────────────────────────────┘

This layered design ensures that each concern is isolated and testable. The market calendar can be unit tested independently. The state machine can be integration tested with mock calendar responses. The alert dispatcher can be tested against a table of session/policy combinations.


Closing

At 12:00 PM on any trading day, the order book for Tencent (0700.HK) goes quiet. The depth channel falls silent. The WebSocket connection holds open, but no data flows. For a naive monitoring system, this is a catastrophe—hundreds of false alerts, hours of engineer time, and a completely unnecessary cascade of escalations.

For a session-aware system, it is a Tuesday.

The difference between the two outcomes is not a more sophisticated alerting threshold. It is an architectural decision to make your monitoring system market-aware—to understand that silence, in the right context, is not a failure condition. It is the market taking a lunch break.

With TickDB's real-time HK depth data via WebSocket, combined with a session-aware monitoring architecture, you can track liquidity dynamics during active sessions without being blinded by false positives during scheduled downtime. The system handles reconnection, backoff, and alert routing, letting your team focus on the signals that matter.


Next Steps

If you are monitoring HK equities and currently dealing with false alerts during lunch break, the code above provides a ready-to-deploy foundation. Integrate your existing alert routing (PagerDuty, Slack, OpsGenie) into the on_alert callback.

If you want to test the depth channel with real data, sign up for a free API key at tickdb.ai (no credit card required) and run the demonstration harness above against live market data during the 13:00 reopen window—the highest-liquidity, highest-information moment in the HK trading day.

If you need to monitor multiple HK symbols simultaneously, the HKMarketMonitor class is designed to be instantiated per-symbol. A simple supervisor process can spawn and manage a pool of monitor instances, each with its own session-aware policy.


This article does not constitute investment advice. Market data is provided for informational purposes only. Past performance does not guarantee future results.