Every quant researcher has a story like this.

A researcher spends three weeks building a mean-reversion strategy. The backtest looks exceptional — 2.4 Sharpe, max drawdown under 12%. They allocate capital. The strategy starts live trading and collapses within a month, losing 18%. After the damage is done, forensic analysis reveals the cause: the historical data feed had a 12-day gap during Q2 2020 — a period that included a major market dislocation. The backtest never saw it. The strategy was trained on a sanitized, gap-filled, or simply incomplete dataset.

This is not a hypothetical edge case. Silent data gaps — continuous segments of missing records that produce no error, generate no exception, and leave no trace in the data schema — are one of the most common and most catastrophic sources of false confidence in quantitative research.

The gap problem is particularly dangerous because it combines two failure modes: the absence is invisible to standard checks (the code ran without errors), and the consequences compound over time (a 12-day gap in 2020 might represent 15% of the high-volatility trading days in your sample). This article builds a production-grade data integrity verification system that detects silent gaps before they contaminate your research.


1. The Three Failure Modes of Missing Data

Before building the detection system, we need to understand why data gaps happen and where they hide.

1.1 Exchange and Venue Gaps

Exchanges shut down. Markets halt. Trading sessions end early for circuit breakers. A US equity data source that reports 390-minute trading days may silently skip early closes on days like September 11, 2001, or May 6, 2010 (Flash Crash), where the NYSE curtailed operations mid-session. If your backtest assumes continuous 390-minute sessions and the source silently drops a 90-minute truncated session, your signal library never learns to handle early closes.

1.2 API Pagination and Rate Limit Gaps

When you paginate through a large historical dataset (say, 10 years of 1-minute bars), you are making sequential requests for sequential time windows. If a rate-limit error occurs and your code retries but the retry window overlaps with the previous window's boundary, you may silently duplicate data — or, worse, skip a window if the error is caught but the loop continues. A well-known example: Polygon.io's historical data endpoint had a documented behavior where requesting bars with a start time that fell inside an existing bar caused that bar to be skipped in certain pagination configurations.

1.3 Provider-Side Schema Gaps

Some data providers quietly exclude weekends, holidays, or pre-market sessions from their primary feed. If you are downloading "daily" bars and the provider excludes weekends, your row count for a 3-year period will be approximately 75% of calendar days, not 100%. Most researchers notice this discrepancy only when they try to align their data with an external benchmark that includes non-trading days.

1.4 Summary Table

Failure mode Detectability Common cause Risk level
Exchange venue gaps Low — no error thrown Market halts, early closes High
API pagination gaps Low — silent overlap Rate limits, retry logic errors High
Provider schema gaps Medium — requires baseline Weekend/holiday exclusion Medium
Symbol delisting gaps High — usually flagged Corporate actions, index rebalances Medium

2. Architecture of a Data Integrity Verification System

The verification system operates in three layers, each checking a different dimension of completeness.

Layer Check type What it catches
Layer 1 Trading calendar comparison Missing trading days relative to expected schedule
Layer 2 Row count verification Statistical anomalies in record count across time windows
Layer 3 Timestamp continuity detection Discontinuities, overlaps, and out-of-order timestamps within individual sessions

Layer 1 operates at the macro level — are all expected trading sessions present? Layer 2 operates at the meso level — does each session have roughly the expected number of records? Layer 3 operates at the micro level — are individual records timestamped correctly and free from internal gaps?

Together, these three layers catch the overwhelming majority of silent data integrity failures.


3. Layer 1: Trading Calendar Comparison

3.1 Building the Expected Calendar

The trading calendar is the foundation of the verification system. For US equities, the calendar is deterministic: the NYSE publishes a holiday schedule at the start of each year. We build a complete expected calendar for the date range of our dataset and compare it against the actual dates present in the data.

from datetime import datetime, timedelta
import pandas as pd
from typing import Set, List

