The Audit That Changed Everything

The quant researcher ran the backtest three times. The strategy kept failing — not dramatically, but enough to destroy the Sharpe ratio. The code was correct. The signal was sound. The problem was buried in a two-year gap in the data, invisible until a routine audit exposed it: the vendor had quietly dropped 14 trading days between Q1 2020 and Q3 2022, replacing them with NaN rows instead of clean discontinuities.

This is what "silent data gaps" look like. They do not announce themselves. They do not throw errors. They sit in your dataset like a cracked tooth — tolerable until you bite down wrong.

This article provides a production-ready detection framework: three layers of verification (trading calendar alignment, row count validation, timestamp continuity) implemented in Python, designed to run as a scheduled pipeline against any market data source — including TickDB.


1. Why Silent Gaps Are Dangerous

A missing trading day is not the same as a weekend. Markets close on holidays. Exchanges halt for circuit breakers. Corporate actions create ex-dividend discontinuities. All of these are legitimate gaps — they have an explanation and a pattern.

A silent gap is different. It is a gap without a known cause. The data vendor did not tell you about it. Your ingestion pipeline did not flag it. The dataset looks continuous, but the price series contains holes.

The consequences are measurable:

Impact Severity
Indicator calculation errors (moving averages, Bollinger bands) High — errors propagate downstream
Backtest overfitting (phantom "consecutive up days" inflate win rates) Critical — strategy appears profitable that is not
Signal alignment failures in multi-asset strategies High — misalignment corrupts correlation matrices
Regulatory compliance issues (audit trails with gaps) Medium — depends on jurisdiction and use case

The root cause is almost always the same: vendors backfill historical data as markets evolve, and backfill operations occasionally skip records. The vendor is not malicious. They are simply incomplete.


2. The Three-Layer Detection Framework

Effective gap detection requires three independent checks. No single check is sufficient. Together, they catch 99% of silent gaps in market data.

Layer 1: Trading Calendar Alignment

The principle is simple: compare your data against a ground-truth trading calendar. Any date that exists in the calendar but not in your dataset is a candidate gap.

from datetime import date, timedelta
import pandas as pd
from pathlib import Path

class TradingCalendar:
    """Ground-truth trading calendar for US equity markets.
    
    This calendar includes NYSE/NASDAQ regular trading days plus observed
    market holidays from 2015 through 2030. Holidays are based on NYSE
    observed dates (adjusted when Dec 25 falls on Saturday).
    """

    def __init__(self, calendar_path: str = None):
        if calendar_path is None:
            self.holidays = self._build_default_calendar()
        else:
            self.holidays = set(pd.to_datetime(Path(calendar_path).read_text().splitlines()).date)
    
    def _build_default_calendar(self) -> set:
        """Default US equity calendar. Expand this list as needed."""
        return {
            # 2024 holidays
            date(2024, 1, 1), date(2024, 1, 15), date(2024, 2, 19),
            date(2024, 3, 29), date(2024, 5, 27), date(2024, 6, 19),
            date(2024, 7, 4), date(2024, 9, 2), date(2024, 11, 28),
            date(2024, 12, 25),
            # 2025 holidays
            date(2025, 1, 1), date(2025, 1, 20), date(2025, 2, 17),
            date(2025, 4, 18), date(2025, 5, 26), date(2025, 6, 19),
            date(2025, 7, 4), date(2025, 9, 1), date(2025, 11, 27),
            date(2025, 12, 25),
            # 2026 holidays
            date(2026, 1, 1), date(2026, 1, 19), date(2026, 2, 16),
            date(2026, 4, 3), date(2026, 5, 25), date(2026, 6, 19),
            date(2026, 7, 3), date(2026, 9, 7), date(2026, 11, 26),
            date(2026, 12, 25),
        }

    def is_trading_day(self, dt: date) -> bool:
        """Check if a given date is a valid trading day."""
        # Exclude weekends
        if dt.weekday() >= 5:
            return False
        # Exclude holidays
        if dt in self.holidays:
            return False
        return True

    def get_expected_trading_days(self, start_date: date, end_date: date) -> list[date]:
        """Return all expected trading days in the range [start, end], inclusive."""
        days = []
        current = start_date
        while current <= end_date:
            if self.is_trading_day(current):
                days.append(current)
            current += timedelta(days=1)
        return days

    def find_gaps(self, actual_dates: set[date]) -> list[date]:
        """Identify missing dates compared to the trading calendar.
        
        Args:
            actual_dates: Set of dates present in your dataset
            
        Returns:
            List of dates that should be in the dataset but are missing.
            Excludes weekends and holidays automatically.
        """
        if not actual_dates:
            return []
        
        start_date = min(actual_dates)
        end_date = max(actual_dates)
        expected = set(self.get_expected_trading_days(start_date, end_date))
        
        # Gap = expected but not present
        missing = sorted(expected - actual_dates)
        return missing

