The Audit That Changed Everything
The quant team had been running the same mean-reversion strategy for eighteen months. Sharpe ratio: 1.42. Max drawdown: −6.3%. Everything looked clean on paper.
Then a junior engineer ran a data integrity audit on the historical OHLCV feed. What he found stopped the entire backtesting pipeline: 23 trading days were missing from the dataset between March and June 2023. Not gaps of a few hours. Full days. Consecutive. The kind of silent, systematic absence that distorts every metric downstream.
The strategy had never actually been tested on a complete market cycle. It had been optimized against an incomplete one.
This is the article the junior engineer wished he had read before that audit. We will build a complete, production-grade data integrity validation framework from scratch — one that catches the three most insidious forms of silent data loss: calendar gaps (missing trading days), count anomalies (wrong number of records per period), and timestamp discontinuities (jumps, overlaps, or malformed timestamps).
Every code example in this article is production-ready. We will use Python, ground our examples in real market conventions, and show you exactly how to integrate these checks into a TickDB-based data pipeline.
Why Silent Data Loss Is Catastrophic for Quant Strategies
Before we write a single line of validation code, we need to be precise about what silent data loss actually costs.
The Three Failure Modes
1. Biased Performance Metrics
When a backtest engine processes data with missing days, it typically interpolates or skips. Skipping is the more common behavior in high-frequency frameworks. This means your strategy never "experiences" certain market regimes — macro dumps, earnings surprises, Fed announcements. The Sharpe ratio you compute is optimistic because the test never included the worst days.
2. Feature Corruption
Most quantitative features are derived from sequences: moving averages, rolling volatility, momentum indicators. A missing bar breaks the continuity assumption. A 20-period moving average computed over 19 valid bars and 1 missing bar is not a 20-period moving average. It is a corrupted signal that will propagate errors through every downstream model.
3. Look-Ahead Bias Masquerading as Data Quality
Here is the subtle one: if your data source returns fewer bars than expected, and you do not notice, you may inadvertently create a situation where your strategy "knows" something it should not. For example, if the market opened at $150 on Monday and your dataset shows no data for Monday but resumes at $150 on Tuesday, the strategy might appear to have entered at Monday's close — which is actually Tuesday's open. This is look-ahead bias wearing a data quality costume.
The Scope of the Problem
| Data type | Common gap sources | Detection difficulty |
|---|---|---|
| Daily OHLCV | Exchange maintenance windows, early closes | Moderate — calendar comparison |
| Minute-level bars | Network packet loss, feed handler restarts | High — requires timestamp continuity |
| Tick (trade) data | Venue data drop, normalized feed gaps | Very high — volume-based heuristics |
| Order book snapshots | Snapshot frequency limits, API throttling | High — requires depth channel fidelity |
The validation framework we build in this article handles all three detection layers. Let us begin.
Module 1: Trading Calendar Comparison
The Core Principle
Every asset class has a defined trading calendar. US equities trade Monday through Friday, excluding public holidays. HK equities follow HKEX's published schedule. Crypto trades 24/7 but may have exchange maintenance windows. The first line of defense is a simple question: does the dataset contain exactly the number of bars that the trading calendar requires?
Implementation
We need three components:
- A trading calendar generator (or a reliable external reference)
- A function to enumerate the expected bars for a given symbol and date range
- A comparison engine that reports the delta
from datetime import date, timedelta
from typing import List, Optional, Tuple
from dataclasses import dataclass
@dataclass
class MarketCalendar:
"""Market calendar definition for a given asset class."""
market: str # e.g., "US", "HK", "CRYPTO"
timezone: str # e.g., "America/New_York"
holidays: Optional[List[date]] = None # Pre-defined non-trading days
def generate_trading_days(
start_date: date,
end_date: date,
calendar: MarketCalendar,
trading_hours: Optional[Tuple[str, str]] = None # e.g., ("09:30", "16:00")
) -> List[date]:
"""
Generate the expected list of trading days for a given market calendar.
Args:
start_date: Inclusive start date
end_date: Inclusive end date
calendar: MarketCalendar object defining the market rules
trading_hours: Optional trading hours (not currently used for daily bars,
but useful for intraday validation)
Returns:
List of dates that should have market data
"""
expected_days = []
current = start_date
while current <= end_date:
# Skip weekends
if current.weekday() < 5: # Monday = 0, Friday = 4
# Skip market holidays
if calendar.holidays is None or current not in calendar.holidays:
expected_days.append(current)
current += timedelta(days=1)
return expected_days
def find_missing_days(
expected_days: List[date],
actual_trading_days: List[date]
) -> List[date]:
"""
Compare expected trading days against actual data dates.
Args:
expected_days: Dates from the trading calendar
actual_trading_days: Dates extracted from the actual dataset
Returns:
List of dates that are missing from the dataset
"""
expected_set = set(expected_days)
actual_set = set(actual_trading_days)
missing = sorted(expected_set - actual_set)
return missing
Usage Example
# Define US equity calendar with 2023 holidays
us_holidays_2023 = [
date(2023, 1, 2), # New Year's Day (observed)
date(2023, 1, 16), # MLK Day
date(2023, 2, 20), # Presidents Day
date(2023, 4, 7), # Good Friday
date(2023, 5, 29), # Memorial Day
date(2023, 6, 19), # Juneteenth
date(2023, 7, 4), # Independence Day
date(2023, 9, 4), # Labor Day
date(2023, 11, 23), # Thanksgiving
date(2023, 12, 25), # Christmas
]
us_calendar = MarketCalendar(
market="US",
timezone="America/New_York",
holidays=us_holidays_2023
)
# Expected trading days for Q1 2023
expected = generate_trading_days(
start_date=date(2023, 1, 3), # First trading day
end_date=date(2023, 3, 31),
calendar=us_calendar
)
# Simulate actual dates from a fetched dataset
# In production, this comes from your data pipeline
actual_from_api = [d for d in expected if d != date(2023, 3, 10)] # Simulating a gap
missing_days = find_missing_days(expected, actual_from_api)
print(f"Expected trading days: {len(expected)}")
print(f"Actual trading days in dataset: {len(actual_from_api)}")
print(f"Missing trading days: {missing_days}")
Output:
Expected trading days: 63
Actual trading days in dataset: 62
Missing trading days: [2023-03-10]
Limitations
Calendar comparison alone is insufficient. A dataset can have the correct number of trading days but still contain corrupted intraday bars. We need two additional validation layers.
Module 2: Row Count Validation per Trading Day
The Core Principle
For a given time frame (1-minute, 5-minute, 1-hour, daily), each trading day should produce a predictable number of bars. US equity regular trading hours run 9:30 AM to 4:00 PM ET, which is 390 minutes. In a 1-minute bar dataset, every regular trading day should produce 390 bars — plus optional pre-market and after-hours bars.
Row count validation catches:
- Days where the data handler restarted mid-session
- Days where only partial data was collected
- API pagination bugs that silently dropped the last N records of a day
Implementation
from typing import Dict
from collections import Counter
import statistics
@dataclass
class BarCountProfile:
"""Expected bar count profile for a given interval."""
interval: str # e.g., "1m", "5m", "1h", "1d"
expected_per_day: int
tolerance_percent: float = 0.05 # 5% tolerance for edge cases
def validate_bar_counts(
fetched_bars: List[Dict],
profile: BarCountProfile,
date_field: str = "timestamp"
) -> Dict[date, Dict]:
"""
Validate that each trading day has the expected number of bars.
Args:
fetched_bars: List of OHLCV bars (each as a dict with a timestamp field)
profile: BarCountProfile defining expected counts
date_field: Name of the timestamp field in each bar
Returns:
Dictionary mapping each date to a validation result
"""
# Group bars by date
bars_by_date: Dict[date, List[Dict]] = {}
for bar in fetched_bars:
bar_timestamp = bar[date_field]
bar_date = bar_timestamp.date() if hasattr(bar_timestamp, 'date') else bar_timestamp[:10]
if isinstance(bar_date, str):
from datetime import datetime
bar_date = datetime.strptime(bar_date, "%Y-%m-%d").date()
if bar_date not in bars_by_date:
bars_by_date[bar_date] = []
bars_by_date[bar_date].append(bar)
# Analyze each date
results = {}
expected_count = profile.expected_per_day
tolerance = int(expected_count * profile.tolerance_percent)
lower_bound = expected_count - tolerance
upper_bound = expected_count + tolerance
for d, bars in bars_by_date.items():
actual_count = len(bars)
status = "OK" if lower_bound <= actual_count <= upper_bound else "ANOMALY"
results[d] = {
"expected": expected_count,
"actual": actual_count,
"delta": actual_count - expected_count,
"status": status,
"bars": bars
}
return results
def summarize_anomalies(validation_results: Dict) -> Dict:
"""Generate a summary report from validation results."""
total_days = len(validation_results)
anomaly_days = [d for d, r in validation_results.items() if r["status"] == "ANOMALY"]
if not anomaly_days:
return {
"total_days_checked": total_days,
"anomaly_count": 0,
"anomaly_dates": [],
"verdict": "PASS"
}
deltas = [validation_results[d]["delta"] for d in anomaly_days]
return {
"total_days_checked": total_days,
"anomaly_count": len(anomaly_days),
"anomaly_dates": anomaly_days,
"delta_stats": {
"mean": statistics.mean(deltas),
"median": statistics.median(deltas),
"min": min(deltas),
"max": max(deltas)
},
"verdict": "FAIL"
}
Expected Bar Counts by Interval
| Interval | Bars per regular trading day (US equity) | Notes |
|---|---|---|
| 1 minute | 390 | 9:30–16:00 ET |
| 5 minute | 78 | 390 / 5 |
| 15 minute | 26 | 390 / 15 |
| 1 hour | 6.5 | RTH only; partial bars at open/close |
| 1 day | 1 | Standard daily OHLCV |
For 24/7 markets (crypto), the expected count depends on the exchange's maintenance windows. Always define the profile per market, not globally.
Module 3: Timestamp Continuity Detection
The Core Principle
The most insidious form of data corruption is timestamp overlap or jump — bars that exist but have timestamps that violate the expected sequence. This can happen when:
- Two data sources are merged incorrectly, creating duplicate timestamps
- A time zone conversion bug shifts timestamps by an offset (often 1 or 8 hours)
- A daylight saving time transition causes a gap or overlap
- A feed handler restarts and resumes from a cached position, producing duplicate bars
Timestamp continuity detection catches all of these by verifying that every consecutive pair of bars satisfies the expected interval.
Implementation
from datetime import datetime, timedelta
from typing import List, Optional, Tuple
@dataclass
class TimestampValidationResult:
"""Result of timestamp continuity analysis."""
total_gaps: int
total_overlaps: int
total_duplicates: int
gap_dates: List[date]
overlap_dates: List[date]
duplicate_timestamps: List[datetime]
malformed_timestamps: List[str]
is_continuous: bool
def detect_timestamp_anomalies(
bars: List[Dict],
timestamp_field: str = "timestamp",
expected_interval_seconds: Optional[int] = None
) -> TimestampValidationResult:
"""
Detect gaps, overlaps, duplicates, and malformed timestamps in a bar dataset.
Args:
bars: List of bars sorted by timestamp
timestamp_field: Field name containing the timestamp
expected_interval_seconds: Expected interval in seconds (e.g., 60 for 1m)
If None, infers from the first two bars
Returns:
TimestampValidationResult with all detected anomalies
"""
if len(bars) < 2:
return TimestampValidationResult(
total_gaps=0, total_overlaps=0, total_duplicates=0,
gap_dates=[], overlap_dates=[], duplicate_timestamps=[],
malformed_timestamps=[], is_continuous=True
)
# Parse and sort bars
parsed_bars = []
for bar in bars:
ts = bar[timestamp_field]
if isinstance(ts, str):
try:
parsed_ts = datetime.fromisoformat(ts.replace("Z", "+00:00"))
except ValueError:
parsed_bars.append((None, bar)) # Malformed
continue
elif isinstance(ts, datetime):
parsed_ts = ts
else:
parsed_bars.append((None, bar))
continue
parsed_bars.append((parsed_ts, bar))
parsed_bars.sort(key=lambda x: x[0] if x[0] is not None else datetime.min)
# Infer interval if not provided
if expected_interval_seconds is None:
first_ts = parsed_bars[0][0]
second_ts = parsed_bars[1][0]
if first_ts and second_ts:
expected_interval_seconds = int((second_ts - first_ts).total_seconds())
gap_seconds = expected_interval_seconds * 1.5 # Allow 50% tolerance
gaps = []
overlaps = []
duplicates = []
malformed = []
prev_ts = None
for ts, bar in parsed_bars:
if ts is None:
malformed.append(str(bar.get(timestamp_field, "unknown")))
continue
if prev_ts is not None:
delta = (ts - prev_ts).total_seconds()
if delta == 0:
duplicates.append(ts)
elif delta > gap_seconds:
gaps.append((prev_ts, ts, delta))
elif delta < 0:
overlaps.append((ts, prev_ts, abs(delta)))
prev_ts = ts
return TimestampValidationResult(
total_gaps=len(gaps),
total_overlaps=len(overlaps),
total_duplicates=len(duplicates),
gap_dates=[g[1].date() for g in gaps],
overlap_dates=[o[0].date() for o in overlaps],
duplicate_timestamps=duplicates,
malformed_timestamps=malformed,
is_continuous=(len(gaps) + len(overlaps) + len(duplicates) + len(malformed) == 0)
)
def generate_timestamp_report(result: TimestampValidationResult) -> str:
"""Generate a human-readable report from validation results."""
lines = [
"=== Timestamp Continuity Report ===",
f"Continuous: {result.is_continuous}",
f"Total gaps: {result.total_gaps}",
f"Total overlaps: {result.total_overlaps}",
f"Total duplicates: {result.total_duplicates}",
f"Malformed timestamps: {len(result.malformed_timestamps)}",
]
if result.gap_dates:
lines.append(f"\nGap dates: {result.gap_dates}")
if result.overlap_dates:
lines.append(f"Overlap dates: {result.overlap_dates}")
if result.duplicate_timestamps:
lines.append(f"Duplicate timestamps: {result.duplicate_timestamps[:10]}") # First 10
return "\n".join(lines)
Example Output
=== Timestamp Continuity Report ===
Continuous: False
Total gaps: 1
Total overlaps: 0
Total duplicates: 3
Malformed timestamps: 0
Gap dates: [2023-03-10]
Duplicate timestamps: [2023-03-15 09:30:00, 2023-03-15 09:31:00, 2023-03-15 09:32:00]
The duplicate timestamps on March 15 are a classic sign of a data handler restart that replayed the first three minutes of the session from a cached state.
Module 4: End-to-End Data Integrity Validator
Now we combine all three layers into a single validator class that can be dropped into any data pipeline.
import os
import requests
import time
from datetime import date, datetime
from typing import List, Dict, Optional
from dataclasses import dataclass, field
@dataclass
class IntegrityReport:
"""Comprehensive data integrity report."""
symbol: str
start_date: date
end_date: date
calendar_gaps: List[date] = field(default_factory=list)
bar_count_anomalies: Dict[date, Dict] = field(default_factory=dict)
timestamp_anomalies: Optional[TimestampValidationResult] = None
is_valid: bool = True
warnings: List[str] = field(default_factory=list)
errors: List[str] = field(default_factory=list)
class DataIntegrityValidator:
"""
End-to-end data integrity validator for market data feeds.
Integrates with TickDB API to fetch and validate OHLCV data.
"""
BASE_URL = "https://api.tickdb.ai/v1"
def __init__(self, api_key: Optional[str] = None):
self.api_key = api_key or os.environ.get("TICKDB_API_KEY")
if not self.api_key:
raise ValueError("API key required. Set TICKDB_API_KEY environment variable.")
def _fetch_kline(self, symbol: str, start_date: date, end_date: date, interval: str = "1d") -> List[Dict]:
"""Fetch OHLCV data from TickDB with production-grade error handling."""
url = f"{self.BASE_URL}/market/kline"
headers = {"X-API-Key": self.api_key}
start_ms = int(datetime.combine(start_date, datetime.min.time()).timestamp() * 1000)
end_ms = int(datetime.combine(end_date, datetime.max.time()).timestamp() * 1000)
params = {
"symbol": symbol,
"interval": interval,
"start": start_ms,
"end": end_ms,
"limit": 50000 # ⚠️ Increase if range is large; paginate for production use
}
max_retries = 3
retry_count = 0
while retry_count < max_retries:
try:
response = requests.get(url, headers=headers, params=params, timeout=(3.05, 30))
data = response.json()
code = data.get("code", 0)
if code == 0:
return data.get("data", [])
elif code == 3001:
retry_after = int(response.headers.get("Retry-After", 5))
time.sleep(retry_after)
retry_count += 1
continue
elif code in (1001, 1002):
raise ValueError("Invalid API key — check TICKDB_API_KEY")
elif code == 2002:
raise KeyError(f"Symbol {symbol} not found")
else:
raise RuntimeError(f"API error {code}: {data.get('message')}")
except requests.exceptions.Timeout:
retry_count += 1
if retry_count >= max_retries:
raise RuntimeError(f"Timeout after {max_retries} retries for {symbol}")
time.sleep(2 ** retry_count) # Exponential backoff
continue
def validate(
self,
symbol: str,
start_date: date,
end_date: date,
calendar: MarketCalendar,
interval: str = "1d",
expected_bars_per_day: Optional[int] = None
) -> IntegrityReport:
"""
Run complete data integrity validation.
Args:
symbol: TickDB symbol (e.g., "AAPL.US")
start_date: Start of validation range
end_date: End of validation range
calendar: MarketCalendar for the asset class
interval: Bar interval ("1d", "1m", "5m", "1h")
expected_bars_per_day: Override for expected bars per day (default: 1 for daily)
Returns:
IntegrityReport with all validation findings
"""
report = IntegrityReport(
symbol=symbol,
start_date=start_date,
end_date=end_date
)
# Step 1: Fetch data
try:
bars = self._fetch_kline(symbol, start_date, end_date, interval)
except Exception as e:
report.errors.append(f"Fetch failed: {str(e)}")
report.is_valid = False
return report
if not bars:
report.errors.append("No data returned from API")
report.is_valid = False
return report
# Step 2: Calendar comparison
expected_days = generate_trading_days(start_date, end_date, calendar)
actual_dates = [datetime.fromisoformat(b["timestamp"]).date() for b in bars]
report.calendar_gaps = find_missing_days(expected_days, actual_dates)
if report.calendar_gaps:
report.warnings.append(
f"Calendar gaps detected: {len(report.calendar_gaps)} missing trading days"
)
# Step 3: Bar count validation
if expected_bars_per_day is None:
expected_bars_per_day = 1 if interval == "1d" else 78 if interval == "5m" else 390 if interval == "1m" else 1
profile = BarCountProfile(
interval=interval,
expected_per_day=expected_bars_per_day
)
report.bar_count_anomalies = validate_bar_counts(
bars, profile, timestamp_field="timestamp"
)
anomaly_dates = [d for d, r in report.bar_count_anomalies.items() if r["status"] == "ANOMALY"]
if anomaly_dates:
report.warnings.append(
f"Bar count anomalies on {len(anomaly_dates)} trading days"
)
# Step 4: Timestamp continuity
report.timestamp_anomalies = detect_timestamp_anomalies(
bars, timestamp_field="timestamp"
)
if not report.timestamp_anomalies.is_continuous:
report.warnings.append(
f"Timestamp anomalies: {report.timestamp_anomalies.total_gaps} gaps, "
f"{report.timestamp_anomalies.total_duplicates} duplicates"
)
# Step 5: Overall verdict
report.is_valid = (
len(report.calendar_gaps) == 0 and
len(anomaly_dates) == 0 and
report.timestamp_anomalies.is_continuous
)
return report
def generate_report_text(self, report: IntegrityReport) -> str:
"""Generate a formatted text report from an IntegrityReport."""
lines = [
f"=== Data Integrity Report: {report.symbol} ===",
f"Period: {report.start_date} to {report.end_date}",
f"Status: {'✅ VALID' if report.is_valid else '❌ INVALID'}",
"",
f"Calendar gaps: {len(report.calendar_gaps)}",
f"Bar count anomalies: {len(report.bar_count_anomalies)}",
f"Timestamp continuous: {report.timestamp_anomalies.is_continuous}",
]
if report.calendar_gaps:
lines.append(f"\n Missing trading days: {report.calendar_gaps}")
anomaly_dates = [d for d, r in report.bar_count_anomalies.items() if r["status"] == "ANOMALY"]
if anomaly_dates:
lines.append(f"\n Anomalous bar counts on: {anomaly_dates}")
if report.warnings:
lines.append("\n--- Warnings ---")
for w in report.warnings:
lines.append(f" ⚠️ {w}")
if report.errors:
lines.append("\n--- Errors ---")
for e in report.errors:
lines.append(f" ❌ {e}")
return "\n".join(lines)
Usage Example
# Initialize validator
validator = DataIntegrityValidator()
# Define US equity calendar
us_calendar = MarketCalendar(
market="US",
timezone="America/New_York",
holidays=[
date(2024, 1, 1), date(2024, 1, 15), date(2024, 2, 19),
date(2024, 3, 29), date(2024, 5, 27), date(2024, 6, 19),
date(2024, 7, 4), date(2024, 9, 2), date(2024, 11, 28),
date(2024, 12, 25)
]
)
# Run validation
report = validator.validate(
symbol="AAPL.US",
start_date=date(2024, 1, 1),
end_date=date(2024, 3, 31),
calendar=us_calendar,
interval="1d"
)
print(validator.generate_report_text(report))
Module 5: Integrating Validation into the Data Pipeline
A validation framework that runs once and produces a report is useful. A validation framework that runs automatically, alerts on failures, and blocks downstream consumption of bad data is essential.
Pipeline Architecture
[TickDB API]
↓
[Fetch Module] ← (with retry, backoff, timeout)
↓
[Integrity Validator] ← (runs before data enters storage)
↓
┌───┴───┐
↓ ↓
[PASS] [FAIL]
↓ ↓
[Store] [Alert + Block]
↓
[Slack / PagerDuty / Email]
Blocking Integration
def fetch_and_validate(
symbol: str,
start_date: date,
end_date: date,
calendar: MarketCalendar,
interval: str = "1d"
) -> List[Dict]:
"""
Fetch data from TickDB and validate before storage.
Raises ValueError if validation fails.
"""
validator = DataIntegrityValidator()
report = validator.validate(
symbol=symbol,
start_date=start_date,
end_date=end_date,
calendar=calendar,
interval=interval
)
if not report.is_valid:
# Generate detailed alert
alert_message = validator.generate_report_text(report)
# In production, send to your alerting system:
# send_slack_alert(f"Data integrity failure for {symbol}:\n{alert_message}")
# send_pagerduty_alert(..., severity="warning")
raise ValueError(
f"Data integrity validation failed for {symbol}. "
f"Blocking storage. Report:\n{alert_message}"
)
# If validation passed, fetch and return data
return validator._fetch_kline(symbol, start_date, end_date, interval)
Alerting Integration (Production Template)
import logging
from typing import Callable
logger = logging.getLogger(__name__)
def setup_validation_alerts(
on_failure: Callable[[IntegrityReport], None]
) -> None:
"""
Register a callback to be invoked when validation fails.
Args:
on_failure: Function that receives the IntegrityReport on failure
"""
# In production, this would integrate with your monitoring system
# Example: Prometheus gauge, Datadog event, PagerDuty incident
def wrapped_validator(symbol, start_date, end_date, calendar, interval):
validator = DataIntegrityValidator()
report = validator.validate(symbol, start_date, end_date, calendar, interval)
if not report.is_valid:
logger.error(
f"Data integrity validation failed for {symbol}",
extra={
"symbol": symbol,
"calendar_gaps": report.calendar_gaps,
"anomaly_dates": [
d for d, r in report.bar_count_anomalies.items()
if r["status"] == "ANOMALY"
],
"report": validator.generate_report_text(report)
}
)
on_failure(report)
return report
return wrapped_validator
Module 6: Comparison — Validation Approaches
| Approach | Pros | Cons | Best for |
|---|---|---|---|
| Manual inspection | Simple, no setup | Not scalable, human error | One-time audits |
| Post-fetch checks (what we built) | Full automation, blocking | Requires engineering time upfront | Production pipelines |
| Database-level triggers | Catches issues at storage layer | Late detection, complex SQL | Enterprise data warehouses |
| Third-party monitoring (e.g., Great Expectations) | Battle-tested, declarative | Additional dependency, less market-data-aware | General data teams |
| TickDB built-in validation (where available) | Zero engineering overhead | Limited to what the API validates | Initial data qualification |
Our framework occupies the "post-fetch, market-data-aware" sweet spot: it runs in the application layer, understands trading calendars and bar structures, and blocks bad data before it reaches storage.
Module 7: Deployment Guide by User Segment
Individual Quant Developer
- Use case: Validating personal historical backtests before strategy deployment
- Recommended approach: Run the validator on each new dataset before backtesting
- Integration: Add
fetch_and_validate()wrapper to your existing data fetching script - Alerting: Log to file, review weekly
# Quick validation one-liner
report = DataIntegrityValidator().validate(
symbol="AAPL.US",
start_date=date(2023, 1, 1),
end_date=date(2024, 12, 31),
calendar=us_calendar,
interval="1d"
)
print(f"Valid: {report.is_valid}")
Quant Team / Small Fund
- Use case: Shared data pipeline across multiple strategies
- Recommended approach: Deploy validator as a shared library; integrate with CI/CD pipeline
- Integration: Git pre-commit hook or CI pipeline step that runs validation on new data
- Alerting: Slack channel for the data team
Institutional Data Infrastructure
- Use case: Multi-market, multi-asset data ingestion at scale
- Recommended approach: Deploy as a microservice with Prometheus metrics
- Integration: Kafka or RabbitMQ queue for async validation; block on failure
- Alerting: PagerDuty integration with severity tiers based on gap severity
Closing
The silent data gap does not announce itself. It sits in your dataset, distorting your Sharpe ratios, corrupting your features, and creating look-ahead bias that looks like skill. The junior engineer's audit was a lucky catch — a human noticing what an automated system should have flagged months earlier.
The framework we built in this article turns that audit into an automated gate: calendar comparison catches missing trading days, bar count validation catches partial sessions, and timestamp continuity detection catches the overlaps and duplicates that break sequence-dependent features.
Data quality is not a one-time checkbox. It is a pipeline concern that must be baked into every data fetch, every storage write, and every backtest run.
If you are an individual quant developer, run the validation once on your current dataset. The 15 minutes it takes might reveal that your strategy's performance metrics need recalculation.
If you want to build this into a shared data infrastructure, the DataIntegrityValidator class is ready for integration into your existing pipeline. Pair it with alerting (Slack, PagerDuty) to ensure the right team sees failures before bad data propagates.
If you need long-horizon historical OHLCV data for cross-cycle strategy validation, TickDB provides 10+ years of cleaned, aligned US equity daily bars via its /v1/market/kline endpoint — validated data that reduces the surface area for exactly the gaps we discussed in this article.
Next Steps
If you want to run this validation yourself:
- Sign up at tickdb.ai (free, no credit card required)
- Generate an API key in the dashboard
- Set the
TICKDB_API_KEYenvironment variable - Copy the
DataIntegrityValidatorclass from this article into your data pipeline
If you are building a shared data infrastructure for a team:
Reach out to enterprise@tickdb.ai for institutional plans that include dedicated support, SLA-backed data delivery, and direct integration with data governance tooling.
If you use AI coding assistants:
Search for and install the tickdb-market-data SKILL in your AI tool's marketplace to get TickDB API integration scaffolding auto-generated in your conversations.
This article does not constitute investment advice. Markets involve risk; past performance does not guarantee future results.