The $10-per-Month Trading System That Actually Works

The hardest part of quantitative trading isn't finding alpha. It's surviving the infrastructure.

I know this because I've been that developer — burning through $400/month on a VPS that ran three Docker containers, a Postgres database I never optimized, and a monitoring stack that ate more RAM than my strategy. I watched my cloud bill climb while my backtest edge evaporated under execution costs. The irony was brutal: I was spending more on infrastructure than I was making on the strategy.

That experience forced a reckoning. What if you stripped everything back to the minimum viable architecture — one developer, one cloud server, one coherent system — and rebuilt from first principles? No Kubernetes. No microservices. No managed services that bill by the API call. Just a lean, resilient, production-grade quant stack that costs less than a monthly coffee habit.

This article is the blueprint I wish I'd had. It covers the complete architecture, the exact cost breakdown, production-ready code for every layer, and the monitoring stack that will wake you up before your strategy bleeds. Every component has been battle-tested on a budget.


The Pain Point Nobody Talks About

Individual quant developers face a structural disadvantage that institutional shops don't: infrastructure overhead consumes disproportionate resources.

Problem Institutional approach Individual developer reality
Market data costs Bundled in prime brokerage Per-API-call pricing adds up fast
Infrastructure Dedicated ops team You are the ops team
Monitoring Enterprise dashboards $0 budget for Datadog
Disaster recovery Hot-hot multi-region Single point of failure
Development time 90% research, 10% infra 40% research, 40% infra, 20% firefighting

The result is predictable. Most individual quant projects either collapse under infrastructure complexity or die quietly after a single weekend outage wipes out three months of backtesting.

The solution isn't to accept these constraints — it's to design a system that works within them.


Architecture Overview: The Lean Stack

The architecture follows a single governing principle: monolith first, separate only when proven necessary. On a budget, the overhead of managing distributed components destroys more value than it creates.

┌─────────────────────────────────────────────────────────────────┐
│                        Cloud VPS (~$10/mo)                      │
│  ┌──────────────────────────────────────────────────────────┐   │
│  │                    Systemd Services                       │   │
│  │  ┌─────────────────┐  ┌─────────────────┐                 │   │
│  │  │  Market Data    │  │  Strategy      │                 │   │
│  │  │  Collector      │  │  Engine        │                 │   │
│  │  │  (Python)       │  │  (Python)      │                 │   │
│  │  └────────┬────────┘  └────────┬────────┘                 │   │
│  │           │                    │                           │   │
│  │           └──────────┬─────────┘                           │   │
│  │                      │ SQLite/CSV                          │   │
│  │           ┌─────────┴──────────┐                          │   │
│  │           │  Data Store        │                          │   │
│  │           │  (SQLite + CSV)    │                          │   │
│  │           └───────────────────┘                          │   │
│  └──────────────────────────────────────────────────────────┘   │
│  ┌──────────────────────────────────────────────────────────┐   │
│  │                    Systemd Services                        │   │
│  │  ┌─────────────────┐  ┌─────────────────┐                 │   │
│  │  │  Grafana        │  │  Prometheus     │                 │   │
│  │  │  Dashboard      │  │  Metrics        │                 │   │
│  │  │  (:3000)        │  │  (:9090)        │                 │   │
│  │  └─────────────────┘  └─────────────────┘                 │   │
│  └──────────────────────────────────────────────────────────┘   │
│  ┌──────────────────────────────────────────────────────────┐   │
│  │                    Systemd Services                        │   │
│  │  ┌─────────────────┐  ┌─────────────────┐                 │   │
│  │  │  Alert Manager  │  │  Health Check   │                 │   │
│  │  │  (ntfy/SMTP)    │  │  Cron Job       │                 │   │
│  │  └─────────────────┘  └─────────────────┘                 │   │
│  └──────────────────────────────────────────────────────────┘   │
└─────────────────────────────────────────────────────────────────┘

Design Principles

  1. Single server, single failure domain: Accept this. Build recovery automation instead of redundancy.
  2. SQLite over PostgreSQL: No daemon, no connection pool, no tuning. SQLite handles 100k+ rows for strategy data without complaint.
  3. Flat files for market data: CSV is your friend. It's queryable, portable, and survives server crashes.
  4. Systemd over Docker for simple services: Lower overhead, easier debugging, native integration with logging.
  5. Prometheus + Grafana for $0: The open-source monitoring stack that won't bankrupt you.

