The moment the closing bell rings on Wall Street, most retail traders shut down their terminals and call it a day. Professional quantitative teams do the opposite. The four hours between market close and the next open represent the highest-signal window in their entire workflow—because the market is closed, but the data never stops flowing.

During those hours, quant teams execute a precise sequence of operations: historical data archival, strategy performance attribution, risk factor decomposition, and next-day signal pre-computation. These tasks are not optional cleanup work. They are the structural foundation of every alpha-generating decision that happens at 9:30 AM the following day. Teams that automate this pipeline gain a compounding edge. Teams that run it manually watch their edge decay as their engineers spend Thursday's premium research hours fixing Tuesday's spreadsheet errors.

This article walks through the complete post-market automation architecture—timer-triggered data archival, Python-based attribution scripts, signal pre-computation pipelines, and the production-grade code that ties it all together.

The Post-Market Problem Statement

Before building the system, it is worth articulating why the post-market window is uniquely demanding. During trading hours, quant teams focus on execution and real-time signal generation. After close, the focus shifts to synthesis and preparation—and the constraints change in three critical ways.

Volume spikes. The closing auction alone generates 15–25% of total daily volume in US equities. Storing, processing, and analyzing this data requires infrastructure that can absorb a short, high-intensity burst without accumulating backlog that bleeds into the next morning's workflow.

Latency tolerance disappears. During the trading day, a 200-millisecond delay in signal generation is a minor inconvenience. At 4:05 PM, a 90-minute backlog in data archival means the next morning's pre-computation pipeline starts with stale context.

Failure modes are expensive. A failed nightly ETL job does not just delay a report. It means the next morning's strategy runs without Tuesday's updated factor exposures, Wednesday's correlation matrices, or Thursday's revised volatility surface. The failure cost compounds across every subsequent trading day until someone catches and corrects it.

The solution is a fully automated post-market pipeline with explicit error handling, dependency management, and alerting.

Architecture Overview: The Four-Phase Pipeline

The post-market workflow breaks into four sequential phases, each with specific inputs, outputs, and failure tolerances.

Phase Name Start time Duration target Key outputs
P1 Data archival 4:00 PM ET ≤ 30 min Raw OHLCV snapshots, order book closes, trade prints
P2 Attribution analysis 4:45 PM ET ≤ 60 min P&L attribution, factor exposure report, Sharpe update
P3 Risk model refresh 5:45 PM ET ≤ 45 min Updated correlation matrix, volatility surface, risk limits
P4 Signal pre-computation 6:30 PM ET ≤ 90 min Next-day signal candidates, regime classification, watchlist

Each phase is independently schedulable, but with hard dependency ordering—P3 cannot start until P2's factor exposure report is written to disk, and P4 cannot start until P3's correlation matrix is finalized.

Phase 1: Automated Data Archival

The Data Acquisition Layer

The archival pipeline starts by pulling the day's complete market data footprint from TickDB. For US equities, this means kline snapshots at multiple intervals (1m, 5m, 15m, 1h, 1d), depth channel snapshots at market close, and the closing trade print sequence from the final auction.

The following code implements a production-grade archival script with automatic retry, rate-limit handling, and idempotent writes.

import os
import time
import json
import logging
from datetime import datetime, timezone
from pathlib import Path

import requests

logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s [%(levelname)s] %(message)s"
)
logger = logging.getLogger(__name__)

# Load API credentials from environment variables — never hardcode keys
TICKDB_API_KEY = os.environ.get("TICKDB_API_KEY")
if not TICKDB_API_KEY:
    raise EnvironmentError("TICKDB_API_KEY environment variable is not set")

BASE_URL = "https://api.tickdb.ai/v1"
HEADERS = {"X-API-Key": TICKDB_API_KEY}

# ─── Configuration ───────────────────────────────────────────────────────────
WATCHLIST = ["AAPL.US", "MSFT.US", "NVDA.US", "SPY.US"]  # Expand as needed
ARCHIVE_ROOT = Path("./market_data_archive")
ARCHIVE_ROOT.mkdir(parents=True, exist_ok=True)

# ─── Rate-limit handler ───────────────────────────────────────────────────────
def handle_rate_limit(response):
    """Extract Retry-After from rate-limited response and sleep accordingly."""
    if response.status_code == 429:
        retry_after = int(response.headers.get("Retry-After", 5))
        logger.warning(f"Rate limited — sleeping for {retry_after} seconds")
        time.sleep(retry_after)
        return True
    return False