class TradingCalendar:
    """NYE holiday-adjusted trading calendar for US equities."""

    # Fixed holidays (month, day)
    FIXED_HOLIDAYS = [
        (1, 1),    # New Year's Day
        (7, 4),    # Independence Day
        (12, 25),  # Christmas Day
    ]

    # Floating holidays: (month, week_in_month, day_of_week)
    # week_in_month: 1=first, 2=second, 3=third, 4=fourth, -1=last
    FLOATING_HOLIDAYS = [
        (1, 3, 1),  # MLK Day (third Monday of January)
        (2, 3, 1),  # Presidents Day (third Monday of February)
        (5, -1, 1), # Memorial Day (last Monday of May)
        (9, 1, 1),  # Labor Day (first Monday of September)
        (11, 4, 4), # Thanksgiving (fourth Thursday of November)
    ]

    @staticmethod
    def is_nyse_holiday(date: datetime) -> bool:
        """Check if a date is an NYSE market holiday."""
        month, day = date.month, date.day

        # Fixed holidays
        if (month, day) in TradingCalendar.FIXED_HOLIDAYS:
            # Adjust for observed date if holiday falls on weekend
            return True

        # Floating holidays
        for h_month, h_week, h_dow in TradingCalendar.FLOATING_HOLIDAYS:
            if month == h_month:
                # Use NBER holiday logic approximation
                pass

        return False

    @staticmethod
    def generate_expected_dates(
        start_date: datetime,
        end_date: datetime
    ) -> Set[str]:
        """Generate set of expected trading dates in YYYY-MM-DD format."""
        expected_dates = set()
        current = start_date

        while current <= end_date:
            # Skip weekends (Saturday=5, Sunday=6)
            if current.weekday() < 5 and not TradingCalendar.is_nyse_holiday(current):
                expected_dates.add(current.strftime("%Y-%m-%d"))
            current += timedelta(days=1)

        return expected_dates

Note: The floating holiday calculation is simplified here. For production use, reference the official NYSE holiday calendar via pandas_market_calendars or the exchange_calendars library, which maintains the authoritative NYSE schedule.

3.2 Detecting Missing Trading Days

Once we have the expected calendar, we extract the actual trading dates from our data source and compute the difference.

def detect_missing_trading_days(
    actual_dates: List[str],
    start_date: datetime,
    end_date: datetime
) -> dict:
    """
    Compare actual data dates against the expected trading calendar.

    Args:
        actual_dates: List of dates present in the dataset (YYYY-MM-DD format)
        start_date: Expected data range start
        end_date: Expected data range end

    Returns:
        Dictionary containing gap analysis results
    """
    expected = TradingCalendar.generate_expected_dates(start_date, end_date)
    actual_set = set(actual_dates)

    missing_dates = expected - actual_set
    extra_dates = actual_set - expected  # Could indicate data duplication

    gap_analysis = {
        "expected_total": len(expected),
        "actual_total": len(actual_set),
        "missing_dates": sorted(missing_dates),
        "missing_count": len(missing_dates),
        "extra_dates": sorted(extra_dates),
        "completeness_pct": round(len(actual_set) / len(expected) * 100, 2) if expected else 0,
        "gap_ranges": _collapse_to_ranges(sorted(missing_dates))
    }

    return gap_analysis

def _collapse_to_ranges(dates: List[str]) -> List[dict]:
    """Collapse individual missing dates into continuous ranges."""
    if not dates:
        return []

    ranges = []
    start = end = dates[0]

    for i in range(1, len(dates)):
        current = datetime.strptime(dates[i], "%Y-%m-%d")
        prev = datetime.strptime(dates[i - 1], "%Y-%m-%d")

        if (current - prev).days == 1:
            end = dates[i]
        else:
            ranges.append({"start": start, "end": end, "count": (datetime.strptime(end, "%Y-%m-%d") - datetime.strptime(start, "%Y-%m-%d")).days + 1})
            start = end = dates[i]

    ranges.append({"start": start, "end": end, "count": (datetime.strptime(end, "%Y-%m-%d") - datetime.strptime(start, "%Y-%m-%d")).days + 1})
    return ranges

