"Price is not data. Price plus context is data. Price plus context plus history is a strategy."
For quant traders and systems traders, this distinction matters more than it might first appear. A single candlestick tells you what happened. A year's worth of candlesticks, stored locally with clean timestamps and consistent formatting, tells you what usually happens — and how to position for the deviation.
This article walks through a concrete, production-ready system for archiving daily market data into a local database. The focus is on incremental updates: how to fetch today's minute-level OHLCV data each day after market close, store it without duplication, and do it reliably enough that you can walk away from the machine. The system uses TickDB's GET /v1/market/kline endpoint, Python for orchestration, and either SQLite (for individual developers) or ClickHouse (for teams that need query performance at scale).
Why Local Archiving Is Worth the Effort
Most traders start with a live data feed and deal with history as a later problem. This creates two compounding issues.
First, the live feed is not designed for historical retrieval. Real-time WebSocket streams are optimized for latency — they deliver the current state, not a retrievable record. If your connection drops mid-session or you want to re-analyze last Tuesday's intraday structure, the live feed gives you nothing to fall back on.
Second, market data vendors change their APIs, retire endpoints, or modify historical adjustments without notice. A vendor that offers clean data today might reclassify adjustments six months from now, and if you have no local copy, you lose the ability to reproduce past results. Backtests become non-reproducible, and strategies that looked promising become unvalidatable.
Local archiving solves both problems. By pulling a daily snapshot of OHLCV data after market close and storing it with immutable timestamps, you build a time capsule that you own and control. The vendor's reliability becomes irrelevant to your research pipeline. Your backtests become reproducible because the data never changes.
TickDB's /v1/market/kline endpoint is well-suited for this use case. It returns historical candlestick data with consistent formatting, handles a wide range of symbols and intervals, and does not require WebSocket infrastructure — a simple HTTPS request with an API key is sufficient.
Understanding TickDB's Kline Endpoint
Before writing code, it helps to understand the parameters that control what data you receive.
The core endpoint is:
GET https://api.tickdb.ai/v1/market/kline
Authentication uses the X-API-Key header:
headers = {"X-API-Key": os.environ.get("TICKDB_API_KEY")}
The request parameters that matter most for daily archiving:
| Parameter | Type | Description |
|---|---|---|
symbol |
String | Exchange:symbol format. e.g., AAPL.US, BTC.USDT, 0700.HK |
interval |
String | Candlestick interval. For minute-level data, use 1m, 5m, 15m, 30m, 60m. For daily, use 1d |
start_time |
Integer | Unix timestamp (seconds) for the start of the query range |
end_time |
Integer | Unix timestamp (seconds) for the end of the query range |
limit |
Integer | Maximum number of candlesticks per request. Maximum 1000 |
For a daily archiving job that runs after market close, the typical configuration is:
interval:1mfor intraday analysis needs, or5mfor moderate-resolution storagestart_timeandend_time: bounded to the trading day (market open to close)limit: set to 1000, which comfortably covers a full US trading day at 1-minute resolution (US equities generate roughly 390 one-minute candles per regular trading day)
The response structure contains an array of candles, each with:
time: Unix timestamp of the candle open timeopen,high,low,close: OHLC valuesvolume: trading volume for the period
This is a standard OHLCV format that maps directly to any time-series database schema.
The Daily Archiving Workflow
A reliable daily archiving system is not just a script. It is a pipeline with three phases: preparation, extraction, and storage.
Phase 1: Preparation — Determine the Trading Day
The first step is identifying what "today" means for the market in question. For US equities, trading days are weekdays (excluding market holidays). For crypto, every day is a trading day. For HK equities, trading days depend on the HK Exchange calendar.
The safest approach is to use the known trading window. For US equities, the regular trading session runs from 9:30 AM to 4:00 PM ET. Converting these to Unix timestamps gives you a deterministic start and end for the query range.
from datetime import datetime, timezone, timedelta
def get_us_equity_trading_window(date: datetime.date):
"""
Returns (start_time, end_time) as Unix timestamps for the US equity trading day.
Assumes date is a weekday and not a market holiday — holiday validation
should be added in production via a market calendar library.
"""
eastern = timezone(timedelta(hours=-5)) # EST (UTC-5), simplified — use pytz or zoneinfo in production
start = datetime.combine(date, datetime.min.time().replace(hour=9, minute=30), tzinfo=eastern)
end = datetime.combine(date, datetime.min.time().replace(hour=16, minute=0), tzinfo=eastern)
return int(start.timestamp()), int(end.timestamp())
⚠️ Engineering note: This simplified timezone handling uses a fixed UTC-5 offset and does not account for daylight saving time transitions. For production systems, use zoneinfo (Python 3.9+) or pytz to convert ET correctly across DST boundaries.
For more robust holiday detection, integrate with a market calendar library such as pandas_market_calendars or exchange_calendars. These libraries handle US market holiday schedules (Good Friday closure, Thanksgiving early close, etc.) and prevent your pipeline from querying non-existent trading sessions.
Phase 2: Extraction — Pull Data from TickDB
With the time window established, the next step is fetching the data. The key design decision is how to handle the pagination implicit in the 1000-candle limit.
A US equity trading day at 1-minute resolution generates approximately 390 candles — well within the 1000-candle limit. An active crypto symbol trading 24 hours generates 1,440 candles — also within the limit. However, an extended historical backfill across multiple days, or fetching higher-frequency data (15-second candles if supported), could exceed the limit and require multiple paginated requests.
The extraction function should handle both cases:
import os
import time
import requests
from typing import Optional
def fetch_klines(
symbol: str,
interval: str,
start_time: int,
end_time: int,
limit: int = 1000
) -> list[dict]:
"""
Fetches all available klines for the given time range, handling pagination
automatically via the 'more_data' field in TickDB's response.
"""
headers = {"X-API-Key": os.environ.get("TICKDB_API_KEY")}
all_candles = []
current_end = end_time
max_retries = 3
retry_delay = 2.0
while True:
params = {
"symbol": symbol,
"interval": interval,
"start_time": start_time,
"end_time": current_end,
"limit": limit
}
for attempt in range(max_retries):
try:
response = requests.get(
"https://api.tickdb.ai/v1/market/kline",
headers=headers,
params=params,
timeout=(3.05, 10)
)
if response.status_code == 429:
retry_after = int(response.headers.get("Retry-After", retry_delay))
time.sleep(retry_after)
continue
response.raise_for_status()
break
except requests.exceptions.RequestException as e:
if attempt < max_retries - 1:
time.sleep(retry_delay * (2 ** attempt))
else:
raise RuntimeError(f"Kline fetch failed after {max_retries} attempts: {e}")
data = response.json()
code = data.get("code", 0)
if code == 3001:
retry_after = int(data.get("retry_after", 5))
time.sleep(retry_after)
continue
if code != 0:
raise RuntimeError(f"TickDB API error {code}: {data.get('message')}")
candles = data.get("data", [])
if not candles:
break
all_candles.extend(candles)
# Check for pagination — if 'more_data' indicates there are older candles,
# adjust the window to fetch the next batch
if len(candles) < limit:
break
# Advance the window backward by one candle to avoid overlap
oldest_candle_time = candles[0].get("time")
if oldest_candle_time is None:
break
current_end = oldest_candle_time
# Sort by timestamp to ensure chronological order
all_candles.sort(key=lambda x: x["time"])
return all_candles
⚠️ Engineering note: The pagination loop uses the oldest candle's timestamp as the new end_time boundary, ensuring no gaps or overlaps across pages. This is a standard sliding-window pagination pattern. Adjust the end_time granularity based on the interval — for 1-minute data, a 1-second adjustment is sufficient.
Phase 3: Storage — SQLite and ClickHouse
The storage layer has two viable paths depending on your scale and query needs.
Option A: SQLite (Individual Developer)
SQLite is embedded, requires no server process, and handles millions of rows without performance issues for personal research. The schema is straightforward:
import sqlite3
from datetime import datetime
def init_sqlite_db(db_path: str):
conn = sqlite3.connect(db_path)
cursor = conn.cursor()
cursor.execute("""
CREATE TABLE IF NOT EXISTS kline_1m (
symbol TEXT NOT NULL,
interval TEXT NOT NULL,
open_time INTEGER NOT NULL,
open REAL NOT NULL,
high REAL NOT NULL,
low REAL NOT NULL,
close REAL NOT NULL,
volume REAL NOT NULL,
fetched_at INTEGER NOT NULL,
PRIMARY KEY (symbol, interval, open_time)
)
""")
cursor.execute("""
CREATE INDEX IF NOT EXISTS idx_symbol_time
ON kline_1m (symbol, open_time)
""")
conn.commit()
return conn
The PRIMARY KEY (symbol, interval, open_time) enforces uniqueness at the database level. If you attempt to insert a candle that already exists, SQLite raises a constraint violation. This is the deduplication mechanism built into the schema itself — no application-level deduplication logic needed for the primary key.
def upsert_candles_sqlite(conn: sqlite3.Connection, symbol: str, interval: str, candles: list[dict]):
cursor = conn.cursor()
fetched_at = int(datetime.now().timestamp())
rows = [
(symbol, interval, c["time"], c["open"], c["high"], c["low"], c["close"], c["volume"], fetched_at)
for c in candles
]
cursor.executemany("""
INSERT OR REPLACE INTO kline_1m
(symbol, interval, open_time, open, high, low, close, volume, fetched_at)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
""", rows)
conn.commit()
return len(rows)
The INSERT OR REPLACE idiom is critical. It means: if this (symbol, interval, open_time) combination already exists, update the OHLCV values and the fetched_at timestamp. This is the correct behavior for incremental updates — a candlestick is immutable once the period closes, but updating fetched_at lets you track when the record was last confirmed.
For a US equity daily job, the query window is narrow enough that INSERT OR REPLACE will only ever affect today's candles. Historical candles from prior days are never touched. This makes the daily job idempotent by design — running it twice on the same day produces the same database state.
Option B: ClickHouse (Team / Production Scale)
For teams that need fast aggregation queries across months of data, ClickHouse provides column-oriented storage with significant compression and query acceleration.
from clickhouse_driver import Client
def init_clickhouse_table(client: Client, table_name: str = "kline_1m"):
client.execute(f"""
CREATE TABLE IF NOT EXISTS {table_name} (
symbol String,
interval String,
open_time Int64,
open Float64,
high Float64,
low Float64,
close Float64,
volume Float64,
fetched_at Int64
) ENGINE = ReplacingMergeTree(fetched_at)
ORDER BY (symbol, interval, open_time)
""")
ClickHouse's ReplacingMergeTree engine with fetched_at as the version column is functionally equivalent to SQLite's INSERT OR REPLACE. When duplicate inserts arrive (same symbol, interval, and open_time), ClickHouse keeps the row with the higher fetched_at value and discards the older one. This is automatic, server-side deduplication that scales to billions of rows.
def upsert_candles_clickhouse(client: Client, symbol: str, interval: str, candles: list[dict]):
fetched_at = int(datetime.now().timestamp())
rows = [
(symbol, interval, c["time"], c["open"], c["high"], c["low"], c["close"], c["volume"], fetched_at)
for c in candles
]
client.execute(
f"INSERT INTO kline_1m (symbol, interval, open_time, open, high, low, close, volume, fetched_at) VALUES",
rows
)
return len(rows)
For teams running multiple ingestion pipelines or backfill jobs in parallel, ClickHouse's merge-tree engine ensures that concurrent writes with overlapping data are resolved correctly by the background merge process.
Incremental Updates: The Core Design Pattern
The phrase "incremental update" sounds simple, but it has a specific meaning in this context: only fetch and store data that is new or changed since the last successful run.
There are two competing philosophies for implementing incremental updates:
Philosophy 1: Time-window-based (preferred for daily jobs)
Store the timestamp of the last successful fetch. On the next run, query from that timestamp to the current time. This is the approach used in the fetch_klines function above — start_time is derived from the last run's timestamp, and end_time is the current time (or market close).
import json
import os
def load_last_fetch_timestamp(symbol: str) -> Optional[int]:
"""Returns the Unix timestamp of the last successful fetch for this symbol."""
checkpoint_file = f"./checkpoints/{symbol.replace('/', '_')}.json"
if not os.path.exists(checkpoint_file):
return None
with open(checkpoint_file, "r") as f:
data = json.load(f)
return data.get("last_fetch_time")
def save_last_fetch_timestamp(symbol: str, timestamp: int):
os.makedirs("./checkpoints", exist_ok=True)
checkpoint_file = f"./checkpoints/{symbol.replace('/', '_')}.json"
with open(checkpoint_file, "w") as f:
json.dump({"last_fetch_time": timestamp, "saved_at": int(datetime.now().timestamp())}, f)
For a daily job, the checkpoint is typically updated to the end-of-day timestamp after a successful run. On the next calendar day, the job fetches from the checkpoint to the new end-of-day.
Philosophy 2: Change-detection-based (preferred for high-frequency updates)
Query the database for the latest stored timestamp and use it as the start_time for the next fetch. This approach is database-driven rather than file-driven and is more robust when multiple processes write to the same database.
def get_latest_stored_timestamp_sqlite(conn: sqlite3.Connection, symbol: str, interval: str) -> Optional[int]:
cursor = conn.cursor()
cursor.execute("""
SELECT MAX(open_time) FROM kline_1m
WHERE symbol = ? AND interval = ?
""", (symbol, interval))
row = cursor.fetchone()
return row[0] if row and row[0] is not None else None
For a daily job running after market close, the time-window approach (Philosophy 1) is simpler and more deterministic. For intraday jobs running every 15 minutes, the change-detection approach (Philosophy 2) is more reliable because it does not depend on external checkpoint files.
Putting It All Together: The Daily Archiving Script
Here is the complete orchestration script that ties the components together. It is designed to run as a daily cron job or a scheduled task after US market close (approximately 4:05 PM ET).
#!/usr/bin/env python3
"""
Daily market data archiver for TickDB.
Run after market close (e.g., via cron: 0 16 * * 1-5 /path/to/archiver.py)
Environment variables required:
TICKDB_API_KEY - Your TickDB API key
ARCHIVE_DB_PATH - Path to SQLite database (or omit for default: ./data/archive.db)
ARCHIVE_SYMBOLS - Comma-separated list of symbols to archive (default: AAPL.US,NVDA.US,MSFT.US)
ARCHIVE_INTERVAL - Candlestick interval (default: 1m)
"""
import os
import sys
import time
import json
import logging
from datetime import datetime, date, timedelta, timezone
from typing import Optional
# Configure logging
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(message)s",
handlers=[
logging.StreamHandler(sys.stdout),
logging.FileHandler("./logs/archive.log")
]
)
logger = logging.getLogger(__name__)
def get_trading_day_window(trade_date: date) -> tuple[int, int]:
"""
Returns (start_time, end_time) as Unix timestamps for the US equity trading session.
Note: This simplified version does not account for market holidays.
For production use, integrate with exchange_calendars or pandas_market_calendars.
"""
eastern = timezone(timedelta(hours=-5))
start = datetime.combine(trade_date, datetime.min.time().replace(hour=9, minute=30), tzinfo=eastern)
end = datetime.combine(trade_date, datetime.min.time().replace(hour=16, minute=0), tzinfo=eastern)
return int(start.timestamp()), int(end.timestamp())
def get_last_checkpoint(symbol: str) -> Optional[int]:
checkpoint_file = f"./checkpoints/{symbol.replace('/', '_')}.json"
if os.path.exists(checkpoint_file):
with open(checkpoint_file, "r") as f:
return json.load(f).get("last_fetch_time")
return None
def save_checkpoint(symbol: str, timestamp: int):
os.makedirs("./checkpoints", exist_ok=True)
checkpoint_file = f"./checkpoints/{symbol.replace('/', '_')}.json"
with open(checkpoint_file, "w") as f:
json.dump({"last_fetch_time": timestamp, "saved_at": int(datetime.now().timestamp())}, f)
def archive_symbol(symbol: str, interval: str, db_conn) -> int:
"""
Archives a single symbol for the current trading day.
Uses incremental fetch: starts from the last checkpoint, ends at market close.
"""
today = date.today()
start_time, end_time = get_trading_day_window(today)
# Use checkpoint for incremental fetch
last_checkpoint = get_last_checkpoint(symbol)
if last_checkpoint and last_checkpoint >= start_time:
start_time = last_checkpoint
logger.info(f"[{symbol}] Incremental fetch: start_time updated to {start_time}")
logger.info(f"[{symbol}] Fetching {interval} klines from {start_time} to {end_time}")
candles = fetch_klines(symbol, interval, start_time, end_time)
if not candles:
logger.warning(f"[{symbol}] No candles returned — check symbol and date")
return 0
logger.info(f"[{symbol}] Received {len(candles)} candles")
stored = upsert_candles_sqlite(db_conn, symbol, interval, candles)
logger.info(f"[{symbol}] Stored {stored} candles")
# Update checkpoint to end of trading day
save_checkpoint(symbol, end_time)
return stored
def main():
api_key = os.environ.get("TICKDB_API_KEY")
if not api_key:
logger.error("TICKDB_API_KEY environment variable is not set")
sys.exit(1)
symbols = os.environ.get("ARCHIVE_SYMBOLS", "AAPL.US,NVDA.US,MSFT.US").split(",")
interval = os.environ.get("ARCHIVE_INTERVAL", "1m")
db_path = os.environ.get("ARCHIVE_DB_PATH", "./data/archive.db")
os.makedirs(os.path.dirname(db_path) or "./data", exist_ok=True)
conn = init_sqlite_db(db_path)
total_candles = 0
for symbol in symbols:
symbol = symbol.strip()
try:
count = archive_symbol(symbol, interval, conn)
total_candles += count
except Exception as e:
logger.error(f"[{symbol}] Archive failed: {e}", exc_info=True)
continue
# Rate-limit between symbols to avoid hitting the API limit
time.sleep(1.0)
conn.close()
logger.info(f"Archive run complete. Total candles stored: {total_candles}")
if __name__ == "__main__":
main()
⚠️ Engineering note: This script does not handle market holidays. A production deployment should query a market calendar before attempting to fetch data. Skipping non-trading days prevents wasted API calls and avoids empty responses that could trigger false warnings.
Cron Configuration for Daily Execution
For a Linux-based deployment, add the following entry to your crontab:
# Run at 4:10 PM ET every weekday (10 minutes after market close)
10 16 * * 1-5 cd /opt/archiver && /usr/bin/python3 archiver.py >> /var/log/archiver.log 2>&1
For Windows-based deployments, use Task Scheduler with a trigger set to fire at 4:10 PM on weekdays.
For cloud-based deployments (AWS Lambda, Google Cloud Functions), the execution model differs: the trigger fires at the scheduled time, the function determines the correct trading day internally (using the market calendar), and then executes the fetch. The checkpoint mechanism works identically in a serverless context.
Handling Edge Cases
A daily archiver will encounter several situations that the happy path does not cover.
Market halts: If trading is halted for a symbol during the session (regulatory halt, unusual volatility), the kline data for that period will reflect the halt — open, high, low, close values may be identical or the candle may be absent entirely. Your monitoring should detect this and flag it rather than treating it as a data quality failure.
Partial last candle: Near the end of the trading day, the current incomplete candle (say, the 3:57 PM candle when running at 4:05 PM) should be excluded from permanent storage. Filter by ensuring open_time < end_time - 60 for 1-minute data, or adjust the threshold based on your interval. Storing partial candles distorts backtest results.
API rate limits: If the job processes many symbols and encounters a 429 response, the retry logic in fetch_klines handles the backoff. For large symbol lists (50+), add an exponential backoff with jitter between symbols to prevent cascading failures:
import random
def fetch_with_backoff(symbol: str, interval: str, start_time: int, end_time: int, max_retries: int = 5):
for attempt in range(max_retries):
try:
return fetch_klines(symbol, interval, start_time, end_time)
except Exception as e:
if attempt < max_retries - 1:
base_delay = 2.0
jitter = random.uniform(0, base_delay * 0.1)
delay = min(base_delay * (2 ** attempt), 30.0) + jitter
logger.warning(f"Attempt {attempt + 1} failed, retrying in {delay:.1f}s: {e}")
time.sleep(delay)
else:
raise
Data quality validation: Before storing candles, validate that high >= max(open, close) and low <= min(open, close). Any candle violating this invariant indicates a data source error and should be logged and excluded.
def validate_candle(candle: dict) -> bool:
o, h, l, c = candle["open"], candle["high"], candle["low"], candle["close"]
return h >= max(o, c) and l <= min(o, c)
Validating the Archive
After a run completes, query the database to verify the data integrity:
import sqlite3
def validate_archive(db_path: str, symbol: str, interval: str):
conn = sqlite3.connect(db_path)
cursor = conn.cursor()
# Check row count per day
cursor.execute("""
SELECT
date(open_time, 'unixepoch', 'localtime') as trade_date,
COUNT(*) as candle_count,
MIN(close) as day_low_approx,
MAX(close) as day_high_approx
FROM kline_1m
WHERE symbol = ? AND interval = ?
GROUP BY trade_date
ORDER BY trade_date DESC
LIMIT 30
""", (symbol, interval))
print(f"Recent archive summary for {symbol} ({interval}):")
for row in cursor.fetchall():
print(f" {row[0]}: {row[1]} candles, low={row[2]:.2f}, high={row[3]:.2f}")
conn.close()
For a US equity 1-minute archive, each trading day should produce approximately 390 candles (9:30 AM to 4:00 PM). Days with significantly fewer candles warrant investigation — either the market closed early (half-day sessions produce ~195 candles), the data was unavailable, or the archiver ran before the session completed.
Deployment Guide by Scale
| Use case | Recommended stack | Key configuration |
|---|---|---|
| Individual researcher | SQLite + cron | ARCHIVE_DB_PATH=~/tickdata/archive.db, daily cron, 3–10 symbols |
| Small team | ClickHouse + scheduled task | Shared ClickHouse instance, team-specific tables, monitoring alerts |
| Quantitative fund | ClickHouse + orchestration platform | Apache Airflow or Prefect for DAG orchestration, Slack alerts on failure, historical backfill pipeline separate from daily incremental |
What You Have Now
After running this pipeline for several weeks, your local database becomes a historical record that you own completely. The next steps depend on your analytical goals.
If you are building intraday strategies, the minute-level archive enables pattern recognition across thousands of trading sessions. You can compute rolling volatility, detect volume spikes, and identify order flow anomalies that only appear at high resolution.
If you are running statistical arbitrage, the archive provides the training dataset for your model. The incremental update pattern ensures that your model is always trained on the most recent data without requiring a full backfill on every run.
If you are benchmarking strategies, the archive serves as the single source of truth. When you need to reproduce a backtest from six months ago, you query your local database — not the vendor's API — and the results are identical every time.
The pipeline itself, once deployed, requires no manual intervention. It runs daily, updates incrementally, and accumulates a historical record that grows more valuable with every passing month.
Next Steps
If you're an individual quant developer, start with the SQLite version. Set up a cron job tonight — the script above is complete and runnable with a free TickDB API key. Three symbols, daily run, and within a month you have a personal historical database that no vendor can take from you.
If you want to run this yourself:
- Sign up at tickdb.ai (free, no credit card required)
- Generate an API key in the dashboard
- Set the
TICKDB_API_KEYenvironment variable and copy the script above
If you need 10+ years of historical OHLCV data to seed the database before running the daily incremental job, reach out to enterprise@tickdb.ai for institutional plans covering extended historical coverage.
If you use AI coding assistants, search for and install the tickdb-market-data SKILL in your AI tool's marketplace for faster prototyping of market data pipelines.
This article does not constitute investment advice. Markets involve risk; past performance does not guarantee future results.