# ─── Core HTTP client with timeout ───────────────────────────────────────────
def get_json(endpoint, params=None, retries=3, backoff_base=2.0):
    """Fetch JSON from TickDB API with exponential backoff and timeout enforcement."""
    url = f"{BASE_URL}{endpoint}"
    for attempt in range(retries):
        try:
            response = requests.get(
                url,
                headers=HEADERS,
                params=params,
                timeout=(3.05, 10.0)  # (connect_timeout, read_timeout)
            )

            if response.status_code == 429:
                handle_rate_limit(response)
                continue

            response.raise_for_status()
            return response.json()

        except requests.exceptions.Timeout:
            logger.warning(f"Timeout on attempt {attempt + 1} — retrying")
        except requests.exceptions.RequestException as e:
            logger.error(f"Request failed: {e}")

        if attempt < retries - 1:
            wait = backoff_base ** attempt
            jitter = 0.1 * wait * (datetime.now().timestamp() % 1)  # Prevent herd
            time.sleep(wait + jitter)
        else:
            raise RuntimeError(f"Failed after {retries} attempts for {endpoint}")

    return None


# ─── Kline archival ────────────────────────────────────────────────────────────
def archive_daily_klines(symbol, date_str):
    """Fetch and persist daily OHLCV kline for a given symbol and date."""
    params = {
        "symbol": symbol,
        "interval": "1d",
        "start_time": f"{date_str}T00:00:00Z",
        "end_time": f"{date_str}T23:59:59Z",
        "limit": 5
    }

    data = get_json("/market/kline", params=params)
    if not data or "data" not in data:
        logger.warning(f"No kline data returned for {symbol} on {date_str}")
        return False

    output_dir = ARCHIVE_ROOT / symbol / "kline"
    output_dir.mkdir(parents=True, exist_ok=True)
    output_file = output_dir / f"{date_str}.json"

    with open(output_file, "w") as f:
        json.dump({"symbol": symbol, "date": date_str, "data": data["data"]}, f, indent=2)

    logger.info(f"Archived: {symbol} kline → {output_file}")
    return True


# ─── Intraday archival (1m candles — needed for attribution) ─────────────────
def archive_intraday_klines(symbol, date_str, intervals=["1m", "5m", "15m"]):
    """Fetch intraday kline data at multiple intervals for a single trading day."""
    for interval in intervals:
        params = {
            "symbol": symbol,
            "interval": interval,
            "start_time": f"{date_str}T09:30:00Z",  # Market open
            "end_time": f"{date_str}T16:00:00Z",     # Market close
            "limit": 500
        }

        data = get_json("/market/kline", params=params)
        if not data or "data" not in data:
            logger.warning(f"No {interval} data for {symbol} on {date_str}")
            continue

        output_dir = ARCHIVE_ROOT / symbol / "kline" / interval
        output_dir.mkdir(parents=True, exist_ok=True)
        output_file = output_dir / f"{date_str}.json"

        with open(output_file, "w") as f:
            json.dump({"symbol": symbol, "interval": interval, "date": date_str, "data": data["data"]}, f, indent=2)

        logger.info(f"Archived: {symbol} {interval} → {output_file}")
        time.sleep(0.5)  # Respectful spacing between requests


# ─── Main archival orchestrator ───────────────────────────────────────────────
def run_daily_archive(target_date=None):
    """Orchestrate full data archival for all symbols in the watchlist."""
    if target_date is None:
        target_date = datetime.now(timezone.utc).strftime("%Y-%m-%d")

    logger.info(f"Starting daily archive for {target_date}")
    results = {}

    for symbol in WATCHLIST:
        try:
            daily_ok = archive_daily_klines(symbol, target_date)
            archive_intraday_klines(symbol, target_date)
            results[symbol] = "success"
        except Exception as e:
            logger.error(f"Failed to archive {symbol}: {e}")
            results[symbol] = f"failed: {e}"

    # Write manifest
    manifest = {
        "date": target_date,
        "symbols": results,
        "archived_at": datetime.now(timezone.utc).isoformat()
    }
    manifest_file = ARCHIVE_ROOT / f"manifest_{target_date}.json"
    with open(manifest_file, "w") as f:
        json.dump(manifest, f, indent=2)

    logger.info(f"Archive complete. Manifest written to {manifest_file}")
    return results


