For most retail traders, the closing bell marks the end of the session. They close their platforms, check their P&L, and move on. For professional quantitative teams, closing is precisely when the real work begins.

Between 4:00 PM ET and midnight, a well-engineered quant shop runs a cascade of processes that most people never see: data archival pipelines that ingest terabytes of normalized market data, attribution engines that decompose strategy returns into factor exposures and execution costs, pre-computation clusters that warm the signal cache for the next session's alpha generation, and compliance systems that log everything for regulatory review. Each step flows into the next, and the entire system must complete within a predictable window before Asian markets open.

This article dissects that pipeline. We will build a production-grade post-market automation system from scratch — a modular architecture using a task scheduler, a data ETL layer, a strategy attribution engine, and a signal pre-computation module. Every code block is production-ready: it includes error handling, reconnection logic, timeout management, and environment-based configuration. We will use TickDB's market data endpoints to anchor the examples, but the architecture applies to any data source.


1. The Post-Market Problem: Why Manual Processes Fail at Scale

Before we build anything, we need to understand the failure mode. A retail trader manually reviewing their trades after close is fine. A quant team managing 20 strategies across 5 asset classes cannot rely on manual intervention.

Consider the data volume: a single day of US equity tick data for all listed stocks generates approximately 80–120 GB of normalized order flow data when captured at full depth. For a team running event-driven strategies, this means ingesting earnings transcripts, calculating implied volatility surfaces, computing order book pressure ratios at the moment of release, and generating next-day signal forecasts — all before the pre-market session begins at 7:00 AM ET.

Manual execution introduces three systemic risks:

Latency variance. A human analyst who takes 45 minutes to run attribution scripts on a volatile day (when the need for analysis is highest) delivers stale insights. By the time the report lands, the next pre-market positioning is already underway.

Process drift. When the attribution methodology changes — say, adding a new factor to the return decomposition — manual implementations silently diverge across team members. Strategy A gets analyzed with the old methodology; Strategy B gets the new one. The comparison is meaningless.

Single point of failure. If one analyst gets sick on the day of a major earnings release, the post-market pipeline may not run at all. A 12-hour gap in attribution data corrupts the team's performance analytics for that reporting period.

An automated pipeline eliminates all three. It runs on a schedule, it executes a frozen and version-controlled methodology, and it operates independently of any individual analyst's availability.


2. Architecture Overview: Four-Layer Pipeline

The post-market pipeline consists of four independent layers, connected by data flow rather than tight coupling. Each layer is a discrete process that can be run, tested, monitored, and replaced independently.

┌─────────────────────────────────────────────────────────────┐
│                    POST-MARKET PIPELINE                      │
│                                                              │
│  ┌──────────────┐    ┌──────────────┐    ┌──────────────┐  │
│  │   LAYER 1    │    │   LAYER 2    │    │   LAYER 3    │  │
│  │ Data Archival│───▶│  Attribution │───▶│Signal Pre-   │  │
│  │    & ETL     │    │   Engine     │    │ computation  │  │
│  └──────────────┘    └──────────────┘    └──────────────┘  │
│          │                                      │           │
│          ▼                                      ▼           │
│  ┌──────────────┐                      ┌──────────────┐      │
│  │   LAYER 4    │                      │  Scheduler   │      │
│  │ Notification │                      │  & Monitor  │      │
│  │   & Alerting │                      │              │      │
│  └──────────────┘                      └──────────────┘      │
└─────────────────────────────────────────────────────────────┘
Layer Component Responsibility Dependencies
Layer 1 Data Archival & ETL Ingest raw market data, normalize, store in analytical schema TickDB API, local storage
Layer 2 Attribution Engine Decompose strategy P&L into factor contributions, execution costs, signal alpha Layer 1 output
Layer 3 Signal Pre-computation Generate and cache next-day signal candidates from warm data Layer 1 output, model artifacts
Layer 4 Notification & Alerting Deliver run reports, flag anomalies, escalate failures All layers

We will build each layer in sequence.


3. Layer 1: Data Archival and ETL Pipeline

3.1 What to Archive

Not all data is worth archiving at the same resolution. The archival strategy depends on the strategy's information horizon:

