The backtest looked perfect. Sharpe ratio of 1.82. Maximum drawdown of −6.3%. Win rate at 61%. You spent three weeks building the mean-reversion strategy, another week fine-tuning the entry thresholds, and two more nights running Monte Carlo simulations to validate robustness. Then you go live—and the strategy bleeds money for six weeks straight.
The culprit is almost always the same: data.
Not the strategy logic. Not the execution slippage model. The data. You trained your entire hypothesis on five years of daily bars when the actual market regime shifts require a decade of minute-level granularity to capture. Or you have the data, but it contains gaps—weekend fills in a supposed "24/7 crypto feed," duplicate timestamps, misaligned OHLCV fields during market open chaos.
This article dissects the engineering challenge of building a production-grade historical data acquisition pipeline. It covers pagination design, rate limit resilience, checkpoint-based resume, and local caching strategies. All code examples use production patterns: exponential backoff with jitter, proper timeout handling, environment-variable authentication, and error codes that match real-world API behavior.
The Scale of the Problem
Ten years of minute-level US equity data is not a trivial dataset. Consider what "10 years of minute bars" actually means:
| Metric | Calculation | Result |
|---|---|---|
| Trading days per year | 252 | — |
| Minutes per trading day | 390 (9:30–16:00 ET) | — |
| Total minutes per year | 252 × 390 | 98,280 |
| Total minutes for 10 years | 98,280 × 10 | 982,800 |
| Bytes per OHLCV record (JSON) | ~150 bytes | — |
| Raw data size (single symbol) | 982,800 × 150 | ~148 MB |
| Symbols in S&P 500 | 500 | — |
| Full S&P 500 dataset | 148 MB × 500 | ~74 GB |
| With compression (Parquet) | ~74 GB × 0.2 | ~15 GB |
The raw dataset for a single large-cap stock is manageable. The full S&P 500 universe across a decade is 15 GB of compressed data—roughly 74 GB uncompressed. That is before you account for corporate actions adjustments, gap-filling for halts, and the deduplication required when multiple data sources contribute to the same symbol.
Memory constraints alone eliminate the possibility of fetching everything in a single API call. A well-designed pipeline must chunk the data, persist progress, and resume from failure points.
Architecture: The Three-Layer Fetching Pipeline
A robust data acquisition system operates across three distinct layers:
Layer 1: Request orchestration. This layer manages the overall job—symbol list, time range, date slicing. It decides what to fetch next based on checkpoint state.
Layer 2: Chunk fetching. This layer handles individual API calls. It implements pagination, rate limit backoff, timeout management, and error classification.
Layer 3: Persistence. This layer writes fetched data to local storage (SQLite, Parquet, or a time-series database), updates checkpoint state, and validates data integrity.
The critical insight is that Layer 1 and Layer 2 must be completely decoupled. If you couple them—embedding persistence logic inside the fetch loop—you create a failure mode where a corrupted write destroys the fetch state and forces a full restart.
Pagination: The Fundamental Challenge
Minute-level data APIs rarely return unlimited records in a single response. TickDB's /v1/market/kline endpoint, for example, supports configurable pagination via the limit and start_time / end_time parameters. A typical design pattern involves fetching in daily or weekly chunks to balance memory usage against API call volume.
The Cursor-Based Approach
For time-series data, cursor-based pagination using timestamps is more reliable than offset-based pagination. Offset pagination breaks when data gaps exist (a trading halt produces missing minutes, shifting all subsequent offsets).
import os
import time
import requests
from datetime import datetime, timedelta
TICKDB_API_KEY = os.environ.get("TICKDB_API_KEY")
BASE_URL = "https://api.tickdb.ai/v1/market/kline"
def fetch_kline_chunk(symbol: str, start_ts: int, end_ts: int, limit: int = 1000) -> list:
"""
Fetch a single page of kline data for a symbol.
Uses timestamp-based pagination to handle gaps correctly.
Args:
symbol: Exchange symbol, e.g., "AAPL.US"
start_ts: Start timestamp in milliseconds (inclusive)
end_ts: End timestamp in milliseconds (exclusive)
limit: Number of records per page (max 1000 for kline endpoint)
Returns:
List of OHLCV records
"""
headers = {"X-API-Key": TICKDB_API_KEY}
params = {
"symbol": symbol,
"interval": "1m",
"start_time": start_ts,
"end_time": end_ts,
"limit": limit
}
# ⚠️ Always use timeouts. Unbounded requests in production will hang indefinitely.
response = requests.get(
BASE_URL,
headers=headers,
params=params,
timeout=(3.05, 10) # (connect_timeout, read_timeout)
)
if response.status_code == 200:
data = response.json()
if data.get("code") == 0:
return data.get("data", [])
else:
raise RuntimeError(f"API error {data.get('code')}: {data.get('message')}")
raise RuntimeError(f"HTTP error {response.status_code}: {response.text}")
Chunk Sizing Strategy
The optimal chunk size depends on three factors:
- API limit per request. If the API caps at 1,000 records per call, chunk sizes above 1,000 generate extra calls that return partial data.
- Rate limit tolerance. Smaller chunks mean more API calls. More API calls mean higher rate limit pressure and slower overall throughput.
- Timestamp granularity. For minute-level data, daily chunks (390 records per chunk) are manageable—roughly 3–5 calls per symbol per year. Weekly chunks (1,950 records) reduce call count but increase the blast radius of a failed chunk.
A practical starting point: daily chunks for single-symbol backtests; weekly chunks for universe-wide batch fetches where rate limit efficiency matters more than per-symbol granularity.
Rate Limit Resilience: Beyond Simple Retry
Rate limit handling is not merely "retry on failure." A naive retry loop with fixed delays creates two failure modes:
- Thundering herd: When a rate limit resets, dozens of concurrent clients retry simultaneously, overwhelming the API and triggering another rate limit.
- Progress starvation: Fixed delays are too conservative during low-traffic periods and too aggressive during high-traffic windows.
The production-grade solution combines exponential backoff with jitter—and crucially, it respects the Retry-After header when the API explicitly communicates a reset time.
Exponential Backoff with Jitter
import random
import time
def fetch_with_backoff(symbol: str, start_ts: int, end_ts: int, max_retries: int = 5) -> list:
"""
Fetch kline data with exponential backoff and jitter.
Respects Retry-After headers from the API.
"""
base_delay = 1.0
max_delay = 60.0
for attempt in range(max_retries):
try:
response = fetch_kline_chunk(symbol, start_ts, end_ts)
return response
except RuntimeError as e:
error_msg = str(e)
# Handle rate limiting (code 3001 from TickDB)
if "3001" in error_msg or response.status_code == 429:
# Extract Retry-After from headers if available
retry_after = int(response.headers.get("Retry-After", 0))
if retry_after > 0:
# API explicitly tells us when to retry—respect it
delay = retry_after + random.uniform(0, 0.5) # Small jitter to avoid collision
else:
# Fall back to exponential backoff with jitter
delay = min(base_delay * (2 ** attempt), max_delay)
delay += random.uniform(0, delay * 0.1) # 10% jitter
print(f"Rate limited. Attempt {attempt + 1}/{max_retries}. "
f"Waiting {delay:.1f}s before retry...")
time.sleep(delay)
continue
# For other errors, retry but with shorter backoff
if attempt < max_retries - 1:
delay = min(base_delay * (2 ** attempt), max_delay) * 0.5
delay += random.uniform(0, delay * 0.1)
time.sleep(delay)
continue
# Final attempt failed—raise the error
raise
raise RuntimeError(f"Failed after {max_retries} retries")
The jitter calculation deserves special attention. A uniformly distributed jitter in the range [0, delay * 0.1] prevents the thundering herd problem without introducing excessive randomness. The client is always within 10% of the intended delay, ensuring predictable behavior while still spreading retries across time.
Handling Specific Error Codes
TickDB's API returns structured error codes that enable precise handling:
| Code | Meaning | Handling strategy |
|---|---|---|
| 1001 / 1002 | Invalid or missing API key | Fail immediately; do not retry |
| 2002 | Symbol not found | Skip this symbol; log for manual review |
| 3001 | Rate limit exceeded | Retry with backoff; read Retry-After header |
| 5000–5999 | Server-side error | Retry with exponential backoff |
For error code handling that integrates directly into your data pipeline, use a structured error classifier:
from dataclasses import dataclass
from typing import Optional
@dataclass
class ApiError:
code: int
message: str
retryable: bool
retry_after: Optional[int] = None
def classify_error(response_json: dict, headers: dict) -> ApiError:
"""
Classify a TickDB API error into retryable and non-retryable categories.
"""
code = response_json.get("code", 0)
message = response_json.get("message", "")
if code == 0:
return ApiError(code=0, message="Success", retryable=False)
if code in (1001, 1002):
return ApiError(code=code, message=message, retryable=False)
if code == 3001:
retry_after = int(headers.get("Retry-After", 5))
return ApiError(code=code, message=message, retryable=True, retry_after=retry_after)
if 2000 <= code < 3000:
# Symbol or parameter errors—not retryable
return ApiError(code=code, message=message, retryable=False)
# Server errors (5000-5999) are retryable
return ApiError(code=code, message=message, retryable=True)
Checkpoint System: The Resume Capability
Without a checkpoint system, a failure during a 74 GB universe fetch forces a full restart. For a decade of data across 500 symbols, a restart costs hours of redundant API calls and bandwidth.
The checkpoint system persists three pieces of state:
- Symbol completion state. Which symbols have been fully fetched?
- Chunk cursor per symbol. The last successfully fetched chunk's end timestamp.
- Verification metadata. Record count, byte count, or checksum to detect corruption.
SQLite Checkpoint Schema
import sqlite3
from datetime import datetime
from pathlib import Path
def init_checkpoint_db(db_path: str = "fetch_checkpoints.db") -> sqlite3.Connection:
"""
Initialize the checkpoint database for tracking fetch progress.
"""
conn = sqlite3.connect(db_path)
cursor = conn.cursor()
cursor.execute("""
CREATE TABLE IF NOT EXISTS symbol_checkpoints (
symbol TEXT PRIMARY KEY,
last_fetched_end_ts INTEGER, -- milliseconds
total_records INTEGER DEFAULT 0,
last_updated TEXT,
status TEXT DEFAULT 'in_progress'
)
""")
cursor.execute("""
CREATE TABLE IF NOT EXISTS fetch_log (
id INTEGER PRIMARY KEY AUTOINCREMENT,
symbol TEXT,
start_ts INTEGER,
end_ts INTEGER,
record_count INTEGER,
fetched_at TEXT,
status TEXT
)
""")
conn.commit()
return conn
def save_checkpoint(conn: sqlite3.Connection, symbol: str, end_ts: int,
record_count: int, status: str = "in_progress"):
"""
Save a checkpoint after successfully fetching a chunk.
"""
cursor = conn.cursor()
cursor.execute("""
INSERT INTO symbol_checkpoints (symbol, last_fetched_end_ts, total_records,
last_updated, status)
VALUES (?, ?, ?, ?, ?)
ON CONFLICT(symbol) DO UPDATE SET
last_fetched_end_ts = excluded.last_fetched_end_ts,
total_records = total_records + excluded.total_records,
last_updated = excluded.last_updated,
status = excluded.status
""", (symbol, end_ts, record_count, datetime.utcnow().isoformat(), status))
conn.commit()
def load_checkpoint(conn: sqlite3.Connection, symbol: str) -> tuple[int, str]:
"""
Load the last checkpoint for a symbol.
Returns (last_fetched_end_ts, status).
If no checkpoint exists, returns (0, 'not_started').
"""
cursor = conn.cursor()
cursor.execute("""
SELECT last_fetched_end_ts, status
FROM symbol_checkpoints
WHERE symbol = ?
""", (symbol,))
result = cursor.fetchone()
if result:
return (result[0], result[1])
return (0, "not_started")
Fetch Loop with Checkpoint Integration
def fetch_symbol_full_history(symbol: str, start_date: datetime, end_date: datetime,
conn: sqlite3.Connection, chunk_days: int = 1) -> dict:
"""
Fetch full historical minute-level data for a symbol with checkpoint resume.
Args:
symbol: Exchange symbol (e.g., "AAPL.US")
start_date: Start of the historical range
end_date: End of the historical range
conn: SQLite checkpoint database connection
chunk_days: Number of days per chunk (1 = daily chunks)
Returns:
Dict with fetch statistics
"""
# Load checkpoint to determine resume point
last_ts, status = load_checkpoint(conn, symbol)
if status == "completed":
print(f"Skipping {symbol}: already fully fetched")
return {"status": "skipped", "symbol": symbol}
# Convert checkpoint timestamp to start of next chunk
if last_ts > 0:
current_start = datetime.fromtimestamp(last_ts / 1000, tz=datetime.timezone.utc)
else:
current_start = start_date
total_records = 0
chunk_count = 0
# Iterate through chunks, starting from checkpoint
while current_start < end_date:
chunk_end = current_start + timedelta(days=chunk_days)
if chunk_end > end_date:
chunk_end = end_date
start_ts = int(current_start.timestamp() * 1000)
end_ts = int(chunk_end.timestamp() * 1000)
try:
records = fetch_with_backoff(symbol, start_ts, end_ts)
# Write to local storage
write_chunk_to_parquet(symbol, records, current_start, chunk_end)
# Save checkpoint
save_checkpoint(conn, symbol, end_ts, len(records))
total_records += len(records)
chunk_count += 1
if chunk_count % 100 == 0:
print(f"Progress: {symbol} — {chunk_count} chunks, "
f"{total_records} records, last chunk end: {chunk_end.date()}")
except Exception as e:
print(f"Failed to fetch {symbol} chunk {current_start.date()} to {chunk_end.date()}: {e}")
# Checkpoint is preserved; next run resumes from this point
raise
# Mark as completed
save_checkpoint(conn, symbol, end_ts, 0, status="completed")
return {
"status": "completed",
"symbol": symbol,
"total_chunks": chunk_count,
"total_records": total_records
}
Local Caching: From Raw API to Analysis-Ready Format
Fetching raw JSON from an API is only the first step. For backtesting workloads, data must be stored in a format optimized for reading patterns.
Three storage formats dominate:
| Format | Read speed | Compression | Schema enforcement | Use case |
|---|---|---|---|---|
| CSV | Fast for full scans | Poor | None | Simple pipelines, small datasets |
| Parquet | Fast for columnar reads | Excellent (70–80%) | Optional (via schema) | Production backtesting |
| SQLite | Good for range queries | Moderate | Strict | Multi-symbol queries, checkpointing |
For a 10-year minute-level US stock dataset, Parquet is the recommended format. The columnar layout matches the backtesting access pattern (load all close values for a date range), and Parquet's predicate pushdown eliminates the need to decompress irrelevant columns.
Parquet Writer with Schema Validation
import pyarrow as pa
import pyarrow.parquet as pq
from pathlib import Path
from datetime import datetime
# Define the schema once—enforce it on every write
KLINE_SCHEMA = pa.schema([
("symbol", pa.string()),
("timestamp", pa.int64()), # milliseconds since epoch
("open", pa.float64()),
("high", pa.float64()),
("low", pa.float64()),
("close", pa.float64()),
("volume", pa.float64()),
("turnover", pa.float64()),
])
def write_chunk_to_parquet(symbol: str, records: list, start_date: datetime,
end_date: datetime, output_dir: str = "data/kline"):
"""
Write a chunk of kline data to a Parquet file partitioned by year-month.
"""
if not records:
return
# Build PyArrow table from records
table_data = {
"symbol": [symbol] * len(records),
"timestamp": [r["t"] for r in records],
"open": [r["o"] for r in records],
"high": [r["h"] for r in records],
"low": [r["l"] for r in records],
"close": [r["c"] for r in records],
"volume": [r["v"] for r in records],
"turnover": [r.get("q", 0.0) for r in records],
}
table = pa.Table.from_pydict(table_data, schema=KLINE_SCHEMA)
# Partition by year-month for efficient range queries
year_month = start_date.strftime("%Y-%m")
partition_dir = Path(output_dir) / f"symbol={symbol}" / year_month
partition_dir.mkdir(parents=True, exist_ok=True)
output_path = partition_dir / f"{symbol}_{start_date.strftime('%Y%m%d')}.parquet"
# Write with compression
pq.write_table(
table,
output_path,
compression="zstd", # zstd offers better compression than snappy at similar speed
use_dictionary=True, # Reduces storage for repeated symbols
)
print(f"Wrote {len(records)} records to {output_path} ({output_path.stat().st_size / 1024:.1f} KB)")
Reading Data for Backtesting
import pyarrow.parquet as pq
import pyarrow.dataset as ds
from datetime import datetime
def load_backtest_data(symbol: str, start_date: datetime,
end_date: datetime, data_dir: str = "data/kline") -> pa.Table:
"""
Load minute-level data for a single symbol over a date range.
Uses PyArrow dataset API for predicate pushdown on partitions.
"""
dataset = ds.dataset(
data_dir,
format="parquet",
partitioning="hive" # Expects partition columns as directory names
)
# Filter on partition columns (year-month) first—predicate pushdown eliminates irrelevant files
# Then filter on timestamp range
start_ts = int(start_date.timestamp() * 1000)
end_ts = int(end_date.timestamp() * 1000)
table = dataset.to_table(
filter=(
(ds.field("symbol") == symbol) &
(ds.field("timestamp") >= start_ts) &
(ds.field("timestamp") < end_ts)
)
)
return table
Parallel Fetching: Throughput Optimization
Sequential fetching for 500 symbols over 10 years takes weeks. Production pipelines parallelize the work—but parallelism introduces shared state conflicts and rate limit amplification.
Three approaches to parallel fetching:
Approach 1: Per-symbol workers. Each worker handles one symbol end-to-end. Simple, but underutilizes capacity for symbols with different data volumes.
Approach 2: Shared worker pool. A pool of N workers pulls from a shared queue of chunks. Better load balancing, but requires a distributed checkpoint store (Redis, PostgreSQL) for multi-process coordination.
Approach 3: Hierarchical scheduling. A scheduler dispatches symbol-level jobs. Each job internally uses sequential chunk fetching. Worker processes pull symbol-level jobs from a queue. Best for multi-machine deployments.
For single-machine pipelines, Approach 2 with a concurrent.futures.ThreadPoolExecutor is the pragmatic choice:
from concurrent.futures import ThreadPoolExecutor, as_completed
from datetime import datetime
def fetch_universe_parallel(symbols: list, start_date: datetime, end_date: datetime,
max_workers: int = 4, max_retries: int = 3) -> dict:
"""
Fetch historical data for multiple symbols in parallel.
Args:
symbols: List of exchange symbols (e.g., ["AAPL.US", "MSFT.US"])
start_date: Start of historical range
end_date: End of historical range
max_workers: Number of parallel workers (keep low to respect rate limits)
max_retries: Max retries per symbol
"""
conn = init_checkpoint_db()
results = {"completed": [], "failed": [], "skipped": []}
# ⚠️ max_workers=4 is deliberately conservative. TickDB's free tier
# rate limits apply per API key, not per connection.
# Benchmark your specific rate limit and adjust accordingly.
with ThreadPoolExecutor(max_workers=max_workers) as executor:
futures = {
executor.submit(
fetch_symbol_with_retry,
symbol, start_date, end_date, conn, max_retries
): symbol
for symbol in symbols
}
for future in as_completed(futures):
symbol = futures[future]
try:
result = future.result()
results[result["status"]].append(symbol)
print(f"Completed {symbol}: {result}")
except Exception as e:
results["failed"].append({"symbol": symbol, "error": str(e)})
print(f"Failed {symbol}: {e}")
conn.close()
return results
def fetch_symbol_with_retry(symbol: str, start_date: datetime, end_date: datetime,
conn: sqlite3.Connection, max_retries: int) -> dict:
"""
Wrapper that retries symbol fetch with exponential backoff on transient failures.
"""
for attempt in range(max_retries):
try:
return fetch_symbol_full_history(symbol, start_date, end_date, conn)
except Exception as e:
if attempt < max_retries - 1:
wait = (2 ** attempt) + random.uniform(0, 0.5)
print(f"Retry {attempt + 1}/{max_retries} for {symbol} after {wait:.1f}s: {e}")
time.sleep(wait)
else:
raise
Data Quality Verification
Fetching data is only half the battle. A corrupted fetch—partial chunk, duplicate records, misaligned timestamps—corrupts the backtest and produces false results. Implement a verification layer that runs after each chunk fetch.
def verify_chunk_integrity(symbol: str, records: list, expected_interval_ms: int = 60000) -> bool:
"""
Verify a chunk of kline data for common integrity issues.
Checks:
1. No duplicate timestamps
2. Consecutive timestamps differ by exactly 1 minute (accounting for gaps)
3. OHLCV values are non-negative
4. High >= Open, Close, Low; Low <= Open, Close, High
"""
if not records:
return True
timestamps = [r["t"] for r in records]
# Check for duplicates
if len(timestamps) != len(set(timestamps)):
print(f"WARNING: Duplicate timestamps detected for {symbol}")
return False
# Check for negative values
for r in records:
if r["o"] < 0 or r["h"] < 0 or r["l"] < 0 or r["c"] < 0 or r["v"] < 0:
print(f"WARNING: Negative value detected for {symbol} at {r['t']}")
return False
# Check OHLC relationship
for r in records:
if not (r["h"] >= r["o"] and r["h"] >= r["c"] and r["l"] <= r["o"] and r["l"] <= r["c"]):
print(f"WARNING: Invalid OHLC relationship for {symbol} at {r['t']}")
return False
# Check timestamp monotonicity (with allowance for non-trading gaps)
for i in range(1, len(timestamps)):
diff = timestamps[i] - timestamps[i - 1]
# Accept 1-minute intervals and standard trading gaps (overnight, weekends)
if diff <= 0:
print(f"WARNING: Non-monotonic timestamp for {symbol} at index {i}")
return False
return True
Summary: Production Pipeline Checklist
A production-grade minute-level data acquisition pipeline requires:
| Component | Requirement | Implementation |
|---|---|---|
| Pagination | Timestamp-based, not offset-based | start_time / end_time parameters |
| Rate limiting | Exponential backoff with jitter + Retry-After header |
handle_rate_limit() with 3001 detection |
| Checkpoints | Per-symbol resume state in SQLite | symbol_checkpoints table with last_fetched_end_ts |
| Storage | Columnar format for fast reading | Parquet with year-month partitioning |
| Verification | Post-fetch integrity checks | Duplicate detection, OHLC validation, timestamp monotonicity |
| Error classification | Structured error handling | classify_error() with retryable / non-retryable split |
| Timeout | Every HTTP request bounded | timeout=(3.05, 10) on all requests.get() calls |
| Authentication | Environment variable, never hardcoded | os.environ.get("TICKDB_API_KEY") |
The full source code for this pipeline—including the batch orchestration layer for universe-wide fetches—is available as a reference implementation in the TickDB documentation repository.
Next Steps
If you're building your first backtesting pipeline, start with a single symbol and daily chunks. Validate data quality before scaling to the full universe. A single corrupt month of data can invalidate months of strategy development.
If you're migrating from a polling-based data source, the WebSocket push model offers real-time alternatives for live trading. Your backtesting pipeline can continue using the REST API for historical data while the live execution layer switches to WebSocket subscriptions.
If you need 10+ years of cleaned, aligned OHLCV data for cross-cycle backtesting, TickDB provides historical minute-level data for US equities via the /v1/market/kline endpoint. The free tier supports development and small-scale backtesting. Institutional plans offer expanded rate limits and multi-symbol batch endpoints.
If you're using AI-assisted coding tools, search for and install the tickdb-market-data SKILL on ClawHub to access TickDB API integration directly within your preferred development environment.
This article does not constitute investment advice. Markets involve risk; past performance does not guarantee future results. Data acquisition strategies described here are engineering techniques—their application to trading decisions requires independent validation.