# ─── Entry point ───────────────────────────────────────────────────────────────
if __name__ == "__main__":
    run_daily_archive()

Engineering notes: The script uses an idempotent write strategy—re-running it for the same date overwrites the JSON files with fresh data. This eliminates the need for complex state tracking. The 0.5-second sleep between intraday interval fetches is a deliberate choice to avoid triggering rate limits on a crowded watchlist. For watchlists exceeding 50 symbols, increase the sleep to 1.0 second and monitor the 3001 error code response.

Phase 2: Strategy Attribution Analysis

What Attribution Actually Measures

Attribution analysis answers a specific question: for every basis point of portfolio return, which factor or decision was responsible? The naive answer—"the strategy made money"—is useless for improvement. The actionable answer segments return into its components.

A standard Brinson attribution model breaks total return into:

  • Allocation effect: Did we overweight the sectors that outperformed?
  • Selection effect: Within each sector, did our individual picks beat the benchmark?
  • Interaction effect: The residual from the two above, often attributed to the interaction of allocation and selection decisions.

For quant strategies, the decomposition extends further. A factor-based strategy returns break into:

  • Factor exposure return: The return attributable to our factor tilts (momentum, value, quality, volatility).
  • Idiosyncratic return: Stock-specific alpha not explained by factors.
  • Execution drag: The cost of fills, spreads, and slippage relative to the signal's theoretical edge.

The following script computes daily attribution for a multi-factor portfolio, reading from the archived data.

import json
import logging
from datetime import datetime, timezone
from pathlib import Path
from typing import Dict, List

import numpy as np

logger = logging.getLogger(__name__)

ARCHIVE_ROOT = Path("./market_data_archive")

# ─── Factor definitions ────────────────────────────────────────────────────────
# Each factor carries a weight in the portfolio construction
FACTOR_WEIGHTS = {
    "momentum_20d": 0.30,
    "momentum_5d": 0.15,
    "value_ebp": 0.20,
    "quality_roa": 0.25,
    "low_volatility": 0.10
}

# ─── Data loader ───────────────────────────────────────────────────────────────
def load_daily_klines(symbol: str, date_str: str) -> List[Dict]:
    """Load archived 1-day kline for a symbol."""
    filepath = ARCHIVE_ROOT / symbol / "kline" / "1d" / f"{date_str}.json"
    if not filepath.exists():
        logger.warning(f"Missing archive for {symbol} on {date_str}")
        return []

    with open(filepath) as f:
        payload = json.load(f)

    return payload.get("data", [])


# ─── Simple factor exposure estimator ────────────────────────────────────────
# ⚠️ Production attribution requires vendor factor data (Bloomberg, MSCI Barra).
# This example uses synthetic factor returns for illustration only.
def estimate_factor_exposure(symbol: str, date_str: str) -> Dict[str, float]:
    """
    Return simulated factor exposures for a symbol.
    In production: replace with actual factor data from a risk vendor.
    """
    import hashlib

    seed = int(hashlib.md5(f"{symbol}{date_str}".encode()).hexdigest(), 16) % 10000
    np.random.seed(seed)

    exposures = {}
    for factor in FACTOR_WEIGHTS:
        exposures[factor] = np.random.uniform(-1.0, 1.0)  # Range: [-1, 1]

    return exposures


def compute_factor_return_contribution(exposures: Dict[str, float]) -> float:
    """Compute portfolio return contribution from factor exposures."""
    # Simulate factor returns (replace with actual factor return series in production)
    np.random.seed(42)
    factor_returns = {
        "momentum_20d": 0.0012,
        "momentum_5d": 0.0008,
        "value_ebp": -0.0003,
        "quality_roa": 0.0015,
        "low_volatility": 0.0006
    }

    contribution = 0.0
    for factor, exposure in exposures.items():
        factor_ret = factor_returns.get(factor, 0.0)
        contribution += exposure * factor_ret

    return contribution