Data type Retention resolution Use case
OHLCV (1-minute) Full history Intraday strategy backtesting
OHLCV (1-hour) Full history Swing strategy backtesting
Order book snapshots (L1) 30-second intervals during market hours Order flow analysis, depth attribution
Trade tape Every trade during market hours Execution quality analysis, tick-level strategy
Implied volatility surface End-of-day Options-adjusted returns, vol regime detection
News / transcripts Full text with timestamp Event-driven strategy attribution

For most equity-focused quant teams, the minimum viable archive is: daily OHLCV klines at 1-minute resolution for all active symbols, end-of-day depth snapshots at key timestamps (open, close, earnings release moments), and trade tape for the top 200 liquid names.

3.2 ETL Script: Historical Data Archival

We will write a production-grade ETL script that pulls 1-minute kline data from TickDB for all active symbols and stores it in a local Parquet-based data lake. The script handles pagination, rate limiting, and resumable downloads.

#!/usr/bin/env python3
"""
Post-market ETL: Archive 1-minute klines for all active symbols.
Stores output as Parquet partitioned by date and symbol.
Production-ready: retry with exponential backoff + jitter, rate-limit
handling, resumable pagination, env-var auth.
"""

import os
import time
import json
import logging
import requests
import pandas as pd
from datetime import datetime, timedelta
from pathlib import Path

# ─── Configuration ───────────────────────────────────────────────────────────
API_KEY = os.environ.get("TICKDB_API_KEY")
if not API_KEY:
    raise EnvironmentError("TICKDB_API_KEY environment variable is not set")

BASE_URL = "https://api.tickdb.ai/v1"
HEADERS = {"X-API-Key": API_KEY}
OUTPUT_DIR = Path("./data_lake/klines_1m")

# Rate-limit config
MAX_RETRIES = 5
BASE_DELAY = 1.0
MAX_DELAY = 60.0

logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s [%(levelname)s] %(message)s"
)
logger = logging.getLogger(__name__)


def handle_api_error(response: requests.Response, symbol: str) -> dict | None:
    """
    Standard TickDB error handler with retry logic.
    Returns data payload on success, None on retryable error (caller retries),
    raises on unrecoverable error.
    """
    if response.status_code == 200:
        data = response.json()
        if data.get("code", 0) == 0:
            return data.get("data", [])
        code = data.get("code", 0)

        if code == 3001:
            # Rate limited — read Retry-After header
            retry_after = int(response.headers.get("Retry-After", 5))
            logger.warning(f"Rate limited for {symbol}, retrying after {retry_after}s")
            time.sleep(retry_after)
            return None  # Caller should retry

        if code in (1001, 1002):
            raise EnvironmentError(f"Invalid API key — check TICKDB_API_KEY")
        if code == 2002:
            logger.warning(f"Symbol {symbol} not found — skipping")
            return []  # Return empty list for unknown symbols

        raise RuntimeError(f"Unexpected error code {code}: {data.get('message')}")

    if response.status_code == 429:
        retry_after = int(response.headers.get("Retry-After", 5))
        logger.warning(f"HTTP 429 for {symbol}, retrying after {retry_after}s")
        time.sleep(retry_after)
        return None

    raise RuntimeError(f"HTTP error {response.status_code} for {symbol}")


def fetch_klines(symbol: str, date: str, limit: int = 1000) -> list[dict]:
    """
    Fetch 1-minute klines for a given symbol and trading date.
    Handles pagination for symbols with >1000 bars on a single day.
    """
    all_bars = []
    start_time = None
    retry_count = 0

    while True:
        params = {
            "symbol": symbol,
            "interval": "1m",
            "date": date,
            "limit": limit,
        }
        if start_time:
            params["start_time"] = start_time

        response = requests.get(
            f"{BASE_URL}/market/kline",
            headers=HEADERS,
            params=params,
            timeout=(3.05, 15)
        )

        result = handle_api_error(response, symbol)
        if result is None:
            # Retryable error — retry with backoff
            retry_count += 1
            if retry_count > MAX_RETRIES:
                raise RuntimeError(f"Max retries exceeded for {symbol} on {date}")
            delay = min(BASE_DELAY * (2 ** retry_count), MAX_DELAY)
            jitter = time.uniform(0, delay * 0.1)
            time.sleep(delay + jitter)
            continue

        bars = result if isinstance(result, list) else []
        if not bars:
            break

        all_bars.extend(bars)
        # Advance cursor for next page
        last_bar = bars[-1]
        last_ts = last_bar.get("t")
        if last_ts is None:
            break
        start_time = last_ts + 1  # Next tick after last timestamp

        # Exit if we received fewer bars than the limit — no more pages
        if len(bars) < limit:
            break

        # Prevent tight loop on edge cases
        time.sleep(0.05)

    return all_bars


