The backtest looked perfect. Sharpe ratio of 1.82. Maximum drawdown of −6.3%. Win rate at 61%. You spent three weeks building the mean-reversion strategy, another week fine-tuning the entry thresholds, and two more nights running Monte Carlo simulations to validate robustness. Then you go live—and the strategy bleeds money for six weeks straight.

The culprit is almost always the same: data.

Not the strategy logic. Not the execution slippage model. The data. You trained your entire hypothesis on five years of daily bars when the actual market regime shifts require a decade of minute-level granularity to capture. Or you have the data, but it contains gaps—weekend fills in a supposed "24/7 crypto feed," duplicate timestamps, misaligned OHLCV fields during market open chaos.

This article dissects the engineering challenge of building a production-grade historical data acquisition pipeline. It covers pagination design, rate limit resilience, checkpoint-based resume, and local caching strategies. All code examples use production patterns: exponential backoff with jitter, proper timeout handling, environment-variable authentication, and error codes that match real-world API behavior.


The Scale of the Problem

Ten years of minute-level US equity data is not a trivial dataset. Consider what "10 years of minute bars" actually means:

Metric Calculation Result
Trading days per year 252
Minutes per trading day 390 (9:30–16:00 ET)
Total minutes per year 252 × 390 98,280
Total minutes for 10 years 98,280 × 10 982,800
Bytes per OHLCV record (JSON) ~150 bytes
Raw data size (single symbol) 982,800 × 150 ~148 MB
Symbols in S&P 500 500
Full S&P 500 dataset 148 MB × 500 ~74 GB
With compression (Parquet) ~74 GB × 0.2 ~15 GB

The raw dataset for a single large-cap stock is manageable. The full S&P 500 universe across a decade is 15 GB of compressed data—roughly 74 GB uncompressed. That is before you account for corporate actions adjustments, gap-filling for halts, and the deduplication required when multiple data sources contribute to the same symbol.

Memory constraints alone eliminate the possibility of fetching everything in a single API call. A well-designed pipeline must chunk the data, persist progress, and resume from failure points.


Architecture: The Three-Layer Fetching Pipeline

A robust data acquisition system operates across three distinct layers:

Layer 1: Request orchestration. This layer manages the overall job—symbol list, time range, date slicing. It decides what to fetch next based on checkpoint state.

Layer 2: Chunk fetching. This layer handles individual API calls. It implements pagination, rate limit backoff, timeout management, and error classification.

Layer 3: Persistence. This layer writes fetched data to local storage (SQLite, Parquet, or a time-series database), updates checkpoint state, and validates data integrity.

The critical insight is that Layer 1 and Layer 2 must be completely decoupled. If you couple them—embedding persistence logic inside the fetch loop—you create a failure mode where a corrupted write destroys the fetch state and forces a full restart.


Pagination: The Fundamental Challenge

Minute-level data APIs rarely return unlimited records in a single response. TickDB's /v1/market/kline endpoint, for example, supports configurable pagination via the limit and start_time / end_time parameters. A typical design pattern involves fetching in daily or weekly chunks to balance memory usage against API call volume.

The Cursor-Based Approach

For time-series data, cursor-based pagination using timestamps is more reliable than offset-based pagination. Offset pagination breaks when data gaps exist (a trading halt produces missing minutes, shifting all subsequent offsets).

import os
import time
import requests
from datetime import datetime, timedelta

TICKDB_API_KEY = os.environ.get("TICKDB_API_KEY")
BASE_URL = "https://api.tickdb.ai/v1/market/kline"

def fetch_kline_chunk(symbol: str, start_ts: int, end_ts: int, limit: int = 1000) -> list:
    """
    Fetch a single page of kline data for a symbol.
    Uses timestamp-based pagination to handle gaps correctly.
    
    Args:
        symbol: Exchange symbol, e.g., "AAPL.US"
        start_ts: Start timestamp in milliseconds (inclusive)
        end_ts: End timestamp in milliseconds (exclusive)
        limit: Number of records per page (max 1000 for kline endpoint)
    
    Returns:
        List of OHLCV records
    """
    headers = {"X-API-Key": TICKDB_API_KEY}
    params = {
        "symbol": symbol,
        "interval": "1m",
        "start_time": start_ts,
        "end_time": end_ts,
        "limit": limit
    }
    
    # ⚠️ Always use timeouts. Unbounded requests in production will hang indefinitely.
    response = requests.get(
        BASE_URL,
        headers=headers,
        params=params,
        timeout=(3.05, 10)  # (connect_timeout, read_timeout)
    )
    
    if response.status_code == 200:
        data = response.json()
        if data.get("code") == 0:
            return data.get("data", [])
        else:
            raise RuntimeError(f"API error {data.get('code')}: {data.get('message')}")
    
    raise RuntimeError(f"HTTP error {response.status_code}: {response.text}")