# ─── Attribution report generator ───────────────────────────────────────────
def generate_daily_attribution(symbols: List[str], date_str: str) -> Dict:
    """
    Compute daily attribution for a list of symbols.
    Returns factor exposure breakdown and total return attribution.
    """
    report = {
        "date": date_str,
        "symbols_processed": 0,
        "factor_contributions": {},
        "total_return": 0.0,
        "sharpe_rolling_5d": 0.0
    }

    daily_returns = []

    for symbol in symbols:
        klines = load_daily_klines(symbol, date_str)
        if not klines:
            continue

        # Parse OHLCV
        candle = klines[-1]
        close_price = float(candle.get("close", 0))
        open_price = float(candle.get("open", 0))
        daily_return = (close_price - open_price) / open_price if open_price > 0 else 0.0

        exposures = estimate_factor_exposure(symbol, date_str)
        factor_contrib = compute_factor_return_contribution(exposures)

        logger.info(
            f"{symbol}: return={daily_return:.4f}, "
            f"factor_contrib={factor_contrib:.6f}, "
            f"idiosyncratic={daily_return - factor_contrib:.6f}"
        )

        report["factor_contributions"][symbol] = {
            "daily_return": daily_return,
            "factor_contribution": factor_contrib,
            "idiosyncratic_return": daily_return - factor_contrib,
            "exposures": exposures
        }

        daily_returns.append(daily_return)
        report["symbols_processed"] += 1

    if daily_returns:
        report["total_return"] = np.mean(daily_returns)
        if len(daily_returns) >= 5:
            report["sharpe_rolling_5d"] = (
                np.mean(daily_returns[-5:]) / np.std(daily_returns[-5:]) * np.sqrt(252)
                if np.std(daily_returns[-5:]) > 0 else 0.0
            )

    return report


# ─── Output handler ────────────────────────────────────────────────────────────
def write_attribution_report(report: Dict, date_str: str):
    """Write attribution report to disk with timestamp."""
    output_dir = Path("./attribution_reports")
    output_dir.mkdir(parents=True, exist_ok=True)

    output_file = output_dir / f"attribution_{date_str}.json"
    report["generated_at"] = datetime.now(timezone.utc).isoformat()

    with open(output_file, "w") as f:
        json.dump(report, f, indent=2)

    logger.info(f"Attribution report written: {output_file}")


# ─── Entry point ──────────────────────────────────────────────────────────────
if __name__ == "__main__":
    import sys
    target_date = sys.argv[1] if len(sys.argv) > 1 else datetime.now(timezone.utc).strftime("%Y-%m-%d")
    symbols = ["AAPL.US", "MSFT.US", "NVDA.US", "SPY.US"]

    report = generate_daily_attribution(symbols, target_date)
    write_attribution_report(report, target_date)
    print(json.dumps(report, indent=2))

Phase 3: Risk Model Refresh

The third phase updates the risk model inputs for the next trading day. This includes the covariance matrix, the volatility surface, and the updated correlation structure across the portfolio's positions.

A practical approach uses exponentially weighted covariance (EWMA) to prioritize recent market regimes over distant history. A decay factor of 0.94 (roughly equivalent to a half-life of 11 trading days) balances stability with responsiveness.

import json
import logging
from datetime import datetime, timezone
from pathlib import Path
from typing import Dict, List

import numpy as np

logger = logging.getLogger(__name__)


def build_ewma_covariance_matrix(returns: np.ndarray, lookback: int = 60, decay: float = 0.94) -> np.ndarray:
    """
    Compute exponentially weighted covariance matrix from a returns matrix.
    
    Args:
        returns: 2D array (days x symbols)
        lookback: Number of trading days to use
        decay: EWMA decay factor (0.94 ≈ 11-day half-life)
    
    Returns:
        Covariance matrix (symbols x symbols)
    """
    if returns.shape[0] > lookback:
        returns = returns[-lookback:]

    n_symbols = returns.shape[1]
    weights = np.array([decay ** i for i in range(len(returns) - 1, -1, -1)])
    weights = weights / weights.sum()

    # Weighted mean
    weighted_mean = np.sum(returns * weights[:, np.newaxis], axis=0)
    demeaned = returns - weighted_mean

    # Weighted covariance
    cov = np.zeros((n_symbols, n_symbols))
    for i in range(n_symbols):
        for j in range(n_symbols):
            cov[i, j] = np.sum(weights * demeaned[:, i] * demeaned[:, j])

    return cov