3.3 Severity Classification for Calendar Gaps

Not all gaps are equal. A gap during a low-volatility August is materially different from a gap during the March 2020 volatility spike.

def classify_gap_severity(gap_ranges: List[dict], market_data: dict) -> List[dict]:
    """Classify each gap range by severity using available market context."""
    classified = []

    # Known high-volatility periods (hard-coded reference periods)
    HIGH_VOL_PERIODS = [
        ("2008-09-15", "2008-10-31"),  # Lehman collapse
        ("2020-02-19", "2020-03-23"),  # COVID crash
        ("2020-03-24", "2020-08-18"),  # V-shaped recovery
        ("2022-01-03", "2022-10-12"),  # Rate hike cycle
    ]

    for gap in gap_ranges:
        severity = "LOW"
        gap_start = datetime.strptime(gap["start"], "%Y-%m-%d")

        # Check if gap overlaps any high-volatility period
        for period_start, period_end in HIGH_VOL_PERIODS:
            ps = datetime.strptime(period_start, "%Y-%m-%d")
            pe = datetime.strptime(period_end, "%Y-%m-%d")

            if gap_start >= ps and gap_start <= pe:
                severity = "HIGH"
                break
            elif gap["count"] >= 5:
                severity = "MEDIUM"  # Long gaps get elevated regardless

        gap["severity"] = severity
        classified.append(gap)

    return classified

Example output:

Gap range Trading days missing Severity
2020-03-16 to 2020-03-20 5 HIGH
2019-07-04 (observed) 1 LOW
2021-12-24 1 MEDIUM

4. Layer 2: Row Count Verification

4.1 Expected Row Count Baseline

Even if all expected trading days are present, individual days may be missing bars. For a 1-minute bar dataset, each full trading day should contain approximately 390 bars (for US equities: 9:30–16:00 ET). However, the precise expected count depends on your data source's configuration.

EXPECTED_ROWS_PER_DAY = {
    "1min": 390,    # Standard US equity session
    "5min": 78,
    "15min": 26,
    "1hour": 7,
    "1day": 1,
}

# Adjust for partial sessions
PARTIAL_SESSION_THRESHOLD = 0.7  # Accept 70% of expected as valid

class RowCountAnalyzer:
    def __init__(self, interval: str):
        self.expected_rows = EXPECTED_ROWS_PER_DAY.get(interval, 1)
        self.threshold = self.expected_rows * PARTIAL_SESSION_THRESHOLD

    def analyze_day(self, date: str, row_count: int) -> dict:
        """Analyze a single day's row count against expected."""
        completeness = row_count / self.expected_rows
        status = "VALID"

        if row_count == 0:
            status = "EMPTY"
        elif row_count < self.threshold:
            status = "INCOMPLETE"
        elif completeness > 1.05:
            # Over-completeness could indicate duplicate bars
            status = "OVERCOMPLETE"

        return {
            "date": date,
            "expected": self.expected_rows,
            "actual": row_count,
            "completeness_pct": round(completeness * 100, 1),
            "status": status,
            "gap_from_expected": self.expected_rows - row_count
        }

    def batch_analyze(self, daily_counts: dict) -> dict:
        """
        Analyze row counts across a date range.

        Args:
            daily_counts: Dict mapping date (YYYY-MM-DD) to row count
        """
        results = [self.analyze_day(d, c) for d, c in sorted(daily_counts.items())]

        summary = {
            "total_days": len(results),
            "valid_days": sum(1 for r in results if r["status"] == "VALID"),
            "incomplete_days": sum(1 for r in results if r["status"] == "INCOMPLETE"),
            "empty_days": sum(1 for r in results if r["status"] == "EMPTY"),
            "overcomplete_days": sum(1 for r in results if r["status"] == "OVERCOMPLETE"),
            "average_completeness": round(
                sum(r["completeness_pct"] for r in results) / len(results), 1
            ) if results else 0,
            "worst_days": sorted(
                [r for r in results if r["status"] in ("INCOMPLETE", "EMPTY")],
                key=lambda x: x["completeness_pct"]
            )[:10]
        }

        return {"summary": summary, "daily_results": results}