Chunk Sizing Strategy

The optimal chunk size depends on three factors:

  1. API limit per request. If the API caps at 1,000 records per call, chunk sizes above 1,000 generate extra calls that return partial data.
  2. Rate limit tolerance. Smaller chunks mean more API calls. More API calls mean higher rate limit pressure and slower overall throughput.
  3. Timestamp granularity. For minute-level data, daily chunks (390 records per chunk) are manageable—roughly 3–5 calls per symbol per year. Weekly chunks (1,950 records) reduce call count but increase the blast radius of a failed chunk.

A practical starting point: daily chunks for single-symbol backtests; weekly chunks for universe-wide batch fetches where rate limit efficiency matters more than per-symbol granularity.


Rate Limit Resilience: Beyond Simple Retry

Rate limit handling is not merely "retry on failure." A naive retry loop with fixed delays creates two failure modes:

  1. Thundering herd: When a rate limit resets, dozens of concurrent clients retry simultaneously, overwhelming the API and triggering another rate limit.
  2. Progress starvation: Fixed delays are too conservative during low-traffic periods and too aggressive during high-traffic windows.

The production-grade solution combines exponential backoff with jitter—and crucially, it respects the Retry-After header when the API explicitly communicates a reset time.

Exponential Backoff with Jitter

import random
import time

def fetch_with_backoff(symbol: str, start_ts: int, end_ts: int, max_retries: int = 5) -> list:
    """
    Fetch kline data with exponential backoff and jitter.
    Respects Retry-After headers from the API.
    """
    base_delay = 1.0
    max_delay = 60.0
    
    for attempt in range(max_retries):
        try:
            response = fetch_kline_chunk(symbol, start_ts, end_ts)
            return response
        
        except RuntimeError as e:
            error_msg = str(e)
            
            # Handle rate limiting (code 3001 from TickDB)
            if "3001" in error_msg or response.status_code == 429:
                # Extract Retry-After from headers if available
                retry_after = int(response.headers.get("Retry-After", 0))
                
                if retry_after > 0:
                    # API explicitly tells us when to retry—respect it
                    delay = retry_after + random.uniform(0, 0.5)  # Small jitter to avoid collision
                else:
                    # Fall back to exponential backoff with jitter
                    delay = min(base_delay * (2 ** attempt), max_delay)
                    delay += random.uniform(0, delay * 0.1)  # 10% jitter
                
                print(f"Rate limited. Attempt {attempt + 1}/{max_retries}. "
                      f"Waiting {delay:.1f}s before retry...")
                time.sleep(delay)
                continue
            
            # For other errors, retry but with shorter backoff
            if attempt < max_retries - 1:
                delay = min(base_delay * (2 ** attempt), max_delay) * 0.5
                delay += random.uniform(0, delay * 0.1)
                time.sleep(delay)
                continue
            
            # Final attempt failed—raise the error
            raise
    
    raise RuntimeError(f"Failed after {max_retries} retries")

The jitter calculation deserves special attention. A uniformly distributed jitter in the range [0, delay * 0.1] prevents the thundering herd problem without introducing excessive randomness. The client is always within 10% of the intended delay, ensuring predictable behavior while still spreading retries across time.

Handling Specific Error Codes

TickDB's API returns structured error codes that enable precise handling:

Code Meaning Handling strategy
1001 / 1002 Invalid or missing API key Fail immediately; do not retry
2002 Symbol not found Skip this symbol; log for manual review
3001 Rate limit exceeded Retry with backoff; read Retry-After header
5000–5999 Server-side error Retry with exponential backoff

For error code handling that integrates directly into your data pipeline, use a structured error classifier:

