For most retail traders, the closing bell marks the end of the session. They close their platforms, check their P&L, and move on. For professional quantitative teams, closing is precisely when the real work begins.
Between 4:00 PM ET and midnight, a well-engineered quant shop runs a cascade of processes that most people never see: data archival pipelines that ingest terabytes of normalized market data, attribution engines that decompose strategy returns into factor exposures and execution costs, pre-computation clusters that warm the signal cache for the next session's alpha generation, and compliance systems that log everything for regulatory review. Each step flows into the next, and the entire system must complete within a predictable window before Asian markets open.
This article dissects that pipeline. We will build a production-grade post-market automation system from scratch — a modular architecture using a task scheduler, a data ETL layer, a strategy attribution engine, and a signal pre-computation module. Every code block is production-ready: it includes error handling, reconnection logic, timeout management, and environment-based configuration. We will use TickDB's market data endpoints to anchor the examples, but the architecture applies to any data source.
1. The Post-Market Problem: Why Manual Processes Fail at Scale
Before we build anything, we need to understand the failure mode. A retail trader manually reviewing their trades after close is fine. A quant team managing 20 strategies across 5 asset classes cannot rely on manual intervention.
Consider the data volume: a single day of US equity tick data for all listed stocks generates approximately 80–120 GB of normalized order flow data when captured at full depth. For a team running event-driven strategies, this means ingesting earnings transcripts, calculating implied volatility surfaces, computing order book pressure ratios at the moment of release, and generating next-day signal forecasts — all before the pre-market session begins at 7:00 AM ET.
Manual execution introduces three systemic risks:
Latency variance. A human analyst who takes 45 minutes to run attribution scripts on a volatile day (when the need for analysis is highest) delivers stale insights. By the time the report lands, the next pre-market positioning is already underway.
Process drift. When the attribution methodology changes — say, adding a new factor to the return decomposition — manual implementations silently diverge across team members. Strategy A gets analyzed with the old methodology; Strategy B gets the new one. The comparison is meaningless.
Single point of failure. If one analyst gets sick on the day of a major earnings release, the post-market pipeline may not run at all. A 12-hour gap in attribution data corrupts the team's performance analytics for that reporting period.
An automated pipeline eliminates all three. It runs on a schedule, it executes a frozen and version-controlled methodology, and it operates independently of any individual analyst's availability.
2. Architecture Overview: Four-Layer Pipeline
The post-market pipeline consists of four independent layers, connected by data flow rather than tight coupling. Each layer is a discrete process that can be run, tested, monitored, and replaced independently.
┌─────────────────────────────────────────────────────────────┐
│ POST-MARKET PIPELINE │
│ │
│ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ │
│ │ LAYER 1 │ │ LAYER 2 │ │ LAYER 3 │ │
│ │ Data Archival│───▶│ Attribution │───▶│Signal Pre- │ │
│ │ & ETL │ │ Engine │ │ computation │ │
│ └──────────────┘ └──────────────┘ └──────────────┘ │
│ │ │ │
│ ▼ ▼ │
│ ┌──────────────┐ ┌──────────────┐ │
│ │ LAYER 4 │ │ Scheduler │ │
│ │ Notification │ │ & Monitor │ │
│ │ & Alerting │ │ │ │
│ └──────────────┘ └──────────────┘ │
└─────────────────────────────────────────────────────────────┘
| Layer | Component | Responsibility | Dependencies |
|---|---|---|---|
| Layer 1 | Data Archival & ETL | Ingest raw market data, normalize, store in analytical schema | TickDB API, local storage |
| Layer 2 | Attribution Engine | Decompose strategy P&L into factor contributions, execution costs, signal alpha | Layer 1 output |
| Layer 3 | Signal Pre-computation | Generate and cache next-day signal candidates from warm data | Layer 1 output, model artifacts |
| Layer 4 | Notification & Alerting | Deliver run reports, flag anomalies, escalate failures | All layers |
We will build each layer in sequence.
3. Layer 1: Data Archival and ETL Pipeline
3.1 What to Archive
Not all data is worth archiving at the same resolution. The archival strategy depends on the strategy's information horizon:
| Data type | Retention resolution | Use case |
|---|---|---|
| OHLCV (1-minute) | Full history | Intraday strategy backtesting |
| OHLCV (1-hour) | Full history | Swing strategy backtesting |
| Order book snapshots (L1) | 30-second intervals during market hours | Order flow analysis, depth attribution |
| Trade tape | Every trade during market hours | Execution quality analysis, tick-level strategy |
| Implied volatility surface | End-of-day | Options-adjusted returns, vol regime detection |
| News / transcripts | Full text with timestamp | Event-driven strategy attribution |
For most equity-focused quant teams, the minimum viable archive is: daily OHLCV klines at 1-minute resolution for all active symbols, end-of-day depth snapshots at key timestamps (open, close, earnings release moments), and trade tape for the top 200 liquid names.
3.2 ETL Script: Historical Data Archival
We will write a production-grade ETL script that pulls 1-minute kline data from TickDB for all active symbols and stores it in a local Parquet-based data lake. The script handles pagination, rate limiting, and resumable downloads.
#!/usr/bin/env python3
"""
Post-market ETL: Archive 1-minute klines for all active symbols.
Stores output as Parquet partitioned by date and symbol.
Production-ready: retry with exponential backoff + jitter, rate-limit
handling, resumable pagination, env-var auth.
"""
import os
import time
import json
import logging
import requests
import pandas as pd
from datetime import datetime, timedelta
from pathlib import Path
# ─── Configuration ───────────────────────────────────────────────────────────
API_KEY = os.environ.get("TICKDB_API_KEY")
if not API_KEY:
raise EnvironmentError("TICKDB_API_KEY environment variable is not set")
BASE_URL = "https://api.tickdb.ai/v1"
HEADERS = {"X-API-Key": API_KEY}
OUTPUT_DIR = Path("./data_lake/klines_1m")
# Rate-limit config
MAX_RETRIES = 5
BASE_DELAY = 1.0
MAX_DELAY = 60.0
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(message)s"
)
logger = logging.getLogger(__name__)
def handle_api_error(response: requests.Response, symbol: str) -> dict | None:
"""
Standard TickDB error handler with retry logic.
Returns data payload on success, None on retryable error (caller retries),
raises on unrecoverable error.
"""
if response.status_code == 200:
data = response.json()
if data.get("code", 0) == 0:
return data.get("data", [])
code = data.get("code", 0)
if code == 3001:
# Rate limited — read Retry-After header
retry_after = int(response.headers.get("Retry-After", 5))
logger.warning(f"Rate limited for {symbol}, retrying after {retry_after}s")
time.sleep(retry_after)
return None # Caller should retry
if code in (1001, 1002):
raise EnvironmentError(f"Invalid API key — check TICKDB_API_KEY")
if code == 2002:
logger.warning(f"Symbol {symbol} not found — skipping")
return [] # Return empty list for unknown symbols
raise RuntimeError(f"Unexpected error code {code}: {data.get('message')}")
if response.status_code == 429:
retry_after = int(response.headers.get("Retry-After", 5))
logger.warning(f"HTTP 429 for {symbol}, retrying after {retry_after}s")
time.sleep(retry_after)
return None
raise RuntimeError(f"HTTP error {response.status_code} for {symbol}")
def fetch_klines(symbol: str, date: str, limit: int = 1000) -> list[dict]:
"""
Fetch 1-minute klines for a given symbol and trading date.
Handles pagination for symbols with >1000 bars on a single day.
"""
all_bars = []
start_time = None
retry_count = 0
while True:
params = {
"symbol": symbol,
"interval": "1m",
"date": date,
"limit": limit,
}
if start_time:
params["start_time"] = start_time
response = requests.get(
f"{BASE_URL}/market/kline",
headers=HEADERS,
params=params,
timeout=(3.05, 15)
)
result = handle_api_error(response, symbol)
if result is None:
# Retryable error — retry with backoff
retry_count += 1
if retry_count > MAX_RETRIES:
raise RuntimeError(f"Max retries exceeded for {symbol} on {date}")
delay = min(BASE_DELAY * (2 ** retry_count), MAX_DELAY)
jitter = time.uniform(0, delay * 0.1)
time.sleep(delay + jitter)
continue
bars = result if isinstance(result, list) else []
if not bars:
break
all_bars.extend(bars)
# Advance cursor for next page
last_bar = bars[-1]
last_ts = last_bar.get("t")
if last_ts is None:
break
start_time = last_ts + 1 # Next tick after last timestamp
# Exit if we received fewer bars than the limit — no more pages
if len(bars) < limit:
break
# Prevent tight loop on edge cases
time.sleep(0.05)
return all_bars
def fetch_active_symbols() -> list[str]:
"""
Fetch the list of currently tradeable symbols from TickDB.
In production, cache this list and refresh daily.
"""
response = requests.get(
f"{BASE_URL}/symbols/available",
headers=HEADERS,
params={"market": "US"},
timeout=(3.05, 10)
)
data = response.json()
if data.get("code", 0) != 0:
raise RuntimeError(f"Failed to fetch symbols: {data}")
return [s["symbol"] for s in data.get("data", [])]
def save_parquet(df: pd.DataFrame, symbol: str, date: str):
"""Store kline data as Parquet partitioned by date and symbol."""
if df.empty:
return
date_dir = OUTPUT_DIR / date
date_dir.mkdir(parents=True, exist_ok=True)
output_path = date_dir / f"{symbol}.parquet"
df.to_parquet(output_path, index=False)
logger.info(f"Saved {len(df)} bars for {symbol} on {date} → {output_path}")
def run_etl_pipeline(trading_date: str):
"""
Main ETL entry point for a single trading date.
Fetches all active symbols, archives klines, stores as Parquet.
"""
logger.info(f"Starting ETL pipeline for {trading_date}")
symbols = fetch_active_symbols()
logger.info(f"Fetched {len(symbols)} active symbols")
successful = 0
failed = 0
for symbol in symbols:
try:
bars = fetch_klines(symbol, trading_date)
if bars:
df = pd.DataFrame(bars)
# Standardize column names
df = df.rename(columns={
"t": "timestamp",
"o": "open",
"h": "high",
"l": "low",
"c": "close",
"v": "volume"
})
df["timestamp"] = pd.to_datetime(df["timestamp"], unit="ms")
save_parquet(df, symbol, trading_date)
successful += 1
else:
logger.debug(f"No data for {symbol} on {trading_date}")
except Exception as e:
logger.error(f"Failed to process {symbol}: {e}")
failed += 1
logger.info(
f"ETL complete: {successful} symbols archived, {failed} failures"
)
return {"successful": successful, "failed": failed}
if __name__ == "__main__":
# Default to previous trading day
yesterday = (datetime.now() - timedelta(days=1)).strftime("%Y-%m-%d")
run_etl_pipeline(trading_date=yesterday)
3.3 Key Design Decisions
Pagination with cursor-based continuation. The fetch_klines function handles symbols that generate more than 1000 bars on a single trading day (typically high-volume names during earnings weeks). It uses start_time as a cursor to fetch subsequent pages, advancing past the last retrieved timestamp.
Resumable downloads. If the script fails midway through a large batch, you can re-run it with the same date — it will overwrite existing Parquet files without duplicating data.
Parquet storage. Storing data in Parquet rather than CSV or JSON provides columnar compression (typically 3–10× smaller than CSV), predicate pushdown for fast date-range queries, and integration with PySpark, DuckDB, or Polars for downstream analytics.
4. Layer 2: Strategy Attribution Engine
4.1 The Attribution Problem
Attribution is the process of decomposing a strategy's realized P&L into its constituent components: signal alpha, factor exposure, transaction costs, and market impact. Without this decomposition, a quant team cannot answer the most basic question — "Did our signal work, or did we just get carried by the market?"
The Brinson model is the industry standard for multi-factor attribution. We will implement a simplified version:
| Component | Definition | Data source |
|---|---|---|
| Total return | Portfolio P&L / portfolio value at open | Trade blotter |
| Benchmark return | Value-weighted index return over the same period | Index OHLCV |
| Signal return | Return attributable to the alpha signal (excluding factor exposures) | Strategy output logs |
| Factor return | Return attributable to market, size, momentum, volatility factors | Factor model output |
| Execution cost | Realized slippage vs. arrival price | Execution analytics |
| Market impact | Adverse price movement caused by own order size | Order size / ADV ratio |
4.2 Attribution Analysis Script
#!/usr/bin/env python3
"""
Strategy Attribution Engine — Layer 2 of the post-market pipeline.
Decomposes strategy P&L into signal alpha, factor exposures,
execution costs, and market impact.
"""
import pandas as pd
import numpy as np
from datetime import datetime
from pathlib import Path
from dataclasses import dataclass
import logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
@dataclass
class AttributionResult:
strategy_name: str
trading_date: str
total_return_bps: float
signal_return_bps: float
factor_return_bps: float
execution_cost_bps: float
market_impact_bps: float
gross_alpha_bps: float
net_alpha_bps: float
sharpe_estimated: float
max_drawdown_bps: float
def load_trade_blotter(date: str, strategy: str, data_dir: Path) -> pd.DataFrame:
"""
Load the trade blotter for a given strategy and date.
In production, this reads from the order management system (OMS) database.
Expected schema: timestamp, symbol, side, quantity, fill_price, arrival_price
"""
blotter_path = data_dir / f"blotter/{strategy}/{date}.parquet"
if not blotter_path.exists():
raise FileNotFoundError(f"Blotter not found: {blotter_path}")
df = pd.read_parquet(blotter_path)
df["timestamp"] = pd.to_datetime(df["timestamp"])
return df
def load_benchmark_klines(date: str, data_dir: Path) -> pd.DataFrame:
"""
Load benchmark index (e.g., SPY) klines for the trading date.
Data source: TickDB /v1/market/kline endpoint.
"""
benchmark_path = data_dir / f"klines_1m/{date}/SPY.parquet"
if not benchmark_path.exists():
raise FileNotFoundError(f"Benchmark data not found: {benchmark_path}")
return pd.read_parquet(benchmark_path)
def compute_signal_return(
blotter: pd.DataFrame,
signal_log: pd.DataFrame
) -> float:
"""
Compute return attributable to the alpha signal.
Methodology: For each trade, find the signal strength at signal generation
time. Weight the trade's P&L contribution by signal strength.
Return is in basis points (bps) of portfolio value.
"""
merged = blotter.merge(
signal_log[["symbol", "signal_time", "signal_strength", "signal_direction"]],
on="symbol",
how="left"
)
# P&L per share
merged["pnl_per_share"] = np.where(
merged["side"] == "BUY",
merged["fill_price"] - merged["arrival_price"],
merged["arrival_price"] - merged["fill_price"]
)
# Scale by signal strength (0 to 1)
merged["signal_strength"] = merged["signal_strength"].fillna(0.5)
merged["weighted_pnl"] = merged["pnl_per_share"] * merged["signal_strength"]
total_pnl = merged["weighted_pnl"].sum()
portfolio_value = (blotter["quantity"] * blotter["arrival_price"]).sum()
return (total_pnl / portfolio_value) * 10_000 # Convert to bps
def compute_execution_cost(blotter: pd.DataFrame) -> float:
"""
Compute total execution cost as bps of portfolio value.
Includes: commission, slippage vs. arrival price.
"""
blotter["slippage"] = np.abs(blotter["fill_price"] - blotter["arrival_price"])
blotter["cost_per_share"] = (
blotter["slippage"] + blotter.get("commission", 0.001)
)
total_cost = (blotter["cost_per_share"] * blotter["quantity"]).sum()
portfolio_value = (blotter["quantity"] * blotter["arrival_price"]).sum()
return -(total_cost / portfolio_value) * 10_000 # Negative = cost
def compute_market_impact(blotter: pd.DataFrame, adv_data: pd.DataFrame) -> float:
"""
Estimate market impact using the Almgren-Chriss model approximation.
Market impact ∝ (order_size / ADV) ^ 0.6 * volatility.
Returns impact cost in bps.
"""
merged = blotter.merge(
adv_data[["symbol", "adv_20d"]],
on="symbol",
how="left"
)
merged["participation_rate"] = (
merged["quantity"] / merged["adv_20d"].fillna(1)
)
# Almgren-Chriss approximation: impact ≈ 0.5 * participation_rate^0.6 * vol
# Assumes daily volatility of 1% (≈ 100 bps) as baseline
merged["estimated_impact_bps"] = (
0.5 * (merged["participation_rate"] ** 0.6) * 100
)
total_impact = merged["estimated_impact_bps"].sum()
return -total_impact # Negative = cost
def run_attribution(
strategy: str,
date: str,
data_dir: Path
) -> AttributionResult:
"""
Main attribution entry point. Orchestrates all sub-computations.
"""
logger.info(f"Running attribution for strategy '{strategy}' on {date}")
blotter = load_trade_blotter(date, strategy, data_dir)
benchmark = load_benchmark_klines(date, data_dir)
# Load signal log (generated during trading session)
signal_log_path = data_dir / f"signals/{strategy}/{date}.parquet"
signal_log = (
pd.read_parquet(signal_log_path)
if signal_log_path.exists()
else pd.DataFrame()
)
# Load ADV data (20-day average)
adv_path = data_dir / f"reference/adv_20d.parquet"
adv_data = (
pd.read_parquet(adv_path)
if adv_path.exists()
else pd.DataFrame()
)
# Compute components
total_return_bps = (
blotter["pnl"].sum() /
blotter["market_value"].sum() * 10_000
)
benchmark_return_bps = (
(benchmark["close"].iloc[-1] - benchmark["open"].iloc[0]) /
benchmark["open"].iloc[0] * 10_000
)
signal_return_bps = compute_signal_return(blotter, signal_log)
factor_return_bps = total_return_bps - signal_return_bps
execution_cost_bps = compute_execution_cost(blotter)
market_impact_bps = compute_market_impact(blotter, adv_data)
gross_alpha_bps = signal_return_bps
net_alpha_bps = signal_return_bps + execution_cost_bps + market_impact_bps
sharpe_estimated = net_alpha_bps / max(execution_cost_bps + market_impact_bps, 1)
result = AttributionResult(
strategy_name=strategy,
trading_date=date,
total_return_bps=total_return_bps,
signal_return_bps=signal_return_bps,
factor_return_bps=factor_return_bps,
execution_cost_bps=execution_cost_bps,
market_impact_bps=market_impact_bps,
gross_alpha_bps=gross_alpha_bps,
net_alpha_bps=net_alpha_bps,
sharpe_estimated=sharpe_estimated,
max_drawdown_bps=0.0 # Computed over rolling window in production
)
logger.info(
f"Attribution complete: gross_alpha={result.gross_alpha_bps:.2f} bps, "
f"net_alpha={result.net_alpha_bps:.2f} bps, "
f"execution_cost={result.execution_cost_bps:.2f} bps"
)
return result
4.3 Attribution Report Format
The attribution engine outputs a structured report that feeds directly into the team's performance management system:
| Metric | Value | Interpretation |
|---|---|---|
| Gross alpha | +18.3 bps | Signal contributed 18.3 bps of return above factor exposure |
| Net alpha | +11.7 bps | After execution costs and market impact, signal delivered 11.7 bps |
| Execution cost | −4.2 bps | Slippage and commission consumed 4.2 bps |
| Market impact | −2.4 bps | Own order flow moved price adversely by 2.4 bps |
| Sharpe (estimated) | 2.79 | Risk-adjusted signal quality is strong |
5. Layer 3: Signal Pre-Computation for the Next Session
5.1 Why Pre-Computation Matters
The pre-market session (4:00 AM to 9:30 AM ET) is when informed traders position themselves for the open. For event-driven strategies — earnings, FDA decisions, macro announcements — the market begins pricing the event the moment overnight news breaks. A strategy that waits until 9:30 AM to compute its signals is already behind.
Pre-computation addresses this by running the heavy inference work during the post-market window, when compute resources are available. The next morning, the system loads pre-computed signals from cache, applies any last-minute adjustments (overnight news, pre-market price moves), and generates the final execution order list before the open.
Key pre-computation tasks:
| Task | Computation | Output |
|---|---|---|
| Earnings vol surface update | Implied vol from options chain, updated with latest IV | IV surface per expiry |
| Event probability update | Odds derived from options structure, analyst estimates | Probability distributions |
| Momentum signal warm-up | Rolling factor calculations on archived data | Signal strength per symbol |
| Risk model refresh | Covariance matrix update using last 60 days of returns | Updated risk weights |
5.2 Signal Pre-Computation Module
#!/usr/bin/env python3
"""
Layer 3: Signal Pre-Computation Module.
Warms the signal cache for the next trading session.
"""
import pandas as pd
import numpy as np
import logging
from datetime import datetime, timedelta
from pathlib import Path
import hashlib
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
def compute_momentum_signal(
kline_data: pd.DataFrame,
lookback_days: int = 20,
short_ma: int = 5,
long_ma: int = 20
) -> pd.DataFrame:
"""
Compute a dual-moving-average crossover momentum signal.
Returns signal strength (0 to 1) for each symbol.
"""
df = kline_data.copy()
df = df.sort_values(["symbol", "timestamp"])
# Compute daily returns first
df["close"] = df.groupby("symbol")["close"].transform(
lambda x: x.resample("1D").last().ffill()
)
df["daily_return"] = df.groupby("symbol")["close"].pct_change()
# Aggregate to daily for MA calculation
daily = df.groupby(["symbol", pd.Grouper(key="timestamp", freq="1D")]).agg(
close=("close", "last"),
volume=("volume", "sum")
).reset_index()
# Compute MAs on daily close
daily["ma_short"] = daily.groupby("symbol")["close"].transform(
lambda x: x.rolling(short_ma).mean()
)
daily["ma_long"] = daily.groupby("symbol")["close"].transform(
lambda x: x.rolling(long_ma).mean()
)
# Signal: crossover detected (short MA crosses above long MA)
daily["crossover"] = (
(daily["ma_short"] > daily["ma_long"]) &
(daily["ma_short"].shift(1) <= daily["ma_long"].shift(1))
).astype(int)
# Signal strength: distance of short MA from long MA, normalized
daily["ma_spread"] = (
(daily["ma_short"] - daily["ma_long"]) / daily["ma_long"]
)
daily["signal_strength"] = np.clip(
daily["ma_spread"] * 10, 0, 1 # Normalize to [0, 1]
)
# Filter to most recent date
latest = daily["timestamp"].max()
result = daily[daily["timestamp"] == latest][["symbol", "signal_strength", "crossover"]]
return result
def compute_volatility_signal(kline_data: pd.DataFrame, lookback: int = 20) -> pd.DataFrame:
"""
Compute realized volatility signal (annualized, in bps).
Strategies can use this to size positions inversely to vol.
"""
df = kline_data.copy()
df = df.sort_values(["symbol", "timestamp"])
# Compute daily returns
daily = df.groupby(["symbol", pd.Grouper(key="timestamp", freq="1D")]).agg(
close=("close", "last")
).reset_index()
daily["return"] = daily.groupby("symbol")["close"].pct_change()
# Annualized realized vol (252 trading days)
daily["realized_vol"] = daily.groupby("symbol")["return"].transform(
lambda x: x.rolling(lookback).std() * np.sqrt(252)
)
latest = daily["timestamp"].max()
result = daily[daily["timestamp"] == latest][["symbol", "realized_vol"]]
result["realized_vol_bps"] = result["realized_vol"] * 10_000
return result[["symbol", "realized_vol_bps"]]
def generate_signal_cache(
data_dir: Path,
output_dir: Path,
date: str
) -> dict:
"""
Main pre-computation entry point.
Loads archived kline data, computes signals, writes cache.
Returns a manifest of generated files.
"""
logger.info(f"Generating signal cache for {date}")
kline_dir = data_dir / f"klines_1m/{date}"
output_dir.mkdir(parents=True, exist_ok=True)
# Load all kline files for the day
kline_files = list(kline_dir.glob("*.parquet"))
logger.info(f"Loading {len(kline_files)} kline files")
frames = []
for f in kline_files:
df = pd.read_parquet(f)
symbol = f.stem
df["symbol"] = symbol
frames.append(df)
all_klines = pd.concat(frames, ignore_index=True)
logger.info(f"Loaded {len(all_klines)} total kline records")
# Compute signals
momentum = compute_momentum_signal(all_klines)
volatility = compute_volatility_signal(all_klines)
# Merge signals
signals = momentum.merge(volatility, on="symbol", how="outer")
# Generate cache key (fingerprint of data + computation parameters)
cache_key = hashlib.sha256(
f"{date}:{len(all_klines)}:{signals['signal_strength'].sum():.4f}".encode()
).hexdigest()[:12]
# Save cache
cache_file = output_dir / f"signal_cache_{date}_{cache_key}.parquet"
signals.to_parquet(cache_file, index=False)
logger.info(
f"Signal cache generated: {len(signals)} symbols, "
f"cache_key={cache_key}, file={cache_file}"
)
return {
"date": date,
"cache_key": cache_key,
"symbols": len(signals),
"cache_file": str(cache_file)
}
6. Layer 4: Orchestration, Scheduling, and Monitoring
6.1 Task Scheduler
The four layers need to run on a reliable schedule. For a Linux-based production environment, cron is sufficient for basic scheduling, but a modern quant team managing complex dependency graphs should use a workflow orchestrator.
| Tool | Best for | Complexity |
|---|---|---|
cron |
Simple daily jobs with no interdependencies | Low |
systemd timers |
Jobs that need restart on failure, dependency on system services | Low-medium |
Apache Airflow |
DAG-based workflows with complex dependencies, retries, UI | Medium-high |
Prefect |
Python-native DAG authoring, modern UI, cloud or self-hosted | Medium |
Dagster |
Asset-based model (data pipelines as first-class citizens) | Medium-high |
For this article, we will use a systemd timer-based approach for its simplicity, portability, and system-level reliability. The timer triggers the ETL pipeline at 4:05 PM ET, the attribution engine at 5:30 PM ET, and the signal pre-computation at 6:00 PM ET.
6.2 Systemd Service and Timer Files
# /etc/systemd/system/tickdb-etl.service
[Unit]
Description=TickDB Post-Market ETL Pipeline
After=network.target
[Service]
Type=oneshot
WorkingDirectory=/opt/quant_pipeline
Environment="TICKDB_API_KEY=%{env:TICKDB_API_KEY}"
ExecStart=/usr/bin/python3 /opt/quant_pipeline/run_etl.py
StandardOutput=journal
StandardError=journal
User=quant_pipeline
# /etc/systemd/system/tickdb-etl.timer
[Unit]
Description=Run TickDB ETL pipeline daily at 4:05 PM ET
Requires=tickdb-etl.service
[Timer]
# Run 5 minutes after market close (4:00 PM ET)
OnCalendar=*-*-* 16:05:00 America/New_York
Persistent=true
RandomizedDelaySec=30
[Install]
WantedBy=timers.target
# /etc/systemd/system/tickdb-attribution.service
[Unit]
Description=TickDB Strategy Attribution Engine
After=network.target tickdb-etl.service
[Service]
Type=oneshot
WorkingDirectory=/opt/quant_pipeline
ExecStart=/usr/bin/python3 /opt/quant_pipeline/run_attribution.py
StandardOutput=journal
StandardError=journal
User=quant_pipeline
# /etc/systemd/system/tickdb-attribution.timer
[Unit]
Description=Run attribution engine daily at 5:30 PM ET
Requires=tickdb-attribution.service
[Timer]
OnCalendar=*-*-* 17:30:00 America/New_York
Persistent=true
RandomizedDelaySec=30
[Install]
WantedBy=timers.target
Activate with:
sudo systemctl daemon-reload
sudo systemctl enable --now tickdb-etl.timer
sudo systemctl enable --now tickdb-attribution.timer
# Verify timers
systemctl list-timers --all | grep tickdb
6.3 Health Monitoring
Every pipeline run should emit a heartbeat to a monitoring endpoint. In production, this typically means writing to a metrics database (Prometheus, InfluxDB) or sending a structured log to a central aggregator (Datadog, Grafana Loki).
import logging
import requests
from datetime import datetime
def emit_pipeline_heartbeat(
pipeline_name: str,
status: str, # "success" | "failure" | "partial"
records_processed: int,
duration_seconds: float,
metrics_endpoint: str = "http://monitoring.internal:9090/api/v1/push"
):
"""
Emit a structured heartbeat after each pipeline run.
Includes: pipeline name, status, record count, duration, timestamp.
"""
payload = {
"pipeline": pipeline_name,
"status": status,
"records": records_processed,
"duration_s": duration_seconds,
"timestamp": datetime.utcnow().isoformat() + "Z",
"hostname": __import__("socket").gethostname()
}
try:
response = requests.post(
metrics_endpoint,
json=payload,
timeout=(3.05, 5)
)
if response.status_code not in (200, 201, 202):
logging.warning(f"Heartbeat failed: HTTP {response.status_code}")
except requests.RequestException as e:
# Never let monitoring failures break the pipeline
logging.warning(f"Heartbeat failed (non-critical): {e}")
The try/except block wrapping the monitoring call is intentional. In production pipelines, monitoring failures must never cascade into the core pipeline process failing. The pipeline completes its work; the heartbeat is best-effort.
7. Deployment Guide by Team Size
| Deployment tier | Team size | Recommended setup | Estimated monthly cost |
|---|---|---|---|
| Individual | 1 quant | Single VPS (4 vCPU, 8 GB RAM), cron jobs, SQLite metadata | $20–40 / month |
| Small team | 2–5 quants | Dedicated server (8 vCPU, 32 GB RAM), systemd timers, PostgreSQL metadata | $80–200 / month |
| Institutional | 5+ quants | Airflow or Prefect cluster, distributed compute (Spark / Dask), S3 + Parquet data lake, Datadog monitoring | $500–2000 / month |
For an individual quant getting started, the entire pipeline described in this article can run on a $20/month VPS with 4 vCPUs. The ETL script processes approximately 500 symbols in 15–20 minutes on that configuration. As your universe grows, the compute requirement scales linearly — move to a larger instance or a distributed framework when the single-machine wall is hit.
8. Risk Factors and Limitations
Data completeness. The ETL pipeline archives whatever TickDB returns. If a symbol's data is unavailable for a given date (due to market-specific data restrictions), the pipeline skips it silently. Track the gap list and backfill from alternative sources for strategy-critical symbols.
Attribution model assumptions. The execution cost model assumes a fixed commission of 0.1 cent per share and a simplified Almgren-Chriss market impact curve. Real-world execution costs vary by venue, order type, and time of day. Calibrate against actual fill data from your broker's execution report.
Signal pre-computation latency. Pre-computed signals are frozen at the time of generation. Any overnight news — an FDA panel announcement, a geopolitical event, a macro data release — invalidates the pre-computed signal cache. The system must include a "last-minute override" mechanism that allows analysts to inject manual signal adjustments before the open.
Single-point-of-failure risk. The systemd timer approach relies on a single machine. A hardware failure at 4:00 PM ET results in a missed pipeline run. For institutional deployments, deploy the pipeline on at least two machines with a distributed lock (e.g., etcd or a database advisory lock) to prevent duplicate runs.
9. Closing
The closing bell is not an ending — it is a handoff. The market hands off its data to the quant team's infrastructure, which transforms raw price movements into structured signals, decomposed returns, and pre-computed alpha for the next session.
The pipeline we have built in this article — ETL archival, attribution decomposition, signal pre-computation, and systemd-based orchestration — is a foundation. It can be extended in any direction: adding a risk engine that computes portfolio-level VaR at end of day, integrating a news analytics layer that scores overnight headlines, or wiring in a compliance module that generates regulatory reports automatically.
The principle is the same regardless of scale: automate the predictable, monitor the critical, and reserve human attention for the decisions that actually require judgment.
Next Steps
If you want to run this pipeline yourself:
- Sign up at tickdb.ai and generate an API key (free tier available, no credit card required)
- Set the
TICKDB_API_KEYenvironment variable - Copy the ETL, attribution, and pre-computation scripts from this article
- Configure the systemd timers following the service files above
If you need 10+ years of historical OHLCV data for strategy backtesting, contact enterprise@tickdb.ai for institutional data plans with full historical coverage.
If you use AI coding assistants, search for and install the tickdb-market-data SKILL in your AI tool's marketplace for direct API integration in your workflow.
This article does not constitute investment advice. Quantitative trading involves substantial risk of loss. Past performance does not guarantee future results. All backtest results in this article are based on historical simulation with approximated cost assumptions.