The most expensive mistake a quant researcher makes is not a bad signal — it is trusting a backtest built on the wrong architecture.

Vectorized backtests are fast. They are also dangerously deceptive for anything beyond simple moving-average crossovers. The moment your strategy involves position sizing with portfolio-level risk limits, order-state dependencies across bars, or fill simulation that respects order books, vectorized frameworks silently discard the interactions that determine real-world performance.

Event-driven architectures solve this. They simulate market reality one tick at a time, with full fidelity to the order lifecycle. The tradeoff is complexity — and without a disciplined design, that complexity becomes unmaintainable.

This article builds a production-grade event-driven backtesting engine from scratch. We will design the event loop, implement a robust Order state machine, and construct a realistic matching engine. By the end, you will have an extensible framework that handles complex multi-asset strategies without the silent shortcuts that plague naive implementations.

Why Event-Driven Outperforms Vectorized: A Technical Breakdown

Before writing code, we need to understand exactly why the architectural choice matters.

The Vectorized Trap

Vectorized backtesting treats each bar independently. You compute signals for every timestamp, apply them to positions, and calculate P&L. The math is clean. The simulation is wrong.

Consider a strategy that holds a position until the close, then rebalances at the next open. In vectorized form, you compute the close-of-day signal, then the next-day open signal — as if the world pauses between bars. In reality, the market moves continuously. Your order submission, queue position, fill price, and slippage all depend on events that happen inside the bar — events that vectorized frameworks simply never model.

Vectorized backtests systematically understate transaction costs and overstate capacity. The gap is small for low-frequency strategies with wide stop-losses. It is catastrophic for high-frequency event-driven strategies.

What Event-Driven Architecture Provides

An event-driven backtester processes market data sequentially:

MarketData Event → Strategy Signal → Order Event → 
    Matching Engine → Fill Event → Portfolio Update → 
    Risk Check → MarketData Event...

Every stage can introduce delay, rejection, or modification. The strategy does not simply "know" its position — it receives confirmation via a fill event. Risk limits can reject signals mid-simulation. Order state machine transitions are explicit and auditable.

This fidelity comes with costs: simulation speed drops by 10x–100x compared to vectorized approaches, and the code complexity increases substantially. For strategies where simulation accuracy determines whether the strategy is viable, this tradeoff is not optional.

Decision Matrix

Characteristic Vectorized Event-Driven
Execution speed Fast (10x–100x) Slow
Order lifecycle fidelity None Full
Fill simulation Fixed slippage Queue-aware
Portfolio-level risk Approximated Exact
Multi-asset correlation Assumed independent Naturally modeled
Ideal strategy frequency Daily+ Intraday+
Code complexity Low Medium–High

If your strategy trades on daily bars with fixed position sizing, vectorized is acceptable. If you are building anything with intraday signals, conditional orders, or portfolio-level constraints, event-driven is mandatory.

Core Architecture: The Event System

Event Taxonomy

The backbone of the framework is a clean event type hierarchy. Each event carries a timestamp and immutable payload:

from dataclasses import dataclass, field
from datetime import datetime
from enum import Enum, auto
from typing import Optional, Dict, Any
import uuid


class EventType(Enum):
    MARKET_DATA = auto()
    ORDER_SUBMIT = auto()
    ORDER_ACCEPT = auto()
    ORDER_REJECT = auto()
    ORDER_PARTIAL_FILL = auto()
    ORDER_FILLED = auto()
    ORDER_CANCEL = auto()
    ORDER_MODIFY = auto()
    PORTFOLIO_UPDATE = auto()
    RISK_CHECK = auto()
    SIGNAL = auto()


@dataclass
class BaseEvent:
    """Every event carries a timestamp and unique identifier."""
    timestamp: datetime
    event_id: str = field(default_factory=lambda: str(uuid.uuid4()))
    
    @property
    def event_type(self) -> EventType:
        raise NotImplementedError


@dataclass
class MarketDataEvent(BaseEvent):
    """Market data events carry price and volume information."""
    symbol: str
    open_price: float
    high_price: float
    low_price: float
    close_price: float
    volume: float
    bid_price: Optional[float] = None
    ask_price: Optional[float] = None
    bid_size: Optional[float] = None
    ask_size: Optional[float] = None
    
    @property
    def event_type(self) -> EventType:
        return EventType.MARKET_DATA
    
    @property
    def mid_price(self) -> float:
        if self.bid_price is not None and self.ask_price is not None:
            return (self.bid_price + self.ask_price) / 2.0
        return self.close_price