from dataclasses import dataclass
from typing import Optional

@dataclass
class ApiError:
    code: int
    message: str
    retryable: bool
    retry_after: Optional[int] = None

def classify_error(response_json: dict, headers: dict) -> ApiError:
    """
    Classify a TickDB API error into retryable and non-retryable categories.
    """
    code = response_json.get("code", 0)
    message = response_json.get("message", "")
    
    if code == 0:
        return ApiError(code=0, message="Success", retryable=False)
    
    if code in (1001, 1002):
        return ApiError(code=code, message=message, retryable=False)
    
    if code == 3001:
        retry_after = int(headers.get("Retry-After", 5))
        return ApiError(code=code, message=message, retryable=True, retry_after=retry_after)
    
    if 2000 <= code < 3000:
        # Symbol or parameter errors—not retryable
        return ApiError(code=code, message=message, retryable=False)
    
    # Server errors (5000-5999) are retryable
    return ApiError(code=code, message=message, retryable=True)

Checkpoint System: The Resume Capability

Without a checkpoint system, a failure during a 74 GB universe fetch forces a full restart. For a decade of data across 500 symbols, a restart costs hours of redundant API calls and bandwidth.

The checkpoint system persists three pieces of state:

  1. Symbol completion state. Which symbols have been fully fetched?
  2. Chunk cursor per symbol. The last successfully fetched chunk's end timestamp.
  3. Verification metadata. Record count, byte count, or checksum to detect corruption.

SQLite Checkpoint Schema

import sqlite3
from datetime import datetime
from pathlib import Path

def init_checkpoint_db(db_path: str = "fetch_checkpoints.db") -> sqlite3.Connection:
    """
    Initialize the checkpoint database for tracking fetch progress.
    """
    conn = sqlite3.connect(db_path)
    cursor = conn.cursor()
    
    cursor.execute("""
        CREATE TABLE IF NOT EXISTS symbol_checkpoints (
            symbol TEXT PRIMARY KEY,
            last_fetched_end_ts INTEGER,  -- milliseconds
            total_records INTEGER DEFAULT 0,
            last_updated TEXT,
            status TEXT DEFAULT 'in_progress'
        )
    """)
    
    cursor.execute("""
        CREATE TABLE IF NOT EXISTS fetch_log (
            id INTEGER PRIMARY KEY AUTOINCREMENT,
            symbol TEXT,
            start_ts INTEGER,
            end_ts INTEGER,
            record_count INTEGER,
            fetched_at TEXT,
            status TEXT
        )
    """)
    
    conn.commit()
    return conn

def save_checkpoint(conn: sqlite3.Connection, symbol: str, end_ts: int, 
                    record_count: int, status: str = "in_progress"):
    """
    Save a checkpoint after successfully fetching a chunk.
    """
    cursor = conn.cursor()
    cursor.execute("""
        INSERT INTO symbol_checkpoints (symbol, last_fetched_end_ts, total_records, 
                                        last_updated, status)
        VALUES (?, ?, ?, ?, ?)
        ON CONFLICT(symbol) DO UPDATE SET
            last_fetched_end_ts = excluded.last_fetched_end_ts,
            total_records = total_records + excluded.total_records,
            last_updated = excluded.last_updated,
            status = excluded.status
    """, (symbol, end_ts, record_count, datetime.utcnow().isoformat(), status))
    conn.commit()

def load_checkpoint(conn: sqlite3.Connection, symbol: str) -> tuple[int, str]:
    """
    Load the last checkpoint for a symbol.
    Returns (last_fetched_end_ts, status).
    If no checkpoint exists, returns (0, 'not_started').
    """
    cursor = conn.cursor()
    cursor.execute("""
        SELECT last_fetched_end_ts, status 
        FROM symbol_checkpoints 
        WHERE symbol = ?
    """, (symbol,))
    
    result = cursor.fetchone()
    if result:
        return (result[0], result[1])
    return (0, "not_started")

Fetch Loop with Checkpoint Integration

