"We never thought the regulator would ask for order-level timestamps from three years ago."
That single sentence cost a systematic fund $2.3 million in a 2022 SEC settlement — not for the original violation, but for failing to produce records within the required window. The strategy itself was sound. The infrastructure was not.
Quantitative trading firms operate in one of the most heavily scrutinized environments in finance. Every order, every fill, every parameter change and model update falls under the gaze of regulators, internal auditors, and counterparties who may someday challenge your execution quality. Building a profitable alpha model matters. Building one that survives a regulatory audit three years later matters more.
This article dissects what compliance auditors actually look for, what internal risk teams genuinely need, and how to architect a data retention infrastructure that serves both mandates without ballooning costs into the stratosphere. We will cover the regulatory landscape, the specific data schemas you must preserve, the retention windows that apply to different instrument classes, and — most importantly — how to automate the entire archive pipeline so that "compliance" becomes an engineering feature rather than a quarterly panic.
The Regulatory Landscape: Who Wants Your Data and Why
Compliance retention requirements emerge from two distinct layers: external regulatory mandates and internal governance policies. These layers often overlap, but they have different enforcement mechanisms, different consequence profiles, and different technical implications for your data pipeline.
External Regulatory Requirements
The regulatory environment varies significantly by jurisdiction and instrument type. For US equities quant strategies, four frameworks dominate the compliance retention conversation.
SEC Rule 17a-4 is the foundational requirement for US broker-dealers and investment advisers. It mandates preservation of business-related communications, trade records, and supporting documentation for a minimum of six years — with the first two years in an immediately accessible location. The rule is prescriptive about format: records must be "easily readable" and accessible within reasonable timeframes upon regulatory request. For quantitative trading firms, this covers order records, executions, cancelled orders, and all communications related to trading decisions.
Dodd-Frank Act provisions introduce additional requirements for firms engaged in derivatives and swaps. Position data, transaction-level records, and risk calculations must be retained for five years. The Commodity Futures Trading Commission (CFTC) enforces parallel requirements under Part 45 and Part 49 of its regulations, covering swap execution data and customer records.
MiFID II, while European in origin, affects any quant firm trading EU-listed instruments or serving EU institutional clients. Article 16 mandates record-keeping for five years, extended to seven years upon request. The regulation's order recording requirements are granular: every order must carry a unique identifier, timestamp (in nanoseconds where technologically feasible), and full audit trail of modifications and cancellations.
FINRA Rule 4511 extends similar requirements to member firms, covering order memoranda, trade blotters, and customer account records. Firms running algorithmic strategies must additionally document the algorithm's parameters, the person responsible for the strategy, and any changes made over time.
Internal Risk Control Requirements
Internal governance requirements often exceed regulatory minimums. Internal audit teams, risk committees, and prime brokers all generate their own data retention demands.
Prime brokers typically require maintenance of pre-trade and post-trade records for 5–7 years to support margin calculations, reconciliation disputes, and performance attribution. Risk management teams need to preserve position snapshots at intraday intervals (typically 5-minute granularity for real-time risk monitoring) to reconstruct portfolio state during incident investigations. Model risk management — a growing compliance domain following SR 11-7 guidance from the Federal Reserve — requires retention of model version history, feature engineering pipelines, training datasets, and all backtest results associated with production models.
The critical insight: regulatory requirements establish the floor, not the ceiling. Your data retention architecture must accommodate both.
The Six Categories of Data You Must Retain
A comprehensive compliance data retention framework addresses six distinct data categories. Each serves a different compliance purpose and carries different technical requirements for preservation.
Category 1: Order Management System Records
Order records form the backbone of any compliance archive. Every order submitted through your OMS — whether filled, cancelled, or rejected — must be preserved with full lineage.
The critical metadata fields for order records:
import time
from dataclasses import dataclass, field
from typing import Optional
from datetime import datetime
@dataclass
class ComplianceOrderRecord:
"""
Schema-compliant order record for regulatory retention.
Fields align with FINRA OATS requirements and SEC Rule 17a-4.
"""
order_id: str # Unique order identifier
client_order_id: Optional[str] # Client-assigned reference
account_id: str # Account or fund identifier
strategy_id: str # Algo/strategy that generated the order
# Timestamps (UTC, nanosecond precision where available)
created_at: datetime
submitted_at: datetime
modified_at: datetime # Any modification event
filled_at: Optional[datetime] = None
cancelled_at: Optional[datetime] = None
rejected_at: Optional[datetime] = None
# Instrument identification
symbol: str # Normalized symbol (e.g., "AAPL.US")
exchange: str # Primary execution venue
instrument_type: str # equity / option / future / swap
# Order details
side: str # buy / sell
order_type: str # market / limit / stop / algo
quantity: float
filled_quantity: float = 0.0
price: Optional[float] = None # Limit price if applicable
stop_price: Optional[float] = None
# Execution metadata
execution_venue: Optional[str] = None # Specific venue if multi-venue routing
fill_price: Optional[float] = None
fill_id: Optional[str] = None # Exchange-assigned fill ID
# Algo parameters at submission time
algo_params: dict = field(default_factory=dict)
# Compliance identifiers
compliance_uuid: str = field(default_factory=lambda: generate_uuid())
firm_tag: str = "" # For multi-strategy compliance segregation
def to_archive_row(self) -> dict:
"""Convert to immutable archive format."""
return {
"compliance_uuid": self.compliance_uuid,
"order_id": self.order_id,
"account_id": self.account_id,
"strategy_id": self.strategy_id,
"symbol": self.symbol,
"instrument_type": self.instrument_type,
"side": self.side,
"order_type": self.order_type,
"quantity": self.quantity,
"filled_quantity": self.filled_quantity,
"price": self.price,
"stop_price": self.stop_price,
"exchange": self.exchange,
"execution_venue": self.execution_venue,
"fill_price": self.fill_price,
"fill_id": self.fill_id,
"algo_params_json": json_dumps(self.algo_params),
"created_at": self.created_at.isoformat(),
"submitted_at": self.submitted_at.isoformat(),
"modified_at": self.modified_at.isoformat(),
"filled_at": self.filled_at.isoformat() if self.filled_at else None,
"cancelled_at": self.cancelled_at.isoformat() if self.cancelled_at else None,
"rejected_at": self.rejected_at.isoformat() if self.rejected_at else None,
"archive_timestamp": datetime.utcnow().isoformat(),
}
Each OMS record must be immutable once written. Any correction or amendment must be appended as a new version, never an in-place update. This append-only log architecture mirrors the write-ahead log patterns used in financial databases and satisfies regulatory requirements for non-repudiation.
Category 2: Execution and Fill Data
Execution records capture what actually happened in the market. Unlike order records, which capture intent, execution records capture outcomes.
Required fields for execution records:
| Field | Regulatory relevance | Retention period |
|---|---|---|
| Fill ID (exchange-assigned) | Primary key for trade matching | 7 years |
| Order ID (firm-assigned) | Links execution to original order | 7 years |
| Timestamp (microsecond minimum) | Cross-venue reconstruction | 7 years |
| Price and quantity | P&L calculation, transaction tax | 7 years |
| Commission and fees | Cost basis for performance | 7 years |
| Venue and routing path | Best execution compliance | 5 years |
| Counterparty information | Cross-border reporting (Dodd-Frank) | 5 years |
The venue and routing path field deserves particular attention. Best execution analysis under RegNMS requires firms to demonstrate that order flow was directed to venues offering the best available terms. Without granular routing logs, this analysis collapses.
Category 3: Position and Portfolio Snapshots
Regulatory requirements and internal risk management both demand historical position reconstruction. The technical challenge: you need enough granularity to reconstruct portfolio state at any point in time, without storing petabytes of tick data.
A standard approach is a tiered snapshot strategy:
import pandas as pd
from datetime import datetime, timedelta
from typing import Dict, List
class PositionSnapshotArchiver:
"""
Tiered position snapshot strategy balancing storage costs
against reconstruction granularity requirements.
"""
def __init__(self, archive_writer, snapshot_writer):
self.archive = archive_writer # S3 / HDFS / cold storage interface
self.snapshots = snapshot_writer # Queryable database (TimescaleDB / ClickHouse)
def capture_intraday_snapshots(self, portfolio_state: dict, timestamp: datetime):
"""
Capture position snapshots at configurable intervals.
Standard: every 5 minutes during market hours.
"""
snapshot = {
"timestamp": timestamp,
"portfolio_id": portfolio_state["portfolio_id"],
"positions": portfolio_state["positions"],
"total_market_value": portfolio_state["market_value"],
"total_exposure": portfolio_state["net_exposure"],
"cash_balance": portfolio_state["cash"],
"margin_used": portfolio_state["margin_used"],
}
self.snapshots.write("position_snapshots", snapshot)
def generate_daily_snapshots(self, date: datetime.date):
"""
End-of-day snapshots for long-term retention.
These go to cold storage (S3 with appropriate lifecycle policies).
"""
query = f"""
SELECT * FROM position_snapshots
WHERE timestamp >= '{date} 16:00:00'
AND timestamp < '{date + timedelta(days=1)} 09:30:00'
ORDER BY timestamp DESC LIMIT 1
"""
eod_snapshots = self.snapshots.execute(query)
for snapshot in eod_snapshots:
partition_key = f"year={date.year}/month={date.month:02d}/day={date.day:02d}"
object_key = f"snapshots/positions/{partition_key}/{snapshot['portfolio_id']}.parquet"
self.archive.write(object_key, snapshot.to_parquet())
def reconstruction_query(self, portfolio_id: str, target_time: datetime) -> dict:
"""
Reconstruct portfolio state at a specific historical point.
"""
# Find the nearest snapshot before target time
query = f"""
SELECT * FROM position_snapshots
WHERE portfolio_id = '{portfolio_id}'
AND timestamp <= '{target_time.isoformat()}'
ORDER BY timestamp DESC LIMIT 1
"""
return self.snapshots.execute(query)
Snapshot frequency guidelines:
| Retention tier | Frequency | Storage medium | Typical use |
|---|---|---|---|
| Intraday (0–90 days) | Every 5 minutes during trading hours | Hot storage (TimescaleDB / ClickHouse) | Real-time risk, incident reconstruction |
| Near-term (90 days – 2 years) | End-of-day + intraday on event days | Warm storage | Internal audit, quarterly reviews |
| Long-term (2–7 years) | End-of-day only | Cold storage (S3 / Glacier) | Regulatory inquiries, litigation |
Category 4: Strategy and Model Change Logs
Model risk management requirements under SR 11-7 and similar frameworks demand complete audit trails for any production model change. This goes beyond trading logs — it encompasses the model development lifecycle.
Every model deployment must be accompanied by:
- Version identifier: Semantic versioning scheme (e.g.,
v2.3.1-alpha) - Feature set hash: SHA-256 of the feature engineering pipeline configuration
- Training dataset reference: Pointer to the training dataset with its timestamp range
- Backtest results summary: Win rate, Sharpe, max drawdown, sample size
- Deployment timestamp and responsible party: Who deployed and why
- Pre-deployment approval record: Model risk sign-off documentation
import hashlib
import json
@dataclass
class ModelDeploymentRecord:
model_id: str
version: str
feature_set_hash: str # SHA-256 of feature config
training_start: datetime
training_end: datetime
training_dataset_uri: str
backtest_results: dict # Summary stats from backtesting
approved_by: str
approved_at: datetime
deployed_by: str
deployed_at: datetime
environment: str # production / staging
strategy_id: str
def to_compliance_archive(self) -> dict:
"""Format for regulatory retention."""
return {
"record_type": "model_deployment",
"model_id": self.model_id,
"version": self.version,
"feature_hash": self.feature_set_hash,
"training_window": f"{self.training_start.isoformat()}/{self.training_end.isoformat()}",
"dataset_uri": self.training_dataset_uri,
"backtest_summary": json.dumps(self.backtest_results),
"approved_by": self.approved_by,
"approved_at": self.approved_at.isoformat(),
"deployed_by": self.deployed_by,
"deployed_at": self.deployed_at.isoformat(),
"strategy_id": self.strategy_id,
"environment": self.environment,
"archive_timestamp": datetime.utcnow().isoformat(),
}
Category 5: Pre-Trade Risk Events
Internal risk systems generate their own compliance-relevant data. Risk limit breaches, automatic circuit breakers, and manual overrides of risk controls all require documentation.
@dataclass
class RiskEventRecord:
event_id: str
event_type: str # limit_breach / circuit_trigger / manual_override
timestamp: datetime
portfolio_id: str
strategy_id: str
# Event details
limit_type: str # max_position / max_loss / max_exposure
limit_value: float
actual_value: float
breach_magnitude: float # actual / limit - 1
# Response taken
action: str # order_rejected / position_closed / alert_sent
auto_triggered: bool
override_by: Optional[str] = None # If manual override
override_reason: Optional[str] = None
def to_archive(self) -> dict:
return {
"event_id": self.event_id,
"event_type": self.event_type,
"timestamp": self.timestamp.isoformat(),
"portfolio_id": self.portfolio_id,
"strategy_id": self.strategy_id,
"limit_type": self.limit_type,
"limit_value": self.limit_value,
"actual_value": self.actual_value,
"breach_magnitude_pct": self.breach_magnitude * 100,
"action": self.action,
"auto_triggered": self.auto_triggered,
"override_by": self.override_by,
"override_reason": self.override_reason,
}
Pre-trade risk events are often required by prime brokers under Basel III / Basel IV frameworks and by internal risk committees tracking systematic behavior patterns. Most firms retain these for 5–7 years.
Category 6: System and Infrastructure Logs
Regulatory inquiries frequently ask for system logs during disputed execution periods. Network latency anomalies, system failures, and configuration changes can all become evidence in a compliance investigation.
Infrastructure logs to preserve:
- Gateway logs: Order submission timestamps, network latency measurements, message acknowledgment times
- OMS configuration changes: Any modification to routing logic, venue selection, or execution parameters
- Risk system alerts: Warnings generated by pre-trade risk systems before a breach
- Authentication and access logs: Who accessed which systems and when
Retention recommendation: system logs for 90 days in hot storage, 2 years in cold storage. The older retention window is acceptable because system logs are high-volume and lower compliance value compared to trade records.
Data Retention Windows by Instrument Class
Retention requirements vary by instrument type. Here is the compliance retention reference for the most common quant trading scenarios:
| Instrument | Regulatory basis | Minimum retention | Recommended retention |
|---|---|---|---|
| US equities | SEC Rule 17a-4 | 6 years | 7 years |
| US options | FINRA Rule 4511 | 6 years | 7 years |
| US futures | CFTC Part 31 | 5 years | 7 years |
| Crypto (US-facing) | SEC/FINRA guidance | 5 years | 7 years |
| EU equities | MiFID II | 5 years | 7 years |
| Swaps / derivatives | Dodd-Frank | 5 years | 7 years |
| FX (spot) | No direct SEC/CFTC mandate | 5 years | 5 years (institutional best practice) |
The key principle: when in doubt, retain longer. The cost of over-retention is storage. The cost of under-retention is regulatory sanction and litigation exposure.
Automating Compliance Data Archiving
Manual archiving — exporting logs to spreadsheets, burning DVDs, or relying on backups — fails at scale and fails under pressure. A robust compliance archiving pipeline treats data retention as a first-class engineering concern.
The Archive Pipeline Architecture
import logging
import boto3
from datetime import datetime, timedelta
from kafka import KafkaConsumer, KafkaProducer
from typing import Optional
class ComplianceArchiver:
"""
Automated compliance data archiver.
Ingests from Kafka topics, applies retention policies,
writes to immutable cold storage with appropriate lifecycle policies.
"""
def __init__(
self,
kafka_bootstrap: str,
archive_s3_bucket: str,
kms_key_id: str,
retention_config: dict,
heartbeat_interval: int = 30,
):
self.logger = logging.getLogger("compliance_archiver")
# Kafka consumer for order/fill events
self.consumer = KafkaConsumer(
"oms.orders",
"oms.executions",
"risk.events",
bootstrap_servers=kafka_bootstrap,
group_id="compliance_archiver",
auto_offset_reset="earliest",
enable_auto_commit=False,
)
self.s3 = boto3.client("s3")
self.kms_key_id = kms_key_id
self.bucket = archive_s3_bucket
self.retention_config = retention_config
# Heartbeat for monitoring
self.heartbeat_interval = heartbeat_interval
self.last_heartbeat = datetime.utcnow()
# Reconnection state
self._max_retries = 5
self._base_delay = 1.0
def run(self):
"""
Main processing loop with heartbeat and graceful shutdown.
"""
self.logger.info("Compliance archiver starting")
try:
while True:
messages = self.consumer.poll(timeout_ms=1000)
if not messages:
self._emit_heartbeat()
continue
for topic_partition, records in messages.items():
self._process_records(topic_partition.topic, records)
self.consumer.commit()
except KeyboardInterrupt:
self.logger.info("Shutdown signal received")
self._flush_pending()
self.consumer.close()
def _process_records(self, topic: str, records):
"""
Process batch of records from a Kafka topic.
"""
for record in records:
try:
data = json.loads(record.value())
archive_path = self._build_archive_path(topic, record.timestamp, data)
# Encrypt before writing (SSE-KMS)
encrypted_data = self._encrypt_record(data)
# Write with metadata tags for future retrieval
self.s3.put_object(
Bucket=self.bucket,
Key=archive_path,
Body=encrypted_data,
ServerSideEncryption="aws:kms",
SSEKMSKeyId=self.kms_key_id,
Metadata={
"topic": topic,
"partition": str(record.partition),
"offset": str(record.offset),
"ingested_at": datetime.utcnow().isoformat(),
},
Tagging=f"topic={topic}&retention_class={self._retention_class(topic)}",
)
except Exception as e:
# Dead letter queue for failed records
self._send_to_dlq(topic, record, str(e))
self.logger.error(f"Failed to archive record from {topic}: {e}")
def _build_archive_path(self, topic: str, timestamp: int, data: dict) -> str:
"""
Build hierarchical archive path following compliance naming conventions.
Format: entity/record_type/YYYY/MM/DD/HH/partition/offset.parquet
"""
ts = datetime.fromtimestamp(timestamp / 1000)
if topic.startswith("oms.orders"):
entity = "orders"
elif topic.startswith("oms.executions"):
entity = "executions"
elif topic.startswith("risk.events"):
entity = "risk_events"
else:
entity = "misc"
record_type = data.get("record_type", "unknown")
return (
f"{entity}/{record_type}/"
f"year={ts.year}/month={ts.month:02d}/day={ts.day:02d}/"
f"hour={ts.hour:02d}/"
f"p{record.partition:04d}_o{record.offset:012d}.json"
)
def _retention_class(self, topic: str) -> str:
"""
Map topics to retention classes.
"""
retention_map = {
"oms.orders": "trade_records", # 7 years
"oms.executions": "trade_records", # 7 years
"risk.events": "risk_events", # 7 years
"system.logs": "infra_logs", # 2 years
}
return retention_map.get(topic, "default")
def _encrypt_record(self, data: dict) -> bytes:
"""
Encrypt record data using KMS before cold storage write.
"""
import json
plaintext = json.dumps(data).encode("utf-8")
# In production: use boto3 kms.encrypt() or envelope encryption
return plaintext
def _emit_heartbeat(self):
"""
Periodic heartbeat for monitoring system health.
"""
now = datetime.utcnow()
if (now - self.last_heartbeat).total_seconds() >= self.heartbeat_interval:
self.logger.info(f"Archiver heartbeat: {now.isoformat()}, "
f"consumer lag: {self.consumer.metrics()}")
self.last_heartbeat = now
def _send_to_dlq(self, topic: str, record, error: str):
"""
Send failed records to dead letter queue for manual review.
"""
self.logger.warning(f"DLQ: topic={topic}, offset={record.offset}, error={error}")
# Implementation: write to DLQ Kafka topic or S3 DLQ bucket
def _flush_pending(self):
"""
Flush any pending records before shutdown.
"""
self.logger.info("Flushing pending records before shutdown")
messages = self.consumer.poll(timeout_ms=5000)
for topic_partition, records in messages.items():
self._process_records(topic_partition.topic, records)
self.consumer.commit()
S3 Lifecycle Configuration
Beyond the application-level pipeline, S3 lifecycle policies enforce retention requirements automatically:
{
"Rules": [
{
"ID": "trade_records_retention",
"Status": "Enabled",
"Filter": {
"Tag": {
"Key": "retention_class",
"Value": "trade_records"
}
},
"Expiration": {
"Days": 2555
},
"Transitions": [
{
"Days": 90,
"StorageClass": "GLACIER"
}
]
},
{
"ID": "risk_events_retention",
"Status": "Enabled",
"Filter": {
"Tag": {
"Key": "retention_class",
"Value": "risk_events"
}
},
"Expiration": {
"Days": 2555
}
},
{
"ID": "infra_logs_retention",
"Status": "Enabled",
"Filter": {
"Tag": {
"Key": "retention_class",
"Value": "infra_logs"
}
},
"Expiration": {
"Days": 730
}
}
]
}
This lifecycle configuration ensures that data transitions to cheaper storage as it ages, while the expiration policy guarantees deletion after the retention window — critical for managing storage costs over a 7-year retention horizon.
Immutable Write Pattern for Compliance Records
A critical compliance requirement: once written, records must not be modifiable. Any alteration — even corrections — must be append-only with a complete audit trail.
from abc import ABC, abstractmethod
class ImmutableRecordStore(ABC):
"""
Abstract base for immutable compliance record storage.
Enforces append-only semantics with cryptographic verification.
"""
@abstractmethod
def append(self, record_type: str, data: dict) -> str:
"""Append a record and return its immutable identifier."""
pass
@abstractmethod
def read(self, record_id: str) -> Optional[dict]:
"""Read a specific record by identifier."""
pass
@abstractmethod
def verify_chain(self, start_id: str, end_id: str) -> bool:
"""Verify integrity of a record chain."""
pass
class AppendOnlyS3Store(ImmutableRecordStore):
"""
S3-based immutable record store.
Each write is a new object; no object is ever overwritten.
"""
def __init__(self, bucket: str, kms_key_id: str):
self.bucket = bucket
self.kms = boto3.client("kms")
self.dynamodb = boto3.resource("dynamodb")
self.chain_table = self.dynamodb.Table("compliance_record_chain")
def append(self, record_type: str, data: dict) -> str:
# Generate immutable ID with hash of content for integrity
content_hash = hashlib.sha256(json.dumps(data, sort_keys=True).encode()).hexdigest()
record_id = f"{record_type}_{datetime.utcnow().strftime('%Y%m%d%H%M%S%f')}_{content_hash[:8]}"
# Write to S3 (never overwrite existing key)
key = f"compliance/{record_type}/{record_id}.json"
# Check for key existence (failsafe)
existing = self.s3.list_objects_v2(Bucket=self.bucket, Prefix=key, MaxKeys=1)
if existing.get("Contents"):
raise ImmutableStoreViolation(f"Key already exists: {key}")
self.s3.put_object(Bucket=self.bucket, Key=key, Body=json.dumps(data))
# Update chain index
self.chain_table.put_item(Item={
"record_id": record_id,
"record_type": record_type,
"content_hash": content_hash,
"written_at": datetime.utcnow().isoformat(),
"s3_key": key,
})
return record_id
def read(self, record_id: str) -> Optional[dict]:
item = self.chain_table.get_item(Key={"record_id": record_id})
if not item:
return None
response = self.s3.get_object(Bucket=self.bucket, Key=item["s3_key"])
return json.loads(response["Body"].read())
def verify_chain(self, start_id: str, end_id: str) -> bool:
"""Verify integrity of a sequence of records."""
# Implementation: fetch chain items and verify content hashes
pass
class ImmutableStoreViolation(Exception):
"""Raised when an attempt to overwrite immutable storage is detected."""
pass
The append-only pattern is not optional — it is the technical foundation of non-repudiation. A regulator reviewing your records needs confidence that what was written three years ago is exactly what exists today, with no post-hoc modifications.
Cross-Venue Reconciliation and Data Alignment
Compliance audits frequently require cross-venue reconstruction — for example, reconstructing a multi-venue execution to verify best execution compliance under RegNMS.
This requires careful timestamp alignment across systems. Clock synchronization errors as small as 200ms can generate phantom arbitrage signals in cross-venue analysis, and regulators are well aware of this. Document your time synchronization methodology.
from datetime import datetime
from typing import List, Dict
class CrossVenueReconstructor:
"""
Reconstructs a unified execution timeline from multi-venue order data.
Critical for RegNMS best execution compliance verification.
"""
def __init__(self, time_reference: str = "NIST UTC"):
self.time_reference = time_reference
self.clock_skew_tolerance_ms = 50 # Maximum acceptable clock skew
def reconstruct_timeline(
self,
order_records: List[Dict],
execution_records: List[Dict],
system_logs: List[Dict],
) -> Dict:
"""
Build a unified timeline from all data sources.
Flags clock skew anomalies for compliance review.
"""
# Normalize all timestamps to UTC microseconds
all_events = []
for record in order_records:
all_events.append({
"source": "oms",
"event_type": record["order_type"],
"timestamp": self._normalize_timestamp(record["submitted_at"]),
"order_id": record["order_id"],
})
for record in execution_records:
all_events.append({
"source": "execution",
"event_type": "fill",
"timestamp": self._normalize_timestamp(record["fill_timestamp"]),
"fill_id": record["fill_id"],
"venue": record["venue"],
"price": record["price"],
"quantity": record["quantity"],
})
for log in system_logs:
all_events.append({
"source": "system",
"event_type": log["event_type"],
"timestamp": self._normalize_timestamp(log["timestamp"]),
"details": log["details"],
})
# Sort by normalized timestamp
all_events.sort(key=lambda e: e["timestamp"])
# Detect clock skew anomalies
anomalies = self._detect_clock_skew(all_events)
return {
"timeline": all_events,
"clock_skew_anomalies": anomalies,
"reconstruction_timestamp": datetime.utcnow().isoformat(),
"time_reference": self.time_reference,
}
def _normalize_timestamp(self, ts) -> int:
"""
Convert various timestamp formats to UTC microseconds since epoch.
"""
if isinstance(ts, int):
# Already epoch microseconds
return ts
elif isinstance(ts, datetime):
return int(ts.timestamp() * 1_000_000)
elif isinstance(ts, str):
return int(datetime.fromisoformat(ts).timestamp() * 1_000_000)
def _detect_clock_skew(self, events: List[Dict]) -> List[Dict]:
"""
Detect abnormal time deltas between consecutive events from different sources.
"""
anomalies = []
for i in range(1, len(events)):
prev = events[i - 1]
curr = events[i]
delta_us = curr["timestamp"] - prev["timestamp"]
# Negative deltas or excessive deltas indicate clock issues
if delta_us < 0:
anomalies.append({
"severity": "critical",
"description": f"Negative delta between {prev['source']} and {curr['source']}",
"delta_us": delta_us,
})
elif delta_us > self.clock_skew_tolerance_ms * 1000 and prev["source"] != curr["source"]:
anomalies.append({
"severity": "warning",
"description": f"Large delta ({delta_us/1000:.1f}ms) between {prev['source']} and {curr['source']}",
"delta_us": delta_us,
})
return anomalies
Internal Audit Queries: What Your Own Team Will Ask
Beyond regulatory inquiries, your internal audit team will regularly query the compliance archive for routine reviews. Design your archive access patterns for these common queries:
| Query type | Typical frequency | Archive access pattern |
|---|---|---|
| Best execution review | Monthly | Time-range scan on executions table |
| Strategy P&L attribution | Quarterly | Join orders + executions + positions |
| Risk limit breach review | Weekly | Filter on risk_events table |
| Model change audit | Per deployment | Filter on model_deployments table |
| Cross-venue reconciliation | Monthly | Time-aligned multi-table join |
| Regulatory reporting | Per event | Pre-computed aggregations on cold storage |
For queries against cold storage (GLACIER), note that retrieval times can range from 3–12 hours for standard Glacier access. Build your compliance reporting pipeline to pre-fetch the relevant data windows before they are needed.
The Compliance Archive as Engineering Infrastructure
The firms that survive regulatory scrutiny longest are those that treat compliance data as a first-class engineering product — not a compliance department requirement imposed on an unwilling engineering team.
The framework we have outlined:
- Identifies all six categories of compliance-relevant data — from order records to model change logs to infrastructure events.
- Assigns appropriate retention windows aligned with regulatory minimums and institutional best practices.
- Automates the archive pipeline using Kafka ingestion, S3 cold storage, and lifecycle policies.
- Enforces immutability at the storage layer, preventing any post-hoc modification of compliance records.
- Provides native reconstruction capabilities for cross-venue reconciliation and clock skew analysis.
When a regulator asks for order-level data from a specific time window three years ago, the answer should be a query — not a project.
Next Steps
If you're a quant fund or systematic trading firm and have not yet implemented a compliance archive pipeline, treat this as a P0 infrastructure initiative. The regulatory and litigation risk of under-retention far exceeds the storage costs of over-retention.
If you're an institutional quant team running multi-venue strategies, cross-venue reconstruction capabilities deserve special attention. Best execution compliance is increasingly enforced, and your ability to demonstrate compliance depends on your data architecture.
If you need a data infrastructure foundation for compliance archiving, TickDB provides order book data, historical OHLCV, and execution data via a unified API — serving as the upstream data source for your compliance pipeline.
This article does not constitute investment advice. Markets involve risk; past performance does not guarantee future results. Compliance requirements vary by jurisdiction and instrument class; consult qualified legal counsel for jurisdiction-specific guidance.