def compute_risk_contribution(cov_matrix: np.ndarray, weights: np.ndarray) -> np.ndarray:
    """
    Compute marginal risk contribution of each position to total portfolio volatility.
    
    Args:
        cov_matrix: NxN covariance matrix
        weights: N-length position weights vector
    
    Returns:
        N-length risk contribution vector (in percent of total risk)
    """
    portfolio_variance = np.dot(weights, np.dot(cov_matrix, weights))
    portfolio_vol = np.sqrt(portfolio_variance) if portfolio_variance > 0 else 1e-10

    marginal_risks = np.dot(cov_matrix, weights)
    risk_contrib = weights * marginal_risks / portfolio_vol

    return risk_contrib / risk_contrib.sum()


def refresh_risk_model(date_str: str, symbols: List[str], portfolio_weights: Dict[str, float]) -> Dict:
    """
    Refresh risk model for the next trading day.
    In production: load multi-year historical returns from TickDB kline archives.
    """
    # Build synthetic return matrix (in production: load from 1d kline archives)
    np.random.seed(int(date_str.replace("-", "")) % 10000)
    n_days = 60
    n_symbols = len(symbols)

    # Simulated returns: random walk with realistic vol (~20% annualized = ~1.25% daily std)
    returns = np.random.randn(n_days, n_symbols) * 0.0125

    cov_matrix = build_ewma_covariance_matrix(returns, lookback=60, decay=0.94)

    weights = np.array([portfolio_weights.get(s, 0.0) for s in symbols])
    risk_contrib = compute_risk_contribution(cov_matrix, weights)

    # Portfolio volatility
    portfolio_vol = np.sqrt(np.dot(weights, np.dot(cov_matrix, weights))) * np.sqrt(252)

    risk_report = {
        "date": date_str,
        "symbols": symbols,
        "portfolio_annual_vol": round(float(portfolio_vol * 100), 2),
        "covariance_matrix": cov_matrix.tolist(),
        "risk_contributions": {s: round(float(r), 4) for s, r in zip(symbols, risk_contrib)},
        "max_position_risk": round(float(max(risk_contrib) * 100), 2)
    }

    logger.info(f"Risk model refreshed: portfolio vol = {risk_report['portfolio_annual_vol']}%")
    logger.info(f"Top risk contributor: {max(risk_contrib)} at {max(risk_contrib) * 100:.1f}%")

    return risk_report

Phase 4: Next-Day Signal Pre-Computation

The final phase prepares the signals that the strategy will use at the next open. This includes:

  • Regime classification: Is the market in a trending phase, a mean-reversion phase, or a high-volatility consolidation?
  • Signal candidates: Pre-ranked stock list based on the previous day's factor scores and today's pre-market data.
  • Watchlist updates: Symbols that crossed volatility thresholds overnight are flagged for special attention.
  • Execution schedule: Pre-computed limit order price levels based on the day's range and the closing auction volume profile.
import json
import logging
from datetime import datetime, timezone
from pathlib import Path

import numpy as np

logger = logging.getLogger(__name__)


def classify_market_regime(klines_20d: list, volatility_window: int = 20) -> str:
    """
    Classify market regime based on recent price action.
    
    Returns:
        "trending" | "mean_reverting" | "high_vol_consolidation" | "low_vol_trendless"
    """
    if len(klines_20d) < volatility_window:
        return "insufficient_data"

    closes = np.array([float(k.get("close", 0)) for k in klines_20d[-volatility_window:]])
    returns = np.diff(closes) / closes[:-1]

    trend_strength = abs(np.mean(returns)) / (np.std(returns) + 1e-10)
    vol_percentile = np.std(returns) * np.sqrt(252)  # Annualized vol

    if vol_percentile > 0.30:
        return "high_vol_consolidation"
    elif trend_strength > 1.5:
        return "trending"
    elif np.std(returns) < 0.005:
        return "low_vol_trendless"
    else:
        return "mean_reverting"