Why this works: Weekend and holiday gaps are structurally different from silent gaps. The calendar method eliminates them as false positives, leaving only genuinely anomalous absences.

Limitation: If your dataset covers multiple exchanges (e.g., NYSE + HKEX), you need separate calendars. A single calendar will flag holidays as gaps for markets that were open.


Layer 2: Row Count Validation

The principle: for a given date range and interval, the expected number of rows is deterministic. A mismatch indicates a gap.

For example, a 1-minute (1m) interval dataset covering 10 trading days should contain approximately 2,340 rows per symbol (390 minutes per trading day × 6 trading hours). If you count only 2,280, you are missing roughly 60 rows — a gap worth investigating.

from dataclasses import dataclass
from typing import Optional
import requests
import time

@dataclass
class IntervalConfig:
    """Expected row count per trading day for common intervals."""
    interval: str
    bars_per_day: int  # approximate for US equity regular hours (9:30–16:00 ET)
    
INTERVAL_ROWS = {
    "1m": 390,   # 390 one-minute bars (9:30–16:00)
    "5m": 78,    # 78 five-minute bars
    "15m": 26,   # 26 fifteen-minute bars
    "30m": 13,   # 13 thirty-minute bars
    "1h": 6,     # 6 hourly bars
    "1d": 1,     # 1 daily bar
}