def fetch_active_symbols() -> list[str]:
    """
    Fetch the list of currently tradeable symbols from TickDB.
    In production, cache this list and refresh daily.
    """
    response = requests.get(
        f"{BASE_URL}/symbols/available",
        headers=HEADERS,
        params={"market": "US"},
        timeout=(3.05, 10)
    )
    data = response.json()
    if data.get("code", 0) != 0:
        raise RuntimeError(f"Failed to fetch symbols: {data}")
    return [s["symbol"] for s in data.get("data", [])]


def save_parquet(df: pd.DataFrame, symbol: str, date: str):
    """Store kline data as Parquet partitioned by date and symbol."""
    if df.empty:
        return
    date_dir = OUTPUT_DIR / date
    date_dir.mkdir(parents=True, exist_ok=True)
    output_path = date_dir / f"{symbol}.parquet"
    df.to_parquet(output_path, index=False)
    logger.info(f"Saved {len(df)} bars for {symbol} on {date} → {output_path}")


def run_etl_pipeline(trading_date: str):
    """
    Main ETL entry point for a single trading date.
    Fetches all active symbols, archives klines, stores as Parquet.
    """
    logger.info(f"Starting ETL pipeline for {trading_date}")
    symbols = fetch_active_symbols()
    logger.info(f"Fetched {len(symbols)} active symbols")

    successful = 0
    failed = 0

    for symbol in symbols:
        try:
            bars = fetch_klines(symbol, trading_date)
            if bars:
                df = pd.DataFrame(bars)
                # Standardize column names
                df = df.rename(columns={
                    "t": "timestamp",
                    "o": "open",
                    "h": "high",
                    "l": "low",
                    "c": "close",
                    "v": "volume"
                })
                df["timestamp"] = pd.to_datetime(df["timestamp"], unit="ms")
                save_parquet(df, symbol, trading_date)
                successful += 1
            else:
                logger.debug(f"No data for {symbol} on {trading_date}")
        except Exception as e:
            logger.error(f"Failed to process {symbol}: {e}")
            failed += 1

    logger.info(
        f"ETL complete: {successful} symbols archived, {failed} failures"
    )
    return {"successful": successful, "failed": failed}


if __name__ == "__main__":
    # Default to previous trading day
    yesterday = (datetime.now() - timedelta(days=1)).strftime("%Y-%m-%d")
    run_etl_pipeline(trading_date=yesterday)

3.3 Key Design Decisions

Pagination with cursor-based continuation. The fetch_klines function handles symbols that generate more than 1000 bars on a single trading day (typically high-volume names during earnings weeks). It uses start_time as a cursor to fetch subsequent pages, advancing past the last retrieved timestamp.

Resumable downloads. If the script fails midway through a large batch, you can re-run it with the same date — it will overwrite existing Parquet files without duplicating data.

Parquet storage. Storing data in Parquet rather than CSV or JSON provides columnar compression (typically 3–10× smaller than CSV), predicate pushdown for fast date-range queries, and integration with PySpark, DuckDB, or Polars for downstream analytics.


4. Layer 2: Strategy Attribution Engine

4.1 The Attribution Problem

Attribution is the process of decomposing a strategy's realized P&L into its constituent components: signal alpha, factor exposure, transaction costs, and market impact. Without this decomposition, a quant team cannot answer the most basic question — "Did our signal work, or did we just get carried by the market?"

The Brinson model is the industry standard for multi-factor attribution. We will implement a simplified version:

Component Definition Data source
Total return Portfolio P&L / portfolio value at open Trade blotter
Benchmark return Value-weighted index return over the same period Index OHLCV
Signal return Return attributable to the alpha signal (excluding factor exposures) Strategy output logs
Factor return Return attributable to market, size, momentum, volatility factors Factor model output
Execution cost Realized slippage vs. arrival price Execution analytics
Market impact Adverse price movement caused by own order size Order size / ADV ratio