def compute_signal_candidates(symbols: list, date_str: str) -> list:
    """
    Pre-rank stock list based on factor scores and momentum.
    In production: this integrates with the strategy's scoring engine.
    """
    candidates = []

    for symbol in symbols:
        # Load archived 1d kline
        filepath = Path(f"./market_data_archive/{symbol}/kline/1d/{date_str}.json")
        if not filepath.exists():
            continue

        with open(filepath) as f:
            payload = json.load(f)

        data = payload.get("data", [])
        if len(data) < 5:
            continue

        closes = [float(k.get("close", 0)) for k in data]
        mom_5d = (closes[-1] - closes[-6]) / closes[-6] if len(closes) >= 6 else 0.0
        mom_20d = (closes[-1] - closes[-21]) / closes[-21] if len(closes) >= 21 else 0.0
        vol_20d = np.std(np.diff(closes) / closes[:-1]) * np.sqrt(252)

        # Composite score: momentum-weighted, vol-penalized
        score = mom_5d * 0.6 + mom_20d * 0.4 - vol_20d * 0.05

        candidates.append({
            "symbol": symbol,
            "momentum_5d": round(mom_5d, 4),
            "momentum_20d": round(mom_20d, 4),
            "vol_20d": round(vol_20d, 4),
            "score": round(score, 4)
        })

    # Sort by composite score descending
    candidates.sort(key=lambda x: x["score"], reverse=True)
    return candidates


def generate_pre_market_watchlist(candidates: list, vol_threshold: float = 0.25) -> list:
    """
    Flag symbols with volatility above threshold for pre-market review.
    """
    watchlist = [c for c in candidates if c["vol_20d"] > vol_threshold]
    return watchlist


def precompute_execution_levels(klines_5m: list, last_close: float) -> dict:
    """
    Compute suggested limit order price levels based on the day's range profile.
    
    Args:
        klines_5m: List of 5-minute OHLCV candles for the last trading day
        last_close: Closing price of the last trading day
    
    Returns:
        dict with bid_level, ask_level, and spread_width (in cents)
    """
    if not klines_5m:
        return {"bid_level": last_close, "ask_level": last_close, "spread_width": 0.01}

    highs = [float(k.get("high", 0)) for k in klines_5m]
    lows = [float(k.get("low", 0)) for k in klines_5m]

    day_high = max(highs)
    day_low = min(lows)
    day_range = day_high - day_low

    # Use 25th percentile of range as a signal for reversion
    bid_level = day_low + 0.25 * day_range
    ask_level = day_high - 0.25 * day_range
    spread_width = ask_level - bid_level

    return {
        "bid_level": round(bid_level, 2),
        "ask_level": round(ask_level, 2),
        "spread_width": round(spread_width, 2),
        "day_high": round(day_high, 2),
        "day_low": round(day_low, 2)
    }


def generate_signal_report(symbols: list, date_str: str) -> dict:
    """
    Orchestrate full pre-market signal generation.
    """
    candidates = compute_signal_candidates(symbols, date_str)
    watchlist = generate_pre_market_watchlist(candidates)

    # Example: compute execution levels for top candidate
    top_candidate = candidates[0] if candidates else None
    execution_levels = {}
    if top_candidate:
        filepath = Path(f"./market_data_archive/{top_candidate['symbol']}/kline/1m/{date_str}.json")
        if filepath.exists():
            with open(filepath) as f:
                intraday_data = json.load(f).get("data", [])
            execution_levels = precompute_execution_levels(
                intraday_data, 
                float(intraday_data[-1].get("close", 0)) if intraday_data else 0.0
            )

    report = {
        "date": date_str,
        "generated_at": datetime.now(timezone.utc).isoformat(),
        "signal_candidates": candidates[:10],  # Top 10
        "watchlist_vol_flags": watchlist,
        "execution_levels_sample": {
            "symbol": top_candidate["symbol"] if top_candidate else None,
            **execution_levels
        }
    }

    output_dir = Path("./signal_reports")
    output_dir.mkdir(parents=True, exist_ok=True)
    output_file = output_dir / f"signals_{date_str}.json"

    with open(output_file, "w") as f:
        json.dump(report, f, indent=2)

    logger.info(f"Signal report generated: {len(candidates)} candidates, {len(watchlist)} on watchlist")
    return report

Orchestrating the Full Pipeline

With all four phases implemented, the complete pipeline is orchestrated by a single scheduler script. The example below uses schedule (a lightweight Python library) with a production-ready fallback to cron.

import schedule
import time
import logging
from datetime import datetime, timezone
from pathlib import Path

logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s [%(levelname)s] [%(name)s] %(message)s"
)
logger = logging.getLogger("scheduler")

from market_data_archiv import run_daily_archive
from attribution_analysis import generate_daily_attribution, write_attribution_report
from risk_model_refresh import refresh_risk_model
from signal_precompute import generate_signal_report

SYMBOLS = ["AAPL.US", "MSFT.US", "NVDA.US", "SPY.US"]
PORTFOLIO_WEIGHTS = {"AAPL.US": 0.25, "MSFT.US": 0.25, "NVDA.US": 0.30, "SPY.US": 0.20}