Production-Grade Data Collector

The data collector is the foundation. It must be resilient, handle API rate limits gracefully, and never corrupt your data store.

Project Structure

/opt/quant-stack/
├── collectors/
│   ├── market_data_collector.py
│   └── requirements.txt
├── strategies/
│   └── example_strategy.py
├── data/
│   ├── market_data/
│   └── signals/
├── monitoring/
│   ├── prometheus.yml
│   └── grafana/
├── alerts/
│   └── alert_manager.py
└── scripts/
    └── health_check.sh

Data Collector Implementation

#!/usr/bin/env python3
"""
Market Data Collector — Production-Grade Implementation
Features: heartbeat, exponential backoff, rate-limit handling, timeout management
"""

import os
import time
import json
import sqlite3
import logging
import signal
import random
from datetime import datetime, timedelta
from pathlib import Path
from typing import Optional, Dict, Any, List
import threading
from dataclasses import dataclass

import requests

# Configure structured logging
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s | %(levelname)-8s | %(name)s | %(message)s',
    handlers=[
        logging.FileHandler('/var/log/quant-collector.log'),
        logging.StreamHandler()
    ]
)
logger = logging.getLogger('market_data_collector')


@dataclass
class CollectorConfig:
    api_key: str
    symbols: List[str]
    poll_interval: int = 5  # seconds
    base_url: str = 'https://api.tickdb.ai/v1'
    max_retries: int = 5
    base_delay: float = 1.0
    max_delay: float = 60.0
    request_timeout: tuple = (3.05, 10)  # (connect, read) seconds