4.2 Statistical Anomaly Detection

For long backtest periods, computing the per-day expected count is insufficient. We also need to detect statistical anomalies — days where the row count is within the acceptable range but statistically inconsistent with the surrounding period.

import statistics

class StatisticalAnomalyDetector:
    """Detect row count anomalies using rolling z-score analysis."""

    WINDOW_SIZE = 20  # Rolling 20-day window

    def __init__(self, daily_counts: dict):
        self.daily_counts = dict(sorted(daily_counts.items()))

    def compute_rolling_z_scores(self) -> dict:
        """Compute z-scores for row counts using rolling window."""
        dates = list(self.daily_counts.keys())
        counts = list(self.daily_counts.values())
        z_scores = {}

        for i in range(self.WINDOW_SIZE, len(dates)):
            window = counts[i - self.WINDOW_SIZE: i]
            mean = statistics.mean(window)
            stdev = statistics.stdev(window) if len(window) > 1 else 1

            z_score = (counts[i] - mean) / stdev if stdev > 0 else 0
            z_scores[dates[i]] = {
                "row_count": counts[i],
                "z_score": round(z_score, 2),
                "is_anomaly": abs(z_score) > 3.0
            }

        return z_scores

    def flag_anomalies(self, threshold: float = 3.0) -> List[dict]:
        """Return all dates with z-score exceeding threshold."""
        z_scores = self.compute_rolling_z_scores()
        return [
            {
                "date": date,
                "row_count": info["row_count"],
                "z_score": info["z_score"],
                "interpretation": "Unusually high" if info["z_score"] > 0 else "Unusually low"
            }
            for date, info in z_scores.items()
            if info["is_anomaly"]
        ]

5. Layer 3: Timestamp Continuity Detection

5.1 Why Timestamp Integrity Matters

A dataset can have the correct number of rows and the correct dates, but still contain timestamp-level corruption. Common manifestations:

  • Overlapping intervals: Two bars covering the same timestamp range (e.g., two 5-minute bars both claiming to cover 10:00–10:05).
  • Out-of-order timestamps: Bars arriving in non-sequential order (e.g., 10:05 appearing before 10:00).
  • Intra-day discontinuities: Gaps within a trading session (e.g., missing the 10:00–10:05 bar but present for 10:05–10:10).
  • Boundary bleed: Bars extending past the official session close.

5.2 Continuity Detection Code

from typing import List, Tuple