def phase1_archival():
    date_str = datetime.now(timezone.utc).strftime("%Y-%m-%d")
    logger.info("=== Phase 1: Data Archival ===")
    run_daily_archive(date_str)
    return date_str


def phase2_attribution(date_str):
    logger.info("=== Phase 2: Strategy Attribution ===")
    report = generate_daily_attribution(SYMBOLS, date_str)
    write_attribution_report(report, date_str)


def phase3_risk(date_str):
    logger.info("=== Phase 3: Risk Model Refresh ===")
    refresh_risk_model(date_str, SYMBOLS, PORTFOLIO_WEIGHTS)


def phase4_signals(date_str):
    logger.info("=== Phase 4: Next-Day Signal Pre-Computation ===")
    generate_signal_report(SYMBOLS, date_str)


def run_post_market_pipeline():
    """Execute all four phases in sequence."""
    date_str = phase1_archival()
    phase2_attribution(date_str)
    phase3_risk(date_str)
    phase4_signals(date_str)
    logger.info("=== Post-market pipeline complete ===")


def main():
    # Schedule: run at 4:05 PM ET (market close + 5 min buffer)
    # Note: US equities close at 4:00 PM ET
    schedule.every().day.at("16:05").do(run_post_market_pipeline)

    # For testing: also allow manual trigger
    import sys
    if len(sys.argv) > 1 and sys.argv[1] == "--run-now":
        logger.info("Manual trigger — running post-market pipeline immediately")
        run_post_market_pipeline()
        return

    logger.info("Scheduler running. Next pipeline execution at 4:05 PM ET")
    while True:
        schedule.run_pending()
        time.sleep(60)


if __name__ == "__main__":
    main()

Deployment Configuration by Team Size

Team size Deployment recommendation Scheduler choice Alerting
Individual quant Local cron job + Python virtualenv schedule library or cron Email via sendmail or Gmail API
Small team (2–5) Docker container on a single VPS cron inside container Slack webhook alert
Institutional (5+) Kubernetes CronJob + persistent volume Kubernetes CronJob PagerDuty + Slack integration

For institutional teams, the Kubernetes CronJob manifest below provides a production-ready deployment template.

apiVersion: batch/v1
kind: CronJob
metadata:
  name: post-market-pipeline
  namespace: quant-systems
spec:
  schedule: "5 16 * * 1-5"  # 4:05 PM ET, Monday–Friday (US equity calendar)
  successfulJobsHistoryLimit: 7
  failedJobsHistoryLimit: 3
  concurrencyPolicy: Forbid  # Prevent overlapping runs
  jobTemplate:
    spec:
      template:
        spec:
          containers:
          - name: pipeline
            image: tickdb/post-market-pipeline:latest
            env:
            - name: TICKDB_API_KEY
              valueFrom:
                secretKeyRef:
                  name: tickdb-credentials
                  key: api-key
            resources:
              requests:
                memory: "512Mi"
                cpu: "250m"
              limits:
                memory: "1Gi"
                cpu: "500m"
            volumeMounts:
            - name: archive-data
              mountPath: /app/market_data_archive
            - name: reports-data
              mountPath: /app/attribution_reports
          volumes:
          - name: archive-data
            persistentVolumeClaim:
              claimName: tickdb-archive-pvc
          restartPolicy: OnFailure

Data Source Attribution

The historical OHLCV data referenced in this pipeline is sourced from TickDB's /v1/market/kline endpoint, covering 10+ years of US equity daily and intraday data for cross-cycle strategy backtesting and multi-year covariance estimation.


Next Steps

If you're building your first post-market pipeline, start with Phase 1 (data archival) and run it manually for one week to verify data completeness before automating the scheduler.

If you want to test this with real data:

  1. Sign up at tickdb.ai — free tier available, no credit card required
  2. Generate an API key in the dashboard
  3. Set TICKDB_API_KEY as an environment variable, then run the archival script

If you need 10+ years of historical OHLCV data for multi-year covariance estimation, reach out to enterprise@tickdb.ai for institutional data plans.

If you use AI coding assistants, search for and install the tickdb-market-data SKILL in your AI tool's marketplace for direct API integration in your own workflows.


This article does not constitute investment advice. Markets involve risk; past performance does not guarantee future results.