class MarketDataCollector:
    """Production-grade market data collector with resilience patterns."""
    
    def __init__(self, config: CollectorConfig):
        self.config = config
        self._running = False
        self._shutdown_event = threading.Event()
        self._consecutive_errors = 0
        self._last_heartbeat = None
        
        # Setup signal handlers for graceful shutdown
        signal.signal(signal.SIGTERM, self._signal_handler)
        signal.signal(signal.SIGINT, self._signal_handler)
        
        # Initialize SQLite database
        self._init_database()
        
    def _signal_handler(self, signum, frame):
        """Handle shutdown signals gracefully."""
        logger.info(f"Received signal {signum}, initiating graceful shutdown...")
        self._running = False
        self._shutdown_event.set()
        
    def _init_database(self):
        """Initialize SQLite database with schema."""
        db_path = Path('/opt/quant-stack/data/market_data/collector.db')
        db_path.parent.mkdir(parents=True, exist_ok=True)
        
        with sqlite3.connect(db_path) as conn:
            conn.execute('''
                CREATE TABLE IF NOT EXISTS kline_data (
                    id INTEGER PRIMARY KEY AUTOINCREMENT,
                    symbol TEXT NOT NULL,
                    interval TEXT NOT NULL,
                    open_time INTEGER NOT NULL,
                    open REAL NOT NULL,
                    high REAL NOT NULL,
                    low REAL NOT NULL,
                    close REAL NOT NULL,
                    volume REAL NOT NULL,
                    close_time INTEGER NOT NULL,
                    is_final INTEGER DEFAULT 0,
                    collected_at REAL NOT NULL,
                    UNIQUE(symbol, interval, open_time)
                )
            ''')
            
            conn.execute('''
                CREATE INDEX IF NOT EXISTS idx_symbol_interval_time
                ON kline_data(symbol, interval, open_time)
            ''')
            
            conn.execute('''
                CREATE TABLE IF NOT EXISTS collector_metrics (
                    id INTEGER PRIMARY KEY AUTOINCREMENT,
                    timestamp REAL NOT NULL,
                    symbols_processed INTEGER,
                    records_inserted INTEGER,
                    latency_ms REAL,
                    error_count INTEGER
                )
            ''')
            
        logger.info(f"Database initialized at {db_path}")
        
    def _send_heartbeat(self):
        """WebSocket keepalive pattern for HTTP polling."""
        self._last_heartbeat = time.time()
        logger.debug(f"Heartbeat sent at {datetime.now().isoformat()}")
        
    def _calculate_backoff(self, retry_count: int) -> float:
        """Exponential backoff with jitter."""
        delay = min(self.config.base_delay * (2 ** retry_count), self.config.max_delay)
        jitter = random.uniform(0, delay * 0.1)
        return delay + jitter
        
    def _handle_rate_limit(self, response: requests.Response) -> Optional[float]:
        """Extract and handle rate limit with Retry-After header."""
        if response.status_code == 429 or (
            response.headers.get('X-RateLimit-Remaining', '100') == '0'
        ):
            retry_after = int(response.headers.get('Retry-After', 
                                   response.headers.get('X-RateLimit-Reset', '5')))
            logger.warning(f"Rate limit hit. Waiting {retry_after} seconds.")
            return float(retry_after)
        return None
        
    def _fetch_kline(self, symbol: str, interval: str = '1h', limit: int = 100) -> Optional[Dict]:
        """Fetch kline data from TickDB with full error handling."""
        url = f"{self.config.base_url}/market/kline"
        params = {'symbol': symbol, 'interval': interval, 'limit': limit}
        headers = {'X-API-Key': self.config.api_key}
        
        try:
            response = requests.get(
                url,
                params=params,
                headers=headers,
                timeout=self.config.request_timeout
            )
            
            # Handle rate limiting
            retry_after = self._handle_rate_limit(response)
            if retry_after:
                time.sleep(retry_after)
                return self._fetch_kline(symbol, interval, limit)  # Retry once
                
            response.raise_for_status()
            data = response.json()
            
            if data.get('code') == 0:
                self._consecutive_errors = 0
                return data.get('data')
            else:
                self._handle_api_error(data)
                
        except requests.exceptions.Timeout:
            logger.error(f"Request timeout for {symbol}")
            self._consecutive_errors += 1
        except requests.exceptions.RequestException as e:
            logger.error(f"Request failed for {symbol}: {e}")
            self._consecutive_errors += 1
            
        return None
        
    def _handle_api_error(self, response: Dict):
        """Standard TickDB error handler."""
        code = response.get('code', 0)
        message = response.get('message', 'Unknown error')
        
        error_handlers = {
            1001: "Invalid API key — check TICKDB_API_KEY environment variable",
            1002: "Invalid API key — key may have expired",
            2002: f"Symbol not found — verify via /v1/symbols/available endpoint",
            3001: "Rate limit exceeded — implementing backoff",
        }
        
        if code in error_handlers:
            logger.error(f"API Error {code}: {error_handlers[code]}")
        else:
            logger.error(f"API Error {code}: {message}")
            
        self._consecutive_errors += 1
        
        if code == 3001:
            # Trigger immediate backoff
            time.sleep(self._calculate_backoff(self.config.max_retries))
            
    def _store_kline_data(self, symbol: str, interval: str, data: List[Dict]) -> int:
        """Store kline data in SQLite with deduplication."""
        if not data:
            return 0
            
        db_path = Path('/opt/quant-stack/data/market_data/collector.db')
        inserted = 0
        
        with sqlite3.connect(db_path) as conn:
            for candle in data:
                try:
                    conn.execute('''
                        INSERT OR REPLACE INTO kline_data 
                        (symbol, interval, open_time, open, high, low, close, 
                         volume, close_time, is_final, collected_at)
                        VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
                    ''', (
                        symbol,
                        interval,
                        candle.get('openTime'),
                        candle.get('open'),
                        candle.get('high'),
                        candle.get('low'),
                        candle.get('close'),
                        candle.get('volume'),
                        candle.get('closeTime'),
                        1 if candle.get('isFinal') else 0,
                        time.time()
                    ))
                    inserted += 1
                except sqlite3.IntegrityError:
                    pass  # Skip duplicates
                    
        return inserted
        
    def _record_metrics(self, symbols_processed: int, records_inserted: int, latency_ms: float):
        """Record collector metrics for monitoring."""
        db_path = Path('/opt/quant-stack/data/market_data/collector.db')
        
        with sqlite3.connect(db_path) as conn:
            conn.execute('''
                INSERT INTO collector_metrics 
                (timestamp, symbols_processed, records_inserted, latency_ms, error_count)
                VALUES (?, ?, ?, ?, ?)
            ''', (time.time(), symbols_processed, records_inserted, latency_ms, 
                  self._consecutive_errors))
                  
    def collect_batch(self) -> Dict[str, int]:
        """Collect data for all configured symbols."""
        start_time = time.time()
        results = {'processed': 0, 'inserted': 0, 'errors': 0}
        
        for symbol in self.config.symbols:
            data = self._fetch_kline(symbol, limit=100)
            
            if data:
                inserted = self._store_kline_data(symbol, '1h', data)
                results['processed'] += 1
                results['inserted'] += inserted
                logger.info(f"{symbol}: fetched {len(data)} candles, {inserted} new records")
            else:
                results['errors'] += 1
                
        latency_ms = (time.time() - start_time) * 1000
        self._record_metrics(results['processed'], results['inserted'], latency_ms)
        self._send_heartbeat()
        
        return results
        
    def run(self):
        """Main collection loop with graceful shutdown support."""
        logger.info("Market Data Collector starting...")
        self._running = True
        
        while self._running and not self._shutdown_event.is_set():
            try:
                results = self.collect_batch()
                
                if self._consecutive_errors >= self.config.max_retries:
                    delay = self._calculate_backoff(self._consecutive_errors)
                    logger.warning(f"Too many errors. Entering extended backoff for {delay:.1f}s")
                    self._shutdown_event.wait(timeout=delay)
                else:
                    self._shutdown_event.wait(timeout=self.config.poll_interval)
                    
            except Exception as e:
                logger.exception(f"Unexpected error in collection loop: {e}")
                self._shutdown_event.wait(timeout=self.config.poll_interval)
                
        logger.info("Market Data Collector stopped gracefully.")
        
    def get_status(self) -> Dict[str, Any]:
        """Return current collector status for health checks."""
        return {
            'running': self._running,
            'last_heartbeat': self._last_heartbeat,
            'consecutive_errors': self._consecutive_errors,
            'configured_symbols': len(self.config.symbols)
        }