@dataclass
class OrderEvent(BaseEvent):
    """Order events carry full order specification."""
    symbol: str
    side: str  # 'BUY' or 'SELL'
    quantity: float
    order_type: str  # 'MARKET', 'LIMIT', 'STOP', 'STOP_LIMIT'
    price: Optional[float] = None
    stop_price: Optional[float] = None
    tif: str = 'DAY'  # 'DAY', 'IOC', 'FOK', 'GTC'
    
    @property
    def event_type(self) -> EventType:
        return EventType.ORDER_SUBMIT


@dataclass
class FillEvent(BaseEvent):
    """Fill events record actual execution details."""
    order_id: str
    symbol: str
    side: str
    filled_quantity: float
    fill_price: float
    commission: float
    slippage_bps: float
    
    @property
    def event_type(self) -> EventType:
        return EventType.ORDER_FILLED

This event taxonomy is deliberately explicit. Every state transition is an event. There are no silent side effects.

Event Queue Design

The event queue is the central nervous system. It must guarantee ordering, support priority, and enable look-ahead prevention.

Critical design decision: The queue must prevent look-ahead bias by enforcing that only events with timestamps greater than the current simulation time are visible to downstream components. This is the single most common source of backtesting overfitting.

import heapq
from collections import deque
from typing import Callable, List, Optional, TypeVar, Generic

T = TypeVar('T', bound=BaseEvent)


class EventQueue:
    """
    Thread-safe priority queue with look-ahead protection.
    
    Events are ordered by timestamp. Components consuming from 
    this queue can only see events at or after the current 
    simulation time — preventing look-ahead bias.
    """
    
    def __init__(self):
        self._heap: List[BaseEvent] = []
        self._processed_ids: set = set()
        self._current_time: Optional[datetime] = None
    
    def publish(self, event: BaseEvent) -> None:
        """
        Add an event to the queue. Pre-publish validation 
        prevents duplicate event IDs.
        """
        if event.event_id in self._processed_ids:
            raise ValueError(
                f"Event {event.event_id} has already been processed"
            )
        heapq.heappush(self._heap, event)
    
    def advance_time(self, new_time: datetime) -> None:
        """
        Advance simulation time. After calling this, events 
        with timestamps > new_time are visible; events with 
        timestamps <= new_time have been consumed.
        
        ⚠️ This is the core look-ahead protection mechanism.
        """
        self._current_time = new_time
    
    def get_next(self) -> Optional[BaseEvent]:
        """
        Pop the next eligible event. Returns None if no events 
        exist for the current simulation time.
        """
        while self._heap:
            event = heapq.heappop(self._heap)
            
            if event.timestamp > self._current_time:
                # Event is in the future — push it back
                # (we've peeked too far)
                heapq.heappush(self._heap, event)
                return None
            
            if event.event_id in self._processed_ids:
                continue
                
            self._processed_ids.add(event.event_id)
            return event
        
        return None
    
    def peek(self) -> Optional[BaseEvent]:
        """Peek at the next event without consuming it."""
        if not self._heap:
            return None
        return min(self._heap, key=lambda e: e.timestamp)
    
    def drain_until(self, end_time: datetime) -> List[BaseEvent]:
        """
        Drain all events up to and including end_time.
        Used for bar-based strategies that process complete bars.
        """
        events = []
        while True:
            next_event = self.peek()
            if next_event is None or next_event.timestamp > end_time:
                break
            event = self.get_next()
            if event:
                events.append(event)
        return events
    
    def __len__(self) -> int:
        return len(self._heap)

Event Bus and Handler Registration

The event bus decouples producers from consumers. Strategies, risk managers, and portfolio handlers register interest in specific event types:

EventHandler = Callable[[BaseEvent], None]


class EventBus:
    """
    Publish-subscribe event bus for component decoupling.
    
    Each handler receives events in order. Handlers are 
    executed synchronously — if you need async processing, 
    implement a separate thread pool.
    """
    
    def __init__(self):
        self._handlers: Dict[EventType, List[EventHandler]] = {
            event_type: [] for event_type in EventType
        }
        self._global_handlers: List[EventHandler] = []
    
    def subscribe(
        self, 
        event_type: EventType, 
        handler: EventHandler
    ) -> None:
        """Register a handler for a specific event type."""
        if handler not in self._handlers[event_type]:
            self._handlers[event_type].append(handler)
    
    def subscribe_all(self, handler: EventHandler) -> None:
        """Register a handler for all event types."""
        if handler not in self._global_handlers:
            self._global_handlers.append(handler)
    
    def unsubscribe(
        self, 
        event_type: EventType, 
        handler: EventHandler
    ) -> None:
        """Remove a handler registration."""
        if handler in self._handlers[event_type]:
            self._handlers[event_type].remove(handler)
    
    def publish_event(self, event: BaseEvent) -> None:
        """
        Dispatch event to all registered handlers.
        
        ⚠️ Handler execution order matters. Register handlers 
        in dependency order: MarketData → Strategy → Order 
        → Matching → Risk → Portfolio.
        """
        # Type-specific handlers
        for handler in self._handlers[event.event_type]:
            handler(event)
        
        # Global handlers (e.g., logging, persistence)
        for handler in self._global_handlers:
            handler(event)

