"Price is the effect. The order book is the cause."

Every quantitative researcher who has attempted a 10-year backtest has hit the same wall: the data exists, but getting it is the real engineering problem. A decade of 1-minute OHLCV candles for a single US equity yields roughly 3.2 million rows. For a portfolio of 50 stocks, you are processing 160 million rows — and that is before you account for weekends, partial sessions, and data anomalies that require re-fetching.

The naive approach — sequential REST calls in a loop — will either timeout, get rate-limited, or take 72+ hours. This article walks through the production-grade solution: a concurrent shard-fetcher with checkpoint resume, local caching, and rate-limit awareness, built against the TickDB REST API.


1. The Problem: Why Batch Historical Data Is Hard

Three constraints make large historical fetches genuinely difficult.

Volume. One minute of US equity trading generates roughly 390 minutes of exchange activity per symbol (9:30 AM – 4:00 PM ET, excluding pre-market and after-hours). Over a 10-year window, this compounds to millions of rows per symbol.

Rate limits. TickDB enforces a 3001 error code when you exceed the per-second request budget. The API surfaces a Retry-After header — but naive clients either ignore it or retry immediately, making the problem worse.

Partial failures. A fetch of 50,000 candles dies at row 24,671. Starting over from the beginning wastes time and burns your rate-limit budget on duplicate work. You need a resumable pipeline.

Most developers hit all three problems at once. The solution is not a single clever trick — it is a layered architecture: concurrent fetching at the top layer, paginated requests at the middle layer, and checkpointed local storage at the bottom layer.


2. TickDB Kline Endpoint: What You Need to Know First

Before writing code, understand the endpoint you are calling.

The GET /v1/market/kline endpoint returns historical OHLCV candles. Authentication is header-based:

import os

headers = {"X-API-Key": os.environ.get("TICKDB_API_KEY")}

The critical parameters:

Parameter Type Notes
symbol string Format: AAPL.US — suffix is required
interval string 1m for minute-level
start_time int Unix timestamp in milliseconds
end_time int Unix timestamp in milliseconds
limit int Max candles per call. The API does not enforce a fixed maximum, but a practical ceiling is 10,000 per call

A common mistake: using /kline for live dashboards or using /kline/latest for backtesting. /kline is the historical endpoint. /kline/latest is for current-day real-time data. Mixing them up produces empty result sets and silent bugs.


3. Base Client: Error Handling and Rate-Limit Awareness

Every production-grade data fetcher needs a client that handles errors systematically. Here is the foundational layer:

import os
import time
import random
import requests

class TickDBClient:
    """Production-grade TickDB client with rate-limit and error handling."""

    BASE_URL = "https://api.tickdb.ai/v1"

    def __init__(self, api_key=None, max_retries=5):
        self.api_key = api_key or os.environ.get("TICKDB_API_KEY")
        if not self.api_key:
            raise ValueError("TICKDB_API_KEY env var is not set")
        self.headers = {"X-API-Key": self.api_key}
        self.max_retries = max_retries

    def _handle_error(self, resp, attempt=0):
        """Standard TickDB error handler with exponential backoff."""
        if resp.status_code == 200:
            return None

        code = resp.json().get("code", 0)
        if code in (1001, 1002):
            raise ValueError("Invalid API key — check TICKDB_API_KEY")
        if code == 2002:
            raise KeyError(f"Symbol not found — verify via /v1/symbols/available")
        if code == 3001:
            retry_after = int(resp.headers.get("Retry-After", 5))
            print(f"  Rate limited. Sleeping {retry_after}s...")
            time.sleep(retry_after)
            return "retry"
        if code == 4001:
            raise ValueError(f"Invalid interval or start_time: {resp.json().get('message')}")

        # Unknown error — backoff and retry
        if attempt < self.max_retries:
            delay = min(2 ** attempt + random.uniform(0, 1), 30)
            print(f"  Error {code}. Retrying in {delay:.1f}s...")
            time.sleep(delay)
            return "retry"

        raise RuntimeError(f"Request failed after {self.max_retries} retries: {resp.text}")

    def kline(self, symbol, interval, start_time, end_time, limit=10000):
        """Fetch kline (OHLCV) data with automatic retry and rate-limit handling."""
        params = {
            "symbol": symbol,
            "interval": interval,
            "start_time": start_time,
            "end_time": end_time,
            "limit": limit,
        }

        for attempt in range(self.max_retries + 1):
            resp = requests.get(
                f"{self.BASE_URL}/market/kline",
                headers=self.headers,
                params=params,
                timeout=(3.05, 10)
            )
            action = self._handle_error(resp, attempt)
            if action is None:
                return resp.json()

The error handler differentiates between fatal errors (invalid key, bad symbol) and transient errors (rate limit, server errors). The exponential backoff with jitter prevents thundering-herd behavior — all clients retry at the same moment — which is critical when running concurrent workers.