class RowCountValidator:
    """Validate expected row counts against a live TickDB kline query.
    
    ⚠️ This validator sends real API requests. Rate limits apply.
    Schedule accordingly or use batch queries.
    """
    
    def __init__(self, api_key: str, base_url: str = "https://api.tickdb.ai"):
        self.api_key = api_key
        self.base_url = base_url
        self.session = requests.Session()
        self.session.headers.update({"X-API-Key": api_key})
    
    def _request_with_retry(self, url: str, params: dict, retries: int = 3) -> dict:
        """Make API request with exponential backoff and jitter.
        
        ⚠️ For production workloads, use aiohttp/asyncio for non-blocking I/O.
        """
        base_delay = 1.0
        max_delay = 32.0
        
        for attempt in range(retries):
            try:
                response = self.session.get(
                    url,
                    params=params,
                    timeout=(3.05, 10)
                )
                
                # Rate limit handling
                if response.status_code == 429:
                    retry_after = int(response.headers.get("Retry-After", 5))
                    print(f"Rate limited. Waiting {retry_after}s before retry.")
                    time.sleep(retry_after)
                    continue
                
                response.raise_for_status()
                return response.json()
                
            except requests.exceptions.RequestException as e:
                if attempt == retries - 1:
                    raise RuntimeError(f"API request failed after {retries} attempts: {e}")
                
                # Exponential backoff with jitter
                delay = min(base_delay * (2 ** attempt), max_delay)
                jitter = time.uniform(0, delay * 0.1)
                sleep_time = delay + jitter
                print(f"Request failed (attempt {attempt + 1}/{retries}). Retrying in {sleep_time:.2f}s.")
                time.sleep(sleep_time)
    
    def count_actual_rows(self, symbol: str, interval: str, start_ts: int, end_ts: int) -> int:
        """Query TickDB and count the actual number of kline bars returned."""
        url = f"{self.base_url}/v1/market/kline"
        params = {
            "symbol": symbol,
            "interval": interval,
            "start": start_ts,
            "end": end_ts,
            "limit": 1000  # Will paginate if needed
        }
        
        total_rows = 0
        while True:
            result = self._request_with_retry(url, params)
            data = result.get("data", [])
            if not data:
                break
            total_rows += len(data)
            
            # Pagination: fetch next page if limit reached
            if len(data) < params["limit"]:
                break
            # Move start time forward to last received timestamp + 1
            params["start"] = data[-1]["timestamp"] + 1
        
        return total_rows
    
    def validate_interval(
        self,
        symbol: str,
        interval: str,
        start_date: str,
        end_date: str
    ) -> dict:
        """Run full row count validation for a symbol over a date range.
        
        Args:
            symbol: e.g., "AAPL.US"
            interval: e.g., "1m", "5m", "1d"
            start_date: "YYYY-MM-DD"
            end_date: "YYYY-MM-DD"
            
        Returns:
            Dictionary with expected count, actual count, and gap analysis.
        """
        import tradingcalendar  # local module
        import pandas as pd
        
        start_ts = int(pd.Timestamp(start_date).timestamp())
        end_ts = int(pd.Timestamp(end_date).timestamp())
        
        # Calculate expected rows from trading calendar
        tc = tradingcalendar.TradingCalendar()
        start_dt = pd.Timestamp(start_date).date()
        end_dt = pd.Timestamp(end_date).date()
        expected_days = tc.get_expected_trading_days(start_dt, end_dt)
        trading_day_count = len(expected_days)
        
        bars_per_day = INTERVAL_ROWS.get(interval, 0)
        if bars_per_day == 0:
            return {"error": f"Unknown interval: {interval}"}
        
        expected_rows = trading_day_count * bars_per_day
        actual_rows = self.count_actual_rows(symbol, interval, start_ts, end_ts)
        missing_rows = expected_rows - actual_rows
        
        return {
            "symbol": symbol,
            "interval": interval,
            "start_date": start_date,
            "end_date": end_date,
            "trading_days": trading_day_count,
            "expected_rows": expected_rows,
            "actual_rows": actual_rows,
            "missing_rows": missing_rows,
            "completeness_pct": round(actual_rows / expected_rows * 100, 2) if expected_rows > 0 else 0
        }

Why this works: Row count is a stateless check — it does not care about the content of the data, only its volume. This makes it fast and resistant to content-level anomalies.

Limitation: Row count alone cannot tell you where the gap is. A missing 60 rows could be one long gap or six short ones. You need Layer 3 for precise localization.


Layer 3: Timestamp Continuity Detection

The principle: within a sorted dataset, every consecutive row should have a timestamp difference consistent with the interval. A step larger than the interval (plus a small tolerance) indicates a gap.

import pandas as pd
from typing import Generator