Order State Machine: Explicit Lifecycle Management

The order state machine is where many backtesting frameworks cut corners — and where silent bugs proliferate. A correctly implemented state machine makes every order transition explicit and prevents invalid states.

State Definition

from enum import Enum, auto


class OrderState(Enum):
    """
    Order lifecycle states. Transitions are strictly enforced.
    
    State diagram:
    
    PENDING → ACCEPTED → (PARTIAL_FILLED) → FILLED
                ↓
            REJECTED
            
    Any state (except FILLED, REJECTED) → CANCELLED
    ACCEPTED → MODIFIED → ACCEPTED
    """
    PENDING = auto()      # Submitted, not yet processed
    ACCEPTED = auto()     # Exchange acknowledged
    PARTIAL_FILLED = auto()  # Some quantity filled
    FILLED = auto()       # Complete execution
    REJECTED = auto()     # Exchange rejected
    CANCELLED = auto()    # User-requested cancellation
    EXPIRED = auto()      # Time-in-force expired


class OrderSide(Enum):
    BUY = auto()
    SELL = auto()


class OrderType(Enum):
    MARKET = auto()
    LIMIT = auto()
    STOP = auto()
    STOP_LIMIT = auto()

State Machine Implementation

@dataclass
class Order:
    """
    Order entity with explicit state machine.
    
    State transitions are validated before application.
    Invalid transitions raise StateTransitionError.
    """
    order_id: str
    symbol: str
    side: OrderSide
    order_type: OrderType
    quantity: float
    filled_quantity: float = 0.0
    price: Optional[float] = None
    stop_price: Optional[float] = None
    tif: str = 'DAY'
    
    # Timestamps
    created_at: datetime = field(default_factory=datetime.now)
    updated_at: datetime = field(default_factory=datetime.now)
    
    # State machine
    state: OrderState = OrderState.PENDING
    
    # Metadata
    parent_order_id: Optional[str] = None  # For linked orders
    tags: Dict[str, Any] = field(default_factory=dict)
    
    def _validate_transition(self, new_state: OrderState) -> None:
        """Enforce valid state transitions."""
        valid_transitions = {
            OrderState.PENDING: {
                OrderState.ACCEPTED,
                OrderState.REJECTED,
                OrderState.CANCELLED,
            },
            OrderState.ACCEPTED: {
                OrderState.PARTIAL_FILLED,
                OrderState.FILLED,
                OrderState.REJECTED,
                OrderState.CANCELLED,
                OrderState.EXPIRED,
            },
            OrderState.PARTIAL_FILLED: {
                OrderState.PARTIAL_FILLED,  # More fills
                OrderState.FILLED,
                OrderState.CANCELLED,
            },
            OrderState.FILLED: set(),  # Terminal state
            OrderState.REJECTED: set(),  # Terminal state
            OrderState.CANCELLED: set(),  # Terminal state
            OrderState.EXPIRED: set(),  # Terminal state
        }
        
        if new_state not in valid_transitions.get(self.state, set()):
            raise StateTransitionError(
                f"Invalid transition: {self.state.name} → {new_state.name} "
                f"for order {self.order_id}"
            )
    
    def transition_to(self, new_state: OrderState) -> None:
        """
        Apply a state transition with validation.
        
        ⚠️ Always use this method instead of direct state assignment.
        """
        self._validate_transition(new_state)
        old_state = self.state
        self.state = new_state
        self.updated_at = datetime.now()
    
    @property
    def remaining_quantity(self) -> float:
        return self.quantity - self.filled_quantity
    
    @property
    def is_active(self) -> bool:
        """Order can still receive fills."""
        return self.state in (
            OrderState.PENDING,
            OrderState.ACCEPTED,
            OrderState.PARTIAL_FILLED,
        )
    
    @property
    def is_buy(self) -> bool:
        return self.side == OrderSide.BUY
    
    def __repr__(self) -> str:
        return (
            f"Order({self.order_id}, {self.symbol}, "
            f"{self.side.name}, qty={self.quantity}, "
            f"filled={self.filled_quantity}, state={self.state.name})"
        )


class StateTransitionError(Exception):
    """Raised when an invalid state transition is attempted."""
    pass

Order Repository

The order repository tracks all active and historical orders. It supports querying by symbol, state, and time range:

class OrderRepository:
    """
    Manages order lifecycle and persistence.
    
    Provides efficient lookup by order_id, symbol, and state.
    """
    
    def __init__(self):
        self._orders: Dict[str, Order] = {}
        self._by_symbol: Dict[str, List[str]] = {}
        self._by_state: Dict[OrderState, List[str]] = {
            state: [] for state in OrderState
        }
    
    def add(self, order: Order) -> None:
        """Register a new order."""
        self._orders[order.order_id] = order
        self._by_symbol.setdefault(order.symbol, []).append(order.order_id)
        self._by_state[order.state].append(order.order_id)
    
    def update_state(self, order_id: str, new_state: OrderState) -> None:
        """Update order state with index maintenance."""
        order = self._orders[order_id]
        old_state = order.state
        
        # Update order
        order.transition_to(new_state)
        
        # Update state index
        self._by_state[old_state].remove(order_id)
        self._by_state[new_state].append(order_id)
    
    def get(self, order_id: str) -> Optional[Order]:
        return self._orders.get(order_id)
    
    def get_by_symbol(self, symbol: str) -> List[Order]:
        return [
            self._orders[oid] 
            for oid in self._by_symbol.get(symbol, [])
        ]
    
    def get_active_by_symbol(self, symbol: str) -> List[Order]:
        """Get all active (non-terminal) orders for a symbol."""
        return [
            order for order in self.get_by_symbol(symbol)
            if order.is_active
        ]
    
    def get_by_state(self, state: OrderState) -> List[Order]:
        return [
            self._orders[oid] 
            for oid in self._by_state.get(state, [])
        ]
    
    @property
    def active_orders(self) -> List[Order]:
        """All orders in non-terminal states."""
        return self.get_by_state(OrderState.PENDING) + \
               self.get_by_state(OrderState.ACCEPTED) + \
               self.get_by_state(OrderState.PARTIAL_FILLED)

Matching Engine: Realistic Fill Simulation

The matching engine determines fill prices. Naive implementations use close_price ± fixed_slippage. Production-grade engines simulate queue dynamics, market impact, and adverse selection.

Fill Model Architecture

@dataclass
class FillModel:
    """
    Configurable fill model supporting multiple simulation approaches.
    
    Available models:
    - FIXED_SLIPPAGE: Constant slippage regardless of order size
    - VOLATILITY_ADJUSTED: Slippage scales with current volatility
    - QUEUE_AWARE: Simulates queue position and time-to-fill
    """
    
    model_type: str = 'VOLATILITY_ADJUSTED'
    base_slippage_bps: float = 2.0
    market_impact_coefficient: float = 0.1
    
    def calculate_fill(
        self,
        order: Order,
        market_data: MarketDataEvent,
        current_volatility: Optional[float] = None,
    ) -> tuple[float, float, float]:
        """
        Calculate fill price, slippage, and commission.
        
        Returns: (fill_price, slippage_bps, commission)
        """
        if self.model_type == 'FIXED_SLIPPAGE':
            return self._fixed_slippage_fill(order, market_data)
        elif self.model_type == 'VOLATILITY_ADJUSTED':
            return self._volatility_adjusted_fill(
                order, market_data, current_volatility
            )
        elif self.model_type == 'QUEUE_AWARE':
            return self._queue_aware_fill(order, market_data)
        else:
            raise ValueError(f"Unknown fill model: {self.model_type}")
    
    def _fixed_slippage_fill(
        self,
        order: Order,
        market_data: MarketDataEvent
    ) -> tuple[float, float, float]:
        """Apply fixed slippage regardless of conditions."""
        reference_price = market_data.mid_price
        
        if order.is_buy:
            fill_price = reference_price * (1 + self.base_slippage_bps / 10000)
        else:
            fill_price = reference_price * (1 - self.base_slippage_bps / 10000)
        
        commission = self._calculate_commission(order, fill_price)
        return fill_price, self.base_slippage_bps, commission
    
    def _volatility_adjusted_fill(
        self,
        order: Order,
        market_data: MarketDataEvent,
        volatility: Optional[float]
    ) -> tuple[float, float, float]:
        """
        Slippage scales with realized volatility.
        
        Higher volatility → wider spreads → more slippage.
        """
        reference_price = market_data.mid_price
        
        if volatility is not None:
            vol_scalar = max(1.0, volatility / 0.02)  # Normalize to 2% vol
        else:
            vol_scalar = 1.0
        
        slippage = self.base_slippage_bps * vol_scalar
        
        if order.is_buy:
            fill_price = reference_price * (1 + slippage / 10000)
        else:
            fill_price = reference_price * (1 - slippage / 10000)
        
        commission = self._calculate_commission(order, fill_price)
        return fill_price, slippage, commission
    
    def _queue_aware_fill(
        self,
        order: Order,
        market_data: MarketDataEvent
    ) -> tuple[float, float, float]:
        """
        Queue-aware fill model.
        
        ⚠️ Requires bid_size/ask_size from market_data.
        Simulates time-to-fill based on queue depth.
        """
        if market_data.bid_size is None or market_data.ask_size is None:
            # Fall back to volatility model
            return self._volatility_adjusted_fill(order, market_data, None)
        
        reference_price = market_data.mid_price
        
        if order.is_buy:
            queue_ahead = market_data.bid_size
            fill_price = market_data.ask_price
        else:
            queue_ahead = market_data.ask_size
            fill_price = market_data.bid_price
        
        # Market impact: larger orders face more impact
        queue_fraction = min(order.quantity / max(queue_ahead, 1), 1.0)
        market_impact = (
            self.market_impact_coefficient * 
            queue_fraction * 
            (fill_price - reference_price) / reference_price
        )
        
        fill_price *= (1 + market_impact)
        slippage = abs(fill_price - reference_price) / reference_price * 10000
        commission = self._calculate_commission(order, fill_price)
        
        return fill_price, slippage, commission
    
    def _calculate_commission(
        self,
        order: Order,
        fill_price: float
    ) -> float:
        """
        Calculate commission. Default: per-share/contract model.
        
        Override this for custom fee schedules.
        """
        # Example: $0.005 per share, minimum $1
        shares = order.quantity
        commission = max(shares * 0.005, 1.0)
        return commission