def main():
    """Entry point with configuration from environment variables."""
    api_key = os.environ.get('TICKDB_API_KEY')
    if not api_key:
        logger.error("TICKDB_API_KEY environment variable not set")
        raise ValueError("TICKDB_API_KEY is required")
        
    # Configure symbols — extend as needed
    symbols = os.environ.get('COLLECTOR_SYMBOLS', 'AAPL.US,TSLA.US,NVDA.US').split(',')
    
    config = CollectorConfig(
        api_key=api_key,
        symbols=[s.strip() for s in symbols],
        poll_interval=int(os.environ.get('POLL_INTERVAL', '60'))
    )
    
    collector = MarketDataCollector(config)
    collector.run()


if __name__ == '__main__':
    main()

Systemd Service Configuration

[Unit]
Description=TickDB Market Data Collector
After=network.target
StartLimitIntervalSec=300
StartLimitBurst=5

[Service]
Type=simple
User=quant
Group=quant
WorkingDirectory=/opt/quant-stack
Environment="TICKDB_API_KEY=%env(TICKDB_API_KEY)"
Environment="COLLECTOR_SYMBOLS=AAPL.US,TSLA.US,NVDA.US,MSFT.US,GOOGL.US"
Environment="POLL_INTERVAL=60"
ExecStart=/usr/bin/python3 /opt/quant-stack/collectors/market_data_collector.py
Restart=on-failure
RestartSec=10
StandardOutput=journal
StandardError=journal

# Resource limits for a budget VPS
MemoryMax=256M
CPUQuota=50%

# Hardening
NoNewPrivileges=true
ProtectSystem=strict
ProtectHome=true
ReadWritePaths=/opt/quant-stack/data /var/log

[Install]
WantedBy=multi-user.target

Monitoring Stack: Prometheus + Grafana on $0

Monitoring is non-negotiable. A strategy that runs unattended without alerts is a liability, not an asset.

Prometheus Configuration

# /opt/quant-stack/monitoring/prometheus.yml
global:
  scrape_interval: 15s
  evaluation_interval: 15s

alerting:
  alertmanagers:
    - static_configs:
        - targets: []

rule_files:
  - "alert_rules.yml"

scrape_configs:
  # Node Exporter for system metrics
  - job_name: 'node'
    static_configs:
      - targets: ['localhost:9100']

  # Market Data Collector metrics
  - job_name: 'collector'
    static_configs:
      - targets: ['localhost:9091']
    metrics_path: /metrics

  # Custom application metrics
  - job_name: 'quant-strategy'
    static_configs:
      - targets: ['localhost:9092']

Alert Rules

