The most expensive mistake a quant researcher makes is not a bad signal — it is trusting a backtest built on the wrong architecture.
Vectorized backtests are fast. They are also dangerously deceptive for anything beyond simple moving-average crossovers. The moment your strategy involves position sizing with portfolio-level risk limits, order-state dependencies across bars, or fill simulation that respects order books, vectorized frameworks silently discard the interactions that determine real-world performance.
Event-driven architectures solve this. They simulate market reality one tick at a time, with full fidelity to the order lifecycle. The tradeoff is complexity — and without a disciplined design, that complexity becomes unmaintainable.
This article builds a production-grade event-driven backtesting engine from scratch. We will design the event loop, implement a robust Order state machine, and construct a realistic matching engine. By the end, you will have an extensible framework that handles complex multi-asset strategies without the silent shortcuts that plague naive implementations.
Why Event-Driven Outperforms Vectorized: A Technical Breakdown
Before writing code, we need to understand exactly why the architectural choice matters.
The Vectorized Trap
Vectorized backtesting treats each bar independently. You compute signals for every timestamp, apply them to positions, and calculate P&L. The math is clean. The simulation is wrong.
Consider a strategy that holds a position until the close, then rebalances at the next open. In vectorized form, you compute the close-of-day signal, then the next-day open signal — as if the world pauses between bars. In reality, the market moves continuously. Your order submission, queue position, fill price, and slippage all depend on events that happen inside the bar — events that vectorized frameworks simply never model.
Vectorized backtests systematically understate transaction costs and overstate capacity. The gap is small for low-frequency strategies with wide stop-losses. It is catastrophic for high-frequency event-driven strategies.
What Event-Driven Architecture Provides
An event-driven backtester processes market data sequentially:
MarketData Event → Strategy Signal → Order Event →
Matching Engine → Fill Event → Portfolio Update →
Risk Check → MarketData Event...
Every stage can introduce delay, rejection, or modification. The strategy does not simply "know" its position — it receives confirmation via a fill event. Risk limits can reject signals mid-simulation. Order state machine transitions are explicit and auditable.
This fidelity comes with costs: simulation speed drops by 10x–100x compared to vectorized approaches, and the code complexity increases substantially. For strategies where simulation accuracy determines whether the strategy is viable, this tradeoff is not optional.
Decision Matrix
| Characteristic | Vectorized | Event-Driven |
|---|---|---|
| Execution speed | Fast (10x–100x) | Slow |
| Order lifecycle fidelity | None | Full |
| Fill simulation | Fixed slippage | Queue-aware |
| Portfolio-level risk | Approximated | Exact |
| Multi-asset correlation | Assumed independent | Naturally modeled |
| Ideal strategy frequency | Daily+ | Intraday+ |
| Code complexity | Low | Medium–High |
If your strategy trades on daily bars with fixed position sizing, vectorized is acceptable. If you are building anything with intraday signals, conditional orders, or portfolio-level constraints, event-driven is mandatory.
Core Architecture: The Event System
Event Taxonomy
The backbone of the framework is a clean event type hierarchy. Each event carries a timestamp and immutable payload:
from dataclasses import dataclass, field
from datetime import datetime
from enum import Enum, auto
from typing import Optional, Dict, Any
import uuid
class EventType(Enum):
MARKET_DATA = auto()
ORDER_SUBMIT = auto()
ORDER_ACCEPT = auto()
ORDER_REJECT = auto()
ORDER_PARTIAL_FILL = auto()
ORDER_FILLED = auto()
ORDER_CANCEL = auto()
ORDER_MODIFY = auto()
PORTFOLIO_UPDATE = auto()
RISK_CHECK = auto()
SIGNAL = auto()
@dataclass
class BaseEvent:
"""Every event carries a timestamp and unique identifier."""
timestamp: datetime
event_id: str = field(default_factory=lambda: str(uuid.uuid4()))
@property
def event_type(self) -> EventType:
raise NotImplementedError
@dataclass
class MarketDataEvent(BaseEvent):
"""Market data events carry price and volume information."""
symbol: str
open_price: float
high_price: float
low_price: float
close_price: float
volume: float
bid_price: Optional[float] = None
ask_price: Optional[float] = None
bid_size: Optional[float] = None
ask_size: Optional[float] = None
@property
def event_type(self) -> EventType:
return EventType.MARKET_DATA
@property
def mid_price(self) -> float:
if self.bid_price is not None and self.ask_price is not None:
return (self.bid_price + self.ask_price) / 2.0
return self.close_price
@dataclass
class OrderEvent(BaseEvent):
"""Order events carry full order specification."""
symbol: str
side: str # 'BUY' or 'SELL'
quantity: float
order_type: str # 'MARKET', 'LIMIT', 'STOP', 'STOP_LIMIT'
price: Optional[float] = None
stop_price: Optional[float] = None
tif: str = 'DAY' # 'DAY', 'IOC', 'FOK', 'GTC'
@property
def event_type(self) -> EventType:
return EventType.ORDER_SUBMIT
@dataclass
class FillEvent(BaseEvent):
"""Fill events record actual execution details."""
order_id: str
symbol: str
side: str
filled_quantity: float
fill_price: float
commission: float
slippage_bps: float
@property
def event_type(self) -> EventType:
return EventType.ORDER_FILLED
This event taxonomy is deliberately explicit. Every state transition is an event. There are no silent side effects.
Event Queue Design
The event queue is the central nervous system. It must guarantee ordering, support priority, and enable look-ahead prevention.
Critical design decision: The queue must prevent look-ahead bias by enforcing that only events with timestamps greater than the current simulation time are visible to downstream components. This is the single most common source of backtesting overfitting.
import heapq
from collections import deque
from typing import Callable, List, Optional, TypeVar, Generic
T = TypeVar('T', bound=BaseEvent)
class EventQueue:
"""
Thread-safe priority queue with look-ahead protection.
Events are ordered by timestamp. Components consuming from
this queue can only see events at or after the current
simulation time — preventing look-ahead bias.
"""
def __init__(self):
self._heap: List[BaseEvent] = []
self._processed_ids: set = set()
self._current_time: Optional[datetime] = None
def publish(self, event: BaseEvent) -> None:
"""
Add an event to the queue. Pre-publish validation
prevents duplicate event IDs.
"""
if event.event_id in self._processed_ids:
raise ValueError(
f"Event {event.event_id} has already been processed"
)
heapq.heappush(self._heap, event)
def advance_time(self, new_time: datetime) -> None:
"""
Advance simulation time. After calling this, events
with timestamps > new_time are visible; events with
timestamps <= new_time have been consumed.
⚠️ This is the core look-ahead protection mechanism.
"""
self._current_time = new_time
def get_next(self) -> Optional[BaseEvent]:
"""
Pop the next eligible event. Returns None if no events
exist for the current simulation time.
"""
while self._heap:
event = heapq.heappop(self._heap)
if event.timestamp > self._current_time:
# Event is in the future — push it back
# (we've peeked too far)
heapq.heappush(self._heap, event)
return None
if event.event_id in self._processed_ids:
continue
self._processed_ids.add(event.event_id)
return event
return None
def peek(self) -> Optional[BaseEvent]:
"""Peek at the next event without consuming it."""
if not self._heap:
return None
return min(self._heap, key=lambda e: e.timestamp)
def drain_until(self, end_time: datetime) -> List[BaseEvent]:
"""
Drain all events up to and including end_time.
Used for bar-based strategies that process complete bars.
"""
events = []
while True:
next_event = self.peek()
if next_event is None or next_event.timestamp > end_time:
break
event = self.get_next()
if event:
events.append(event)
return events
def __len__(self) -> int:
return len(self._heap)
Event Bus and Handler Registration
The event bus decouples producers from consumers. Strategies, risk managers, and portfolio handlers register interest in specific event types:
EventHandler = Callable[[BaseEvent], None]
class EventBus:
"""
Publish-subscribe event bus for component decoupling.
Each handler receives events in order. Handlers are
executed synchronously — if you need async processing,
implement a separate thread pool.
"""
def __init__(self):
self._handlers: Dict[EventType, List[EventHandler]] = {
event_type: [] for event_type in EventType
}
self._global_handlers: List[EventHandler] = []
def subscribe(
self,
event_type: EventType,
handler: EventHandler
) -> None:
"""Register a handler for a specific event type."""
if handler not in self._handlers[event_type]:
self._handlers[event_type].append(handler)
def subscribe_all(self, handler: EventHandler) -> None:
"""Register a handler for all event types."""
if handler not in self._global_handlers:
self._global_handlers.append(handler)
def unsubscribe(
self,
event_type: EventType,
handler: EventHandler
) -> None:
"""Remove a handler registration."""
if handler in self._handlers[event_type]:
self._handlers[event_type].remove(handler)
def publish_event(self, event: BaseEvent) -> None:
"""
Dispatch event to all registered handlers.
⚠️ Handler execution order matters. Register handlers
in dependency order: MarketData → Strategy → Order
→ Matching → Risk → Portfolio.
"""
# Type-specific handlers
for handler in self._handlers[event.event_type]:
handler(event)
# Global handlers (e.g., logging, persistence)
for handler in self._global_handlers:
handler(event)
Order State Machine: Explicit Lifecycle Management
The order state machine is where many backtesting frameworks cut corners — and where silent bugs proliferate. A correctly implemented state machine makes every order transition explicit and prevents invalid states.
State Definition
from enum import Enum, auto
class OrderState(Enum):
"""
Order lifecycle states. Transitions are strictly enforced.
State diagram:
PENDING → ACCEPTED → (PARTIAL_FILLED) → FILLED
↓
REJECTED
Any state (except FILLED, REJECTED) → CANCELLED
ACCEPTED → MODIFIED → ACCEPTED
"""
PENDING = auto() # Submitted, not yet processed
ACCEPTED = auto() # Exchange acknowledged
PARTIAL_FILLED = auto() # Some quantity filled
FILLED = auto() # Complete execution
REJECTED = auto() # Exchange rejected
CANCELLED = auto() # User-requested cancellation
EXPIRED = auto() # Time-in-force expired
class OrderSide(Enum):
BUY = auto()
SELL = auto()
class OrderType(Enum):
MARKET = auto()
LIMIT = auto()
STOP = auto()
STOP_LIMIT = auto()
State Machine Implementation
@dataclass
class Order:
"""
Order entity with explicit state machine.
State transitions are validated before application.
Invalid transitions raise StateTransitionError.
"""
order_id: str
symbol: str
side: OrderSide
order_type: OrderType
quantity: float
filled_quantity: float = 0.0
price: Optional[float] = None
stop_price: Optional[float] = None
tif: str = 'DAY'
# Timestamps
created_at: datetime = field(default_factory=datetime.now)
updated_at: datetime = field(default_factory=datetime.now)
# State machine
state: OrderState = OrderState.PENDING
# Metadata
parent_order_id: Optional[str] = None # For linked orders
tags: Dict[str, Any] = field(default_factory=dict)
def _validate_transition(self, new_state: OrderState) -> None:
"""Enforce valid state transitions."""
valid_transitions = {
OrderState.PENDING: {
OrderState.ACCEPTED,
OrderState.REJECTED,
OrderState.CANCELLED,
},
OrderState.ACCEPTED: {
OrderState.PARTIAL_FILLED,
OrderState.FILLED,
OrderState.REJECTED,
OrderState.CANCELLED,
OrderState.EXPIRED,
},
OrderState.PARTIAL_FILLED: {
OrderState.PARTIAL_FILLED, # More fills
OrderState.FILLED,
OrderState.CANCELLED,
},
OrderState.FILLED: set(), # Terminal state
OrderState.REJECTED: set(), # Terminal state
OrderState.CANCELLED: set(), # Terminal state
OrderState.EXPIRED: set(), # Terminal state
}
if new_state not in valid_transitions.get(self.state, set()):
raise StateTransitionError(
f"Invalid transition: {self.state.name} → {new_state.name} "
f"for order {self.order_id}"
)
def transition_to(self, new_state: OrderState) -> None:
"""
Apply a state transition with validation.
⚠️ Always use this method instead of direct state assignment.
"""
self._validate_transition(new_state)
old_state = self.state
self.state = new_state
self.updated_at = datetime.now()
@property
def remaining_quantity(self) -> float:
return self.quantity - self.filled_quantity
@property
def is_active(self) -> bool:
"""Order can still receive fills."""
return self.state in (
OrderState.PENDING,
OrderState.ACCEPTED,
OrderState.PARTIAL_FILLED,
)
@property
def is_buy(self) -> bool:
return self.side == OrderSide.BUY
def __repr__(self) -> str:
return (
f"Order({self.order_id}, {self.symbol}, "
f"{self.side.name}, qty={self.quantity}, "
f"filled={self.filled_quantity}, state={self.state.name})"
)
class StateTransitionError(Exception):
"""Raised when an invalid state transition is attempted."""
pass
Order Repository
The order repository tracks all active and historical orders. It supports querying by symbol, state, and time range:
class OrderRepository:
"""
Manages order lifecycle and persistence.
Provides efficient lookup by order_id, symbol, and state.
"""
def __init__(self):
self._orders: Dict[str, Order] = {}
self._by_symbol: Dict[str, List[str]] = {}
self._by_state: Dict[OrderState, List[str]] = {
state: [] for state in OrderState
}
def add(self, order: Order) -> None:
"""Register a new order."""
self._orders[order.order_id] = order
self._by_symbol.setdefault(order.symbol, []).append(order.order_id)
self._by_state[order.state].append(order.order_id)
def update_state(self, order_id: str, new_state: OrderState) -> None:
"""Update order state with index maintenance."""
order = self._orders[order_id]
old_state = order.state
# Update order
order.transition_to(new_state)
# Update state index
self._by_state[old_state].remove(order_id)
self._by_state[new_state].append(order_id)
def get(self, order_id: str) -> Optional[Order]:
return self._orders.get(order_id)
def get_by_symbol(self, symbol: str) -> List[Order]:
return [
self._orders[oid]
for oid in self._by_symbol.get(symbol, [])
]
def get_active_by_symbol(self, symbol: str) -> List[Order]:
"""Get all active (non-terminal) orders for a symbol."""
return [
order for order in self.get_by_symbol(symbol)
if order.is_active
]
def get_by_state(self, state: OrderState) -> List[Order]:
return [
self._orders[oid]
for oid in self._by_state.get(state, [])
]
@property
def active_orders(self) -> List[Order]:
"""All orders in non-terminal states."""
return self.get_by_state(OrderState.PENDING) + \
self.get_by_state(OrderState.ACCEPTED) + \
self.get_by_state(OrderState.PARTIAL_FILLED)
Matching Engine: Realistic Fill Simulation
The matching engine determines fill prices. Naive implementations use close_price ± fixed_slippage. Production-grade engines simulate queue dynamics, market impact, and adverse selection.
Fill Model Architecture
@dataclass
class FillModel:
"""
Configurable fill model supporting multiple simulation approaches.
Available models:
- FIXED_SLIPPAGE: Constant slippage regardless of order size
- VOLATILITY_ADJUSTED: Slippage scales with current volatility
- QUEUE_AWARE: Simulates queue position and time-to-fill
"""
model_type: str = 'VOLATILITY_ADJUSTED'
base_slippage_bps: float = 2.0
market_impact_coefficient: float = 0.1
def calculate_fill(
self,
order: Order,
market_data: MarketDataEvent,
current_volatility: Optional[float] = None,
) -> tuple[float, float, float]:
"""
Calculate fill price, slippage, and commission.
Returns: (fill_price, slippage_bps, commission)
"""
if self.model_type == 'FIXED_SLIPPAGE':
return self._fixed_slippage_fill(order, market_data)
elif self.model_type == 'VOLATILITY_ADJUSTED':
return self._volatility_adjusted_fill(
order, market_data, current_volatility
)
elif self.model_type == 'QUEUE_AWARE':
return self._queue_aware_fill(order, market_data)
else:
raise ValueError(f"Unknown fill model: {self.model_type}")
def _fixed_slippage_fill(
self,
order: Order,
market_data: MarketDataEvent
) -> tuple[float, float, float]:
"""Apply fixed slippage regardless of conditions."""
reference_price = market_data.mid_price
if order.is_buy:
fill_price = reference_price * (1 + self.base_slippage_bps / 10000)
else:
fill_price = reference_price * (1 - self.base_slippage_bps / 10000)
commission = self._calculate_commission(order, fill_price)
return fill_price, self.base_slippage_bps, commission
def _volatility_adjusted_fill(
self,
order: Order,
market_data: MarketDataEvent,
volatility: Optional[float]
) -> tuple[float, float, float]:
"""
Slippage scales with realized volatility.
Higher volatility → wider spreads → more slippage.
"""
reference_price = market_data.mid_price
if volatility is not None:
vol_scalar = max(1.0, volatility / 0.02) # Normalize to 2% vol
else:
vol_scalar = 1.0
slippage = self.base_slippage_bps * vol_scalar
if order.is_buy:
fill_price = reference_price * (1 + slippage / 10000)
else:
fill_price = reference_price * (1 - slippage / 10000)
commission = self._calculate_commission(order, fill_price)
return fill_price, slippage, commission
def _queue_aware_fill(
self,
order: Order,
market_data: MarketDataEvent
) -> tuple[float, float, float]:
"""
Queue-aware fill model.
⚠️ Requires bid_size/ask_size from market_data.
Simulates time-to-fill based on queue depth.
"""
if market_data.bid_size is None or market_data.ask_size is None:
# Fall back to volatility model
return self._volatility_adjusted_fill(order, market_data, None)
reference_price = market_data.mid_price
if order.is_buy:
queue_ahead = market_data.bid_size
fill_price = market_data.ask_price
else:
queue_ahead = market_data.ask_size
fill_price = market_data.bid_price
# Market impact: larger orders face more impact
queue_fraction = min(order.quantity / max(queue_ahead, 1), 1.0)
market_impact = (
self.market_impact_coefficient *
queue_fraction *
(fill_price - reference_price) / reference_price
)
fill_price *= (1 + market_impact)
slippage = abs(fill_price - reference_price) / reference_price * 10000
commission = self._calculate_commission(order, fill_price)
return fill_price, slippage, commission
def _calculate_commission(
self,
order: Order,
fill_price: float
) -> float:
"""
Calculate commission. Default: per-share/contract model.
Override this for custom fee schedules.
"""
# Example: $0.005 per share, minimum $1
shares = order.quantity
commission = max(shares * 0.005, 1.0)
return commission
Matching Engine Core
The matching engine processes order events and generates fill events:
class MatchingEngine:
"""
Core matching engine with fill model integration.
Processing flow:
1. Receive ORDER_SUBMIT event
2. Validate order
3. Apply fill model
4. Generate FILL event
5. Update order state
"""
def __init__(
self,
order_repository: OrderRepository,
event_bus: EventBus,
fill_model: FillModel,
):
self.order_repository = order_repository
self.event_bus = event_bus
self.fill_model = fill_model
self.current_volatility: Optional[float] = None
# Register handlers
self.event_bus.subscribe(EventType.ORDER_SUBMIT, self.on_order)
def set_volatility(self, vol: float) -> None:
"""Update current volatility estimate for fill modeling."""
self.current_volatility = vol
def on_order(self, event: OrderEvent) -> None:
"""Process incoming order submission."""
order = Order(
order_id=event.event_id,
symbol=event.symbol,
side=OrderSide.BUY if event.side == 'BUY' else OrderSide.SELL,
order_type=self._map_order_type(event.order_type),
quantity=event.quantity,
price=event.price,
stop_price=event.stop_price,
tif=event.tif,
)
# Add to repository
self.order_repository.add(order)
# Transition to ACCEPTED (simplified; real impl validates)
self.order_repository.update_state(
order.order_id, OrderState.ACCEPTED
)
# Publish accept event
self.event_bus.publish_event(OrderAcceptEvent(
timestamp=event.timestamp,
order_id=order.order_id,
symbol=order.symbol,
status='ACCEPTED',
))
# Process fill immediately for MARKET orders
if order.order_type == OrderType.MARKET:
self._fill_order(order, event.timestamp)
def _fill_order(
self,
order: Order,
timestamp: datetime
) -> None:
"""Execute fill for an order."""
# Get current market data (from event bus state)
market_data = self._get_latest_market_data(order.symbol)
if market_data is None:
self._reject_order(order, timestamp, "No market data available")
return
# Calculate fill
fill_price, slippage, commission = self.fill_model.calculate_fill(
order, market_data, self.current_volatility
)
# Validate fill price (basic sanity check)
if fill_price <= 0:
self._reject_order(order, timestamp, "Invalid fill price")
return
# Update order
order.filled_quantity = order.quantity
self.order_repository.update_state(
order.order_id, OrderState.FILLED
)
# Generate fill event
fill_event = FillEvent(
timestamp=timestamp,
order_id=order.order_id,
symbol=order.symbol,
side=order.side.name,
filled_quantity=order.quantity,
fill_price=fill_price,
commission=commission,
slippage_bps=slippage,
)
self.event_bus.publish_event(fill_event)
def _reject_order(
self,
order: Order,
timestamp: datetime,
reason: str
) -> None:
"""Reject an order with a reason."""
self.order_repository.update_state(
order.order_id, OrderState.REJECTED
)
self.event_bus.publish_event(OrderRejectEvent(
timestamp=timestamp,
order_id=order.order_id,
symbol=order.symbol,
reason=reason,
))
def _get_latest_market_data(
self,
symbol: str
) -> Optional[MarketDataEvent]:
"""Retrieve latest market data for a symbol."""
# Implementation depends on data storage strategy
# This is a simplified placeholder
return None
@staticmethod
def _map_order_type(order_type: str) -> OrderType:
"""Map string order type to OrderType enum."""
mapping = {
'MARKET': OrderType.MARKET,
'LIMIT': OrderType.LIMIT,
'STOP': OrderType.STOP,
'STOP_LIMIT': OrderType.STOP_LIMIT,
}
return mapping.get(order_type.upper(), OrderType.MARKET)
@dataclass
class OrderAcceptEvent(BaseEvent):
"""Event published when order is accepted."""
order_id: str
symbol: str
status: str
@property
def event_type(self) -> EventType:
return EventType.ORDER_ACCEPT
@dataclass
class OrderRejectEvent(BaseEvent):
"""Event published when order is rejected."""
order_id: str
symbol: str
reason: str
@property
def event_type(self) -> EventType:
return EventType.ORDER_REJECT
Portfolio and Risk Management Integration
Portfolio Tracker
The portfolio tracker receives fill events and maintains positions and cash:
@dataclass
class Position:
"""Represents a current market position."""
symbol: str
quantity: float
average_entry_price: float
unrealized_pnl: float = 0.0
realized_pnl: float = 0.0
@property
def market_value(self) -> float:
return self.quantity * self.average_entry_price
@property
def is_long(self) -> bool:
return self.quantity > 0
@property
def is_short(self) -> bool:
return self.quantity < 0
class Portfolio:
"""
Portfolio tracker with position management.
Receives FILL events and updates positions accordingly.
Tracks cash, margin, and P&L in real-time.
"""
def __init__(self, initial_cash: float = 100_000.0):
self.initial_cash = initial_cash
self.cash = initial_cash
self.positions: Dict[str, Position] = {}
self.orders: OrderRepository = OrderRepository()
# P&L tracking
self.realized_pnl = 0.0
self.trades: List[FillEvent] = []
# Event tracking for metrics
self.equity_curve: List[tuple[datetime, float]] = []
def on_fill(self, event: FillEvent) -> None:
"""Process a fill event and update portfolio."""
self.trades.append(event)
# Update cash
self.cash -= event.filled_quantity * event.fill_price
self.cash -= event.commission
# Update or create position
if event.symbol not in self.positions:
self.positions[event.symbol] = Position(
symbol=event.symbol,
quantity=0.0,
average_entry_price=0.0,
)
position = self.positions[event.symbol]
self._update_position(position, event)
# Update realized P&L
self.realized_pnl = sum(
p.realized_pnl for p in self.positions.values()
)
def _update_position(
self,
position: Position,
fill: FillEvent
) -> None:
"""Update position with new fill."""
if position.quantity == 0:
# Opening position
position.quantity = fill.filled_quantity if fill.side == 'BUY' else -fill.filled_quantity
position.average_entry_price = fill.fill_price
elif (position.quantity > 0) == (fill.side == 'BUY'):
# Adding to position
total_cost = (
position.quantity * position.average_entry_price +
fill.filled_quantity * fill.fill_price
)
position.quantity += fill.filled_quantity if fill.side == 'BUY' else -fill.filled_quantity
position.average_entry_price = total_cost / abs(position.quantity)
else:
# Reducing or closing position
closing_qty = min(abs(position.quantity), fill.filled_quantity)
# Realized P&L on the closed portion
pnl = closing_qty * (
position.average_entry_price - fill.fill_price
) * (-1 if position.is_short else 1)
position.realized_pnl += pnl
position.quantity += fill.filled_quantity if fill.side == 'BUY' else -fill.filled_quantity
if abs(position.quantity) < 1e-8:
position.quantity = 0.0
def update_market_values(self, prices: Dict[str, float]) -> None:
"""Update unrealized P&L based on current prices."""
for symbol, position in self.positions.items():
if symbol in prices and position.quantity != 0:
current_price = prices[symbol]
position.unrealized_pnl = position.quantity * (
current_price - position.average_entry_price
)
# Record equity for curve
total_equity = self.cash + sum(
p.market_value for p in self.positions.values()
)
self.equity_curve.append((datetime.now(), total_equity))
@property
def total_equity(self) -> float:
return self.cash + sum(
p.market_value for p in self.positions.values()
)
@property
def total_pnl(self) -> float:
return self.realized_pnl + sum(
p.unrealized_pnl for p in self.positions.values()
)
@property
def leverage(self) -> float:
"""Current portfolio leverage."""
long_value = sum(
p.market_value for p in self.positions.values() if p.is_long
)
short_value = sum(
abs(p.market_value) for p in self.positions.values() if p.is_short
)
total_exposure = long_value + short_value
return total_exposure / max(self.total_equity, 1e-8)
Risk Manager
The risk manager checks orders against portfolio constraints before execution:
@dataclass
class RiskLimits:
"""Risk limit configuration."""
max_position_size: float = 10_000 # Max shares per position
max_portfolio_leverage: float = 2.0
max_sector_exposure: float = 0.30 # 30% of portfolio
max_order_value: float = 50_000.0
max_drawdown_pct: float = 0.20
class RiskManager:
"""
Pre-trade and post-trade risk management.
Pre-trade: Validates order against risk limits before submission.
Post-trade: Monitors portfolio-level risk and triggers liquidation.
"""
def __init__(
self,
portfolio: Portfolio,
limits: RiskLimits,
):
self.portfolio = portfolio
self.limits = limits
self.high_water_mark = portfolio.initial_cash
self.is_breaching = False
def pre_trade_check(self, order: Order, current_price: float) -> tuple[bool, str]:
"""
Validate order against risk limits before submission.
Returns: (approved, rejection_reason)
"""
order_value = order.quantity * current_price
# Check order size
if order.quantity > self.limits.max_position_size:
return False, f"Order size {order.quantity} exceeds max {self.limits.max_position_size}"
# Check order value
if order_value > self.limits.max_order_value:
return False, f"Order value ${order_value:,.0f} exceeds max ${self.limits.max_order_value:,.0f}"
# Check leverage
current_leverage = self.portfolio.leverage
if current_leverage > self.limits.max_portfolio_leverage:
return False, f"Portfolio leverage {current_leverage:.2f}x exceeds max {self.limits.max_portfolio_leverage}x"
# Check drawdown
if self.is_breaching:
return False, "Account is in drawdown breach — new orders blocked"
return True, ""
def post_trade_update(self) -> None:
"""
Update risk state after trade execution.
Checks drawdown against high water mark.
"""
current_equity = self.portfolio.total_equity
self.high_water_mark = max(self.high_water_mark, current_equity)
drawdown = (self.high_water_mark - current_equity) / self.high_water_mark
if drawdown > self.limits.max_drawdown_pct:
self.is_breaching = True
# Trigger risk event for potential liquidation
Putting It Together: The Backtest Runner
Simulation Loop
class BacktestRunner:
"""
Orchestrates the event-driven backtest simulation.
Processing order:
1. Advance time
2. Drain events for current time
3. Process through event bus
4. Update portfolio and risk state
5. Record metrics
"""
def __init__(
self,
event_queue: EventQueue,
event_bus: EventBus,
portfolio: Portfolio,
risk_manager: RiskManager,
matching_engine: MatchingEngine,
):
self.event_queue = event_queue
self.event_bus = event_bus
self.portfolio = portfolio
self.risk_manager = risk_manager
self.matching_engine = matching_engine
# Metrics collection
self.metrics: List[dict] = []
# Register handlers
self.event_bus.subscribe(EventType.FILL, self.portfolio.on_fill)
def run(
self,
start_date: datetime,
end_date: datetime,
strategy: Optional['BaseStrategy'] = None,
) -> dict:
"""
Execute the backtest simulation.
Args:
start_date: Simulation start time
end_date: Simulation end time
strategy: Optional strategy instance (will receive events)
Returns:
Backtest results including equity curve, trades, and metrics
"""
self.event_queue.advance_time(start_date)
current_time = start_date
while current_time <= end_date:
# Drain all events up to current time
events = self.event_queue.drain_until(current_time)
for event in events:
# Publish to event bus
self.event_bus.publish_event(event)
# Update portfolio prices if market data
if event.event_type == EventType.MARKET_DATA:
self.portfolio.update_market_values(
{event.symbol: event.close_price}
)
# Record metrics snapshot
self._record_metrics(current_time)
# Advance to next event time
next_event = self.event_queue.peek()
if next_event is None:
break
current_time = next_event.timestamp
self.event_queue.advance_time(current_time)
return self._generate_results()
def _record_metrics(self, timestamp: datetime) -> None:
"""Record current state for metrics."""
self.metrics.append({
'timestamp': timestamp,
'equity': self.portfolio.total_equity,
'cash': self.portfolio.cash,
'positions': len(self.portfolio.positions),
'leverage': self.portfolio.leverage,
'realized_pnl': self.portfolio.realized_pnl,
})
def _generate_results(self) -> dict:
"""Compile backtest results."""
equity_curve = [m['equity'] for m in self.metrics]
# Calculate performance metrics
total_return = (equity_curve[-1] - equity_curve[0]) / equity_curve[0]
# Max drawdown
peak = equity_curve[0]
max_dd = 0.0
for equity in equity_curve:
if equity > peak:
peak = equity
dd = (peak - equity) / peak
max_dd = max(max_dd, dd)
return {
'initial_cash': self.portfolio.initial_cash,
'final_equity': equity_curve[-1],
'total_return': total_return,
'total_trades': len(self.portfolio.trades),
'max_drawdown': max_dd,
'equity_curve': equity_curve,
'trades': self.portfolio.trades,
'metrics': self.metrics,
}
Performance Considerations
Common Pitfalls
| Pitfall | Symptom | Solution |
|---|---|---|
| Look-ahead bias | Unrealistic returns, negative drawdown | Validate event queue time boundaries; use only data available at simulation time |
| Survivorship bias | Overestimated returns | Include delisted/ bankrupt symbols in historical data |
| Position leak | P&L attribution errors | Close all positions at end of simulation; use proper settlement |
| Overfitting | Live performance >> backtest | Out-of-sample testing; parameter sensitivity analysis |
| Ignoring transaction costs | Strategy looks profitable but loses money | Include realistic commission, slippage, and market impact |
Optimization Strategies
For strategies requiring higher simulation throughput:
Bar aggregation: For lower-frequency strategies, aggregate tick data into 1-minute or 5-minute bars before simulation. Reduces event count by 100x–1000x.
Vectorized components: Hybrid architecture — use vectorized computation for signal generation where order independence is verified, event-driven for order lifecycle.
Parallelization: Partition simulation by symbol or time window. Merge results at end. Requires careful handling of cross-symbol correlations.
Caching: Cache historical data in memory during simulation. Disk I/O is the primary bottleneck for long backtests.
class CachedDataProvider:
"""High-performance data provider with in-memory caching."""
def __init__(self, cache_size: int = 10_000):
self._cache: LRUCache = LRUCache(maxsize=cache_size)
self._data_source = None # Your data source
def get_bars(
self,
symbol: str,
start: datetime,
end: datetime,
) -> pd.DataFrame:
"""Retrieve bars with LRU caching."""
cache_key = (symbol, start, end)
if cache_key in self._cache:
return self._cache[cache_key]
data = self._data_source.fetch(symbol, start, end)
self._cache[cache_key] = data
return data
Extensibility Points
This framework is designed for extension. Key extension points:
New asset classes: Implement a new AssetAdapter that maps asset-specific behaviors (settlement, margin requirements, contract specifications) to the standard event interface.
Alternative data: Create a new AlternativeDataProvider that publishes events in the same format. The event bus architecture ensures strategies can consume traditional and alternative data through the same interface.
Execution strategies: Implement TWAP, VWAP, or POV execution algorithms by creating a new ExecutionHandler that manages child orders rather than submitting parent orders directly.
Risk models: Extend RiskManager with factor-based risk models, VaR calculations, or custom sector constraints by overriding the check methods.
Closing Thoughts
An event-driven backtesting framework is not a luxury — it is the minimum infrastructure required to trust any strategy that operates on intraday data, involves complex order logic, or manages portfolio-level constraints.
The architecture presented here is intentionally explicit. Every state transition is auditable. Every event has a timestamp. Look-ahead bias is structurally prevented by the event queue's time boundary enforcement.
The tradeoff is complexity — but this complexity is manageable when organized around clear interfaces. The event bus decouples components. The order repository centralizes state. The matching engine abstracts fill simulation.
Build the foundation correctly once. The strategies you test on top of it will be worth more because of it.
Next Steps
If you are building your first event-driven strategy, start with the architecture in this article. Validate it against a simple moving-average crossover before adding complexity.
If you need institutional-grade historical data to power your backtests, reach out to enterprise@tickdb.ai for data coverage spanning multiple asset classes with 10+ years of cleaned, timestamped historical data.
If you are evaluating backtesting frameworks, compare the event taxonomy and state machine implementation against your requirements. A framework that cuts corners on state management will surface bugs only in production.
This article does not constitute investment advice. Backtesting results do not guarantee future performance. Markets involve risk; past performance does not guarantee future results.