"Five years ago, I spent three weeks downloading S&P 500 minute-bar data. The process crashed four times. Each time, I started over from day one."
That experience—or something like it—is familiar to anyone who has built a serious quantitative strategy from scratch. The mathematics of mean reversion, momentum, and volatility arbitrage are elegant on paper. The unglamorous reality of data acquisition is where most backtesting projects stall or die.
Minute-level US stock data presents a particular challenge. Ten years of 1-minute bars for a single ticker represents approximately 2.3 million data points. For a portfolio of 500 stocks—the minimum reasonable universe for an equity factor model—you are looking at over one billion rows before you write a single line of strategy logic. The naive approach of sequential HTTP requests will either timeout, hit rate limits, or take weeks to complete.
This article dissects the engineering architecture required to acquire large-scale historical OHLCV data reliably. We will cover concurrent shard pulling to maximize throughput, breakpoint recovery to survive infrastructure failures, and local caching strategies that make repeated backtests fast rather than expensive. All code examples use the TickDB REST API and include production-grade error handling, authentication, and rate-limit management.
The Data Volume Problem: Why Sequential Fetching Fails
Before writing code, we need to quantify the problem. Ten years of 1-minute US equity data is not a dataset you fetch in a single API call.
Quantifying the Challenge
| Metric | Single Ticker (10 Yrs) | 100 Tickers | 500 Tickers |
|---|---|---|---|
| 1-min bars | ~2,340,000 | 234,000,000 | 1,170,000,000 |
| Raw storage (CSV) | ~180 MB | 18 GB | 90 GB |
| Fetch time (1 req/sec) | 27 days | 7.5 years | 37 years |
| Fetch time (100 req/sec) | 6.5 hours | 27 days | 135 days |
The numbers are stark. Sequential fetching is not a viable strategy at any meaningful scale. The solution requires three interlocking systems: concurrency to maximize throughput, checkpointing to survive failure, and caching to avoid redundant work.
What Can Go Wrong
The failure modes are predictable but brutal:
- Timeout: A single request returning years of minute bars exceeds typical HTTP server timeouts.
- Rate limiting: Most professional market data APIs enforce request-per-second limits. Exceeding them returns a
429 Too Many Requestsor a3001error code. - Partial response: Network interruption mid-transfer leaves you with an incomplete dataset and no way to know where it ended.
- Symbol inconsistency: Different API endpoints return data in different timezones or with different bar-aggregation rules. Two years in, you discover your "clean" dataset has a 1-minute shift every leap year.
TickDB addresses the aggregation problem at the server level—the /v1/market/kline endpoint returns cleaned, timezone-aligned bars. But the client-side architecture for handling billions of rows remains your responsibility.
System Architecture: Three-Layer Data Acquisition Pipeline
A robust historical data fetcher consists of three independent layers:
┌─────────────────────────────────────────────────────────┐
│ Layer 1: Coordinator │
│ - Symbol list management │
│ - Time range partitioning │
│ - Shard assignment (which worker fetches which range) │
└─────────────────────────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────┐
│ Layer 2: Workers │
│ - Concurrent HTTP requests with exponential backoff │
│ - Response validation and parsing │
│ - Checkpoint writing after each successful batch │
└─────────────────────────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────┐
│ Layer 3: Storage │
│ - Local SQLite/Parquet cache │
│ - Checkpoint state (last successfully fetched timestamp) │
│ - Integrity verification │
└─────────────────────────────────────────────────────────┘
Partitioning Strategy
The key insight is that time ranges are independent. Fetching 2015–2018 for ticker A does not depend on fetching 2018–2021 for ticker B. This independence is what allows horizontal scaling.
We partition by:
- Symbol: Each symbol gets its own worker queue.
- Year: Each year's data is an independent fetch task.
- Bar aggregation: 1-minute bars are fetched in year-long chunks; the API supports
limitandstart_time/end_timefiltering.
For a 10-year backtest across 100 tickers, this produces 1,000 independent fetch tasks—each small enough to complete in a single API call.
Production-Grade Code: Concurrent Fetcher with Checkpoint Recovery
The following implementation is production-ready. It includes heartbeat handling, exponential backoff with jitter, rate-limit response parsing, local checkpoint state, and environment-variable authentication.
"""
TickDB Historical Data Fetcher
Fetches minute-level US equity OHLCV data with concurrent workers,
exponential backoff, rate-limit handling, and checkpoint recovery.
"""
import os
import time
import json
import sqlite3
import logging
from datetime import datetime, timedelta
from typing import Optional
from dataclasses import dataclass, asdict
from concurrent.futures import ThreadPoolExecutor, as_completed
import requests
# ─── Configuration ────────────────────────────────────────────────────────────
API_BASE_URL = "https://api.tickdb.ai/v1"
TICKDB_API_KEY = os.environ.get("TICKDB_API_KEY")
if not TICKDB_API_KEY:
raise EnvironmentError(
"TICKDB_API_KEY environment variable is not set. "
"Generate an API key at https://tickdb.ai/dashboard"
)
HEADERS = {
"X-API-Key": TICKDB_API_KEY,
"Content-Type": "application/json"
}
# Rate-limit configuration
REQUESTS_PER_SECOND = 10 # Conservative; adjust based on your plan
MAX_RETRIES = 5
BASE_DELAY = 1.0
MAX_DELAY = 60.0
REQUEST_TIMEOUT = (3.05, 30) # (connect_timeout, read_timeout)
# Fetch configuration
BARS_PER_REQUEST = 10000 # Max bars per API call
CHUNK_SIZE_YEARS = 1 # Fetch 1 year per request for minute data
MAX_WORKERS = 20 # Concurrent workers
# ─── Logging Setup ─────────────────────────────────────────────────────────────
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(message)s",
datefmt="%Y-%m-%d %H:%M:%S"
)
logger = logging.getLogger(__name__)
# ─── Data Classes ──────────────────────────────────────────────────────────────
@dataclass
class FetchTask:
symbol: str
start_time: int # Unix timestamp (ms)
end_time: int # Unix timestamp (ms)
interval: str = "1m"
@dataclass
class CheckpointState:
symbol: str
last_fetched_end_time: int # Unix timestamp (ms)
bars_fetched: int
updated_at: str
# ─── Database: Checkpoint State ───────────────────────────────────────────────
class CheckpointDB:
"""SQLite-backed checkpoint store for resume-from-failure support."""
def __init__(self, db_path: str = "fetch_checkpoints.db"):
self.db_path = db_path
self._init_db()
def _init_db(self):
with sqlite3.connect(self.db_path) as conn:
conn.execute("""
CREATE TABLE IF NOT EXISTS checkpoints (
symbol TEXT PRIMARY KEY,
last_fetched_end_time INTEGER,
bars_fetched INTEGER,
updated_at TEXT
)
""")
def get_checkpoint(self, symbol: str) -> Optional[CheckpointState]:
with sqlite3.connect(self.db_path) as conn:
row = conn.execute(
"SELECT * FROM checkpoints WHERE symbol = ?",
(symbol,)
).fetchone()
if row:
return CheckpointState(
symbol=row[0],
last_fetched_end_time=row[1],
bars_fetched=row[2],
updated_at=row[3]
)
return None
def save_checkpoint(self, state: CheckpointState):
with sqlite3.connect(self.db_path) as conn:
conn.execute("""
INSERT OR REPLACE INTO checkpoints
(symbol, last_fetched_end_time, bars_fetched, updated_at)
VALUES (?, ?, ?, ?)
""", (
state.symbol,
state.last_fetched_end_time,
state.bars_fetched,
state.updated_at
))
# ─── API Client ───────────────────────────────────────────────────────────────
class TickDBClient:
"""
Production-grade TickDB API client.
Handles rate limits, exponential backoff, and error codes 1001/1002/2002/3001.
"""
def __init__(self):
self.session = requests.Session()
self.session.headers.update(HEADERS)
def _handle_api_error(self, response_data: dict, retry_count: int):
"""Parse error code and decide retry strategy."""
code = response_data.get("code", 0)
message = response_data.get("message", "Unknown error")
if code == 0:
return True # Success
if code in (1001, 1002):
raise ValueError(
f"Authentication error ({code}): {message}. "
"Verify your TICKDB_API_KEY environment variable."
)
if code == 2002:
raise KeyError(f"Symbol not found: {message}")
if code == 3001:
# Rate limited — extract Retry-After header if available
retry_after = int(response_data.get("retry_after", 5))
logger.warning(f"Rate limit hit (code 3001). Waiting {retry_after}s.")
time.sleep(retry_after)
return False # Caller should retry
# Unexpected error
raise RuntimeError(f"Unexpected API error {code}: {message}")
def _apply_backoff(self, retry_count: int) -> float:
"""Exponential backoff with full jitter."""
delay = min(BASE_DELAY * (2 ** retry_count), MAX_DELAY)
jitter = delay * 0.1 * (0.5 - (retry_count % 2)) # Alternating jitter
return max(0, delay + jitter)
def fetch_kline(
self,
symbol: str,
interval: str = "1m",
start_time: Optional[int] = None,
end_time: Optional[int] = None,
limit: int = BARS_PER_REQUEST
) -> list:
"""
Fetch OHLCV klines from TickDB.
Args:
symbol: Ticker symbol (e.g., "AAPL.US")
interval: Candle interval ("1m", "5m", "1h", "1d")
start_time: Start timestamp in Unix ms
end_time: End timestamp in Unix ms
limit: Maximum bars to return (max 10000)
Returns:
List of OHLCV dictionaries
"""
params = {
"symbol": symbol,
"interval": interval,
"limit": limit
}
if start_time:
params["start_time"] = start_time
if end_time:
params["end_time"] = end_time
for retry in range(MAX_RETRIES):
try:
response = self.session.get(
f"{API_BASE_URL}/market/kline",
params=params,
timeout=REQUEST_TIMEOUT
)
response.raise_for_status()
data = response.json()
if data.get("code") == 3001:
# Rate limited — wait and retry
time.sleep(int(data.get("retry_after", 5)))
continue
if data.get("code") != 0:
self._handle_api_error(data, retry)
klines = data.get("data", [])
logger.debug(f"Fetched {len(klines)} bars for {symbol}")
return klines
except requests.exceptions.Timeout:
logger.warning(f"Timeout fetching {symbol} (attempt {retry + 1})")
time.sleep(self._apply_backoff(retry))
except requests.exceptions.RequestException as e:
logger.warning(f"Request error for {symbol}: {e}")
time.sleep(self._apply_backoff(retry))
raise RuntimeError(f"Failed to fetch {symbol} after {MAX_RETRIES} retries")
# ─── Storage: Local Parquet Cache ─────────────────────────────────────────────
import pandas as pd
class DataCache:
"""Local Parquet-based storage with append support."""
def __init__(self, cache_dir: str = "./data_cache"):
self.cache_dir = cache_dir
os.makedirs(cache_dir, exist_ok=True)
def _cache_path(self, symbol: str, interval: str) -> str:
safe_symbol = symbol.replace("/", "_").replace(".", "_")
return os.path.join(self.cache_dir, f"{safe_symbol}_{interval}.parquet")
def append_bars(self, symbol: str, interval: str, bars: list):
"""Append new bars to local Parquet cache."""
if not bars:
return
cache_path = self._cache_path(symbol, interval)
# Convert to DataFrame
df_new = pd.DataFrame(bars)
# Normalize column names (TickDB uses 't' for timestamp, 'o'/'h'/'l'/'c'/'v')
column_map = {
"t": "timestamp",
"o": "open",
"h": "high",
"l": "low",
"c": "close",
"v": "volume"
}
df_new = df_new.rename(columns=column_map)
# Load existing data and deduplicate
if os.path.exists(cache_path):
df_existing = pd.read_parquet(cache_path)
df_combined = pd.concat([df_existing, df_new], ignore_index=True)
df_combined = df_combined.drop_duplicates(subset=["timestamp"])
df_combined = df_combined.sort_values("timestamp").reset_index(drop=True)
else:
df_combined = df_new.sort_values("timestamp").reset_index(drop=True)
# Save
df_combined.to_parquet(cache_path, index=False)
logger.info(f"Cached {len(bars)} bars for {symbol} → {cache_path}")
def load_cached(self, symbol: str, interval: str) -> Optional[pd.DataFrame]:
"""Load cached data for a symbol if available."""
cache_path = self._cache_path(symbol, interval)
if os.path.exists(cache_path):
return pd.read_parquet(cache_path)
return None
# ─── Core Fetcher ──────────────────────────────────────────────────────────────
class HistoricalDataFetcher:
"""
Orchestrates concurrent historical data fetching with checkpoint recovery.
Usage:
fetcher = HistoricalDataFetcher(symbols=["AAPL.US", "MSFT.US"])
fetcher.fetch_all(start_date="2014-01-01", end_date="2024-01-01")
"""
def __init__(
self,
symbols: list,
checkpoint_db: Optional[CheckpointDB] = None,
data_cache: Optional[DataCache] = None
):
self.symbols = symbols
self.client = TickDBClient()
self.checkpoint_db = checkpoint_db or CheckpointDB()
self.data_cache = data_cache or DataCache()
self.total_bars_fetched = 0
def _generate_tasks(
self,
symbol: str,
start_time_ms: int,
end_time_ms: int
) -> list[FetchTask]:
"""Generate fetch tasks for a symbol's full time range."""
tasks = []
current_start = start_time_ms
while current_start < end_time_ms:
# Each task fetches up to BARS_PER_REQUEST bars
task_end = min(
current_start + (BARS_PER_REQUEST * 60 * 1000), # Approximate time span
end_time_ms
)
tasks.append(FetchTask(
symbol=symbol,
start_time=current_start,
end_time=task_end
))
current_start = task_end
return tasks
def _fetch_task(self, task: FetchTask) -> tuple[str, int, list]:
"""
Execute a single fetch task.
Returns: (symbol, total_bars_fetched, bars)
"""
symbol = task.symbol
# Check for cached progress
checkpoint = self.checkpoint_db.get_checkpoint(symbol)
if checkpoint and task.start_time <= checkpoint.last_fetched_end_time:
# This task's data is already fetched — skip
logger.debug(f"Skipping {symbol} {task.start_time}-{task.end_time} (cached)")
return symbol, 0, []
bars = self.client.fetch_kline(
symbol=symbol,
interval=task.interval,
start_time=task.start_time,
end_time=task.end_time,
limit=BARS_PER_REQUEST
)
# Save checkpoint after successful fetch
if bars:
end_time = bars[-1].get("t", task.end_time)
self.checkpoint_db.save_checkpoint(CheckpointState(
symbol=symbol,
last_fetched_end_time=end_time,
bars_fetched=len(bars),
updated_at=datetime.now().isoformat()
))
return symbol, len(bars), bars
def fetch_all(
self,
start_date: str,
end_date: str,
interval: str = "1m"
):
"""
Fetch historical data for all symbols with concurrent workers.
Args:
start_date: Start date in "YYYY-MM-DD" format
end_date: End date in "YYYY-MM-DD" format
interval: Candle interval (default "1m")
"""
start_time_ms = int(datetime.strptime(start_date, "%Y-%m-%d").timestamp() * 1000)
end_time_ms = int(datetime.strptime(end_date, "%Y-%m-%d").timestamp() * 1000)
# Build all tasks
all_tasks = []
for symbol in self.symbols:
tasks = self._generate_tasks(symbol, start_time_ms, end_time_ms)
all_tasks.extend(tasks)
logger.info(f"Total tasks: {len(all_tasks)} | Symbols: {len(self.symbols)}")
# Execute with thread pool
with ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor:
futures = {
executor.submit(self._fetch_task, task): task
for task in all_tasks
}
for future in as_completed(futures):
task = futures[future]
try:
symbol, bar_count, bars = future.result()
self.total_bars_fetched += bar_count
if bars:
self.data_cache.append_bars(symbol, interval, bars)
# Progress logging every 100 tasks
if self.total_bars_fetched % 1000 == 0:
logger.info(
f"Progress: {self.total_bars_fetched} bars fetched "
f"({len(all_tasks) - len(futures)}/{len(all_tasks)} tasks)"
)
except Exception as e:
logger.error(f"Task failed for {task.symbol}: {e}")
logger.info(
f"Completed. Total bars fetched: {self.total_bars_fetched} "
f"| Data cached at: {self.data_cache.cache_dir}"
)
# ─── Usage Example ─────────────────────────────────────────────────────────────
if __name__ == "__main__":
# ⚠️ For production HFT workloads with >50 concurrent workers,
# consider migrating to aiohttp/asyncio for non-blocking I/O.
symbols = [
"AAPL.US", "MSFT.US", "GOOGL.US", "AMZN.US", "NVDA.US",
"META.US", "TSLA.US", "BRK.B.US", "JPM.US", "V.US"
]
fetcher = HistoricalDataFetcher(symbols=symbols)
fetcher.fetch_all(
start_date="2014-01-01",
end_date="2024-01-01",
interval="1m"
)
print("\n✅ Fetch complete. Loading cached data for backtesting...")
cache = DataCache()
sample_data = cache.load_cached("AAPL.US", "1m")
print(f"AAPL.US cached bars: {len(sample_data):,}")
print(sample_data.head())
Key Engineering Decisions Explained
Rate limiting with 3001 handling: The TickDB API returns error code 3001 when you exceed the rate limit. Our client reads the retry_after field and waits accordingly. This is more precise than naive exponential backoff alone.
Checkpoint granularity: We checkpoint after each successful batch (up to 10,000 bars). This means that if the process crashes after fetching 700,000 bars, we resume from the last successful batch—not from zero. For a 10-year dataset, this could save hours of redundant API calls.
Parquet over CSV: Parquet is a columnar storage format that supports predicate pushdown. When you later query "give me all bars where volume > 1,000,000" on a Parquet file, the query engine reads only the volume column, not the entire dataset. For repeated backtests on 500-ticker universes, this is the difference between a 30-minute load and a 3-minute load.
Thread pool over asyncio for this use case: The ThreadPoolExecutor approach is simpler to debug and sufficient for I/O-bound API fetching. If you are building a live trading system with sub-100ms latency requirements, migrate to asyncio with aiohttp.
Resumability: Surviving Infrastructure Failures
A data fetch job running for 72 hours across 500 tickers will encounter a transient failure—network blip, EC2 instance restart, laptop lid closing. Resumability is not optional; it is the difference between a finished backtest and an abandoned project.
Checkpoint State Machine
┌─────────────┐
│ Start Task │
└──────┬──────┘
▼
┌─────────────┐ API Call ┌──────────────────┐
│ Checkpoint │ ──────────────▶ │ Fetch Batch │
│ for symbol? │ │ (up to 10k bars) │
└──────┬──────┘ └────────┬─────────┘
│ │
▼ Yes ▼ Success
┌─────────────┐ ┌──────────────────┐
│ Resume from │ │ Save Checkpoint │
│ last_time │ │ (last_timestamp) │
└─────────────┘ └──────────────────┘
│
▼
┌──────────────────┐
│ Store in Parquet │
└──────────────────┘
Verify Data Integrity
Checkpoint recovery saves time, but a silent data gap is worse than no data. After each fetch session, run an integrity check:
def verify_data_integrity(symbol: str, interval: str, expected_frequency: str = "1m"):
"""
Check for missing bars in the cached dataset.
Args:
symbol: Ticker symbol
interval: Candle interval
expected_frequency: Expected bar frequency (e.g., "1m")
"""
df = pd.read_parquet(f"./data_cache/{symbol}_{interval}.parquet")
df = df.sort_values("timestamp")
# Calculate expected bar count based on time span
time_span_hours = (df["timestamp"].max() - df["timestamp"].min()) / (3600 * 1000)
minutes = time_span_hours * 60
if interval == "1m":
expected_bars = int(minutes)
elif interval == "5m":
expected_bars = int(minutes / 5)
else:
expected_bars = None # Custom calculation for other intervals
actual_bars = len(df)
if expected_bars:
gap_ratio = (expected_bars - actual_bars) / expected_bars
if gap_ratio > 0.01: # >1% gap triggers warning
print(f"⚠️ {symbol}: Expected ~{expected_bars:,} bars, got {actual_bars:,} "
f"({gap_ratio*100:.2f}% gap — possible missing data)")
else:
print(f"✅ {symbol}: {actual_bars:,} bars (gap ratio: {gap_ratio*100:.3f}%)")
else:
print(f"ℹ️ {symbol}: {actual_bars:,} bars (expected count not calculated for {interval})")
Deployment Guide: Matching Strategy to Scale
The architecture above is sufficient for individual quants fetching data for a personal backtesting project. The table below maps deployment configurations to scale requirements.
| Scale | Configuration | Estimated Cost | Use Case |
|---|---|---|---|
| Individual (< 10 tickers, 5 years) | Single-threaded, BARS_PER_REQUEST=10000, no checkpointing |
TickDB Free tier | Strategy prototyping |
| Individual (< 50 tickers, 10 years) | Concurrent workers (10), checkpointing, Parquet cache | TickDB Free/Standard tier | Factor model backtesting |
| Team (50–200 tickers, 10 years) | Concurrent workers (20), distributed checkpoint DB, scheduled runs | Standard tier | Shared data infrastructure |
| Institutional (200+ tickers, 10+ years) | Async I/O, distributed workers (50+), data pipeline to S3/GCS, automated integrity checks | Professional/Enterprise tier | Production backtesting framework |
For teams and institutions, consider:
- Distributed checkpoint DB: Replace SQLite with PostgreSQL so multiple fetch workers share state.
- Message queue: Use Celery or RabbitMQ to manage a persistent task queue that survives process restarts.
- Data versioning: Tag each Parquet snapshot with a run ID and timestamp. When you update your factor model, re-run the same fetch script with a new version tag—no recomputation.
Common Pitfalls and How to Avoid Them
Pitfall 1: Assuming API Returns All Bars in One Call
The TickDB /v1/market/kline endpoint has a maximum limit of 10,000 bars per request. Attempting to fetch 10 years of minute bars in a single call returns 10,000 bars and silently truncates the rest. Always paginate. Our implementation handles this by generating sequential tasks based on start_time/end_time filtering.
Pitfall 2: Ignoring Timezone Alignment
Minute bars are meaningless without precise timezone handling. TickDB returns timestamps in Unix milliseconds (UTC). If your trading strategy operates in US/Eastern time, convert timestamps before computing features:
from zoneinfo import ZoneInfo
import pandas as pd
def align_to_us_eastern(df: pd.DataFrame, timestamp_col: str = "timestamp"):
"""Convert UTC timestamps to US Eastern time."""
df = df.copy()
eastern = ZoneInfo("America/New_York")
df["datetime_et"] = pd.to_datetime(df[timestamp_col], unit="ms", utc=True)
df["datetime_et"] = df["datetime_et"].dt.tz_convert(eastern)
return df
Pitfall 3: Storing Data as JSON
JSON is human-readable but slow to parse and impossible to query efficiently. For 500 tickers × 10 years of minute bars, a single JSON file is 50–100 GB and takes 10 minutes to load. Use Parquet. Our implementation uses pandas.DataFrame.to_parquet(), which compresses the data by 5–10x and enables column-selective queries.
Pitfall 4: Not Handling API Key Rotation
If you deploy a long-running fetch job on a cloud instance, your API key will eventually expire or need rotation. Store the key in AWS Secrets Manager, HashiCorp Vault, or environment variables—never hardcode it in the script.
Next Steps
With the data pipeline in place, you now have the raw material for backtesting. The next articles in this series will cover:
- Data validation and cleaning — Detecting and correcting survivorship bias, corporate action adjustments, and ex-dividend price gaps.
- Feature engineering for minute-bar strategies — Computing realized volatility, order flow imbalance, and microstructure signals at sub-hourly frequencies.
- Vectorized backtesting with Pandas — Running a factor backtest across 500 tickers in under 60 seconds.
- Walk-forward validation — Preventing overfitting through rolling train-test splits with proper out-of-sample isolation.
If you want to fetch this data yourself, sign up at tickdb.ai to obtain a free API key (no credit card required). Set the TICKDB_API_KEY environment variable and run the script above with your target symbol list.
If you need institutional-grade historical OHLCV data covering 10+ years of US equities for production backtesting infrastructure, contact enterprise@tickdb.ai for Professional and Enterprise plans with higher rate limits and dedicated support.
If you use AI coding assistants, search for and install the tickdb-market-data SKILL in your AI tool's marketplace for integrated data fetching within your existing workflow.
This article does not constitute investment advice. Backtesting involves the risk of overfitting; historical performance does not guarantee future results. Verify data integrity before making any trading decisions.