# /opt/quant-stack/monitoring/alert_rules.yml
groups:
  - name: quant_stack_alerts
    rules:
      - alert: CollectorDown
        expr: up{job="collector"} == 0
        for: 2m
        labels:
          severity: critical
        annotations:
          summary: "Market Data Collector is down"
          description: "Collector has been offline for more than 2 minutes"
          
      - alert: HighErrorRate
        expr: rate(collector_errors_total[5m]) > 0.1
        for: 5m
        labels:
          severity: warning
        annotations:
          summary: "High error rate in data collector"
          description: "Error rate exceeds 10% over 5 minutes"
          
      - alert: DiskSpaceLow
        expr: (node_filesystem_avail_bytes{mountpoint="/"} / node_filesystem_size_bytes{mountpoint="/"}) < 0.15
        for: 5m
        labels:
          severity: warning
        annotations:
          summary: "Disk space running low"
          description: "Less than 15% disk space remaining"
          
      - alert: StrategyDrawdown
        expr: strategy_pnl_ratio < -0.05
        for: 30m
        labels:
          severity: warning
        annotations:
          summary: "Strategy experiencing significant drawdown"
          description: "Strategy drawdown exceeds 5%"

Alert Manager: NTFY Integration

For individual developers, expensive PagerDuty plans are not viable. NTFY provides free push notifications to your phone.

#!/usr/bin/env python3
"""
Alert Manager — Sends alerts via NTFY (free) or SMTP
"""

import os
import smtplib
import logging
from email.mime.text import MIMEText
from dataclasses import dataclass
from typing import Optional
from urllib.request import Request, urlopen
from urllib.error import URLError

import yaml

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger('alert_manager')


@dataclass
class AlertConfig:
    ntfy_topic: Optional[str] = None
    ntfy_server: str = 'https://ntfy.sh'
    smtp_host: Optional[str] = None
    smtp_port: int = 587
    smtp_user: Optional[str] = None
    smtp_password: Optional[str] = None
    alert_email: Optional[str] = None
    from_email: str = 'quant-alerts@localhost'


class AlertManager:
    """Flexible alert dispatcher supporting NTFY and email."""
    
    def __init__(self, config: AlertConfig):
        self.config = config
        
    def send_ntfy(self, title: str, message: str, priority: str = 'default', tags: str = '') -> bool:
        """Send alert via NTFY push notifications."""
        if not self.config.ntfy_topic:
            return False
            
        try:
            url = f"{self.config.ntfy_server}/{self.config.ntfy_topic}"
            data = message.encode('utf-8')
            headers = {
                'Title': title,
                'Priority': priority,
                'Tags': tags,
                'Content-Type': 'text/plain'
            }
            
            request = Request(url, data=data, headers=headers)
            with urlopen(request, timeout=10) as response:
                logger.info(f"NTFY alert sent: {title}")
                return response.status == 200
                
        except URLError as e:
            logger.error(f"Failed to send NTFY alert: {e}")
            return False
            
    def send_email(self, subject: str, body: str) -> bool:
        """Send alert via email."""
        if not all([self.config.smtp_host, self.config.smtp_user, 
                    self.config.smtp_password, self.config.alert_email]):
            return False
            
        try:
            msg = MIMEText(body, 'plain')
            msg['Subject'] = f"[QUANT ALERT] {subject}"
            msg['From'] = self.config.from_email
            msg['To'] = self.config.alert_email
            
            with smtplib.SMTP(self.config.smtp_host, self.config.smtp_port) as server:
                server.starttls()
                server.login(self.config.smtp_user, self.config.smtp_password)
                server.send_message(msg)
                
            logger.info(f"Email alert sent: {subject}")
            return True
            
        except Exception as e:
            logger.error(f"Failed to send email alert: {e}")
            return False
            
    def send_alert(self, title: str, message: str, severity: str = 'warning'):
        """Send alert through all configured channels."""
        priority_map = {
            'critical': 'urgent',
            'warning': 'high',
            'info': 'default'
        }
        tags_map = {
            'critical': 'warning,red_circle',
            'warning': 'warning,yellow_circle',
            'info': 'info'
        }
        
        # Always try NTFY if configured
        if self.config.ntfy_topic:
            self.send_ntfy(
                title, message,
                priority=priority_map.get(severity, 'default'),
                tags=tags_map.get(severity, '')
            )
            
        # Try email as backup
        if not self.config.ntfy_topic or severity == 'critical':
            self.send_email(title, message)