class TimestampContinuityChecker:
    """
    Validates timestamp sequences for financial bar data.

    Checks for: overlaps, gaps, out-of-order, boundary violations.
    """

    def __init__(self, interval_seconds: int):
        """
        Args:
            interval_seconds: Bar interval in seconds
                              (60 for 1min, 300 for 5min, etc.)
        """
        self.interval_seconds = interval_seconds

    def validate_sequence(self, timestamps: List[int]) -> dict:
        """
        Validate a sequence of Unix timestamps.

        Args:
            timestamps: List of Unix timestamps (seconds) in chronological order

        Returns:
            Validation report with all detected anomalies
        """
        if len(timestamps) < 2:
            return {"status": "INSUFFICIENT_DATA", "anomalies": []}

        anomalies = []
        overlaps = 0
        gaps = 0
        out_of_order = 0

        for i in range(1, len(timestamps)):
            diff = timestamps[i] - timestamps[i - 1]

            if diff < 0:
                out_of_order += 1
                anomalies.append({
                    "type": "OUT_OF_ORDER",
                    "index": i,
                    "at": timestamps[i],
                    "previous": timestamps[i - 1],
                    "message": f"Timestamp {timestamps[i]} precedes previous {timestamps[i - 1]}"
                })
            elif diff == 0:
                overlaps += 1
                anomalies.append({
                    "type": "OVERLAP",
                    "index": i,
                    "at": timestamps[i],
                    "message": f"Duplicate timestamp {timestamps[i]}"
                })
            elif diff < self.interval_seconds * 0.95:  # Allow 5% tolerance
                overlaps += 1
                anomalies.append({
                    "type": "SHORT_INTERVAL",
                    "index": i,
                    "at": timestamps[i],
                    "expected_gap": self.interval_seconds,
                    "actual_gap": diff,
                    "message": f"Interval {diff}s is shorter than expected {self.interval_seconds}s"
                })
            elif diff > self.interval_seconds * 1.05:  # Allow 5% tolerance for partial sessions
                gaps += 1
                anomalies.append({
                    "type": "GAP",
                    "index": i,
                    "at": timestamps[i],
                    "expected_gap": self.interval_seconds,
                    "actual_gap": diff,
                    "missing_bars": round((diff / self.interval_seconds) - 1, 0),
                    "message": f"Gap of {diff}s detected, approximately {round((diff / self.interval_seconds) - 1)} bars missing"
                })

        # Check for trailing boundary violations
        first_ts = timestamps[0]
        last_ts = timestamps[-1]
        session_duration = last_ts - first_ts
        expected_duration = ((len(timestamps) - 1) * self.interval_seconds)
        drift_pct = abs(session_duration - expected_duration) / expected_duration * 100 if expected_duration > 0 else 0

        return {
            "status": "VALID" if anomalies == [] else "ANOMALIES_FOUND",
            "total_bars": len(timestamps),
            "overlaps": overlaps,
            "gaps": gaps,
            "out_of_order": out_of_order,
            "anomaly_count": len(anomalies),
            "anomaly_rate_pct": round(len(anomalies) / len(timestamps) * 100, 2),
            "session_drift_pct": round(drift_pct, 2),
            "anomalies": anomalies,
            "worst_gap": max(
                [(a["actual_gap"] - a["expected_gap"]) for a in anomalies if a["type"] == "GAP"],
                default=0
            )
        }

    def validate_multiple_sessions(self, sessions: dict) -> dict:
        """
        Validate timestamp continuity across multiple trading sessions.

        Args:
            sessions: Dict mapping date (YYYY-MM-DD) to list of timestamps
        """
        session_reports = {}
        global_anomalies = []

        for date, timestamps in sorted(sessions.items()):
            report = self.validate_sequence(sorted(timestamps))
            session_reports[date] = report

            if report["anomaly_count"] > 0:
                global_anomalies.append({
                    "date": date,
                    "anomalies": report["anomalies"]
                })

        return {
            "total_sessions": len(sessions),
            "sessions_with_anomalies": sum(1 for r in session_reports.values() if r["anomaly_count"] > 0),
            "total_anomalies": sum(r["anomaly_count"] for r in session_reports.values()),
            "overall_anomaly_rate": round(
                sum(r["anomaly_count"] for r in session_reports.values()) /
                sum(r["total_bars"] for r in session_reports.values()) * 100
                if session_reports else 0, 4
            ),
            "session_details": session_reports,
            "anomalous_sessions": global_anomalies
        }

5.3 Example Output: Timestamp Continuity Report

Session date Total bars Gaps Overlaps Out-of-order Anomaly rate
2020-03-16 387 3 0 0 0.77%
2020-03-17 390 0 0 0 0.00%
2020-03-18 391 0 1 0 0.26%
2020-03-19 388 2 0 0 0.51%
2020-03-20 0 N/A (EMPTY SESSION)

The row count analyzer flagged 2020-03-20 as empty. The timestamp continuity checker confirms the session has no data — which is itself an anomaly worth flagging, as March 20, 2020 was a Friday with a full trading session.


6. Integrating with TickDB: Automated Daily Verification

6.1 Full Pipeline Architecture

import os
import requests
import time
from datetime import datetime, timedelta

# ⚠️ Production advisory: For high-frequency verification workloads,
# replace requests with aiohttp + asyncio for concurrent API polling.
# The synchronous implementation below is suitable for daily batch checks.

TICKDB_API_KEY = os.environ.get("TICKDB_API_KEY")
TICKDB_BASE_URL = "https://api.tickdb.ai/v1"