class TimestampContinuityChecker:
    """Detect gaps by analyzing timestamp deltas between consecutive rows.
    
    Works on any sorted time-series dataframe with a 'timestamp' column.
    Does not require external calendar data — purely syntactic.
    """
    
    def __init__(self, tolerance_ratio: float = 1.5):
        """Initialize with configurable gap tolerance.
        
        Args:
            tolerance_ratio: Multiplier on expected interval to define a gap.
                            1.5 means a gap is detected when delta > interval × 1.5
        """
        self.tolerance_ratio = tolerance_ratio
    
    def _generate_intervals(self, df: pd.DataFrame, interval_ms: int) -> Generator[dict, None, None]:
        """Yield gap information for each consecutive pair of rows."""
        timestamps = df["timestamp"].values
        
        for i in range(1, len(timestamps)):
            delta = timestamps[i] - timestamps[i - 1]  # in milliseconds
            expected = interval_ms
            
            if delta > expected * self.tolerance_ratio:
                yield {
                    "row_index": i,
                    "prev_timestamp": timestamps[i - 1],
                    "curr_timestamp": timestamps[i],
                    "delta_ms": delta,
                    "expected_ms": expected,
                    "gap_bars": round(delta / expected) - 1  # bars missing between these two
                }
    
    def detect_gaps(self, df: pd.DataFrame, interval: str) -> pd.DataFrame:
        """Run continuity check and return gap report as a DataFrame.
        
        Args:
            df: DataFrame with a 'timestamp' column (ms since epoch)
            interval: e.g., "1m", "5m", "1h" — used to determine expected interval in ms
            
        Returns:
            DataFrame of detected gaps with start/end timestamps and bar counts.
        """
        interval_ms_map = {
            "1m": 60_000,
            "5m": 300_000,
            "15m": 900_000,
            "30m": 1_800_000,
            "1h": 3_600_000,
            "4h": 14_400_000,
            "1d": 86_400_000
        }
        
        interval_ms = interval_ms_map.get(interval)
        if interval_ms is None:
            raise ValueError(f"Unsupported interval: {interval}")
        
        if df.empty or len(df) < 2:
            return pd.DataFrame()
        
        gaps = list(self._generate_intervals(df, interval_ms))
        
        if not gaps:
            return pd.DataFrame()
        
        # Convert to human-readable timestamps
        result = pd.DataFrame(gaps)
        result["gap_start"] = pd.to_datetime(result["prev_timestamp"], unit="ms")
        result["gap_end"] = pd.to_datetime(result["curr_timestamp"], unit="ms")
        
        return result[["gap_start", "gap_end", "gap_bars", "delta_ms"]]
    
    def merge_adjacent_gaps(self, gaps_df: pd.DataFrame, max_gap_ms: int = 3_600_000) -> pd.DataFrame:
        """Merge gaps that are separated by less than max_gap_ms.
        
        Useful when you want to coalesce micro-gaps from market pauses
        (e.g., early close days, trading halts) from true data gaps.
        """
        if gaps_df.empty:
            return gaps_df
        
        gaps_df = gaps_df.sort_values("gap_start").reset_index(drop=True)
        merged = []
        current_start = gaps_df.iloc[0]["gap_start"]
        current_end = gaps_df.iloc[0]["gap_end"]
        current_bars = gaps_df.iloc[0]["gap_bars"]
        
        for i in range(1, len(gaps_df)):
            row = gaps_df.iloc[i]
            # If this gap starts within max_gap_ms of the current end, merge
            if (row["gap_start"] - current_end).total_seconds() * 1000 <= max_gap_ms:
                current_end = max(current_end, row["gap_end"])
                current_bars += row["gap_bars"]
            else:
                merged.append({
                    "gap_start": current_start,
                    "gap_end": current_end,
                    "total_bars_missing": current_bars
                })
                current_start = row["gap_start"]
                current_end = row["gap_end"]
                current_bars = row["gap_bars"]
        
        # Append the last group
        merged.append({
            "gap_start": current_start,
            "gap_end": current_end,
            "total_bars_missing": current_bars
        })
        
        return pd.DataFrame(merged)

Sample output:

gap_start              gap_end                total_bars_missing
2024-03-15 16:00:00    2024-03-18 09:30:00    2340
2024-07-03 13:00:00    2024-07-05 09:30:00    1170

Why this works: Timestamp continuity is immune to calendar mismatches — it only looks at local deltas. A gap detected here is a gap regardless of whether the missing days were holidays or trading days.