def main():
    """Example alert dispatch."""
    config = AlertConfig(
        ntfy_topic=os.environ.get('NTFY_TOPIC'),
        smtp_host=os.environ.get('SMTP_HOST'),
        smtp_user=os.environ.get('SMTP_USER'),
        smtp_password=os.environ.get('SMTP_PASSWORD'),
        alert_email=os.environ.get('ALERT_EMAIL')
    )
    
    manager = AlertManager(config)
    
    # Example: Alert on strategy drawdown
    manager.send_alert(
        "Strategy Drawdown Alert",
        "Strategy has exceeded 5% drawdown threshold. Review positions immediately.",
        severity='critical'
    )


if __name__ == '__main__':
    main()

Strategy Engine: Minimal but Resilient

The strategy engine reads from the local data store and executes signals. It follows the same resilience patterns as the collector.

#!/usr/bin/env python3
"""
Example Strategy Engine — Event-Driven with Backtest Integration
"""

import os
import time
import logging
import sqlite3
from datetime import datetime, timedelta
from pathlib import Path
from typing import Dict, List, Optional, Tuple
from dataclasses import dataclass
import signal

import pandas as pd

logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s | %(levelname)-8s | %(message)s'
)
logger = logging.getLogger('strategy_engine')


@dataclass
class StrategyConfig:
    symbols: List[str]
    interval: str = '1h'
    lookback_periods: int = 20
    entry_threshold: float = 0.02  # 2% signal threshold
    position_size: float = 0.1     # 10% of capital per position
    max_positions: int = 3


class StrategyEngine:
    """Lightweight strategy engine with signal generation."""
    
    def __init__(self, config: StrategyConfig):
        self.config = config
        self.positions: Dict[str, float] = {}
        self.signals: Dict[str, str] = {}  # 'long', 'short', 'neutral'
        self._running = False
        
        signal.signal(signal.SIGTERM, self._signal_handler)
        signal.signal(signal.SIGINT, self._signal_handler)
        
    def _signal_handler(self, signum, frame):
        logger.info("Received shutdown signal, closing positions gracefully...")
        self._running = False
        self._close_all_positions()
        
    def _load_data(self, symbol: str, periods: int = 50) -> Optional[pd.DataFrame]:
        """Load market data from SQLite."""
        db_path = Path('/opt/quant-stack/data/market_data/collector.db')
        
        if not db_path.exists():
            logger.warning(f"Database not found at {db_path}")
            return None
            
        try:
            with sqlite3.connect(db_path) as conn:
                query = '''
                    SELECT open_time, open, high, low, close, volume
                    FROM kline_data
                    WHERE symbol = ? AND interval = ?
                    ORDER BY open_time DESC
                    LIMIT ?
                '''
                df = pd.read_sql_query(query, conn, params=[symbol, self.config.interval, periods])
                
            if df.empty:
                return None
                
            # Convert to numeric and handle missing values
            for col in ['open', 'high', 'low', 'close', 'volume']:
                df[col] = pd.to_numeric(df[col], errors='coerce')
            df = df.dropna()
            
            return df.sort_values('open_time')
            
        except Exception as e:
            logger.error(f"Failed to load data for {symbol}: {e}")
            return None
            
    def _calculate_momentum(self, df: pd.DataFrame) -> float:
        """Simple momentum calculation: rate of change over lookback period."""
        if len(df) < self.config.lookback_periods + 1:
            return 0.0
            
        current_price = df['close'].iloc[-1]
        historical_price = df['close'].iloc[-self.config.lookback_periods]
        
        return (current_price - historical_price) / historical_price
        
    def _generate_signal(self, symbol: str) -> str:
        """Generate trading signal based on momentum."""
        df = self._load_data(symbol)
        
        if df is None or len(df) < self.config.lookback_periods:
            return 'neutral'
            
        momentum = self._calculate_momentum(df)
        
        if momentum > self.config.entry_threshold:
            return 'long'
        elif momentum < -self.config.entry_threshold:
            return 'short'
        else:
            return 'neutral'
            
    def _calculate_position_size(self, symbol: str, signal: str) -> Optional[float]:
        """Calculate position size based on capital allocation."""
        if signal == 'neutral' or len(self.positions) >= self.config.max_positions:
            return None
            
        # In production, fetch account balance from broker API
        mock_capital = 10000.0
        position_value = mock_capital * self.config.position_size
        
        # Fetch current price
        df = self._load_data(symbol)
        if df is None:
            return None
            
        current_price = df['close'].iloc[-1]
        shares = position_value / current_price
        
        return round(shares, 0)
        
    def _open_position(self, symbol: str, signal: str):
        """Execute position opening."""
        size = self._calculate_position_size(symbol, signal)
        
        if size is None or size <= 0:
            return
            
        # In production, call broker API here
        logger.info(f"OPEN {signal.upper()}: {symbol} x {size} shares")
        
        self.positions[symbol] = {
            'signal': signal,
            'size': size,
            'entry_time': datetime.now(),
            'entry_price': self._load_data(symbol)['close'].iloc[-1]
        }
        
    def _close_position(self, symbol: str):
        """Execute position closing."""
        if symbol not in self.positions:
            return
            
        position = self.positions[symbol]
        logger.info(f"CLOSE: {symbol} x {position['size']} shares (P&L calculation would go here)")
        
        del self.positions[symbol]
        
    def _close_all_positions(self):
        """Emergency position close on shutdown."""
        for symbol in list(self.positions.keys()):
            self._close_position(symbol)
            
    def run_cycle(self):
        """Single strategy evaluation cycle."""
        for symbol in self.config.symbols:
            signal = self._generate_signal(symbol)
            self.signals[symbol] = signal
            
            current_position = self.positions.get(symbol)
            
            if signal == 'neutral' and current_position:
                self._close_position(symbol)
            elif signal in ('long', 'short') and not current_position:
                self._open_position(symbol, signal)
            elif current_position and current_position['signal'] != signal:
                self._close_position(symbol)
                if signal != 'neutral':
                    self._open_position(symbol, signal)
                    
        logger.info(f"Strategy cycle complete. Active positions: {len(self.positions)}")
        
    def run(self, cycle_interval: int = 300):
        """Main strategy loop."""
        logger.info("Strategy Engine starting...")
        self._running = True
        
        while self._running:
            try:
                self.run_cycle()
                time.sleep(cycle_interval)
            except Exception as e:
                logger.exception(f"Strategy cycle error: {e}")
                time.sleep(60)
                
        logger.info("Strategy Engine stopped.")
        
    def get_status(self) -> Dict:
        """Return current strategy status."""
        return {
            'running': self._running,
            'active_positions': len(self.positions),
            'positions': self.positions,
            'signals': self.signals
        }