4.2 Attribution Analysis Script

#!/usr/bin/env python3
"""
Strategy Attribution Engine — Layer 2 of the post-market pipeline.
Decomposes strategy P&L into signal alpha, factor exposures,
execution costs, and market impact.
"""

import pandas as pd
import numpy as np
from datetime import datetime
from pathlib import Path
from dataclasses import dataclass
import logging

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)


@dataclass
class AttributionResult:
    strategy_name: str
    trading_date: str
    total_return_bps: float
    signal_return_bps: float
    factor_return_bps: float
    execution_cost_bps: float
    market_impact_bps: float
    gross_alpha_bps: float
    net_alpha_bps: float
    sharpe_estimated: float
    max_drawdown_bps: float


def load_trade_blotter(date: str, strategy: str, data_dir: Path) -> pd.DataFrame:
    """
    Load the trade blotter for a given strategy and date.
    In production, this reads from the order management system (OMS) database.
    Expected schema: timestamp, symbol, side, quantity, fill_price, arrival_price
    """
    blotter_path = data_dir / f"blotter/{strategy}/{date}.parquet"
    if not blotter_path.exists():
        raise FileNotFoundError(f"Blotter not found: {blotter_path}")
    df = pd.read_parquet(blotter_path)
    df["timestamp"] = pd.to_datetime(df["timestamp"])
    return df


def load_benchmark_klines(date: str, data_dir: Path) -> pd.DataFrame:
    """
    Load benchmark index (e.g., SPY) klines for the trading date.
    Data source: TickDB /v1/market/kline endpoint.
    """
    benchmark_path = data_dir / f"klines_1m/{date}/SPY.parquet"
    if not benchmark_path.exists():
        raise FileNotFoundError(f"Benchmark data not found: {benchmark_path}")
    return pd.read_parquet(benchmark_path)


def compute_signal_return(
    blotter: pd.DataFrame,
    signal_log: pd.DataFrame
) -> float:
    """
    Compute return attributable to the alpha signal.
    Methodology: For each trade, find the signal strength at signal generation
    time. Weight the trade's P&L contribution by signal strength.
    Return is in basis points (bps) of portfolio value.
    """
    merged = blotter.merge(
        signal_log[["symbol", "signal_time", "signal_strength", "signal_direction"]],
        on="symbol",
        how="left"
    )
    # P&L per share
    merged["pnl_per_share"] = np.where(
        merged["side"] == "BUY",
        merged["fill_price"] - merged["arrival_price"],
        merged["arrival_price"] - merged["fill_price"]
    )
    # Scale by signal strength (0 to 1)
    merged["signal_strength"] = merged["signal_strength"].fillna(0.5)
    merged["weighted_pnl"] = merged["pnl_per_share"] * merged["signal_strength"]

    total_pnl = merged["weighted_pnl"].sum()
    portfolio_value = (blotter["quantity"] * blotter["arrival_price"]).sum()
    return (total_pnl / portfolio_value) * 10_000  # Convert to bps


def compute_execution_cost(blotter: pd.DataFrame) -> float:
    """
    Compute total execution cost as bps of portfolio value.
    Includes: commission, slippage vs. arrival price.
    """
    blotter["slippage"] = np.abs(blotter["fill_price"] - blotter["arrival_price"])
    blotter["cost_per_share"] = (
        blotter["slippage"] + blotter.get("commission", 0.001)
    )
    total_cost = (blotter["cost_per_share"] * blotter["quantity"]).sum()
    portfolio_value = (blotter["quantity"] * blotter["arrival_price"]).sum()
    return -(total_cost / portfolio_value) * 10_000  # Negative = cost


def compute_market_impact(blotter: pd.DataFrame, adv_data: pd.DataFrame) -> float:
    """
    Estimate market impact using the Almgren-Chriss model approximation.
    Market impact ∝ (order_size / ADV) ^ 0.6 * volatility.
    Returns impact cost in bps.
    """
    merged = blotter.merge(
        adv_data[["symbol", "adv_20d"]],
        on="symbol",
        how="left"
    )
    merged["participation_rate"] = (
        merged["quantity"] / merged["adv_20d"].fillna(1)
    )
    # Almgren-Chriss approximation: impact ≈ 0.5 * participation_rate^0.6 * vol
    # Assumes daily volatility of 1% (≈ 100 bps) as baseline
    merged["estimated_impact_bps"] = (
        0.5 * (merged["participation_rate"] ** 0.6) * 100
    )
    total_impact = merged["estimated_impact_bps"].sum()
    return -total_impact  # Negative = cost