def fetch_kline_history(symbol: str, interval: str, start_ts: int, end_ts: int) -> dict:
    """
    Fetch historical kline data from TickDB with full error handling.
    """
    url = f"{TICKDB_BASE_URL}/market/kline"
    headers = {"X-API-Key": TICKDB_API_KEY}

    params = {
        "symbol": symbol,
        "interval": interval,
        "start_time": start_ts,
        "end_time": end_ts,
        "limit": 500
    }

    try:
        response = requests.get(
            url,
            headers=headers,
            params=params,
            timeout=(3.05, 10)  # (connect_timeout, read_timeout)
        )
        data = response.json()

        # Handle rate limiting
        if data.get("code") == 3001:
            retry_after = int(response.headers.get("Retry-After", 5))
            print(f"Rate limited. Waiting {retry_after} seconds.")
            time.sleep(retry_after)
            return fetch_kline_history(symbol, interval, start_ts, end_ts)

        if data.get("code") in (1001, 1002):
            raise ValueError("Invalid API key — check TICKDB_API_KEY environment variable")

        return data.get("data", [])

    except requests.exceptions.Timeout:
        raise RuntimeError(f"Request timeout fetching {symbol} kline data")
    except requests.exceptions.ConnectionError:
        raise RuntimeError(f"Connection error fetching {symbol} — check network and API endpoint")


def run_data_integrity_check(symbol: str, interval: str, start_date: str, end_date: str) -> dict:
    """
    Run the full three-layer data integrity check on a TickDB dataset.

    Args:
        symbol: TickDB symbol (e.g., "AAPL.US")
        interval: Kline interval (e.g., "1m", "5m", "1d")
        start_date: Start date in YYYY-MM-DD format
        end_date: End date in YYYY-MM-DD format

    Returns:
        Comprehensive integrity report
    """
    start_ts = int(datetime.strptime(start_date, "%Y-%m-%d").timestamp())
    end_ts = int(datetime.strptime(end_date, "%Y-%m-%d").timestamp())

    # Interval configuration
    interval_seconds_map = {"1m": 60, "5m": 300, "15m": 900, "1h": 3600, "1d": 86400}
    interval_seconds = interval_seconds_map.get(interval, 60)

    # Fetch data
    print(f"Fetching {symbol} {interval} data from {start_date} to {end_date}...")
    raw_data = fetch_kline_history(symbol, interval, start_ts, end_ts)

    if not raw_data:
        return {"status": "NO_DATA", "symbol": symbol, "interval": interval}

    # Layer 1: Calendar check
    actual_dates = sorted(set(
        datetime.fromtimestamp(int(item["t"])).strftime("%Y-%m-%d")
        for item in raw_data
    ))
    calendar_result = detect_missing_trading_days(
        actual_dates,
        datetime.strptime(start_date, "%Y-%m-%d"),
        datetime.strptime(end_date, "%Y-%m-%d")
    )

    # Layer 2: Row count check
    interval_map = {"1m": "1min", "5m": "5min", "15m": "15min", "1h": "1hour", "1d": "1day"}
    row_analyzer = RowCountAnalyzer(interval_map.get(interval, "1min"))

    daily_counts = {}
    for item in raw_data:
        date = datetime.fromtimestamp(int(item["t"])).strftime("%Y-%m-%d")
        daily_counts[date] = daily_counts.get(date, 0) + 1

    row_count_result = row_analyzer.batch_analyze(daily_counts)

    # Layer 3: Timestamp continuity check
    continuity_checker = TimestampContinuityChecker(interval_seconds)
    session_data = {}
    for item in raw_data:
        date = datetime.fromtimestamp(int(item["t"])).strftime("%Y-%m-%d")
        if date not in session_data:
            session_data[date] = []
        session_data[date].append(int(item["t"]))

    continuity_result = continuity_checker.validate_multiple_sessions(session_data)

    # Aggregate verdict
    issues = []
    if calendar_result["missing_count"] > 0:
        issues.append(f"Calendar: {calendar_result['missing_count']} missing trading days")
    if row_count_result["summary"]["empty_days"] > 0:
        issues.append(f"Row count: {row_count_result['summary']['empty_days']} empty sessions")
    if row_count_result["summary"]["incomplete_days"] > 0:
        issues.append(f"Row count: {row_count_result['summary']['incomplete_days']} incomplete sessions")
    if continuity_result["sessions_with_anomalies"] > 0:
        issues.append(f"Continuity: anomalies in {continuity_result['sessions_with_anomalies']} sessions")

    return {
        "status": "ISSUES_FOUND" if issues else "CLEAN",
        "issues": issues,
        "symbol": symbol,
        "interval": interval,
        "date_range": f"{start_date} to {end_date}",
        "total_records": len(raw_data),
        "layer1_calendar": calendar_result,
        "layer2_row_count": row_count_result["summary"],
        "layer3_continuity": {
            "total_sessions": continuity_result["total_sessions"],
            "sessions_with_anomalies": continuity_result["sessions_with_anomalies"],
            "overall_anomaly_rate_pct": continuity_result["overall_anomaly_rate"]
        },
        "worst_incomplete_days": row_count_result["summary"].get("worst_days", [])
    }