4. Pagination: Conquering the 10-Year Window

A single API call returns at most 10,000 candles. For a 10-year window of minute data, you need roughly 320 calls per symbol. Pagination is not optional — it is the mechanism that converts a hard problem into a tractable sequence of smaller problems.

4.1 Sliding Window Pagination

The cleanest pagination strategy uses a sliding time window. You request in fixed-duration chunks, advancing the start_time based on the last candle's timestamp.

from datetime import datetime, timezone

def paginate_kline(client, symbol, interval, start_ms, end_ms, batch_limit=10000):
    """
    Paginate through historical kline data using a sliding time window.

    Yields batches of candles, automatically advancing start_time
    based on the last candle's timestamp in each response.
    """
    current_start = start_ms

    while current_start < end_ms:
        params = {
            "symbol": symbol,
            "interval": interval,
            "start_time": current_start,
            "end_time": end_ms,
            "limit": batch_limit,
        }

        resp = requests.get(
            f"{client.BASE_URL}/market/kline",
            headers=client.headers,
            params=params,
            timeout=(3.05, 10)
        )
        result = client._handle_error(resp)
        if result is None:
            result = resp.json()

        candles = result.get("data", [])
        if not candles:
            break  # No more data

        yield candles

        # Advance window past the last received candle
        last_ts = candles[-1][0]  # Timestamp is the first field in each candle
        current_start = last_ts + 1

Important: the last candle's timestamp is included in the response but should not be re-fetched. Advancing current_start by last_ts + 1 ensures the next call starts exactly where this one ended, with no overlap and no gap.

4.2 Bounded vs. Unbounded Windows

Approach Use case
Bounded window Known start and end dates — backtesting a specific period
Unbounded window Continual incremental updates — appending new daily data

For backtesting, use bounded windows. For live trading systems, use unbounded windows where end_time is int(datetime.now().timestamp() * 1000).


5. Concurrent Shard Fetching: Parallelizing the Work

A single-threaded paginator fetching 10 years of data takes days. The solution is concurrent shard fetching: divide the full time range into overlapping segments, fetch them in parallel, then merge and deduplicate the results.

import concurrent.futures
import threading
from dataclasses import dataclass

@dataclass
class FetchConfig:
    symbols: list[str]
    interval: str = "1m"
    years: int = 10
    max_workers: int = 10  # Concurrent API calls
    requests_per_second: int = 5  # Rate limit (adjust to your tier)

client = TickDBClient()
config = FetchConfig(symbols=["AAPL.US", "MSFT.US", "GOOGL.US"], interval="1m", years=10)
rate_limiter = threading.Semaphore(config.requests_per_second)

def fetch_symbol_range(symbol, start_ms, end_ms, client):
    """Fetch one symbol's full time range using pagination."""
    results = []
    with rate_limiter:
        for batch in paginate_kline(client, symbol, config.interval, start_ms, end_ms):
            results.extend(batch)
    return symbol, results

def batch_fetch_all(config):
    """
    Concurrently fetch multiple symbols.
    Each symbol is processed by a separate worker thread.
    """
    end_ms = int(datetime.now(timezone.utc).timestamp() * 1000)
    start_ms = int((datetime.now(timezone.utc) - timedelta(days=365 * config.years)).timestamp() * 1000)

    with concurrent.futures.ThreadPoolExecutor(max_workers=config.max_workers) as executor:
        futures = {
            executor.submit(fetch_symbol_range, sym, start_ms, end_ms, client): sym
            for sym in config.symbols
        }
        all_data = {}
        for future in concurrent.futures.as_completed(futures):
            symbol = futures[future]
            try:
                sym, candles = future.result()
                all_data[sym] = candles
                print(f"  {sym}: fetched {len(candles):,} candles")
            except Exception as exc:
                print(f"  {symbol} generated an exception: {exc}")
    return all_data

Critical parameter: max_workers must be set below your rate-limit ceiling. If TickDB allows 5 requests per second and you set max_workers=20, every request above the 5th will receive a 3001 error and retry, doubling your fetch time. Calibrate based on your account tier.


6. Checkpoint Resume: Surviving Failures Without Starting Over

Network interruptions, OOM kills, and instance restarts are not edge cases — they are the expected state of any long-running data pipeline. Checkpointing is the mechanism that turns a catastrophic failure into a 30-second inconvenience.

6.1 Checkpoint Data Structure

import json
import os
from dataclasses import dataclass, asdict
from datetime import datetime

@dataclass
class Checkpoint:
    symbol: str
    interval: str
    start_ms: int
    end_ms: int
    last_fetched_ts: int
    total_batches: int
    fetched_count: int
    resume_token: str = ""  # Optional: stores the pagination cursor

    def save(self, path):
        with open(path, "w") as f:
            json.dump(asdict(self), f, indent=2)
        print(f"  Checkpoint saved: {path}")

    @classmethod
    def load(cls, path):
        if not os.path.exists(path):
            return None
        with open(path, "r") as f:
            return cls(**json.load(f))