Matching Engine Core

The matching engine processes order events and generates fill events:

class MatchingEngine:
    """
    Core matching engine with fill model integration.
    
    Processing flow:
    1. Receive ORDER_SUBMIT event
    2. Validate order
    3. Apply fill model
    4. Generate FILL event
    5. Update order state
    """
    
    def __init__(
        self,
        order_repository: OrderRepository,
        event_bus: EventBus,
        fill_model: FillModel,
    ):
        self.order_repository = order_repository
        self.event_bus = event_bus
        self.fill_model = fill_model
        self.current_volatility: Optional[float] = None
        
        # Register handlers
        self.event_bus.subscribe(EventType.ORDER_SUBMIT, self.on_order)
    
    def set_volatility(self, vol: float) -> None:
        """Update current volatility estimate for fill modeling."""
        self.current_volatility = vol
    
    def on_order(self, event: OrderEvent) -> None:
        """Process incoming order submission."""
        order = Order(
            order_id=event.event_id,
            symbol=event.symbol,
            side=OrderSide.BUY if event.side == 'BUY' else OrderSide.SELL,
            order_type=self._map_order_type(event.order_type),
            quantity=event.quantity,
            price=event.price,
            stop_price=event.stop_price,
            tif=event.tif,
        )
        
        # Add to repository
        self.order_repository.add(order)
        
        # Transition to ACCEPTED (simplified; real impl validates)
        self.order_repository.update_state(
            order.order_id, OrderState.ACCEPTED
        )
        
        # Publish accept event
        self.event_bus.publish_event(OrderAcceptEvent(
            timestamp=event.timestamp,
            order_id=order.order_id,
            symbol=order.symbol,
            status='ACCEPTED',
        ))
        
        # Process fill immediately for MARKET orders
        if order.order_type == OrderType.MARKET:
            self._fill_order(order, event.timestamp)
    
    def _fill_order(
        self, 
        order: Order, 
        timestamp: datetime
    ) -> None:
        """Execute fill for an order."""
        # Get current market data (from event bus state)
        market_data = self._get_latest_market_data(order.symbol)
        
        if market_data is None:
            self._reject_order(order, timestamp, "No market data available")
            return
        
        # Calculate fill
        fill_price, slippage, commission = self.fill_model.calculate_fill(
            order, market_data, self.current_volatility
        )
        
        # Validate fill price (basic sanity check)
        if fill_price <= 0:
            self._reject_order(order, timestamp, "Invalid fill price")
            return
        
        # Update order
        order.filled_quantity = order.quantity
        self.order_repository.update_state(
            order.order_id, OrderState.FILLED
        )
        
        # Generate fill event
        fill_event = FillEvent(
            timestamp=timestamp,
            order_id=order.order_id,
            symbol=order.symbol,
            side=order.side.name,
            filled_quantity=order.quantity,
            fill_price=fill_price,
            commission=commission,
            slippage_bps=slippage,
        )
        
        self.event_bus.publish_event(fill_event)
    
    def _reject_order(
        self,
        order: Order,
        timestamp: datetime,
        reason: str
    ) -> None:
        """Reject an order with a reason."""
        self.order_repository.update_state(
            order.order_id, OrderState.REJECTED
        )
        
        self.event_bus.publish_event(OrderRejectEvent(
            timestamp=timestamp,
            order_id=order.order_id,
            symbol=order.symbol,
            reason=reason,
        ))
    
    def _get_latest_market_data(
        self, 
        symbol: str
    ) -> Optional[MarketDataEvent]:
        """Retrieve latest market data for a symbol."""
        # Implementation depends on data storage strategy
        # This is a simplified placeholder
        return None
    
    @staticmethod
    def _map_order_type(order_type: str) -> OrderType:
        """Map string order type to OrderType enum."""
        mapping = {
            'MARKET': OrderType.MARKET,
            'LIMIT': OrderType.LIMIT,
            'STOP': OrderType.STOP,
            'STOP_LIMIT': OrderType.STOP_LIMIT,
        }
        return mapping.get(order_type.upper(), OrderType.MARKET)