def fetch_symbol_full_history(symbol: str, start_date: datetime, end_date: datetime,
                              conn: sqlite3.Connection, chunk_days: int = 1) -> dict:
    """
    Fetch full historical minute-level data for a symbol with checkpoint resume.
    
    Args:
        symbol: Exchange symbol (e.g., "AAPL.US")
        start_date: Start of the historical range
        end_date: End of the historical range
        conn: SQLite checkpoint database connection
        chunk_days: Number of days per chunk (1 = daily chunks)
    
    Returns:
        Dict with fetch statistics
    """
    # Load checkpoint to determine resume point
    last_ts, status = load_checkpoint(conn, symbol)
    
    if status == "completed":
        print(f"Skipping {symbol}: already fully fetched")
        return {"status": "skipped", "symbol": symbol}
    
    # Convert checkpoint timestamp to start of next chunk
    if last_ts > 0:
        current_start = datetime.fromtimestamp(last_ts / 1000, tz=datetime.timezone.utc)
    else:
        current_start = start_date
    
    total_records = 0
    chunk_count = 0
    
    # Iterate through chunks, starting from checkpoint
    while current_start < end_date:
        chunk_end = current_start + timedelta(days=chunk_days)
        if chunk_end > end_date:
            chunk_end = end_date
        
        start_ts = int(current_start.timestamp() * 1000)
        end_ts = int(chunk_end.timestamp() * 1000)
        
        try:
            records = fetch_with_backoff(symbol, start_ts, end_ts)
            
            # Write to local storage
            write_chunk_to_parquet(symbol, records, current_start, chunk_end)
            
            # Save checkpoint
            save_checkpoint(conn, symbol, end_ts, len(records))
            
            total_records += len(records)
            chunk_count += 1
            
            if chunk_count % 100 == 0:
                print(f"Progress: {symbol} — {chunk_count} chunks, "
                      f"{total_records} records, last chunk end: {chunk_end.date()}")
        
        except Exception as e:
            print(f"Failed to fetch {symbol} chunk {current_start.date()} to {chunk_end.date()}: {e}")
            # Checkpoint is preserved; next run resumes from this point
            raise
    
    # Mark as completed
    save_checkpoint(conn, symbol, end_ts, 0, status="completed")
    
    return {
        "status": "completed",
        "symbol": symbol,
        "total_chunks": chunk_count,
        "total_records": total_records
    }

Local Caching: From Raw API to Analysis-Ready Format

Fetching raw JSON from an API is only the first step. For backtesting workloads, data must be stored in a format optimized for reading patterns.

Three storage formats dominate:

Format Read speed Compression Schema enforcement Use case
CSV Fast for full scans Poor None Simple pipelines, small datasets
Parquet Fast for columnar reads Excellent (70–80%) Optional (via schema) Production backtesting
SQLite Good for range queries Moderate Strict Multi-symbol queries, checkpointing

For a 10-year minute-level US stock dataset, Parquet is the recommended format. The columnar layout matches the backtesting access pattern (load all close values for a date range), and Parquet's predicate pushdown eliminates the need to decompress irrelevant columns.

Parquet Writer with Schema Validation

import pyarrow as pa
import pyarrow.parquet as pq
from pathlib import Path
from datetime import datetime

# Define the schema once—enforce it on every write
KLINE_SCHEMA = pa.schema([
    ("symbol", pa.string()),
    ("timestamp", pa.int64()),      # milliseconds since epoch
    ("open", pa.float64()),
    ("high", pa.float64()),
    ("low", pa.float64()),
    ("close", pa.float64()),
    ("volume", pa.float64()),
    ("turnover", pa.float64()),
])

def write_chunk_to_parquet(symbol: str, records: list, start_date: datetime, 
                           end_date: datetime, output_dir: str = "data/kline"):
    """
    Write a chunk of kline data to a Parquet file partitioned by year-month.
    """
    if not records:
        return
    
    # Build PyArrow table from records
    table_data = {
        "symbol": [symbol] * len(records),
        "timestamp": [r["t"] for r in records],
        "open": [r["o"] for r in records],
        "high": [r["h"] for r in records],
        "low": [r["l"] for r in records],
        "close": [r["c"] for r in records],
        "volume": [r["v"] for r in records],
        "turnover": [r.get("q", 0.0) for r in records],
    }
    
    table = pa.Table.from_pydict(table_data, schema=KLINE_SCHEMA)
    
    # Partition by year-month for efficient range queries
    year_month = start_date.strftime("%Y-%m")
    partition_dir = Path(output_dir) / f"symbol={symbol}" / year_month
    partition_dir.mkdir(parents=True, exist_ok=True)
    
    output_path = partition_dir / f"{symbol}_{start_date.strftime('%Y%m%d')}.parquet"
    
    # Write with compression
    pq.write_table(
        table,
        output_path,
        compression="zstd",  # zstd offers better compression than snappy at similar speed
        use_dictionary=True,  # Reduces storage for repeated symbols
    )
    
    print(f"Wrote {len(records)} records to {output_path} ({output_path.stat().st_size / 1024:.1f} KB)")