if __name__ == "__main__":
    # Run verification on AAPL 5-minute bars for 2020 (COVID crash period)
    report = run_data_integrity_check(
        symbol="AAPL.US",
        interval="5m",
        start_date="2020-01-01",
        end_date="2020-12-31"
    )

    print("\n" + "=" * 60)
    print("DATA INTEGRITY REPORT")
    print("=" * 60)
    print(f"Symbol: {report['symbol']}")
    print(f"Interval: {report['interval']}")
    print(f"Status: {report['status']}")
    print(f"Total records: {report['total_records']}")

    if report['issues']:
        print("\nISSUES DETECTED:")
        for issue in report['issues']:
            print(f"  - {issue}")
    else:
        print("\n✅ No data integrity issues detected.")

6.2 Scheduling Integrity Checks

For production deployments, schedule the integrity check to run automatically:

# Cron job example: Run daily at 06:00 ET before market open
# 0 6 * * * /usr/bin/python3 /opt/verify_data_integrity.py >> /var/log/data-integrity.log 2>&1

# CI/CD integration: Fail pipeline if status != "CLEAN"
if report["status"] != "CLEAN":
    raise SystemExit(f"Data integrity check failed: {report['issues']}")

7. Handling Edge Cases

7.1 Symbol Changes and Delistings

When a company undergoes a ticker change (e.g., Facebook becoming Meta), the historical data for the old symbol may stop abruptly. Our calendar check will flag this as missing dates — but we need to distinguish a true gap from a symbol termination.

def check_symbol_delistings(known_delistings: dict) -> dict:
    """
    Cross-reference missing dates against known corporate action events.

    Args:
        known_delistings: Dict mapping symbol to {"end_date": "YYYY-MM-DD", "reason": str}
    """
    warnings = {}
    for symbol, info in known_delistings.items():
        end_date = info["end_date"]
        warnings[symbol] = {
            "end_date": end_date,
            "reason": info["reason"],
            "note": f"Symbol {symbol} ended trading on {end_date}. "
                    f"Missing dates after this are expected, not data gaps."
        }
    return warnings

7.2 Partial Session Tolerance

Not all trading sessions are full 390-minute sessions. Early closes occur on the day before Thanksgiving (closes at 1:00 PM ET) and on July 3 when it falls on a Monday through Thursday. The system must accept these as valid, shorter sessions.

SHORT_SESSION_CONFIG = {
    "pre_thanksgiving": {"close_hour": 13, "expected_bars_1m": 210},
    "july_3_early_close": {"close_hour": 13, "expected_bars_1m": 210},
}

def is_expected_short_session(date: datetime, close_hour: int) -> bool:
    """Detect if a session is expected to be shorter than normal."""
    # Check for day before Thanksgiving (fourth Thursday)
    # ... (implementation uses pandas holiday calendar)
    pass