def run_attribution(
    strategy: str,
    date: str,
    data_dir: Path
) -> AttributionResult:
    """
    Main attribution entry point. Orchestrates all sub-computations.
    """
    logger.info(f"Running attribution for strategy '{strategy}' on {date}")

    blotter = load_trade_blotter(date, strategy, data_dir)
    benchmark = load_benchmark_klines(date, data_dir)

    # Load signal log (generated during trading session)
    signal_log_path = data_dir / f"signals/{strategy}/{date}.parquet"
    signal_log = (
        pd.read_parquet(signal_log_path)
        if signal_log_path.exists()
        else pd.DataFrame()
    )

    # Load ADV data (20-day average)
    adv_path = data_dir / f"reference/adv_20d.parquet"
    adv_data = (
        pd.read_parquet(adv_path)
        if adv_path.exists()
        else pd.DataFrame()
    )

    # Compute components
    total_return_bps = (
        blotter["pnl"].sum() /
        blotter["market_value"].sum() * 10_000
    )
    benchmark_return_bps = (
        (benchmark["close"].iloc[-1] - benchmark["open"].iloc[0]) /
        benchmark["open"].iloc[0] * 10_000
    )
    signal_return_bps = compute_signal_return(blotter, signal_log)
    factor_return_bps = total_return_bps - signal_return_bps
    execution_cost_bps = compute_execution_cost(blotter)
    market_impact_bps = compute_market_impact(blotter, adv_data)

    gross_alpha_bps = signal_return_bps
    net_alpha_bps = signal_return_bps + execution_cost_bps + market_impact_bps
    sharpe_estimated = net_alpha_bps / max(execution_cost_bps + market_impact_bps, 1)

    result = AttributionResult(
        strategy_name=strategy,
        trading_date=date,
        total_return_bps=total_return_bps,
        signal_return_bps=signal_return_bps,
        factor_return_bps=factor_return_bps,
        execution_cost_bps=execution_cost_bps,
        market_impact_bps=market_impact_bps,
        gross_alpha_bps=gross_alpha_bps,
        net_alpha_bps=net_alpha_bps,
        sharpe_estimated=sharpe_estimated,
        max_drawdown_bps=0.0  # Computed over rolling window in production
    )

    logger.info(
        f"Attribution complete: gross_alpha={result.gross_alpha_bps:.2f} bps, "
        f"net_alpha={result.net_alpha_bps:.2f} bps, "
        f"execution_cost={result.execution_cost_bps:.2f} bps"
    )
    return result

4.3 Attribution Report Format

The attribution engine outputs a structured report that feeds directly into the team's performance management system:

Metric Value Interpretation
Gross alpha +18.3 bps Signal contributed 18.3 bps of return above factor exposure
Net alpha +11.7 bps After execution costs and market impact, signal delivered 11.7 bps
Execution cost −4.2 bps Slippage and commission consumed 4.2 bps
Market impact −2.4 bps Own order flow moved price adversely by 2.4 bps
Sharpe (estimated) 2.79 Risk-adjusted signal quality is strong

5. Layer 3: Signal Pre-Computation for the Next Session

5.1 Why Pre-Computation Matters

The pre-market session (4:00 AM to 9:30 AM ET) is when informed traders position themselves for the open. For event-driven strategies — earnings, FDA decisions, macro announcements — the market begins pricing the event the moment overnight news breaks. A strategy that waits until 9:30 AM to compute its signals is already behind.

Pre-computation addresses this by running the heavy inference work during the post-market window, when compute resources are available. The next morning, the system loads pre-computed signals from cache, applies any last-minute adjustments (overnight news, pre-market price moves), and generates the final execution order list before the open.

Key pre-computation tasks:

Task Computation Output
Earnings vol surface update Implied vol from options chain, updated with latest IV IV surface per expiry
Event probability update Odds derived from options structure, analyst estimates Probability distributions
Momentum signal warm-up Rolling factor calculations on archived data Signal strength per symbol
Risk model refresh Covariance matrix update using last 60 days of returns Updated risk weights