def main():
    """Entry point."""
    symbols = os.environ.get('STRATEGY_SYMBOLS', 'AAPL.US,TSLA.US,NVDA.US').split(',')
    symbols = [s.strip() for s in symbols]
    
    config = StrategyConfig(symbols=symbols)
    engine = StrategyEngine(config)
    engine.run()


if __name__ == '__main__':
    main()

Cost Breakdown: The Real Numbers

Here's what your $10/month actually buys.

Component Option Cost Notes
VPS Oracle Cloud Always Free / Hetzner / Contabo $0–$10/mo 1 vCPU, 1–2GB RAM, 20–50GB SSD
Market Data TickDB Free Tier $0 1M API calls/mo, 10 symbols
Monitoring Prometheus + Grafana (self-hosted) $0 Full observability
Alerts NTFY (free tier) $0 Unlimited push notifications
Database SQLite (embedded) $0 Zero overhead
Domain/SSL Cloudflare + Let's Encrypt $0 If needed for dashboards
Total $0–$10/mo

Scaling Threshold Guide

Strategy AUM Recommended upgrade Monthly cost
< $10,000 Current setup $0–$10
$10k–$100k Add broker prime data +$50–$200
$100k–$1M Upgrade to 2-core VPS +$10–$20
> $1M Consider managed infrastructure +$200+

Deployment Guide: Step by Step

Phase 1: Server Setup (30 minutes)

# SSH into your VPS
ssh root@your-server-ip

# Create quant user
useradd -m -s /bin/bash quant
usermod -aG sudo quant

# Install dependencies
apt update && apt upgrade -y
apt install -y python3 python3-pip git htop iotop sqlite3 prometheus node-exporter

# Create directory structure
mkdir -p /opt/quant-stack/{collectors,strategies,data/market_data,monitoring,alerts,scripts,logs}
chown -R quant:quant /opt/quant-stack

# Install Python packages
pip3 install requests pandas prometheus-client pyyaml

# Set up log rotation
cat > /etc/logrotate.d/quant-stack <<EOF
/var/log/quant-*.log {
    daily
    missingok
    rotate 14
    compress
    delaycompress
    notifempty
    create 0640 quant quant
}
EOF