Reading Data for Backtesting

import pyarrow.parquet as pq
import pyarrow.dataset as ds
from datetime import datetime

def load_backtest_data(symbol: str, start_date: datetime, 
                       end_date: datetime, data_dir: str = "data/kline") -> pa.Table:
    """
    Load minute-level data for a single symbol over a date range.
    Uses PyArrow dataset API for predicate pushdown on partitions.
    """
    dataset = ds.dataset(
        data_dir,
        format="parquet",
        partitioning="hive"  # Expects partition columns as directory names
    )
    
    # Filter on partition columns (year-month) first—predicate pushdown eliminates irrelevant files
    # Then filter on timestamp range
    start_ts = int(start_date.timestamp() * 1000)
    end_ts = int(end_date.timestamp() * 1000)
    
    table = dataset.to_table(
        filter=(
            (ds.field("symbol") == symbol) &
            (ds.field("timestamp") >= start_ts) &
            (ds.field("timestamp") < end_ts)
        )
    )
    
    return table

Parallel Fetching: Throughput Optimization

Sequential fetching for 500 symbols over 10 years takes weeks. Production pipelines parallelize the work—but parallelism introduces shared state conflicts and rate limit amplification.

Three approaches to parallel fetching:

Approach 1: Per-symbol workers. Each worker handles one symbol end-to-end. Simple, but underutilizes capacity for symbols with different data volumes.

Approach 2: Shared worker pool. A pool of N workers pulls from a shared queue of chunks. Better load balancing, but requires a distributed checkpoint store (Redis, PostgreSQL) for multi-process coordination.

Approach 3: Hierarchical scheduling. A scheduler dispatches symbol-level jobs. Each job internally uses sequential chunk fetching. Worker processes pull symbol-level jobs from a queue. Best for multi-machine deployments.

For single-machine pipelines, Approach 2 with a concurrent.futures.ThreadPoolExecutor is the pragmatic choice:

from concurrent.futures import ThreadPoolExecutor, as_completed
from datetime import datetime

def fetch_universe_parallel(symbols: list, start_date: datetime, end_date: datetime,
                            max_workers: int = 4, max_retries: int = 3) -> dict:
    """
    Fetch historical data for multiple symbols in parallel.
    
    Args:
        symbols: List of exchange symbols (e.g., ["AAPL.US", "MSFT.US"])
        start_date: Start of historical range
        end_date: End of historical range
        max_workers: Number of parallel workers (keep low to respect rate limits)
        max_retries: Max retries per symbol
    """
    conn = init_checkpoint_db()
    results = {"completed": [], "failed": [], "skipped": []}
    
    # ⚠️ max_workers=4 is deliberately conservative. TickDB's free tier 
    # rate limits apply per API key, not per connection.
    # Benchmark your specific rate limit and adjust accordingly.
    with ThreadPoolExecutor(max_workers=max_workers) as executor:
        futures = {
            executor.submit(
                fetch_symbol_with_retry, 
                symbol, start_date, end_date, conn, max_retries
            ): symbol 
            for symbol in symbols
        }
        
        for future in as_completed(futures):
            symbol = futures[future]
            try:
                result = future.result()
                results[result["status"]].append(symbol)
                print(f"Completed {symbol}: {result}")
            except Exception as e:
                results["failed"].append({"symbol": symbol, "error": str(e)})
                print(f"Failed {symbol}: {e}")
    
    conn.close()
    return results

