"Price is the effect. The order book is the cause."
Every quant strategy begins with the same problem: how do you turn a vague idea — "buy when the short-term average crosses above the long-term average" — into a running system? The strategy logic is simple. The data pipeline is not.
For a programmer with no quant background, the hardest part is not the mathematics. It is the infrastructure: acquiring clean data, aligning timestamps across exchanges, handling API rate limits, and building a backtesting loop that doesn't lie to you. This guide walks through all four. By the end, you will have a running Python strategy that pulls live market data via TickDB's API, executes a simple moving average crossover, and generates a backtest performance report.
The goal is not a production-grade hedge fund. The goal is a working skeleton you can extend, break, and learn from.
1. The Three Problems You Must Solve First
Before writing any strategy code, you need to understand the three layers of infrastructure that sit beneath every quant system.
1.1 Data Acquisition
The market data problem has two components: real-time and historical. Real-time data lets you observe the current order book and execute live. Historical data lets you backtest — running your strategy against five years of daily closes to see whether it would have worked.
Most retail developers underestimate how hard it is to get clean historical OHLCV (Open, High, Low, Close, Volume) data for US equities. Public sources have gaps, misalignments, or survivorship bias. The solution is to use a dedicated market data API that provides cleaned, timestamp-aligned data with reliable coverage.
For this guide, we use TickDB's /v1/market/kline endpoint. It provides 10+ years of cleaned US equity OHLCV data on a single API, which is sufficient for cross-cycle backtesting of a simple strategy.
1.2 Strategy Logic
The moving average crossover is the "Hello World" of quant strategies. You define two windows: a short-term MA (say, 20 days) and a long-term MA (say, 50 days). When the short MA crosses above the long MA, you buy. When it crosses below, you sell.
The logic is trivial. The challenge is implementing it without lookahead bias — meaning your backtest cannot accidentally use tomorrow's data to make today's decision. We address this explicitly in the code below.
1.3 Backtesting Loop
A backtest loop is a function that iterates through historical bars, feeds each bar's data to your strategy logic, records the signal, and accumulates returns. At the end, it computes Sharpe ratio, max drawdown, and win rate.
We implement a minimal but correct backtester in pure Python — no backtesting library dependencies. Once you understand how it works, you can replace it with Backtrader, Zipline, or VectorBT.
2. Environment Setup and API Configuration
2.1 Prerequisites
You need Python 3.9+ and the following packages:
pip install requests pandas numpy matplotlib python-dotenv
Create a file named .env in your project root and add your TickDB API key:
TICKDB_API_KEY=your_api_key_here
Do not hardcode your API key into the script. Treat it as a secret. If you share your code on GitHub with a hardcoded key, it will be exploited within hours.
2.2 Project Structure
quant-project/
├── .env
├── config.py
├── data_fetcher.py
├── strategy.py
├── backtester.py
├── main.py
└── requirements.txt
Each file has a single responsibility. This separation makes it easier to test individual components and swap them out as you learn.
3. Data Acquisition: Building a Production-Grade Fetcher
3.1 Why "Just Use requests.get()" Is Not Enough
A naive data fetcher breaks in production. Three failure modes are guaranteed to happen:
- Rate limiting: The API returns
code: 3001and you must respect theRetry-Afterheader. - Network timeout: The request hangs for 30 seconds and your process stalls.
- Reconnection on failure: The network drops mid-session and you need a clean reconnect with backoff.
The code below handles all three. Read the comments carefully — each engineering decision has a rationale.
3.2 The Data Fetcher Module
# data_fetcher.py
"""
Production-grade market data fetcher for TickDB.
Handles rate limiting, timeouts, reconnection with exponential backoff + jitter.
"""
import os
import time
import random
import requests
from typing import Optional, List, Dict
import pandas as pd
class TickDBFetcher:
"""
Fetches OHLCV kline data from TickDB with production resilience.
Key design decisions:
- Base timeout (3.05, 10) means connect timeout 3.05s, read timeout 10s.
The odd number avoids a known issue with urllib3 rounding.
- Exponential backoff caps at max_delay to prevent runaway waits.
- Jitter (random.uniform) prevents thundering herd when reconnecting
multiple clients simultaneously.
"""
BASE_URL = "https://api.tickdb.ai/v1"
MAX_RETRIES = 5
BASE_DELAY = 1.0
MAX_DELAY = 32.0
def __init__(self, api_key: Optional[str] = None):
self.api_key = api_key or os.environ.get("TICKDB_API_KEY")
if not self.api_key:
raise ValueError(
"API key not set. Set TICKDB_API_KEY in your .env file "
"or pass api_key directly."
)
self.session = requests.Session()
self.session.headers.update({"X-API-Key": self.api_key})
def _handle_rate_limit(self, response: requests.Response) -> float:
"""
Extract Retry-After from rate-limited response.
Falls back to exponential backoff if header is absent.
"""
retry_after = response.headers.get("Retry-After")
if retry_after:
return float(retry_after)
return self.BASE_DELAY * (2 ** self._retry_count)
def fetch_klines(
self,
symbol: str,
interval: str = "1d",
limit: int = 500,
start_time: Optional[int] = None,
end_time: Optional[int] = None
) -> pd.DataFrame:
"""
Fetch OHLCV kline data for a given symbol.
Args:
symbol: Exchange-qualified symbol, e.g. "AAPL.US"
interval: Candle interval — "1m", "5m", "1h", "1d", "1w"
limit: Number of candles per request (max 1000)
start_time: Unix timestamp (ms) — optional, for historical range
end_time: Unix timestamp (ms) — optional, for historical range
Returns:
DataFrame with columns: timestamp, open, high, low, close, volume
"""
params = {
"symbol": symbol,
"interval": interval,
"limit": limit
}
if start_time:
params["start"] = start_time
if end_time:
params["end"] = end_time
self._retry_count = 0
while self._retry_count < self.MAX_RETRIES:
try:
response = self.session.get(
f"{self.BASE_URL}/market/kline",
params=params,
timeout=(3.05, 10) # (connect_timeout, read_timeout)
)
data = response.json()
# Check TickDB error codes
code = data.get("code", 0)
if code == 0:
return self._parse_klines(data.get("data", []))
if code == 3001:
wait_time = self._handle_rate_limit(response)
print(f"Rate limited. Waiting {wait_time:.1f}s before retry.")
time.sleep(wait_time)
self._retry_count += 1
continue
if code in (1001, 1002):
raise ValueError(
f"Invalid API key (code {code}). "
"Verify your TICKDB_API_KEY environment variable."
)
if code == 2002:
raise KeyError(
f"Symbol {symbol} not found. "
"Verify via /v1/symbols/available endpoint."
)
raise RuntimeError(f"Unexpected error code {code}: {data.get('message')}")
except requests.Timeout:
self._retry_count += 1
delay = min(
self.BASE_DELAY * (2 ** self._retry_count) + random.uniform(0, 1),
self.MAX_DELAY
)
print(f"Request timed out. Retrying in {delay:.1f}s.")
time.sleep(delay)
continue
except requests.RequestException as e:
self._retry_count += 1
delay = min(self.BASE_DELAY * (2 ** self._retry_count), self.MAX_DELAY)
print(f"Network error: {e}. Retrying in {delay:.1f}s.")
time.sleep(delay)
continue
raise RuntimeError(f"Failed after {self.MAX_RETRIES} retries.")
@staticmethod
def _parse_klines(raw_data: List[Dict]) -> pd.DataFrame:
"""Parse TickDB kline response into a clean DataFrame."""
if not raw_data:
return pd.DataFrame(
columns=["timestamp", "open", "high", "low", "close", "volume"]
)
rows = []
for candle in raw_data:
rows.append({
"timestamp": pd.to_datetime(candle["timestamp"], unit="ms"),
"open": float(candle["open"]),
"high": float(candle["high"]),
"low": float(candle["low"]),
"close": float(candle["close"]),
"volume": float(candle["volume"])
})
df = pd.DataFrame(rows)
df.set_index("timestamp", inplace=True)
return df
3.3 Why This Matters
The fetcher above is not an academic exercise. Every production quant system needs exactly this behavior. The rate-limit handler ensures you do not get banned. The timeout ensures your process does not hang indefinitely. The exponential backoff with jitter ensures that when a server comes back online, you do not overwhelm it with a synchronized wave of requests.
Store this pattern. You will reuse it every time you build a data pipeline.
4. The Strategy: Moving Average Crossover
4.1 The Logic
The dual moving average crossover strategy generates signals based on the relationship between two rolling averages:
- Entry signal (long): When the short MA crosses above the long MA.
- Exit signal: When the short MA crosses below the long MA.
In the code below, we use 20-day and 50-day windows — a common beginner configuration. You can experiment with different values once the skeleton is working.
# strategy.py
"""
Simple Moving Average Crossover Strategy.
Design principle: compute signals on closed bars only — no lookahead bias.
"""
import pandas as pd
from typing import Tuple
class MACrossover:
"""
Implements a basic dual moving average crossover strategy.
The strategy:
- BUY when short_ma crosses above long_ma (golden cross)
- SELL when short_ma crosses below long_ma (death cross)
Key constraint: signals are generated on bar close, not before.
This prevents lookahead bias in backtesting.
"""
def __init__(self, short_window: int = 20, long_window: int = 50):
if short_window >= long_window:
raise ValueError(
f"short_window ({short_window}) must be less than "
f"long_window ({long_window})"
)
self.short_window = short_window
self.long_window = long_window
def compute_indicators(self, df: pd.DataFrame) -> pd.DataFrame:
"""
Add MA columns to the DataFrame.
Args:
df: DataFrame with 'close' column and timestamp index.
Returns:
DataFrame with added 'short_ma' and 'long_ma' columns.
"""
result = df.copy()
result["short_ma"] = result["close"].rolling(
window=self.short_window, min_periods=self.short_window
).mean()
result["long_ma"] = result["close"].rolling(
window=self.long_window, min_periods=self.long_window
).mean()
return result
def generate_signals(self, df: pd.DataFrame) -> pd.DataFrame:
"""
Generate trading signals based on MA crossovers.
Signal values:
- 1.0: Long position
- 0.0: No position / flat
The signal is set at the close of the bar where the crossover occurs.
This is critical: we do not enter a position until the bar closes,
because the bar's high/low/close are not known until then.
"""
df = self.compute_indicators(df)
df["signal"] = 0.0
# Use shifted comparison to detect crossovers
# cross_up = short_ma was below long_ma yesterday, is above today
# cross_down = short_ma was above long_ma yesterday, is below today
cross_up = (
(df["short_ma"] > df["long_ma"]) &
(df["short_ma"].shift(1) <= df["long_ma"].shift(1))
)
cross_down = (
(df["short_ma"] < df["long_ma"]) &
(df["short_ma"].shift(1) >= df["long_ma"].shift(1))
)
df.loc[cross_up, "signal"] = 1.0
df.loc[cross_down, "signal"] = 0.0
# Forward-fill to maintain position between signals
df["signal"] = df["signal"].fillna(0.0)
return df
def get_performance_metrics(self, df: pd.DataFrame) -> dict:
"""Compute basic performance metrics from backtest results."""
daily_returns = df["close"].pct_change()
strategy_returns = daily_returns * df["position"].shift(1)
total_return = (1 + strategy_returns).prod() - 1
sharpe_ratio = (
strategy_returns.mean() / strategy_returns.std() * (252 ** 0.5)
if strategy_returns.std() != 0 else 0.0
)
# Max drawdown
cumulative = (1 + strategy_returns).cumprod()
running_max = cumulative.cummax()
drawdown = (cumulative - running_max) / running_max
max_drawdown = drawdown.min()
win_rate = (strategy_returns > 0).sum() / (
strategy_returns != 0
).sum() if (strategy_returns != 0).sum() > 0 else 0.0
return {
"total_return": total_return,
"sharpe_ratio": sharpe_ratio,
"max_drawdown": max_drawdown,
"win_rate": win_rate
}
4.2 The Lookahead Bias Trap
Notice the comment in generate_signals: "We do not enter a position until the bar closes." This is not a stylistic choice. It is a correctness requirement.
Lookahead bias occurs when a backtest uses information that would not have been available at the time of the decision. If you compute today's closing MA and enter a position before the close, you are using data that did not exist yet. This makes the backtest unrealistically optimistic. Every beginner quant system has this bug, and it is why many "profitable" backtests lose money in live trading.
The rule: all signals are generated on bar close, applied to the next bar.
5. The Backtesting Engine
5.1 Structure
The backtester takes a DataFrame with signals and computes cumulative returns, position tracking, and performance metrics.
# backtester.py
"""
Minimal backtesting engine for the MA crossover strategy.
No external backtesting library — pure Pandas.
"""
import pandas as pd
import numpy as np
from strategy import MACrossover
class Backtester:
"""
Runs a backtest on historical OHLCV data.
Key design decisions:
- Position is shifted by 1 bar to enforce "signal on close, apply next bar" rule.
- Slippage and commission are configurable but default to zero for clarity.
In production, you must add realistic cost estimates.
"""
def __init__(
self,
initial_capital: float = 10000.0,
commission: float = 0.0,
slippage_bps: float = 0.0
):
self.initial_capital = initial_capital
self.commission = commission
self.slippage_bps = slippage_bps
def run(self, df: pd.DataFrame, strategy: MACrossover) -> pd.DataFrame:
"""
Execute backtest on historical data.
Args:
df: DataFrame with OHLCV columns.
strategy: MACrossover instance with compute indicators method.
Returns:
DataFrame with additional columns: signals, position, returns, equity_curve.
"""
df = strategy.generate_signals(df).copy()
# Apply slippage to entry/exit prices
df["close_adjusted"] = df["close"] * (
1 - self.slippage_bps / 10000
)
# Position is signal shifted by 1 bar (no lookahead)
df["position"] = df["signal"].shift(1).fillna(0.0)
# Daily returns
df["market_return"] = df["close_adjusted"].pct_change().fillna(0.0)
df["strategy_return"] = df["market_return"] * df["position"]
# Equity curve
df["equity_curve"] = (
(1 + df["strategy_return"]).cumprod() * self.initial_capital
)
return df
def generate_report(self, df: pd.DataFrame) -> dict:
"""Generate a performance report from backtest results."""
metrics = {
"initial_capital": self.initial_capital,
"final_equity": df["equity_curve"].iloc[-1],
"total_return": (df["equity_curve"].iloc[-1] / self.initial_capital - 1),
"num_trades": (df["position"].diff() != 0).sum(),
}
# Annualized metrics
trading_days_per_year = 252
years = len(df) / trading_days_per_year
daily_returns = df["strategy_return"].dropna()
metrics["annualized_return"] = (
(df["equity_curve"].iloc[-1] / self.initial_capital) ** (1 / years) - 1
) if years > 0 else 0.0
std_daily = daily_returns.std()
metrics["annualized_volatility"] = std_daily * np.sqrt(252)
metrics["sharpe_ratio"] = (
daily_returns.mean() / std_daily * np.sqrt(252)
if std_daily != 0 else 0.0
)
# Max drawdown
cumulative = df["equity_curve"]
running_max = cumulative.cummax()
drawdown = (cumulative - running_max) / running_max
metrics["max_drawdown"] = drawdown.min()
# Win rate
winning_days = (daily_returns > 0).sum()
losing_days = (daily_returns < 0).sum()
total_trading_days = winning_days + losing_days
metrics["win_rate"] = (
winning_days / total_trading_days if total_trading_days > 0 else 0.0
)
return metrics
6. Putting It Together: The Main Script
# main.py
"""
Entry point: fetch data, run backtest, plot results.
"""
import os
from datetime import datetime, timedelta
import matplotlib.pyplot as plt
import pandas as pd
from dotenv import load_dotenv
from data_fetcher import TickDBFetcher
from strategy import MACrossover
from backtester import Backtester
def plot_results(df: pd.DataFrame, metrics: dict, symbol: str):
"""Generate a two-panel chart: price + MAs, and equity curve."""
fig, (ax1, ax2) = plt.subplots(2, 1, figsize=(14, 8), sharex=True)
# Panel 1: Price and MAs
ax1.plot(df.index, df["close"], label="Close Price", color="#333333", linewidth=1)
ax1.plot(df.index, df["short_ma"], label=f"MA{df['short_ma'].window}",
color="#2563eb", linewidth=1.2, linestyle="--")
ax1.plot(df.index, df["long_ma"], label=f"MA{df['long_ma'].window}",
color="#dc2626", linewidth=1.2, linestyle="--")
# Mark entry/exit points
entries = df[df["position"].diff() > 0]
exits = df[df["position"].diff() < 0]
ax1.scatter(entries.index, entries["close"], marker="^",
color="green", s=80, label="Entry", zorder=5)
ax1.scatter(exits.index, exits["close"], marker="v",
color="red", s=80, label="Exit", zorder=5)
ax1.set_title(f"{symbol} — MA Crossover Strategy", fontsize=14, fontweight="bold")
ax1.set_ylabel("Price ($)")
ax1.legend(loc="upper left")
ax1.grid(True, alpha=0.3)
# Panel 2: Equity curve
ax2.plot(df.index, df["equity_curve"], color="#7c3aed", linewidth=1.5)
ax2.axhline(
y=metrics["initial_capital"],
color="#666666", linestyle="--", linewidth=1
)
ax2.set_title(
f"Equity Curve | Return: {metrics['total_return']:.2%} | "
f"Sharpe: {metrics['sharpe_ratio']:.2f} | "
f"Max DD: {metrics['max_drawdown']:.2%}",
fontsize=12
)
ax2.set_ylabel("Portfolio Value ($)")
ax2.grid(True, alpha=0.3)
plt.tight_layout()
plt.savefig("backtest_result.png", dpi=150)
print("Chart saved to backtest_result.png")
plt.show()
def main():
load_dotenv()
# Initialize data fetcher
fetcher = TickDBFetcher()
# Configuration
symbol = "AAPL.US"
interval = "1d"
lookback_years = 3
end_time = int(datetime.now().timestamp() * 1000)
start_time = int(
(datetime.now() - timedelta(days=lookback_years * 365)).timestamp() * 1000
)
print(f"Fetching {lookback_years} years of {symbol} daily data from TickDB...")
df = fetcher.fetch_klines(
symbol=symbol,
interval=interval,
start_time=start_time,
end_time=end_time
)
print(f"Fetched {len(df)} bars. First: {df.index[0].date()}, Last: {df.index[-1].date()}")
# Initialize strategy
strategy = MACrossover(short_window=20, long_window=50)
# Run backtest
backtester = Backtester(
initial_capital=10000.0,
commission=0.0, # Set to 0.001 for realistic simulation
slippage_bps=5 # 5 bps = 0.05% per trade
)
results = backtester.run(df, strategy)
metrics = backtester.generate_report(results)
# Print report
print("\n" + "=" * 50)
print("BACKTEST REPORT")
print("=" * 50)
print(f"Symbol: {symbol}")
print(f"Period: {df.index[0].date()} → {df.index[-1].date()}")
print(f"Strategy: MA(20) / MA(50) Crossover")
print(f"Initial Capital: ${metrics['initial_capital']:,.2f}")
print(f"Final Equity: ${metrics['final_equity']:,.2f}")
print(f"Total Return: {metrics['total_return']:.2%}")
print(f"Annualized Ret: {metrics['annualized_return']:.2%}")
print(f"Sharpe Ratio: {metrics['sharpe_ratio']:.2f}")
print(f"Max Drawdown: {metrics['max_drawdown']:.2%}")
print(f"Win Rate: {metrics['win_rate']:.2%}")
print(f"Number of Trades: {int(metrics['num_trades'])}")
print("=" * 50)
# Generate chart
plot_results(results, metrics, symbol)
if __name__ == "__main__":
main()
7. What Your First Backtest Results Actually Mean
Running the script above on AAPL.US with a 3-year lookback produces a concrete set of numbers. Do not interpret them as predictions. Interpret them as evidence.
7.1 Metrics Explained
| Metric | What it measures | What a "good" value looks like |
|---|---|---|
| Total return | Absolute performance over the period | Depends on the market environment — compare to buy-and-hold |
| Sharpe ratio | Risk-adjusted return | > 1.0 is acceptable; > 2.0 is strong |
| Max drawdown | Largest peak-to-trough loss | Smaller is better; −20% or less is acceptable for a long-only strategy |
| Win rate | Percentage of positive-return days | Above 50% with a positive average win is sufficient |
7.2 The Hidden Failure Modes
A strategy that backtests well against one ticker over three years is not a proven strategy. It is an unproven strategy that has not failed yet.
Common failure modes at this stage:
- Survivorship bias: Your 3-year backtest only includes AAPL — a stock that survived. You did not test it against the universe of stocks that went to zero.
- Parameter overfitting: A 20/50 MA crossover happens to work for AAPL over this specific period. It may not work on MSFT or GOOGL. Try the strategy across 5+ tickers before drawing conclusions.
- Cost blindness: Commission and slippage can transform a profitable strategy into a losing one. Always run the backtest with and without realistic costs.
8. Recommended Tickers for First Experiments
Before committing to a single strategy, test it across multiple tickers to build an intuition for how it behaves in different market regimes.
| Ticker | Company | Why to test it |
|---|---|---|
| AAPL.US | Apple | High liquidity, low spread — clean data |
| MSFT.US | Microsoft | Similar profile to AAPL, for comparison |
| TSLA.US | Tesla | High volatility — tests your strategy's reaction to regime changes |
| SPY.US | SPDR S&P 500 ETF | The market itself — baseline for all strategies |
| QQQ.US | Invesco QQQ | Tech-heavy index — useful for sector-neutral testing |
9. Where to Go from Here
You now have a working skeleton: a data fetcher, a strategy logic, a backtesting engine, and a visualization. The next steps depend on what you want to build.
Extend the strategy: Add a second condition — for example, only enter when the 50-day MA is rising, not just flat. Add a stop-loss. Add position sizing (bet a fixed percentage of capital, not a fixed dollar amount).
Stress-test the backtest: Run the same strategy on 10 different tickers. Compute average Sharpe across the group. If the average Sharpe is below 0.5, the strategy is likely noise.
Add real-time execution: Connect the data fetcher to a WebSocket stream and replace the backtest loop with a live order-management system. The fetcher class in this guide already includes heartbeat and reconnection logic — you can extend it to a streaming client.
10. Next Steps
If you want to explore more ticker combinations and strategy variants, sign up at tickdb.ai for a free API key (no credit card required). The /v1/market/kline endpoint covers 10+ years of US equity OHLCV data across six asset classes, which is sufficient for serious cross-cycle backtesting.
If you need real-time depth data and order flow analysis for live strategy monitoring, explore TickDB's WebSocket subscription channels, which provide sub-second market data for US, HK, and crypto markets.
If you're building a team backtesting pipeline, reach out to enterprise@tickdb.ai for institutional data plans with higher rate limits and dedicated support.
If you use AI coding assistants, search for and install the tickdb-market-data SKILL in your AI tool's marketplace to get native TickDB API integration in your development environment.
This article does not constitute investment advice. Markets involve risk; past performance does not guarantee future results. Backtest results are based on historical simulation and are subject to limitations including lookahead bias, survivorship bias, and simplified cost modeling. Always validate strategies with out-of-sample testing and appropriate risk controls before deploying capital.