@dataclass
class OrderAcceptEvent(BaseEvent):
    """Event published when order is accepted."""
    order_id: str
    symbol: str
    status: str
    
    @property
    def event_type(self) -> EventType:
        return EventType.ORDER_ACCEPT


@dataclass
class OrderRejectEvent(BaseEvent):
    """Event published when order is rejected."""
    order_id: str
    symbol: str
    reason: str
    
    @property
    def event_type(self) -> EventType:
        return EventType.ORDER_REJECT

Portfolio and Risk Management Integration

Portfolio Tracker

The portfolio tracker receives fill events and maintains positions and cash:

@dataclass
class Position:
    """Represents a current market position."""
    symbol: str
    quantity: float
    average_entry_price: float
    unrealized_pnl: float = 0.0
    realized_pnl: float = 0.0
    
    @property
    def market_value(self) -> float:
        return self.quantity * self.average_entry_price
    
    @property
    def is_long(self) -> bool:
        return self.quantity > 0
    
    @property
    def is_short(self) -> bool:
        return self.quantity < 0


class Portfolio:
    """
    Portfolio tracker with position management.
    
    Receives FILL events and updates positions accordingly.
    Tracks cash, margin, and P&L in real-time.
    """
    
    def __init__(self, initial_cash: float = 100_000.0):
        self.initial_cash = initial_cash
        self.cash = initial_cash
        self.positions: Dict[str, Position] = {}
        self.orders: OrderRepository = OrderRepository()
        
        # P&L tracking
        self.realized_pnl = 0.0
        self.trades: List[FillEvent] = []
        
        # Event tracking for metrics
        self.equity_curve: List[tuple[datetime, float]] = []
    
    def on_fill(self, event: FillEvent) -> None:
        """Process a fill event and update portfolio."""
        self.trades.append(event)
        
        # Update cash
        self.cash -= event.filled_quantity * event.fill_price
        self.cash -= event.commission
        
        # Update or create position
        if event.symbol not in self.positions:
            self.positions[event.symbol] = Position(
                symbol=event.symbol,
                quantity=0.0,
                average_entry_price=0.0,
            )
        
        position = self.positions[event.symbol]
        self._update_position(position, event)
        
        # Update realized P&L
        self.realized_pnl = sum(
            p.realized_pnl for p in self.positions.values()
        )
    
    def _update_position(
        self, 
        position: Position, 
        fill: FillEvent
    ) -> None:
        """Update position with new fill."""
        if position.quantity == 0:
            # Opening position
            position.quantity = fill.filled_quantity if fill.side == 'BUY' else -fill.filled_quantity
            position.average_entry_price = fill.fill_price
        elif (position.quantity > 0) == (fill.side == 'BUY'):
            # Adding to position
            total_cost = (
                position.quantity * position.average_entry_price +
                fill.filled_quantity * fill.fill_price
            )
            position.quantity += fill.filled_quantity if fill.side == 'BUY' else -fill.filled_quantity
            position.average_entry_price = total_cost / abs(position.quantity)
        else:
            # Reducing or closing position
            closing_qty = min(abs(position.quantity), fill.filled_quantity)
            
            # Realized P&L on the closed portion
            pnl = closing_qty * (
                position.average_entry_price - fill.fill_price
            ) * (-1 if position.is_short else 1)
            position.realized_pnl += pnl
            
            position.quantity += fill.filled_quantity if fill.side == 'BUY' else -fill.filled_quantity
            
            if abs(position.quantity) < 1e-8:
                position.quantity = 0.0
    
    def update_market_values(self, prices: Dict[str, float]) -> None:
        """Update unrealized P&L based on current prices."""
        for symbol, position in self.positions.items():
            if symbol in prices and position.quantity != 0:
                current_price = prices[symbol]
                position.unrealized_pnl = position.quantity * (
                    current_price - position.average_entry_price
                )
        
        # Record equity for curve
        total_equity = self.cash + sum(
            p.market_value for p in self.positions.values()
        )
        self.equity_curve.append((datetime.now(), total_equity))
    
    @property
    def total_equity(self) -> float:
        return self.cash + sum(
            p.market_value for p in self.positions.values()
        )
    
    @property
    def total_pnl(self) -> float:
        return self.realized_pnl + sum(
            p.unrealized_pnl for p in self.positions.values()
        )
    
    @property
    def leverage(self) -> float:
        """Current portfolio leverage."""
        long_value = sum(
            p.market_value for p in self.positions.values() if p.is_long
        )
        short_value = sum(
            abs(p.market_value) for p in self.positions.values() if p.is_short
        )
        total_exposure = long_value + short_value
        return total_exposure / max(self.total_equity, 1e-8)