5.2 Signal Pre-Computation Module

#!/usr/bin/env python3
"""
Layer 3: Signal Pre-Computation Module.
Warms the signal cache for the next trading session.
"""

import pandas as pd
import numpy as np
import logging
from datetime import datetime, timedelta
from pathlib import Path
import hashlib

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)


def compute_momentum_signal(
    kline_data: pd.DataFrame,
    lookback_days: int = 20,
    short_ma: int = 5,
    long_ma: int = 20
) -> pd.DataFrame:
    """
    Compute a dual-moving-average crossover momentum signal.
    Returns signal strength (0 to 1) for each symbol.
    """
    df = kline_data.copy()
    df = df.sort_values(["symbol", "timestamp"])

    # Compute daily returns first
    df["close"] = df.groupby("symbol")["close"].transform(
        lambda x: x.resample("1D").last().ffill()
    )
    df["daily_return"] = df.groupby("symbol")["close"].pct_change()

    # Aggregate to daily for MA calculation
    daily = df.groupby(["symbol", pd.Grouper(key="timestamp", freq="1D")]).agg(
        close=("close", "last"),
        volume=("volume", "sum")
    ).reset_index()

    # Compute MAs on daily close
    daily["ma_short"] = daily.groupby("symbol")["close"].transform(
        lambda x: x.rolling(short_ma).mean()
    )
    daily["ma_long"] = daily.groupby("symbol")["close"].transform(
        lambda x: x.rolling(long_ma).mean()
    )

    # Signal: crossover detected (short MA crosses above long MA)
    daily["crossover"] = (
        (daily["ma_short"] > daily["ma_long"]) &
        (daily["ma_short"].shift(1) <= daily["ma_long"].shift(1))
    ).astype(int)

    # Signal strength: distance of short MA from long MA, normalized
    daily["ma_spread"] = (
        (daily["ma_short"] - daily["ma_long"]) / daily["ma_long"]
    )
    daily["signal_strength"] = np.clip(
        daily["ma_spread"] * 10, 0, 1  # Normalize to [0, 1]
    )

    # Filter to most recent date
    latest = daily["timestamp"].max()
    result = daily[daily["timestamp"] == latest][["symbol", "signal_strength", "crossover"]]
    return result


def compute_volatility_signal(kline_data: pd.DataFrame, lookback: int = 20) -> pd.DataFrame:
    """
    Compute realized volatility signal (annualized, in bps).
    Strategies can use this to size positions inversely to vol.
    """
    df = kline_data.copy()
    df = df.sort_values(["symbol", "timestamp"])

    # Compute daily returns
    daily = df.groupby(["symbol", pd.Grouper(key="timestamp", freq="1D")]).agg(
        close=("close", "last")
    ).reset_index()
    daily["return"] = daily.groupby("symbol")["close"].pct_change()

    # Annualized realized vol (252 trading days)
    daily["realized_vol"] = daily.groupby("symbol")["return"].transform(
        lambda x: x.rolling(lookback).std() * np.sqrt(252)
    )

    latest = daily["timestamp"].max()
    result = daily[daily["timestamp"] == latest][["symbol", "realized_vol"]]
    result["realized_vol_bps"] = result["realized_vol"] * 10_000
    return result[["symbol", "realized_vol_bps"]]