6.2 Resumable Fetch Loop

def fetch_with_checkpoint(symbol, start_ms, end_ms, interval, checkpoint_dir):
    os.makedirs(checkpoint_dir, exist_ok=True)
    cp_path = os.path.join(checkpoint_dir, f"{symbol.replace('.', '_')}.json")

    checkpoint = Checkpoint.load(cp_path)
    client = TickDBClient()

    if checkpoint:
        # Resume from last checkpoint
        current_start = checkpoint.last_fetched_ts + 1
        fetched = list(load_cached_candles(symbol, checkpoint_dir))  # Load what we already have
        total_batches = checkpoint.total_batches
        print(f"Resuming {symbol}: {len(fetched):,} candles loaded, resuming from {datetime.fromtimestamp(current_start/1000)}")
    else:
        current_start = start_ms
        fetched = []
        total_batches = 0

    for batch in paginate_kline(client, symbol, interval, current_start, end_ms):
        fetched.extend(batch)
        total_batches += 1

        # Save checkpoint every 10 batches
        if total_batches % 10 == 0:
            cp = Checkpoint(
                symbol=symbol,
                interval=interval,
                start_ms=start_ms,
                end_ms=end_ms,
                last_fetched_ts=batch[-1][0],
                total_batches=total_batches,
                fetched_count=len(fetched),
            )
            cp.save(cp_path)

    return fetched

The checkpoint saves the last_fetched_ts — the timestamp of the most recent successfully processed candle. On resume, the fetcher starts at last_fetched_ts + 1. The stored candle data is also kept in local parquet files (see Section 7), so no work is lost.


7. Local Caching: The Performance Multiplier

Fetching from the API every time you run a backtest is slow and wasteful. A local cache ensures you only pay the API cost for new data.

7.1 Parquet Storage

import pyarrow as pa
import pyarrow.parquet as pq
import pandas as pd

CACHE_DIR = "./tickdb_cache"

def candles_to_df(candles: list) -> pd.DataFrame:
    """Convert TickDB candle list to a pandas DataFrame."""
    if not candles:
        return pd.DataFrame(columns=["timestamp", "open", "high", "low", "close", "volume"])
    df = pd.DataFrame(candles, columns=["timestamp", "open", "high", "low", "close", "volume"])
    df["timestamp"] = pd.to_datetime(df["timestamp"], unit="ms", utc=True)
    return df

def cache_candles(symbol, candles, cache_dir=CACHE_DIR):
    """Append new candles to a parquet cache file."""
    os.makedirs(cache_dir, exist_ok=True)
    cache_file = os.path.join(cache_dir, f"{symbol.replace('.', '_')}.parquet")

    new_df = candles_to_df(candles)
    if new_df.empty:
        return

    if os.path.exists(cache_file):
        existing = pd.read_parquet(cache_file)
        combined = pd.concat([existing, new_df]).drop_duplicates(subset=["timestamp"]).sort_values("timestamp")
    else:
        combined = new_df

    combined.to_parquet(cache_file, index=False)
    print(f"  Cached {len(new_df)} new candles → {cache_file} ({os.path.getsize(cache_file)/1024/1024:.1f} MB)")

def load_cached_candles(symbol, cache_dir=CACHE_DIR):
    """Load all cached candles for a symbol, returning a DataFrame."""
    cache_file = os.path.join(cache_dir, f"{symbol.replace('.', '_')}.parquet")
    if not os.path.exists(cache_file):
        return pd.DataFrame()
    return pd.read_parquet(cache_file)

Parquet is the right format for this use case: columnar storage, built-in compression, and efficient filtering. A 10-year, 50-symbol parquet dataset is typically under 8 GB on disk and loads in under 30 seconds with predicate pushdown.

7.2 Incremental Fetch Strategy

def fetch_incremental(symbol, interval, years=10, cache_dir=CACHE_DIR):
    """
    Fetch only new data: load the last cached timestamp, then fetch everything
    after it. This keeps API calls to the minimum necessary.
    """
    client = TickDBClient()
    os.makedirs(cache_dir, exist_ok=True)

    # Determine the latest cached timestamp
    cached = load_cached_candles(symbol, cache_dir)
    if not cached.empty:
        last_cached_ts = int(cached["timestamp"].max().value / 1_000_000)
    else:
        last_cached_ts = int((datetime.now(timezone.utc) - timedelta(days=365 * years)).timestamp() * 1000)

    end_ms = int(datetime.now(timezone.utc).timestamp() * 1000)
    new_candles = []

    for batch in paginate_kline(client, symbol, interval, last_cached_ts + 1, end_ms):
        new_candles.extend(batch)

    if new_candles:
        cache_candles(symbol, new_candles, cache_dir)

    full_df = load_cached_candles(symbol, cache_dir)
    print(f"  {symbol}: {len(full_df):,} total candles ({len(new_candles):,} new)")
    return full_df