Risk Manager

The risk manager checks orders against portfolio constraints before execution:

@dataclass
class RiskLimits:
    """Risk limit configuration."""
    max_position_size: float = 10_000  # Max shares per position
    max_portfolio_leverage: float = 2.0
    max_sector_exposure: float = 0.30  # 30% of portfolio
    max_order_value: float = 50_000.0
    max_drawdown_pct: float = 0.20


class RiskManager:
    """
    Pre-trade and post-trade risk management.
    
    Pre-trade: Validates order against risk limits before submission.
    Post-trade: Monitors portfolio-level risk and triggers liquidation.
    """
    
    def __init__(
        self,
        portfolio: Portfolio,
        limits: RiskLimits,
    ):
        self.portfolio = portfolio
        self.limits = limits
        self.high_water_mark = portfolio.initial_cash
        self.is_breaching = False
    
    def pre_trade_check(self, order: Order, current_price: float) -> tuple[bool, str]:
        """
        Validate order against risk limits before submission.
        
        Returns: (approved, rejection_reason)
        """
        order_value = order.quantity * current_price
        
        # Check order size
        if order.quantity > self.limits.max_position_size:
            return False, f"Order size {order.quantity} exceeds max {self.limits.max_position_size}"
        
        # Check order value
        if order_value > self.limits.max_order_value:
            return False, f"Order value ${order_value:,.0f} exceeds max ${self.limits.max_order_value:,.0f}"
        
        # Check leverage
        current_leverage = self.portfolio.leverage
        if current_leverage > self.limits.max_portfolio_leverage:
            return False, f"Portfolio leverage {current_leverage:.2f}x exceeds max {self.limits.max_portfolio_leverage}x"
        
        # Check drawdown
        if self.is_breaching:
            return False, "Account is in drawdown breach — new orders blocked"
        
        return True, ""
    
    def post_trade_update(self) -> None:
        """
        Update risk state after trade execution.
        
        Checks drawdown against high water mark.
        """
        current_equity = self.portfolio.total_equity
        self.high_water_mark = max(self.high_water_mark, current_equity)
        
        drawdown = (self.high_water_mark - current_equity) / self.high_water_mark
        
        if drawdown > self.limits.max_drawdown_pct:
            self.is_breaching = True
            # Trigger risk event for potential liquidation

Putting It Together: The Backtest Runner

Simulation Loop

class BacktestRunner:
    """
    Orchestrates the event-driven backtest simulation.
    
    Processing order:
    1. Advance time
    2. Drain events for current time
    3. Process through event bus
    4. Update portfolio and risk state
    5. Record metrics
    """
    
    def __init__(
        self,
        event_queue: EventQueue,
        event_bus: EventBus,
        portfolio: Portfolio,
        risk_manager: RiskManager,
        matching_engine: MatchingEngine,
    ):
        self.event_queue = event_queue
        self.event_bus = event_bus
        self.portfolio = portfolio
        self.risk_manager = risk_manager
        self.matching_engine = matching_engine
        
        # Metrics collection
        self.metrics: List[dict] = []
        
        # Register handlers
        self.event_bus.subscribe(EventType.FILL, self.portfolio.on_fill)
    
    def run(
        self,
        start_date: datetime,
        end_date: datetime,
        strategy: Optional['BaseStrategy'] = None,
    ) -> dict:
        """
        Execute the backtest simulation.
        
        Args:
            start_date: Simulation start time
            end_date: Simulation end time
            strategy: Optional strategy instance (will receive events)
        
        Returns:
            Backtest results including equity curve, trades, and metrics
        """
        self.event_queue.advance_time(start_date)
        current_time = start_date
        
        while current_time <= end_date:
            # Drain all events up to current time
            events = self.event_queue.drain_until(current_time)
            
            for event in events:
                # Publish to event bus
                self.event_bus.publish_event(event)
                
                # Update portfolio prices if market data
                if event.event_type == EventType.MARKET_DATA:
                    self.portfolio.update_market_values(
                        {event.symbol: event.close_price}
                    )
            
            # Record metrics snapshot
            self._record_metrics(current_time)
            
            # Advance to next event time
            next_event = self.event_queue.peek()
            if next_event is None:
                break
            current_time = next_event.timestamp
            self.event_queue.advance_time(current_time)
        
        return self._generate_results()
    
    def _record_metrics(self, timestamp: datetime) -> None:
        """Record current state for metrics."""
        self.metrics.append({
            'timestamp': timestamp,
            'equity': self.portfolio.total_equity,
            'cash': self.portfolio.cash,
            'positions': len(self.portfolio.positions),
            'leverage': self.portfolio.leverage,
            'realized_pnl': self.portfolio.realized_pnl,
        })
    
    def _generate_results(self) -> dict:
        """Compile backtest results."""
        equity_curve = [m['equity'] for m in self.metrics]
        
        # Calculate performance metrics
        total_return = (equity_curve[-1] - equity_curve[0]) / equity_curve[0]
        
        # Max drawdown
        peak = equity_curve[0]
        max_dd = 0.0
        for equity in equity_curve:
            if equity > peak:
                peak = equity
            dd = (peak - equity) / peak
            max_dd = max(max_dd, dd)
        
        return {
            'initial_cash': self.portfolio.initial_cash,
            'final_equity': equity_curve[-1],
            'total_return': total_return,
            'total_trades': len(self.portfolio.trades),
            'max_drawdown': max_dd,
            'equity_curve': equity_curve,
            'trades': self.portfolio.trades,
            'metrics': self.metrics,
        }