Important: The tolerance ratio (default 1.5) absorbs minor market irregularities. If you need stricter detection, lower it to 1.1. If you get false positives from market micro-structure pauses, raise it to 2.0.


3. Running the Full Pipeline

Tie the three layers together into a single scheduled pipeline:

import pandas as pd
import requests
import time
import os
from datetime import datetime
from dotenv import load_dotenv

# Load API key from environment variable
# ⚠️ Do not hardcode API keys in production scripts
load_dotenv()
API_KEY = os.environ.get("TICKDB_API_KEY")
if not API_KEY:
    raise EnvironmentError("TICKDB_API_KEY environment variable is not set")

BASE_URL = "https://api.tickdb.ai"
HEADERS = {"X-API-Key": API_KEY}

def fetch_klines(symbol: str, interval: str, start_ts: int, end_ts: int) -> pd.DataFrame:
    """Fetch kline data from TickDB with retry and timeout."""
    all_data = []
    current_start = start_ts
    
    while current_start < end_ts:
        params = {
            "symbol": symbol,
            "interval": interval,
            "start": current_start,
            "end": end_ts,
            "limit": 1000
        }
        
        try:
            response = requests.get(
                f"{BASE_URL}/v1/market/kline",
                headers=HEADERS,
                params=params,
                timeout=(3.05, 10)
            )
            
            if response.status_code == 429:
                retry_after = int(response.headers.get("Retry-After", 5))
                print(f"Rate limited. Waiting {retry_after}s.")
                time.sleep(retry_after)
                continue
            
            response.raise_for_status()
            result = response.json()
            data = result.get("data", [])
            
            if not data:
                break
            
            all_data.extend(data)
            current_start = data[-1]["timestamp"] + 1
            
            if len(data) < 1000:
                break
            
        except requests.exceptions.RequestException as e:
            print(f"Request error: {e}")
            break
    
    if not all_data:
        return pd.DataFrame()
    
    df = pd.DataFrame(all_data)
    df["timestamp"] = pd.to_datetime(df["timestamp"], unit="ms")
    return df.sort_values("timestamp").reset_index(drop=True)


def run_full_audit(symbol: str, interval: str, start_date: str, end_date: str) -> dict:
    """Run all three detection layers and return a consolidated report."""
    from tradingcalendar import TradingCalendar
    from rowcountvalidator import RowCountValidator
    from timestampchecker import TimestampContinuityChecker
    
    print(f"\n{'='*60}")
    print(f"AUDIT: {symbol} | {interval} | {start_date} → {end_date}")
    print(f"{'='*60}")
    
    start_ts = int(pd.Timestamp(start_date).timestamp() * 1000)
    end_ts = int(pd.Timestamp(end_date).timestamp() * 1000)
    
    # Step 1: Calendar alignment
    tc = TradingCalendar()
    start_dt = pd.Timestamp(start_date).date()
    end_dt = pd.Timestamp(end_date).date()
    expected_days = tc.get_expected_trading_days(start_dt, end_dt)
    actual_dates = set()
    
    df = fetch_klines(symbol, interval, start_ts, end_ts)
    
    if not df.empty:
        df_dates = df["timestamp"].dt.date
        actual_dates = set(df_dates)
    
    calendar_gaps = tc.find_gaps(actual_dates)
    print(f"\n[Layer 1] Calendar Alignment:")
    print(f"  Expected trading days: {len(expected_days)}")
    print(f"  Actual days in data:   {len(actual_dates)}")
    print(f"  Calendar gaps found:   {len(calendar_gaps)}")
    if calendar_gaps:
        print(f"  Missing dates: {calendar_gaps[:5]}{'...' if len(calendar_gaps) > 5 else ''}")
    
    # Step 2: Row count validation
    validator = RowCountValidator(API_KEY)
    row_result = validator.validate_interval(symbol, interval, start_date, end_date)
    print(f"\n[Layer 2] Row Count Validation:")
    print(f"  Expected rows: {row_result.get('expected_rows', 'N/A')}")
    print(f"  Actual rows:   {row_result.get('actual_rows', 'N/A')}")
    print(f"  Missing rows:  {row_result.get('missing_rows', 'N/A')}")
    print(f"  Completeness:  {row_result.get('completeness_pct', 'N/A')}%")
    
    # Step 3: Timestamp continuity
    checker = TimestampContinuityChecker(tolerance_ratio=1.5)
    if not df.empty:
        gaps_df = checker.detect_gaps(df, interval)
        merged_gaps = checker.merge_adjacent_gaps(gaps_df)
        print(f"\n[Layer 3] Timestamp Continuity:")
        print(f"  Gaps detected (raw):     {len(gaps_df)}")
        print(f"  Gaps detected (merged):  {len(merged_gaps)}")
        if not merged_gaps.empty:
            print(f"  Largest gap: {merged_gaps['total_bars_missing'].max()} bars")
    
    return {
        "symbol": symbol,
        "interval": interval,
        "calendar_gaps": calendar_gaps,
        "missing_rows": row_result.get("missing_rows", 0),
        "completeness_pct": row_result.get("completeness_pct", 0),
        "continuity_gaps_raw": len(gaps_df) if not df.empty else 0,
        "continuity_gaps_merged": len(merged_gaps) if not df.empty else 0,
        "df": df
    }