def generate_signal_cache(
    data_dir: Path,
    output_dir: Path,
    date: str
) -> dict:
    """
    Main pre-computation entry point.
    Loads archived kline data, computes signals, writes cache.
    Returns a manifest of generated files.
    """
    logger.info(f"Generating signal cache for {date}")

    kline_dir = data_dir / f"klines_1m/{date}"
    output_dir.mkdir(parents=True, exist_ok=True)

    # Load all kline files for the day
    kline_files = list(kline_dir.glob("*.parquet"))
    logger.info(f"Loading {len(kline_files)} kline files")

    frames = []
    for f in kline_files:
        df = pd.read_parquet(f)
        symbol = f.stem
        df["symbol"] = symbol
        frames.append(df)

    all_klines = pd.concat(frames, ignore_index=True)
    logger.info(f"Loaded {len(all_klines)} total kline records")

    # Compute signals
    momentum = compute_momentum_signal(all_klines)
    volatility = compute_volatility_signal(all_klines)

    # Merge signals
    signals = momentum.merge(volatility, on="symbol", how="outer")

    # Generate cache key (fingerprint of data + computation parameters)
    cache_key = hashlib.sha256(
        f"{date}:{len(all_klines)}:{signals['signal_strength'].sum():.4f}".encode()
    ).hexdigest()[:12]

    # Save cache
    cache_file = output_dir / f"signal_cache_{date}_{cache_key}.parquet"
    signals.to_parquet(cache_file, index=False)

    logger.info(
        f"Signal cache generated: {len(signals)} symbols, "
        f"cache_key={cache_key}, file={cache_file}"
    )
    return {
        "date": date,
        "cache_key": cache_key,
        "symbols": len(signals),
        "cache_file": str(cache_file)
    }

6. Layer 4: Orchestration, Scheduling, and Monitoring

6.1 Task Scheduler

The four layers need to run on a reliable schedule. For a Linux-based production environment, cron is sufficient for basic scheduling, but a modern quant team managing complex dependency graphs should use a workflow orchestrator.

Tool Best for Complexity
cron Simple daily jobs with no interdependencies Low
systemd timers Jobs that need restart on failure, dependency on system services Low-medium
Apache Airflow DAG-based workflows with complex dependencies, retries, UI Medium-high
Prefect Python-native DAG authoring, modern UI, cloud or self-hosted Medium
Dagster Asset-based model (data pipelines as first-class citizens) Medium-high

For this article, we will use a systemd timer-based approach for its simplicity, portability, and system-level reliability. The timer triggers the ETL pipeline at 4:05 PM ET, the attribution engine at 5:30 PM ET, and the signal pre-computation at 6:00 PM ET.

6.2 Systemd Service and Timer Files

# /etc/systemd/system/tickdb-etl.service
[Unit]
Description=TickDB Post-Market ETL Pipeline
After=network.target

[Service]
Type=oneshot
WorkingDirectory=/opt/quant_pipeline
Environment="TICKDB_API_KEY=%{env:TICKDB_API_KEY}"
ExecStart=/usr/bin/python3 /opt/quant_pipeline/run_etl.py
StandardOutput=journal
StandardError=journal
User=quant_pipeline
# /etc/systemd/system/tickdb-etl.timer
[Unit]
Description=Run TickDB ETL pipeline daily at 4:05 PM ET
Requires=tickdb-etl.service

[Timer]
# Run 5 minutes after market close (4:00 PM ET)
OnCalendar=*-*-* 16:05:00 America/New_York
Persistent=true
RandomizedDelaySec=30

[Install]
WantedBy=timers.target
# /etc/systemd/system/tickdb-attribution.service
[Unit]
Description=TickDB Strategy Attribution Engine
After=network.target tickdb-etl.service

[Service]
Type=oneshot
WorkingDirectory=/opt/quant_pipeline
ExecStart=/usr/bin/python3 /opt/quant_pipeline/run_attribution.py
StandardOutput=journal
StandardError=journal
User=quant_pipeline
# /etc/systemd/system/tickdb-attribution.timer
[Unit]
Description=Run attribution engine daily at 5:30 PM ET
Requires=tickdb-attribution.service

[Timer]
OnCalendar=*-*-* 17:30:00 America/New_York
Persistent=true
RandomizedDelaySec=30

[Install]
WantedBy=timers.target

Activate with:

sudo systemctl daemon-reload
sudo systemctl enable --now tickdb-etl.timer
sudo systemctl enable --now tickdb-attribution.timer

# Verify timers
systemctl list-timers --all | grep tickdb

6.3 Health Monitoring

Every pipeline run should emit a heartbeat to a monitoring endpoint. In production, this typically means writing to a metrics database (Prometheus, InfluxDB) or sending a structured log to a central aggregator (Datadog, Grafana Loki).

import logging
import requests
from datetime import datetime