def fetch_symbol_with_retry(symbol: str, start_date: datetime, end_date: datetime,
                             conn: sqlite3.Connection, max_retries: int) -> dict:
    """
    Wrapper that retries symbol fetch with exponential backoff on transient failures.
    """
    for attempt in range(max_retries):
        try:
            return fetch_symbol_full_history(symbol, start_date, end_date, conn)
        except Exception as e:
            if attempt < max_retries - 1:
                wait = (2 ** attempt) + random.uniform(0, 0.5)
                print(f"Retry {attempt + 1}/{max_retries} for {symbol} after {wait:.1f}s: {e}")
                time.sleep(wait)
            else:
                raise

Data Quality Verification

Fetching data is only half the battle. A corrupted fetch—partial chunk, duplicate records, misaligned timestamps—corrupts the backtest and produces false results. Implement a verification layer that runs after each chunk fetch.

def verify_chunk_integrity(symbol: str, records: list, expected_interval_ms: int = 60000) -> bool:
    """
    Verify a chunk of kline data for common integrity issues.
    
    Checks:
    1. No duplicate timestamps
    2. Consecutive timestamps differ by exactly 1 minute (accounting for gaps)
    3. OHLCV values are non-negative
    4. High >= Open, Close, Low; Low <= Open, Close, High
    """
    if not records:
        return True
    
    timestamps = [r["t"] for r in records]
    
    # Check for duplicates
    if len(timestamps) != len(set(timestamps)):
        print(f"WARNING: Duplicate timestamps detected for {symbol}")
        return False
    
    # Check for negative values
    for r in records:
        if r["o"] < 0 or r["h"] < 0 or r["l"] < 0 or r["c"] < 0 or r["v"] < 0:
            print(f"WARNING: Negative value detected for {symbol} at {r['t']}")
            return False
    
    # Check OHLC relationship
    for r in records:
        if not (r["h"] >= r["o"] and r["h"] >= r["c"] and r["l"] <= r["o"] and r["l"] <= r["c"]):
            print(f"WARNING: Invalid OHLC relationship for {symbol} at {r['t']}")
            return False
    
    # Check timestamp monotonicity (with allowance for non-trading gaps)
    for i in range(1, len(timestamps)):
        diff = timestamps[i] - timestamps[i - 1]
        # Accept 1-minute intervals and standard trading gaps (overnight, weekends)
        if diff <= 0:
            print(f"WARNING: Non-monotonic timestamp for {symbol} at index {i}")
            return False
    
    return True

Summary: Production Pipeline Checklist

A production-grade minute-level data acquisition pipeline requires:

Component Requirement Implementation
Pagination Timestamp-based, not offset-based start_time / end_time parameters
Rate limiting Exponential backoff with jitter + Retry-After header handle_rate_limit() with 3001 detection
Checkpoints Per-symbol resume state in SQLite symbol_checkpoints table with last_fetched_end_ts
Storage Columnar format for fast reading Parquet with year-month partitioning
Verification Post-fetch integrity checks Duplicate detection, OHLC validation, timestamp monotonicity
Error classification Structured error handling classify_error() with retryable / non-retryable split
Timeout Every HTTP request bounded timeout=(3.05, 10) on all requests.get() calls
Authentication Environment variable, never hardcoded os.environ.get("TICKDB_API_KEY")

The full source code for this pipeline—including the batch orchestration layer for universe-wide fetches—is available as a reference implementation in the TickDB documentation repository.


Next Steps

If you're building your first backtesting pipeline, start with a single symbol and daily chunks. Validate data quality before scaling to the full universe. A single corrupt month of data can invalidate months of strategy development.

If you're migrating from a polling-based data source, the WebSocket push model offers real-time alternatives for live trading. Your backtesting pipeline can continue using the REST API for historical data while the live execution layer switches to WebSocket subscriptions.

If you need 10+ years of cleaned, aligned OHLCV data for cross-cycle backtesting, TickDB provides historical minute-level data for US equities via the /v1/market/kline endpoint. The free tier supports development and small-scale backtesting. Institutional plans offer expanded rate limits and multi-symbol batch endpoints.

If you're using AI-assisted coding tools, search for and install the tickdb-market-data SKILL on ClawHub to access TickDB API integration directly within your preferred development environment.


This article does not constitute investment advice. Markets involve risk; past performance does not guarantee future results. Data acquisition strategies described here are engineering techniques—their application to trading decisions requires independent validation.