# Example: Audit AAPL 1-minute data for 2024
if __name__ == "__main__":
    report = run_full_audit("AAPL.US", "1m", "2024-01-01", "2024-12-31")
    completeness = report["completeness_pct"]
    if completeness < 99.5:
        print(f"\n⚠️ WARNING: Completeness is {completeness}%. Investigate gaps before backtesting.")

4. Interpreting Results: What Each Layer Catches

Detection method Catches Misses
Calendar alignment Missing entire trading days Intraday gaps within a trading day
Row count validation Total volume deficits Location of gaps, order of data
Timestamp continuity Intraday gaps, micro-holes Calendar-wide date absences

The three methods are complementary. Use all three.


5. Recommended Scheduling

Environment Frequency Scope
Development / backtesting Every pull from TickDB The specific symbol + date range being tested
Daily pipeline Daily (market open check) Previous trading day's data
Weekly audit Weekly Full rolling 90-day window
Monthly audit Monthly Full dataset history

For backtesting scenarios: run the full audit before every backtest, not after. Catching a gap after a failed backtest is repair work. Catching it before is prevention.


6. What to Do When You Find a Gap

Gaps are not always vendor errors. Before filing a support ticket:

  1. Verify it is not a known market event: trading halts, circuit breakers, early closes. Check SEC filings or exchange notices.
  2. Cross-check with a second data source: if Polygon or Databento has the data for the same dates, the gap is likely TickDB-side.
  3. Check your own ingestion pipeline: a timeout during backfill can produce gaps that originate in your code, not the vendor.
  4. Contact TickDB support with the specific symbol, date range, and gap report from this audit script.

Next Steps

If you're debugging a specific backtest failure, run the audit script against the symbol and date range in question. Paste the completeness percentage into your issue report — it gives the support team a concrete starting point.

If you want to schedule automated audits, copy the run_full_audit function into a scheduled task (Airflow, cron, or GitHub Actions) and route the output to a Slack channel or email alert when completeness drops below 99.5%.

If you need historical OHLCV data with verified completeness, TickDB provides 10+ years of cleaned, calendar-aligned US equity kline data via the /v1/market/kline endpoint. Free tier available at tickdb.ai — no credit card required.

If you use AI coding assistants, search for and install the tickdb-market-data SKILL in your AI tool's marketplace for direct API integration in your workflow.


This article does not constitute investment advice. Data quality verification is an engineering practice — its results do not predict market behavior or guarantee trading profitability.