Performance Considerations

Common Pitfalls

Pitfall Symptom Solution
Look-ahead bias Unrealistic returns, negative drawdown Validate event queue time boundaries; use only data available at simulation time
Survivorship bias Overestimated returns Include delisted/ bankrupt symbols in historical data
Position leak P&L attribution errors Close all positions at end of simulation; use proper settlement
Overfitting Live performance >> backtest Out-of-sample testing; parameter sensitivity analysis
Ignoring transaction costs Strategy looks profitable but loses money Include realistic commission, slippage, and market impact

Optimization Strategies

For strategies requiring higher simulation throughput:

  1. Bar aggregation: For lower-frequency strategies, aggregate tick data into 1-minute or 5-minute bars before simulation. Reduces event count by 100x–1000x.

  2. Vectorized components: Hybrid architecture — use vectorized computation for signal generation where order independence is verified, event-driven for order lifecycle.

  3. Parallelization: Partition simulation by symbol or time window. Merge results at end. Requires careful handling of cross-symbol correlations.

  4. Caching: Cache historical data in memory during simulation. Disk I/O is the primary bottleneck for long backtests.

class CachedDataProvider:
    """High-performance data provider with in-memory caching."""
    
    def __init__(self, cache_size: int = 10_000):
        self._cache: LRUCache = LRUCache(maxsize=cache_size)
        self._data_source = None  # Your data source
    
    def get_bars(
        self,
        symbol: str,
        start: datetime,
        end: datetime,
    ) -> pd.DataFrame:
        """Retrieve bars with LRU caching."""
        cache_key = (symbol, start, end)
        
        if cache_key in self._cache:
            return self._cache[cache_key]
        
        data = self._data_source.fetch(symbol, start, end)
        self._cache[cache_key] = data
        return data

Extensibility Points

This framework is designed for extension. Key extension points:

New asset classes: Implement a new AssetAdapter that maps asset-specific behaviors (settlement, margin requirements, contract specifications) to the standard event interface.

Alternative data: Create a new AlternativeDataProvider that publishes events in the same format. The event bus architecture ensures strategies can consume traditional and alternative data through the same interface.

Execution strategies: Implement TWAP, VWAP, or POV execution algorithms by creating a new ExecutionHandler that manages child orders rather than submitting parent orders directly.

Risk models: Extend RiskManager with factor-based risk models, VaR calculations, or custom sector constraints by overriding the check methods.

Closing Thoughts

An event-driven backtesting framework is not a luxury — it is the minimum infrastructure required to trust any strategy that operates on intraday data, involves complex order logic, or manages portfolio-level constraints.

The architecture presented here is intentionally explicit. Every state transition is auditable. Every event has a timestamp. Look-ahead bias is structurally prevented by the event queue's time boundary enforcement.

The tradeoff is complexity — but this complexity is manageable when organized around clear interfaces. The event bus decouples components. The order repository centralizes state. The matching engine abstracts fill simulation.

Build the foundation correctly once. The strategies you test on top of it will be worth more because of it.


Next Steps

If you are building your first event-driven strategy, start with the architecture in this article. Validate it against a simple moving-average crossover before adding complexity.

If you need institutional-grade historical data to power your backtests, reach out to enterprise@tickdb.ai for data coverage spanning multiple asset classes with 10+ years of cleaned, timestamped historical data.

If you are evaluating backtesting frameworks, compare the event taxonomy and state machine implementation against your requirements. A framework that cuts corners on state management will surface bugs only in production.


This article does not constitute investment advice. Backtesting results do not guarantee future performance. Markets involve risk; past performance does not guarantee future results.