This pattern — cache-first, incremental fetch — reduces your average fetch time from hours to seconds for daily backtest runs.


8. Production Deployment: From Prototype to Pipeline

Moving from a working script to a reliable production pipeline requires five additional considerations.

Monitoring. Track three metrics during any large fetch job: candles fetched per minute (throughput), rate-limit hits (health), and checkpoint frequency (progress). Log to stdout or a metrics system:

import time

class FetchMonitor:
    def __init__(self, total_symbols):
        self.start_time = time.time()
        self.total_symbols = total_symbols
        self.completed = 0

    def log_batch(self, symbol, batch_count, total_candles):
        elapsed = time.time() - self.start_time
        rate = total_candles / elapsed if elapsed > 0 else 0
        print(f"  [{self.completed}/{self.total_symbols}] {symbol}: "
              f"{total_candles:,} candles, {rate:.0f} candles/sec")

Graceful error handling. Wrap the entire fetch loop in a try-except that saves the checkpoint before exiting. Never let a raw exception crash the pipeline without saving state:

try:
    fetch_with_checkpoint(...)
except KeyboardInterrupt:
    print("Interrupted — checkpoint saved. Run again to resume.")
except Exception as e:
    print(f"Fatal error: {e}")
    raise  # Still saves checkpoint before crashing

Terms of service compliance. Some data sources restrict redistribution or require attribution. Verify your TickDB plan's usage terms before caching large datasets locally or serving cached data to multiple users.

Cache management. Set a retention policy — for example, purge parquet files older than 90 days. Ten years of minute data grows quickly; a portfolio of 200 symbols can reach 30+ GB.

Retry testing. Before running a full fetch, test the retry and resume logic deliberately. Kill a fetch process mid-run with Ctrl+C or a SIGTERM and verify that a subsequent run resumes cleanly from the checkpoint.


9. Framework Integration: Backtesting Setup

Once the data is cached, feeding it into a backtesting framework is straightforward:

import backtrader as bt

class TickDBData(bt.feeds.PandasData):
    params = (
        ("datetime", "timestamp"),
        ("open", "open"),
        ("high", "high"),
        ("low", "low"),
        ("close", "close"),
        ("volume", "volume"),
        ("openinterest", -1),
    )

def run_backtest(symbol, strategy, cash=100000):
    df = load_cached_candles(symbol)
    data = TickDBData(dataname=df.set_index("timestamp"))
    cerebro = bt.Cerebro()
    cerebro.adddata(data)
    cerebro.addstrategy(strategy)
    cerebro.broker.setcash(cash)
    cerebro.run()
    print(f"Final portfolio value: {cerebro.broker.getvalue():.2f}")

For multi-symbol portfolios, use concurrent.futures.ThreadPoolExecutor to prefetch all symbols into local cache before instantiating the Cerebro engine. Pre-fetching eliminates I/O latency during the backtest run itself.


10. Key Takeaways

Fetching 10 years of minute-level US stock data is an engineering challenge, not a data problem. The solution is a layered architecture:

  • Error handling and rate-limit awareness at the client layer — exponential backoff with jitter prevents API overload.
  • Sliding-window pagination at the request layer — converts millions of rows into manageable 10,000-candle batches.
  • Concurrent shard fetching at the orchestration layer — parallelize across symbols and time ranges to reduce wall-clock time.
  • Checkpoint resume at the pipeline layer — turns catastrophic failures into 30-second interruptions.
  • Local parquet caching at the storage layer — eliminates redundant API calls for repeated backtest runs.

Each layer is independently testable. Start with the client, verify it handles 3001 errors correctly, then build outward. A single threaded fetch for one symbol should take under 10 minutes. A 50-symbol portfolio with 10 years of data should complete in under an hour with the concurrent approach.


Next Steps

If you're ready to start coding: sign up at tickdb.ai and generate a free API key. Set TICKDB_API_KEY as an environment variable, then adapt the code in this article to your specific symbol list and strategy timeframe.

If you want 10+ years of historical OHLCV data for cross-cycle backtesting: TickDB provides clean, time-aligned US equity data via the /v1/market/kline endpoint. The free tier supports development and small-scale backtesting; reach out to enterprise@tickdb.ai for larger datasets.

If you're building automated pipelines: consider installing the tickdb-market-data SKILL in your AI coding assistant to generate custom fetch pipelines tailored to your specific strategy requirements.

This article does not constitute investment advice. Backtesting results do not guarantee future performance. Markets involve risk; past performance does not guarantee future results.