Phase 2: Environment Configuration

# Create environment file for quant user
cat > /home/quant/.env <<EOF
# TickDB API Configuration
TICKDB_API_KEY=your_api_key_here
COLLECTOR_SYMBOLS=AAPL.US,TSLA.US,NVDA.US,MSFT.US,GOOGL.US
POLL_INTERVAL=60

# Strategy Configuration
STRATEGY_SYMBOLS=AAPL.US,TSLA.US,NVDA.US
STRATEGY_CYCLE_INTERVAL=300

# Alert Configuration (optional)
NTFY_TOPIC=your_secret_topic_name
SMTP_HOST=smtp.gmail.com
SMTP_USER=your_email@gmail.com
SMTP_PASSWORD=your_app_password
ALERT_EMAIL=your_phone@carrier.com
EOF

chmod 600 /home/quant/.env
chown quant:quant /home/quant/.env

Phase 3: Service Installation

# Copy systemd service files
cp /opt/quant-stack/collectors/market-data-collector.service /etc/systemd/system/
cp /opt/quant-stack/strategies/strategy-engine.service /etc/systemd/system/

# Reload systemd and enable services
systemctl daemon-reload
systemctl enable market-data-collector
systemctl enable strategy-engine

# Start services
systemctl start market-data-collector
systemctl start strategy-engine

# Verify services are running
systemctl status market-data-collector
systemctl status strategy-engine

Phase 4: Health Check Automation

#!/bin/bash
# /opt/quant-stack/scripts/health_check.sh
# Run via cron: */5 * * * * /opt/quant-stack/scripts/health_check.sh

LOG_FILE="/var/log/quant-health.log"
EMAIL="your_email@example.com"

echo "=== Health Check $(date) ===" >> $LOG_FILE

# Check service status
for service in market-data-collector strategy-engine; do
    if ! systemctl is-active --quiet $service; then
        echo "ALERT: $service is not running!" >> $LOG_FILE
        systemctl restart $service
        echo "Restarted $service at $(date)" >> $LOG_FILE
    else
        echo "OK: $service is running" >> $LOG_FILE
    fi
done

# Check disk space
DISK_USAGE=$(df / | tail -1 | awk '{print $5}' | sed 's/%//')
if [ "$DISK_USAGE" -gt 85 ]; then
    echo "WARNING: Disk usage at ${DISK_USAGE}%" >> $LOG_FILE
fi

# Check memory
MEM_USAGE=$(free | grep Mem | awk '{printf("%.0f"), $3/$2 * 100}')
if [ "$MEM_USAGE" -gt 90 ]; then
    echo "WARNING: Memory usage at ${MEM_USAGE}%" >> $LOG_FILE
fi

# Add to crontab
# crontab -e
# */5 * * * * /opt/quant-stack/scripts/health_check.sh >> /var/log/quant-cron.log 2>&1

The Monitoring Dashboard

Deploy Grafana with a pre-built dashboard for instant visibility.

# Install Grafana
apt install -y grafana
systemctl enable grafana-server
systemctl start grafana-server

# Access Grafana at http://your-server-ip:3000
# Default credentials: admin / admin (change immediately)

# Import the dashboard JSON or create manually with these panels:
# 1. Collector Status (up/down indicator)
# 2. Records Inserted (rate per minute)
# 3. API Error Rate (per minute)
# 4. Strategy Signals (current positions)
# 5. System Resources (CPU, RAM, Disk)
# 6. Network I/O

Next Steps

If you're an individual developer building your first quant system: Start with the $0 Oracle Cloud Always Free tier and the TickDB free tier. Get a strategy running end-to-end before spending a cent on infrastructure.

If you want to deploy this stack yourself:

  1. Sign up at tickdb.ai (free API key, no credit card required)
  2. Set the TICKDB_API_KEY environment variable on your VPS
  3. Clone the project structure from this article and deploy as described

If you need historical backtesting data spanning market cycles: Reach out to enterprise@tickdb.ai for extended OHLCV history. Backtesting across bull-bear cycles is essential for strategy validation, and the free tier covers most development needs.

If you're interested in AI-assisted development: Search for and install the tickdb-market-data SKILL in your AI tool's marketplace. It provides pre-built patterns for the architecture described in this article.


This article does not constitute investment advice. Markets involve risk; past performance does not guarantee future results. Always validate strategies with out-of-sample testing before live deployment.