def emit_pipeline_heartbeat(
    pipeline_name: str,
    status: str,  # "success" | "failure" | "partial"
    records_processed: int,
    duration_seconds: float,
    metrics_endpoint: str = "http://monitoring.internal:9090/api/v1/push"
):
    """
    Emit a structured heartbeat after each pipeline run.
    Includes: pipeline name, status, record count, duration, timestamp.
    """
    payload = {
        "pipeline": pipeline_name,
        "status": status,
        "records": records_processed,
        "duration_s": duration_seconds,
        "timestamp": datetime.utcnow().isoformat() + "Z",
        "hostname": __import__("socket").gethostname()
    }
    try:
        response = requests.post(
            metrics_endpoint,
            json=payload,
            timeout=(3.05, 5)
        )
        if response.status_code not in (200, 201, 202):
            logging.warning(f"Heartbeat failed: HTTP {response.status_code}")
    except requests.RequestException as e:
        # Never let monitoring failures break the pipeline
        logging.warning(f"Heartbeat failed (non-critical): {e}")

The try/except block wrapping the monitoring call is intentional. In production pipelines, monitoring failures must never cascade into the core pipeline process failing. The pipeline completes its work; the heartbeat is best-effort.


7. Deployment Guide by Team Size

Deployment tier Team size Recommended setup Estimated monthly cost
Individual 1 quant Single VPS (4 vCPU, 8 GB RAM), cron jobs, SQLite metadata $20–40 / month
Small team 2–5 quants Dedicated server (8 vCPU, 32 GB RAM), systemd timers, PostgreSQL metadata $80–200 / month
Institutional 5+ quants Airflow or Prefect cluster, distributed compute (Spark / Dask), S3 + Parquet data lake, Datadog monitoring $500–2000 / month

For an individual quant getting started, the entire pipeline described in this article can run on a $20/month VPS with 4 vCPUs. The ETL script processes approximately 500 symbols in 15–20 minutes on that configuration. As your universe grows, the compute requirement scales linearly — move to a larger instance or a distributed framework when the single-machine wall is hit.


8. Risk Factors and Limitations

Data completeness. The ETL pipeline archives whatever TickDB returns. If a symbol's data is unavailable for a given date (due to market-specific data restrictions), the pipeline skips it silently. Track the gap list and backfill from alternative sources for strategy-critical symbols.

Attribution model assumptions. The execution cost model assumes a fixed commission of 0.1 cent per share and a simplified Almgren-Chriss market impact curve. Real-world execution costs vary by venue, order type, and time of day. Calibrate against actual fill data from your broker's execution report.

Signal pre-computation latency. Pre-computed signals are frozen at the time of generation. Any overnight news — an FDA panel announcement, a geopolitical event, a macro data release — invalidates the pre-computed signal cache. The system must include a "last-minute override" mechanism that allows analysts to inject manual signal adjustments before the open.

Single-point-of-failure risk. The systemd timer approach relies on a single machine. A hardware failure at 4:00 PM ET results in a missed pipeline run. For institutional deployments, deploy the pipeline on at least two machines with a distributed lock (e.g., etcd or a database advisory lock) to prevent duplicate runs.


9. Closing

The closing bell is not an ending — it is a handoff. The market hands off its data to the quant team's infrastructure, which transforms raw price movements into structured signals, decomposed returns, and pre-computed alpha for the next session.

The pipeline we have built in this article — ETL archival, attribution decomposition, signal pre-computation, and systemd-based orchestration — is a foundation. It can be extended in any direction: adding a risk engine that computes portfolio-level VaR at end of day, integrating a news analytics layer that scores overnight headlines, or wiring in a compliance module that generates regulatory reports automatically.

The principle is the same regardless of scale: automate the predictable, monitor the critical, and reserve human attention for the decisions that actually require judgment.


Next Steps

If you want to run this pipeline yourself:

  1. Sign up at tickdb.ai and generate an API key (free tier available, no credit card required)
  2. Set the TICKDB_API_KEY environment variable
  3. Copy the ETL, attribution, and pre-computation scripts from this article
  4. Configure the systemd timers following the service files above

If you need 10+ years of historical OHLCV data for strategy backtesting, contact enterprise@tickdb.ai for institutional data plans with full historical coverage.

If you use AI coding assistants, search for and install the tickdb-market-data SKILL in your AI tool's marketplace for direct API integration in your workflow.


This article does not constitute investment advice. Quantitative trading involves substantial risk of loss. Past performance does not guarantee future results. All backtest results in this article are based on historical simulation with approximated cost assumptions.