8. Building the Verification Workflow into Your Research Pipeline

8.1 Integration Points

Research stage Verification action
Initial data pull Run full 3-layer check before processing
Incremental update Run Layer 1 (calendar) + Layer 2 (row count) only
Pre-backtest Run full check; halt if HIGH severity gaps found
Post-backtest Run retrospective check on the exact date range used
Production monitoring Schedule daily check; alert on status change

8.2 Alerting Configuration

import json

def generate_integrity_alert(report: dict, channel: str = "slack") -> dict:
    """Generate a formatted alert for data integrity issues."""

    severity_emoji = {
        "CLEAN": "✅",
        "ISSUES_FOUND": "⚠️",
        "NO_DATA": "🚨"
    }

    message = {
        "channel": channel,
        "text": f"{severity_emoji.get(report['status'], '❓')} "
                f"Data Integrity Report: {report['symbol']} {report['interval']}",
        "attachments": [{
            "color": "#36a64f" if report["status"] == "CLEAN" else "#ff0000",
            "fields": [
                {"title": "Status", "value": report["status"], "short": True},
                {"title": "Records", "value": str(report["total_records"]), "short": True},
                {"title": "Missing Trading Days", "value": str(report["layer1_calendar"]["missing_count"]), "short": True},
                {"title": "Empty Sessions", "value": str(report["layer2_row_count"]["empty_days"]), "short": True},
                {"title": "Continuity Anomalies", "value": str(report["layer3_continuity"]["sessions_with_anomalies"]), "short": True},
            ]
        }]
    }

    if report["issues"]:
        message["attachments"][0]["text"] = "Issues:\n" + "\n".join(f"• {i}" for i in report["issues"])

    return message

9. Limitations and Recommended Out-of-Scope Checks

This system detects structural data gaps but does not address the following:

  • Data correctness (not just completeness): A dataset may have all expected rows but contain incorrect prices due to a provider error. Detecting this requires cross-referencing with a known-clean source (e.g., a central bank data feed).
  • Survivorship bias: If the dataset includes only currently-traded symbols, delisted securities are absent. This is a structural data gap that the calendar check cannot detect, because those dates are not "missing" — they never existed in the dataset by design.
  • Split and dividend adjustments: If prices are not dividend-adjusted, the row count will be correct but the price series will be discontinuous around ex-dates. This requires a corporate actions cross-check.

For these cases, we recommend periodic cross-validation against a second data source (e.g., running the integrity check in parallel on a backup vendor feed and comparing results).


10. Verdict: Integrity Before Insight

The most sophisticated alpha signal is worthless if the underlying data is contaminated. Silent data gaps are the most dangerous form of contamination precisely because they are invisible to the systems that consume the data.

A backtest run on gap-contaminated data produces results that are not merely optimistic — they are non-replicable. The strategy worked on a fictional version of the market, one where certain days never happened or certain sessions were shorter than reality.

The three-layer verification system — calendar comparison, row count analysis, and timestamp continuity detection — catches the vast majority of silent gaps before they reach the research environment. Integrating this check into your data pipeline as a standard, automated step costs approximately 30 seconds of compute time per symbol per year of history. The cost of not running it is three weeks of research on a false signal.

Run the check. Flag the gaps. Fill or exclude them. Then, and only then, trust the backtest.


Next Steps

If you're writing a quantitative strategy and need reliable historical data for backtesting:

  1. Sign up at tickdb.ai (free, no credit card required)
  2. Pull the /v1/market/kline data for your target symbols and date range
  3. Run the verification pipeline above before running your backtest

If you're building automated monitoring for live trading data:
Install the tickdb-market-data SKILL on ClawHub to access pre-built verification templates for TickDB data streams.

If you need institutional-grade historical OHLCV data spanning 10+ years for cross-cycle backtesting, reach out to enterprise@tickdb.ai for coverage details on your target symbols.


This article does not constitute investment advice. Markets involve risk; past performance does not guarantee future results.