"Price is the effect. The order book is the cause."
Every quantitative researcher who has attempted a 10-year backtest has hit the same wall: the data exists, but getting it is the real engineering problem. A decade of 1-minute OHLCV candles for a single US equity yields roughly 3.2 million rows. For a portfolio of 50 stocks, you are processing 160 million rows — and that is before you account for weekends, partial sessions, and data anomalies that require re-fetching.
The naive approach — sequential REST calls in a loop — will either timeout, get rate-limited, or take 72+ hours. This article walks through the production-grade solution: a concurrent shard-fetcher with checkpoint resume, local caching, and rate-limit awareness, built against the TickDB REST API.
1. The Problem: Why Batch Historical Data Is Hard
Three constraints make large historical fetches genuinely difficult.
Volume. One minute of US equity trading generates roughly 390 minutes of exchange activity per symbol (9:30 AM – 4:00 PM ET, excluding pre-market and after-hours). Over a 10-year window, this compounds to millions of rows per symbol.
Rate limits. TickDB enforces a 3001 error code when you exceed the per-second request budget. The API surfaces a Retry-After header — but naive clients either ignore it or retry immediately, making the problem worse.
Partial failures. A fetch of 50,000 candles dies at row 24,671. Starting over from the beginning wastes time and burns your rate-limit budget on duplicate work. You need a resumable pipeline.
Most developers hit all three problems at once. The solution is not a single clever trick — it is a layered architecture: concurrent fetching at the top layer, paginated requests at the middle layer, and checkpointed local storage at the bottom layer.
2. TickDB Kline Endpoint: What You Need to Know First
Before writing code, understand the endpoint you are calling.
The GET /v1/market/kline endpoint returns historical OHLCV candles. Authentication is header-based:
import os
headers = {"X-API-Key": os.environ.get("TICKDB_API_KEY")}
The critical parameters:
| Parameter | Type | Notes |
|---|---|---|
symbol |
string | Format: AAPL.US — suffix is required |
interval |
string | 1m for minute-level |
start_time |
int | Unix timestamp in milliseconds |
end_time |
int | Unix timestamp in milliseconds |
limit |
int | Max candles per call. The API does not enforce a fixed maximum, but a practical ceiling is 10,000 per call |
A common mistake: using /kline for live dashboards or using /kline/latest for backtesting. /kline is the historical endpoint. /kline/latest is for current-day real-time data. Mixing them up produces empty result sets and silent bugs.
3. Base Client: Error Handling and Rate-Limit Awareness
Every production-grade data fetcher needs a client that handles errors systematically. Here is the foundational layer:
import os
import time
import random
import requests
class TickDBClient:
"""Production-grade TickDB client with rate-limit and error handling."""
BASE_URL = "https://api.tickdb.ai/v1"
def __init__(self, api_key=None, max_retries=5):
self.api_key = api_key or os.environ.get("TICKDB_API_KEY")
if not self.api_key:
raise ValueError("TICKDB_API_KEY env var is not set")
self.headers = {"X-API-Key": self.api_key}
self.max_retries = max_retries
def _handle_error(self, resp, attempt=0):
"""Standard TickDB error handler with exponential backoff."""
if resp.status_code == 200:
return None
code = resp.json().get("code", 0)
if code in (1001, 1002):
raise ValueError("Invalid API key — check TICKDB_API_KEY")
if code == 2002:
raise KeyError(f"Symbol not found — verify via /v1/symbols/available")
if code == 3001:
retry_after = int(resp.headers.get("Retry-After", 5))
print(f" Rate limited. Sleeping {retry_after}s...")
time.sleep(retry_after)
return "retry"
if code == 4001:
raise ValueError(f"Invalid interval or start_time: {resp.json().get('message')}")
# Unknown error — backoff and retry
if attempt < self.max_retries:
delay = min(2 ** attempt + random.uniform(0, 1), 30)
print(f" Error {code}. Retrying in {delay:.1f}s...")
time.sleep(delay)
return "retry"
raise RuntimeError(f"Request failed after {self.max_retries} retries: {resp.text}")
def kline(self, symbol, interval, start_time, end_time, limit=10000):
"""Fetch kline (OHLCV) data with automatic retry and rate-limit handling."""
params = {
"symbol": symbol,
"interval": interval,
"start_time": start_time,
"end_time": end_time,
"limit": limit,
}
for attempt in range(self.max_retries + 1):
resp = requests.get(
f"{self.BASE_URL}/market/kline",
headers=self.headers,
params=params,
timeout=(3.05, 10)
)
action = self._handle_error(resp, attempt)
if action is None:
return resp.json()
The error handler differentiates between fatal errors (invalid key, bad symbol) and transient errors (rate limit, server errors). The exponential backoff with jitter prevents thundering-herd behavior — all clients retry at the same moment — which is critical when running concurrent workers.
4. Pagination: Conquering the 10-Year Window
A single API call returns at most 10,000 candles. For a 10-year window of minute data, you need roughly 320 calls per symbol. Pagination is not optional — it is the mechanism that converts a hard problem into a tractable sequence of smaller problems.
4.1 Sliding Window Pagination
The cleanest pagination strategy uses a sliding time window. You request in fixed-duration chunks, advancing the start_time based on the last candle's timestamp.
from datetime import datetime, timezone
def paginate_kline(client, symbol, interval, start_ms, end_ms, batch_limit=10000):
"""
Paginate through historical kline data using a sliding time window.
Yields batches of candles, automatically advancing start_time
based on the last candle's timestamp in each response.
"""
current_start = start_ms
while current_start < end_ms:
params = {
"symbol": symbol,
"interval": interval,
"start_time": current_start,
"end_time": end_ms,
"limit": batch_limit,
}
resp = requests.get(
f"{client.BASE_URL}/market/kline",
headers=client.headers,
params=params,
timeout=(3.05, 10)
)
result = client._handle_error(resp)
if result is None:
result = resp.json()
candles = result.get("data", [])
if not candles:
break # No more data
yield candles
# Advance window past the last received candle
last_ts = candles[-1][0] # Timestamp is the first field in each candle
current_start = last_ts + 1
Important: the last candle's timestamp is included in the response but should not be re-fetched. Advancing current_start by last_ts + 1 ensures the next call starts exactly where this one ended, with no overlap and no gap.
4.2 Bounded vs. Unbounded Windows
| Approach | Use case |
|---|---|
| Bounded window | Known start and end dates — backtesting a specific period |
| Unbounded window | Continual incremental updates — appending new daily data |
For backtesting, use bounded windows. For live trading systems, use unbounded windows where end_time is int(datetime.now().timestamp() * 1000).
5. Concurrent Shard Fetching: Parallelizing the Work
A single-threaded paginator fetching 10 years of data takes days. The solution is concurrent shard fetching: divide the full time range into overlapping segments, fetch them in parallel, then merge and deduplicate the results.
import concurrent.futures
import threading
from dataclasses import dataclass
@dataclass
class FetchConfig:
symbols: list[str]
interval: str = "1m"
years: int = 10
max_workers: int = 10 # Concurrent API calls
requests_per_second: int = 5 # Rate limit (adjust to your tier)
client = TickDBClient()
config = FetchConfig(symbols=["AAPL.US", "MSFT.US", "GOOGL.US"], interval="1m", years=10)
rate_limiter = threading.Semaphore(config.requests_per_second)
def fetch_symbol_range(symbol, start_ms, end_ms, client):
"""Fetch one symbol's full time range using pagination."""
results = []
with rate_limiter:
for batch in paginate_kline(client, symbol, config.interval, start_ms, end_ms):
results.extend(batch)
return symbol, results
def batch_fetch_all(config):
"""
Concurrently fetch multiple symbols.
Each symbol is processed by a separate worker thread.
"""
end_ms = int(datetime.now(timezone.utc).timestamp() * 1000)
start_ms = int((datetime.now(timezone.utc) - timedelta(days=365 * config.years)).timestamp() * 1000)
with concurrent.futures.ThreadPoolExecutor(max_workers=config.max_workers) as executor:
futures = {
executor.submit(fetch_symbol_range, sym, start_ms, end_ms, client): sym
for sym in config.symbols
}
all_data = {}
for future in concurrent.futures.as_completed(futures):
symbol = futures[future]
try:
sym, candles = future.result()
all_data[sym] = candles
print(f" {sym}: fetched {len(candles):,} candles")
except Exception as exc:
print(f" {symbol} generated an exception: {exc}")
return all_data
Critical parameter: max_workers must be set below your rate-limit ceiling. If TickDB allows 5 requests per second and you set max_workers=20, every request above the 5th will receive a 3001 error and retry, doubling your fetch time. Calibrate based on your account tier.
6. Checkpoint Resume: Surviving Failures Without Starting Over
Network interruptions, OOM kills, and instance restarts are not edge cases — they are the expected state of any long-running data pipeline. Checkpointing is the mechanism that turns a catastrophic failure into a 30-second inconvenience.
6.1 Checkpoint Data Structure
import json
import os
from dataclasses import dataclass, asdict
from datetime import datetime
@dataclass
class Checkpoint:
symbol: str
interval: str
start_ms: int
end_ms: int
last_fetched_ts: int
total_batches: int
fetched_count: int
resume_token: str = "" # Optional: stores the pagination cursor
def save(self, path):
with open(path, "w") as f:
json.dump(asdict(self), f, indent=2)
print(f" Checkpoint saved: {path}")
@classmethod
def load(cls, path):
if not os.path.exists(path):
return None
with open(path, "r") as f:
return cls(**json.load(f))
6.2 Resumable Fetch Loop
def fetch_with_checkpoint(symbol, start_ms, end_ms, interval, checkpoint_dir):
os.makedirs(checkpoint_dir, exist_ok=True)
cp_path = os.path.join(checkpoint_dir, f"{symbol.replace('.', '_')}.json")
checkpoint = Checkpoint.load(cp_path)
client = TickDBClient()
if checkpoint:
# Resume from last checkpoint
current_start = checkpoint.last_fetched_ts + 1
fetched = list(load_cached_candles(symbol, checkpoint_dir)) # Load what we already have
total_batches = checkpoint.total_batches
print(f"Resuming {symbol}: {len(fetched):,} candles loaded, resuming from {datetime.fromtimestamp(current_start/1000)}")
else:
current_start = start_ms
fetched = []
total_batches = 0
for batch in paginate_kline(client, symbol, interval, current_start, end_ms):
fetched.extend(batch)
total_batches += 1
# Save checkpoint every 10 batches
if total_batches % 10 == 0:
cp = Checkpoint(
symbol=symbol,
interval=interval,
start_ms=start_ms,
end_ms=end_ms,
last_fetched_ts=batch[-1][0],
total_batches=total_batches,
fetched_count=len(fetched),
)
cp.save(cp_path)
return fetched
The checkpoint saves the last_fetched_ts — the timestamp of the most recent successfully processed candle. On resume, the fetcher starts at last_fetched_ts + 1. The stored candle data is also kept in local parquet files (see Section 7), so no work is lost.
7. Local Caching: The Performance Multiplier
Fetching from the API every time you run a backtest is slow and wasteful. A local cache ensures you only pay the API cost for new data.
7.1 Parquet Storage
import pyarrow as pa
import pyarrow.parquet as pq
import pandas as pd
CACHE_DIR = "./tickdb_cache"
def candles_to_df(candles: list) -> pd.DataFrame:
"""Convert TickDB candle list to a pandas DataFrame."""
if not candles:
return pd.DataFrame(columns=["timestamp", "open", "high", "low", "close", "volume"])
df = pd.DataFrame(candles, columns=["timestamp", "open", "high", "low", "close", "volume"])
df["timestamp"] = pd.to_datetime(df["timestamp"], unit="ms", utc=True)
return df
def cache_candles(symbol, candles, cache_dir=CACHE_DIR):
"""Append new candles to a parquet cache file."""
os.makedirs(cache_dir, exist_ok=True)
cache_file = os.path.join(cache_dir, f"{symbol.replace('.', '_')}.parquet")
new_df = candles_to_df(candles)
if new_df.empty:
return
if os.path.exists(cache_file):
existing = pd.read_parquet(cache_file)
combined = pd.concat([existing, new_df]).drop_duplicates(subset=["timestamp"]).sort_values("timestamp")
else:
combined = new_df
combined.to_parquet(cache_file, index=False)
print(f" Cached {len(new_df)} new candles → {cache_file} ({os.path.getsize(cache_file)/1024/1024:.1f} MB)")
def load_cached_candles(symbol, cache_dir=CACHE_DIR):
"""Load all cached candles for a symbol, returning a DataFrame."""
cache_file = os.path.join(cache_dir, f"{symbol.replace('.', '_')}.parquet")
if not os.path.exists(cache_file):
return pd.DataFrame()
return pd.read_parquet(cache_file)
Parquet is the right format for this use case: columnar storage, built-in compression, and efficient filtering. A 10-year, 50-symbol parquet dataset is typically under 8 GB on disk and loads in under 30 seconds with predicate pushdown.
7.2 Incremental Fetch Strategy
def fetch_incremental(symbol, interval, years=10, cache_dir=CACHE_DIR):
"""
Fetch only new data: load the last cached timestamp, then fetch everything
after it. This keeps API calls to the minimum necessary.
"""
client = TickDBClient()
os.makedirs(cache_dir, exist_ok=True)
# Determine the latest cached timestamp
cached = load_cached_candles(symbol, cache_dir)
if not cached.empty:
last_cached_ts = int(cached["timestamp"].max().value / 1_000_000)
else:
last_cached_ts = int((datetime.now(timezone.utc) - timedelta(days=365 * years)).timestamp() * 1000)
end_ms = int(datetime.now(timezone.utc).timestamp() * 1000)
new_candles = []
for batch in paginate_kline(client, symbol, interval, last_cached_ts + 1, end_ms):
new_candles.extend(batch)
if new_candles:
cache_candles(symbol, new_candles, cache_dir)
full_df = load_cached_candles(symbol, cache_dir)
print(f" {symbol}: {len(full_df):,} total candles ({len(new_candles):,} new)")
return full_df
This pattern — cache-first, incremental fetch — reduces your average fetch time from hours to seconds for daily backtest runs.
8. Production Deployment: From Prototype to Pipeline
Moving from a working script to a reliable production pipeline requires five additional considerations.
Monitoring. Track three metrics during any large fetch job: candles fetched per minute (throughput), rate-limit hits (health), and checkpoint frequency (progress). Log to stdout or a metrics system:
import time
class FetchMonitor:
def __init__(self, total_symbols):
self.start_time = time.time()
self.total_symbols = total_symbols
self.completed = 0
def log_batch(self, symbol, batch_count, total_candles):
elapsed = time.time() - self.start_time
rate = total_candles / elapsed if elapsed > 0 else 0
print(f" [{self.completed}/{self.total_symbols}] {symbol}: "
f"{total_candles:,} candles, {rate:.0f} candles/sec")
Graceful error handling. Wrap the entire fetch loop in a try-except that saves the checkpoint before exiting. Never let a raw exception crash the pipeline without saving state:
try:
fetch_with_checkpoint(...)
except KeyboardInterrupt:
print("Interrupted — checkpoint saved. Run again to resume.")
except Exception as e:
print(f"Fatal error: {e}")
raise # Still saves checkpoint before crashing
Terms of service compliance. Some data sources restrict redistribution or require attribution. Verify your TickDB plan's usage terms before caching large datasets locally or serving cached data to multiple users.
Cache management. Set a retention policy — for example, purge parquet files older than 90 days. Ten years of minute data grows quickly; a portfolio of 200 symbols can reach 30+ GB.
Retry testing. Before running a full fetch, test the retry and resume logic deliberately. Kill a fetch process mid-run with Ctrl+C or a SIGTERM and verify that a subsequent run resumes cleanly from the checkpoint.
9. Framework Integration: Backtesting Setup
Once the data is cached, feeding it into a backtesting framework is straightforward:
import backtrader as bt
class TickDBData(bt.feeds.PandasData):
params = (
("datetime", "timestamp"),
("open", "open"),
("high", "high"),
("low", "low"),
("close", "close"),
("volume", "volume"),
("openinterest", -1),
)
def run_backtest(symbol, strategy, cash=100000):
df = load_cached_candles(symbol)
data = TickDBData(dataname=df.set_index("timestamp"))
cerebro = bt.Cerebro()
cerebro.adddata(data)
cerebro.addstrategy(strategy)
cerebro.broker.setcash(cash)
cerebro.run()
print(f"Final portfolio value: {cerebro.broker.getvalue():.2f}")
For multi-symbol portfolios, use concurrent.futures.ThreadPoolExecutor to prefetch all symbols into local cache before instantiating the Cerebro engine. Pre-fetching eliminates I/O latency during the backtest run itself.
10. Key Takeaways
Fetching 10 years of minute-level US stock data is an engineering challenge, not a data problem. The solution is a layered architecture:
- Error handling and rate-limit awareness at the client layer — exponential backoff with jitter prevents API overload.
- Sliding-window pagination at the request layer — converts millions of rows into manageable 10,000-candle batches.
- Concurrent shard fetching at the orchestration layer — parallelize across symbols and time ranges to reduce wall-clock time.
- Checkpoint resume at the pipeline layer — turns catastrophic failures into 30-second interruptions.
- Local parquet caching at the storage layer — eliminates redundant API calls for repeated backtest runs.
Each layer is independently testable. Start with the client, verify it handles 3001 errors correctly, then build outward. A single threaded fetch for one symbol should take under 10 minutes. A 50-symbol portfolio with 10 years of data should complete in under an hour with the concurrent approach.
Next Steps
If you're ready to start coding: sign up at tickdb.ai and generate a free API key. Set TICKDB_API_KEY as an environment variable, then adapt the code in this article to your specific symbol list and strategy timeframe.
If you want 10+ years of historical OHLCV data for cross-cycle backtesting: TickDB provides clean, time-aligned US equity data via the /v1/market/kline endpoint. The free tier supports development and small-scale backtesting; reach out to enterprise@tickdb.ai for larger datasets.
If you're building automated pipelines: consider installing the tickdb-market-data SKILL in your AI coding assistant to generate custom fetch pipelines tailored to your specific strategy requirements.
This article does not constitute investment advice. Backtesting results do not guarantee future performance. Markets involve risk; past performance does not guarantee future results.