In 2021, a proprietary trading firm in Singapore spent three months building a custom adapter layer just to normalize TickDB's raw order book data into the format their internal risk engine expected. The adapter lived outside TickDB entirely — a separate microservice that their DevOps team had to maintain, monitor, and debug whenever the upstream data schema changed. Every schema update broke their pipeline. Every new market they wanted to add required changes in two places.
This is the problem the SKILL protocol was designed to eliminate.
The SKILL protocol is not merely an API wrapper. It is an extension architecture that lets enterprise teams embed their proprietary business logic directly into the data delivery layer — without forking the core platform, without maintaining brittle sidecar services, and without waiting for a product roadmap to catch up with their specific requirements. Whether you need custom normalization rules for a non-standard asset class, proprietary volatility surface calculations, or enterprise-grade audit logging that satisfies regulatory requirements in a specific jurisdiction, the SKILL protocol provides a structured path to extend TickDB's capabilities while maintaining a clean separation between your intellectual property and the underlying data infrastructure.
This article walks through the full SKILL development lifecycle: from understanding the protocol architecture, to writing your first custom function, to deploying in a private or hybrid environment that satisfies enterprise security and compliance requirements.
1. Understanding the SKILL Protocol Architecture
The SKILL protocol operates on a simple but powerful principle: instead of building every conceivable data transformation into the core platform, TickDB exposes a well-defined extension surface that third parties can implement. Think of it as a plugin architecture with strong contracts — you control the logic, TickDB controls the data plane.
1.1 Core Components
A SKILL package consists of three essential components:
The Manifest (skill.yaml)
The manifest declares what the SKILL does, which TickDB capabilities it extends, and what parameters it accepts. This is the contract between your extension and the TickDB runtime.
apiVersion: tickdb.ai/v1
kind: Skill
metadata:
name: volatility-surface-builder
version: 1.0.0
description: "Computes implied volatility surface from options chain data"
author: "enterprise-quant-team"
spec:
capabilities:
- type: function
name: compute_iv_surface
input: options_chain_snapshot
output: volatility_surface_3d
- type: pipeline
name: iv_enrichment
triggers_on:
- symbol
- interval
config:
- name: tenor_range
type: string
default: "1D,1W,2W,1M,2M,3M,6M,1Y"
- name: moneyness_strikes
type: integer
default: 11
- name: model
type: enum
default: black_scholes
options: [black_scholes, binomial, sabr]
runtime:
python_version: "3.11"
memory_limit: "512Mi"
timeout_seconds: 30
The Function Implementation
The actual business logic lives in one or more function files. The protocol supports Python and JavaScript as first-class runtimes, with WebAssembly support on the roadmap for Q3 2026.
The Data Binding Layer
This is where the magic happens. The binding layer connects your custom functions to TickDB's internal data streams. You declare which data feeds your function subscribes to, and TickDB handles the subscription lifecycle, backpressure management, and error propagation.
# skill_function.py
import numpy as np
from scipy.stats import norm
from typing import Dict, List, Any
from tickdb.skill import on_data, register_function
class VolatilitySurfaceBuilder:
"""
Builds a 3D implied volatility surface from an options chain snapshot.
Uses Black-Scholes implied volatility inversion via Newton-Raphson.
"""
def __init__(self, tenor_range: List[str], moneyness_strikes: int, model: str = "black_scholes"):
self.tenor_range = tenor_range
self.moneyness_strikes = moneyness_strikes
self.model = model
@register_function("compute_iv_surface")
@on_data(triggers=["options_chain"])
def compute_iv_surface(self, chain_data: Dict[str, Any]) -> Dict[str, Any]:
"""
Main entry point for the IV surface computation.
Args:
chain_data: Raw options chain with strikes, expiries, bids, asks
Returns:
3D volatility surface as a dictionary with tenor x moneyness grid
"""
surface = {}
spot = chain_data.get("underlying_price")
for expiry in chain_data.get("expiries", []):
tenor_key = self._expiry_to_tenor(expiry)
if tenor_key not in self.tenor_range:
continue
strikes = chain_data.get("strikes", {}).get(expiry, [])
iv_row = {}
for strike_data in strikes:
iv = self._compute_iv_single(
spot=spot,
strike=strike_data["strike"],
expiry=expiry,
option_price=strike_data.get("mid_price"),
option_type=strike_data.get("type"), # call or put
risk_free=self._get_risk_free_rate(expiry)
)
moneyness = np.log(strike_data["strike"] / spot)
iv_row[moneyness] = iv
surface[tenor_key] = iv_row
return {
"surface": surface,
"spot": spot,
"timestamp": chain_data["timestamp"],
"model": self.model
}
def _compute_iv_single(
self,
spot: float,
strike: float,
expiry: str,
option_price: float,
option_type: str,
risk_free: float
) -> float:
"""Newton-Raphson IV inversion for a single strike."""
T = self._expiry_to_year_fraction(expiry)
if T <= 0 or option_price <= 0:
return np.nan
# Initial guess: rough approximation using intrinsic value
intrinsic = max(0, spot - strike) if option_type == "call" else max(0, strike - spot)
sigma = 0.30 # Start with 30% vol assumption
for _ in range(100): # Max 100 iterations
d1 = (np.log(spot / strike) + (risk_free + 0.5 * sigma**2) * T) / (sigma * np.sqrt(T))
d2 = d1 - sigma * np.sqrt(T)
if option_type == "call":
price = spot * norm.cdf(d1) - strike * np.exp(-risk_free * T) * norm.cdf(d2)
else:
price = strike * np.exp(-risk_free * T) * norm.cdf(-d2) - spot * norm.cdf(-d1)
vega = spot * norm.pdf(d1) * np.sqrt(T) # Analytical vega
if abs(vega) < 1e-10:
break
diff = option_price - price
if abs(diff) < 1e-6:
break
sigma += diff / vega # Newton-Raphson step
sigma = max(0.01, min(sigma, 5.0)) # Bound sigma to reasonable range
return sigma
def _expiry_to_tenor(self, expiry: str) -> str:
"""Convert expiration string to tenor notation."""
# Implementation maps expiry dates to standard tenor buckets
pass
def _expiry_to_year_fraction(self, expiry: str) -> float:
"""Convert expiration to year fraction for pricing models."""
pass
def _get_risk_free_rate(self, expiry: str) -> float:
"""Fetch interpolated risk-free rate for the given expiry."""
# In production, this would call an internal rate service
return 0.05
1.2 Execution Model
When you deploy a SKILL, the TickDB runtime manages its lifecycle. Understanding this execution model is critical for writing performant, resilient extensions.
| Lifecycle Phase | What Happens | Your Responsibility |
|---|---|---|
| Initialization | SKILL package loaded; config validated; resources allocated | Define sensible defaults; validate inputs in __init__ |
| Data subscription | SKILL binds to specified data streams | Use the @on_data decorator to declare subscriptions |
| Execution | Functions called with normalized input data | Keep functions stateless where possible; handle errors gracefully |
| State management | Optional in-memory or persistent state between calls | Use tickdb.skill.get_state() / set_state() for cross-call persistence |
| Teardown | Resources released; subscriptions cancelled | Clean up any external connections in finalizer |
# State management example
from tickdb.skill import get_state, set_state, on_disconnect
class RollingVolatilityTracker:
"""Tracks rolling volatility with state persistence across calls."""
def __init__(self, window: int = 20):
self.window = window
self.prices = get_state("rolling_prices", default=[])
@register_function("update_rolling_vol")
@on_data(triggers=["trade"])
def update_rolling_vol(self, trade: Dict) -> Dict:
self.prices.append(trade["price"])
if len(self.prices) > self.window:
self.prices = self.prices[-self.window:]
# Persist state for next invocation
set_state("rolling_prices", self.prices)
returns = np.diff(np.log(self.prices))
vol = np.std(returns) * np.sqrt(252 * 390) # Annualized, ~390 1-min bars/day
return {"rolling_volatility": vol, "sample_size": len(self.prices)}
@on_disconnect
def cleanup(self):
"""Called when SKILL is unloaded. Save state to persistent storage."""
# In production: write to database or object storage
pass
2. Function Extension Patterns for Common Enterprise Use Cases
The SKILL protocol's flexibility makes it suitable for a wide range of enterprise scenarios. This section documents the three most common extension patterns observed in production deployments.
2.1 Pattern 1: Custom Data Normalization for Non-Standard Asset Classes
Institutional teams often need to work with asset classes that TickDB does not natively support at the data level — exotic derivatives, structured products, OTC instruments. A normalization SKILL bridges the gap.
Use case: A commodity trading desk needs to normalize LNG (liquefied natural gas) forward curves from proprietary broker feeds into the same format as their liquid futures data.
Architecture:
Proprietary Broker Feed → Custom Ingestion SKILL → Normalized Curve → TickDB Storage
↓
Vol Surface SKILL
↓
Risk Engine (downstream)
# lng_curve_normalizer.py
from typing import Dict, List, Tuple
from datetime import datetime, timedelta
from tickdb.skill import register_function, schedule
class LNGCurveNormalizer:
"""
Normalizes broker LNG forward curves into standardized format.
Handles the peculiarities of TTF, JKM, and Henry Hub benchmarks.
"""
def __init__(self, benchmark: str = "TTF"):
self.benchmark = benchmark
self.curve_cache = {}
@register_function("normalize_lng_curve")
def normalize_lng_curve(self, raw_broker_data: Dict) -> Dict:
"""
Transforms broker proprietary format into TickDB standard curve.
Input: {"broker_id": "GFI", "timestamp": "...", "bids": [...], "asks": [...]}
Output: {"symbol": "LNG-TTF", "tenors": [...], "mid_prices": [...], ...}
"""
tenors = self._parse_tenor_dates(raw_broker_data["delivery_periods"])
bids = raw_broker_data["bids"]
asks = raw_broker_data["asks"]
mid_prices = [(b + a) / 2 for b, a in zip(bids, asks)]
spreads = [a - b for a, b in zip(asks, bids)]
normalized = {
"symbol": f"LNG-{self.benchmark}",
"benchmark": self.benchmark,
"timestamp": raw_broker_data["timestamp"],
"broker_id": raw_broker_data["broker_id"],
"tenors": tenors,
"mid_prices": mid_prices,
"bid_prices": bids,
"ask_prices": asks,
"spread": spreads,
"liquidity_score": self._compute_liquidity_score(spreads),
"unit": "USD/MMBtu"
}
return normalized
def _parse_tenor_dates(self, periods: List[str]) -> List[str]:
"""Convert broker period codes to ISO date strings."""
tenor_map = {
"W+1": (datetime.now() + timedelta(days=1)).strftime("%Y-%m-%d"),
"W+2": (datetime.now() + timedelta(days=8)).strftime("%Y-%m-%d"),
"M+1": (datetime.now() + timedelta(days=30)).strftime("%Y-%m-%d"),
"M+2": (datetime.now() + timedelta(days=60)).strftime("%Y-%m-%d"),
"Q+1": (datetime.now() + timedelta(days=90)).strftime("%Y-%m-%d"),
}
return [tenor_map.get(p, p) for p in periods]
def _compute_liquidity_score(self, spreads: List[float]) -> float:
"""
Scores curve liquidity based on spread tightness.
Returns 0-1, where 1 = highly liquid.
"""
avg_spread = sum(spreads) / len(spreads)
# Threshold calibration: 0.05 USD/MMBtu = tight, 0.50 = illiquid
score = max(0, 1 - (avg_spread - 0.05) / 0.45)
return round(score, 3)
2.2 Pattern 2: Regulatory Compliance and Audit Logging
Financial institutions operating under MiFID II, Dodd-Frank, or MAS regulations face strict requirements around data lineage, audit trails, and algorithmic trading records. A compliance SKILL handles this without modifying the core TickDB code.
Use case: An asset manager needs to log every data point consumed by their trading algorithms, including timestamps, data source, and the algorithm's response, for regulatory audit purposes.
# compliance_audit_logger.py
import json
import hashlib
from datetime import datetime
from typing import Any, Dict, Optional
from tickdb.skill import (
register_function,
on_data,
on_function_call,
AuditLogger,
get_config
)
class ComplianceAuditLogger:
"""
MiFID II / Dodd-Frank compliant audit logger.
Logs all data consumption and function calls to immutable audit store.
"""
def __init__(self, audit_bucket: str, retention_days: int = 2555):
"""
Args:
audit_bucket: S3/GCS bucket for audit log storage
retention_days: Minimum 7 years for MiFID II compliance (2555 days)
"""
self.audit_logger = AuditLogger(
destination=audit_bucket,
retention_days=retention_days,
encryption_key_id=get_config("audit_encryption_key"),
partition_by="day" # One file per day for efficient querying
)
self.entity_id = get_config("legal_entity_id")
self.counter = 0
@on_function_call # Intercept ALL function calls, not just specific triggers
def log_function_call(self, func_name: str, args: tuple, kwargs: dict, result: Any) -> None:
"""Called automatically after every registered function executes."""
self.counter += 1
audit_record = {
"audit_id": self._generate_audit_id(),
"timestamp": datetime.utcnow().isoformat() + "Z",
"legal_entity_id": self.entity_id,
"function_name": func_name,
"input_hash": self._hash_payload({"args": args, "kwargs": kwargs}),
"output_hash": self._hash_payload(result),
"execution_time_ms": getattr(result, "_execution_time_ms", None),
"data_source": getattr(result, "_data_source", "unknown"),
"sequence_number": self.counter,
"record_type": "FUNC_CALL"
}
self.audit_logger.write(audit_record)
@on_data(triggers=["*"]) # Wildcard = all data feeds
def log_data_consumption(self, data: Any, source: str, metadata: Dict) -> None:
"""Audit trail for every data point consumed."""
audit_record = {
"audit_id": self._generate_audit_id(),
"timestamp": datetime.utcnow().isoformat() + "Z",
"legal_entity_id": self.entity_id,
"data_source": source,
"data_hash": self._hash_payload(data),
"metadata": metadata,
"record_type": "DATA_CONSUMPTION"
}
self.audit_logger.write(audit_record)
def _generate_audit_id(self) -> str:
"""Generate unique, monotonically increasing audit ID."""
ts = datetime.utcnow().strftime("%Y%m%d%H%M%S%f")
return f"AUD-{self.entity_id}-{ts}-{self.counter:06d}"
def _hash_payload(self, payload: Any) -> str:
"""SHA-256 hash of payload for tamper evidence."""
serialized = json.dumps(payload, sort_keys=True, default=str)
return hashlib.sha256(serialized.encode()).hexdigest()
2.3 Pattern 3: Pre-Trade Risk Checks
Enterprise trading systems require real-time risk controls that operate on incoming market data before any order is submitted. A risk SKILL integrates directly into the TickDB data pipeline.
Use case: A prime brokerage needs to enforce position limits, check volatility thresholds, and block orders during scheduled risk windows — all before the order reaches the execution venue.
# pre_trade_risk_engine.py
from typing import Dict, List, Optional, Tuple
from dataclasses import dataclass
from datetime import time
from tickdb.skill import register_function, on_data, RiskViolation
@dataclass
class RiskLimit:
name: str
threshold: float
current: float
def is_breached(self) -> bool:
return abs(self.current) >= abs(self.threshold)
class PreTradeRiskEngine:
"""
Real-time pre-trade risk checks integrated with TickDB data pipeline.
Blocks orders that violate risk parameters before they reach the market.
"""
def __init__(
self,
position_limits: Dict[str, float],
vol_limit: float = 0.30,
max_order_size: int = 10000,
risk_window_start: time = time(9, 30),
risk_window_end: time = time(15, 45)
):
self.position_limits = {
symbol: RiskLimit(name=f"pos_{symbol}", threshold=limit, current=0.0)
for symbol, limit in position_limits.items()
}
self.vol_limit = vol_limit
self.max_order_size = max_order_size
self.risk_window_start = risk_window_start
self.risk_window_end = risk_window_end
@register_function("pre_trade_check")
@on_data(triggers=["trade", "depth"])
def pre_trade_check(self, order: Dict, market_data: Dict) -> Dict:
"""
Full pre-trade risk check.
Raises RiskViolation if any check fails.
"""
violations = []
# Check 1: Position limit
symbol = order["symbol"]
if symbol in self.position_limits:
new_position = self.position_limits[symbol].current + order["quantity"]
self.position_limits[symbol].current = new_position
if self.position_limits[symbol].is_breached():
violations.append(
f"Position limit breached for {symbol}: "
f"{new_position} vs limit {self.position_limits[symbol].threshold}"
)
# Check 2: Order size
if abs(order["quantity"]) > self.max_order_size:
violations.append(
f"Order size {order['quantity']} exceeds maximum {self.max_order_size}"
)
# Check 3: Volatility threshold (uses current market data)
if "volatility" in market_data:
if market_data["volatility"] > self.vol_limit:
violations.append(
f"Volatility {market_data['volatility']:.2%} exceeds limit {self.vol_limit:.2%}"
)
# Check 4: Risk window
current_time = market_data.get("timestamp")
if current_time:
ct = current_time.time() if hasattr(current_time, "time") else None
if ct and not (self.risk_window_start <= ct <= self.risk_window_end):
violations.append(
f"Order outside risk window ({self.risk_window_start}-{self.risk_window_end})"
)
# Check 5: Spread sanity (depth-based)
if "spread" in market_data and "bid" in market_data and "ask" in market_data:
spread_bps = (market_data["ask"] - market_data["bid"]) / market_data["bid"] * 10000
if spread_bps > 100: # > 100 bps spread is suspicious
violations.append(
f"Abnormally wide spread {spread_bps:.1f} bps — possible stale data"
)
if violations:
raise RiskViolation(
message="Pre-trade risk check failed",
violations=violations,
blocking=True # True = reject order; False = warn but allow
)
return {
"status": "APPROVED",
"risk_checks_passed": len(violations) == 0,
"remaining_capacity": {
symbol: limit.threshold - limit.current
for symbol, limit in self.position_limits.items()
}
}
3. Private Deployment: Running SKILLs Behind Your Firewall
Standard TickDB SKILL deployment assumes cloud connectivity. Enterprise environments with data sovereignty requirements, air-gapped networks, or ultra-low-latency demands need a different approach. This section covers the private deployment model.
3.1 Deployment Architecture Options
| Architecture | Use Case | Latency | Data Sovereignty | Complexity |
|---|---|---|---|---|
| TickDB Cloud (default) | Standard enterprise | ~100 ms round-trip | Managed by TickDB | Lowest |
| Hybrid Connect | Regulatory isolation, multi-region | ~50 ms | Configurable per region | Medium |
| On-Premises Runtime | Air-gapped, maximum control | ~5 ms (local network) | Full local control | Highest |
| Co-location | HFT / ULL requirements | Sub-ms | Full local | Highest + specialized |
3.2 On-Premises SKILL Runtime
The on-premises runtime is a self-contained container that runs your SKILL packages alongside a lightweight TickDB data relay. You maintain the infrastructure; TickDB provides the software.
Minimum requirements:
- 4 CPU cores, 8 GB RAM (per SKILL runtime instance)
- Docker 20.10+ or Kubernetes 1.25+
- 50 GB SSD for local state and logs
- Network access to TickDB relay endpoint (port 443)
Setup:
# 1. Download the private runtime installer
$ wget https://enterprise.tickdb.ai/skill-runtime-2.1.0-linux-amd64.tar.gz
# 2. Extract and configure
$ tar -xzf skill-runtime-2.1.0-linux-amd64.tar.gz
$ cd skill-runtime
$ ./configure.sh \
--enterprise-id "ACME-TRADING" \
--relay-endpoint "relay-private.tickdb.ai:8443" \
--tls-mode mutual \
--audit-destination "s3://acme-audit-logs/risk-logs/" \
--log-level info
# 3. Start the runtime
$ docker-compose up -d
# 4. Verify connectivity
$ ./tickdb-cli skill list
NAME VERSION STATUS MEMORY CPU
volatility-surface-builder 1.0.0 RUNNING 256Mi 0.12 cores
compliance-audit-logger 2.1.0 RUNNING 128Mi 0.08 cores
pre-trade-risk-engine 1.3.2 RUNNING 384Mi 0.34 cores
3.3 SKILL Deployment in On-Premises Mode
Deploying SKILLs to the on-premises runtime follows the same manifest structure, with additional configuration for local resource allocation and offline operation.
# skill.yaml (on-premises variant)
apiVersion: tickdb.ai/v1
kind: Skill
metadata:
name: volatility-surface-builder
version: 1.0.0
spec:
# ... base configuration ...
deployment:
mode: on_premises
runtime: tickdb-runtime:2.1.0
resources:
requests:
memory: "512Mi"
cpu: "0.5"
limits:
memory: "1Gi"
cpu: "2"
gpu: "0" # GPU passthrough for ML SKILLs (requires enterprise+)
persistence:
state_storage: local # vs "cloud" (default)
checkpoint_interval_seconds: 30
state_path: /data/skill-state/vol-surface
network:
allow_internet: false # Strict air-gap mode
relay_fallback: null # No cloud relay — fully local
isolation:
sandbox: docker # vs "process" (shared namespace)
read_only_filesystem: false
3.4 Handling Offline Operation
One of the key challenges in private deployment is gracefully degrading when the relay connection is interrupted. The SKILL runtime provides built-in resilience:
# skill_function.py — offline-aware implementation
from tickdb.skill import get_connectivity_status, on_reconnect
class OfflineAwareFunction:
"""
Demonstrates graceful degradation when on-premises runtime
loses connection to the TickDB relay.
"""
def __init__(self):
self.connectivity = get_connectivity_status()
self.local_cache = {}
@register_function("get_vol_surface")
def get_vol_surface(self, symbol: str, force_refresh: bool = False) -> Dict:
"""
Returns cached vol surface if offline, fresh data if online.
"""
cache_key = f"vs_{symbol}"
if not self.connectivity.is_online() and not force_refresh:
# Offline mode: serve from local cache
if cache_key in self.local_cache:
cached = self.local_cache[cache_key]
cached["_data_freshness"] = "cached_offline"
return cached
else:
raise RuntimeError(
f"No local cache available for {symbol}. "
"Connectivity required for first-time data fetch."
)
# Online mode: fetch fresh data
fresh_data = self._fetch_from_relay(symbol)
self.local_cache[cache_key] = fresh_data
fresh_data["_data_freshness"] = "live"
return fresh_data
@on_reconnect
def sync_on_reconnect(self):
"""
Automatically called when connectivity is restored.
Use this to flush local state to central systems.
"""
# Sync any local state accumulated during offline period
self._flush_state_to_central_store()
4. Security and Access Control
Enterprise SKILL deployment requires robust security controls. The protocol addresses this at multiple layers.
4.1 Authentication and Authorization
# skill.yaml — security configuration
spec:
security:
# Who can invoke this SKILL
allowed_callers:
- "arn:aws:iam::123456789:role/trading-algo-prod"
- "arn:aws:iam::123456789:role/risk-engine-prod"
- "user:quant-researcher@acme.com" # Individual users
# Rate limiting per caller
rate_limits:
default: 100 # calls per minute
overrides:
"arn:aws:iam::...risk-engine-prod": 10000
# Data access controls
data_scope:
# Restrict SKILL to specific symbols or asset classes
symbols:
- "AAPL.US"
- "TSLA.US"
- "BTC.*" # Wildcard for crypto pairs
markets:
- us_equity
- crypto
# Encryption
input_encryption: required
output_encryption: required
at_rest_encryption:
enabled: true
key_id: "arn:aws:kms:us-east-1:123456789:key/enterprise-skill-key"
4.2 Code Signing
All SKILL packages must be signed before deployment. The runtime rejects unsigned packages.
# Signing a SKILL package
$ tickdb-cli skill sign \
--package ./volatility-surface-builder-1.0.0.skill \
--signing-key "./keys/acme-trading-skill-signing.pem" \
--output ./volatility-surface-builder-1.0.0.signed.skill
$ tickdb-cli skill verify \
--package ./volatility-surface-builder-1.0.0.signed.skill \
--public-key "./keys/tickdb-enterprise-ca.pem"
Signature valid. Package integrity verified.
5. Development, Testing, and CI/CD
Professional SKILL development follows a structured lifecycle. This section documents the recommended workflow.
5.1 Local Development Environment
# Install the SKILL SDK
$ pip install tickdb-skill-sdk==2.1.0
# Initialize a new SKILL project
$ tickdb-skill init --name volatility-surface-builder --runtime python3.11
Created project structure:
volatility-surface-builder/
├── skill.yaml # Manifest
├── src/
│ ├── __init__.py
│ ├── functions/
│ │ ├── __init__.py
│ │ └── surface_builder.py
│ └── utils/
│ ├── __init__.py
│ └── numerics.py
├── tests/
│ ├── __init__.py
│ ├── test_surface_builder.py
│ └── fixtures/
│ └── sample_chain.json
├── Makefile
└── README.md
5.2 Unit Testing
# tests/test_surface_builder.py
import pytest
import numpy as np
from src.functions.surface_builder import VolatilitySurfaceBuilder
class TestVolatilitySurfaceBuilder:
"""Unit tests for the IV surface computation."""
@pytest.fixture
def builder(self):
return VolatilitySurfaceBuilder(
tenor_range=["1W", "1M", "2M"],
moneyness_strikes=5,
model="black_scholes"
)
@pytest.fixture
def sample_chain(self):
"""Standard at-the-money options chain for testing."""
return {
"underlying_price": 100.0,
"timestamp": "2026-04-15T10:00:00Z",
"expiries": ["2026-04-22", "2026-05-15", "2026-06-15"],
"strikes": {
"2026-04-22": [
{"strike": 95.0, "mid_price": 5.50, "type": "call"},
{"strike": 100.0, "mid_price": 2.10, "type": "call"},
{"strike": 105.0, "mid_price": 0.60, "type": "call"},
],
# ... additional expirations
}
}
def test_surface_output_structure(self, builder, sample_chain):
"""Surface output contains required fields."""
result = builder.compute_iv_surface(sample_chain)
assert "surface" in result
assert "spot" in result
assert "timestamp" in result
assert result["spot"] == 100.0
def test_iv_within_reasonable_bounds(self, builder, sample_chain):
"""Computed IV should be between 1% and 200%."""
result = builder.compute_iv_surface(sample_chain)
for tenor, moneyness_grid in result["surface"].items():
for moneyness, iv in moneyness_grid.items():
assert 0.01 < iv < 2.0, f"IV {iv} out of bounds at tenor={tenor}, moneyness={moneyness}"
def test_call_parity(self, builder):
"""Synthetic put IV should match call IV at same moneyness (put-call parity)."""
# Construct a chain where put-call parity should hold exactly
# (ignoring early exercise for short expirations)
# Verify computed IVs match within tolerance
pass
def test_edge_case_zero_vega(self, builder):
"""Very short-dated options with tiny vega should not crash."""
short_chain = {
"underlying_price": 100.0,
"timestamp": "2026-04-15T10:00:00Z",
"expiries": ["2026-04-16"],
"strikes": {
"2026-04-16": [
{"strike": 100.0, "mid_price": 0.01, "type": "call"},
]
}
}
result = builder.compute_iv_surface(short_chain)
assert not np.isnan(result["surface"].get("1W", {}).get(0.0, np.nan))
5.3 CI/CD Pipeline
# .github/workflows/skill-ci.yml
name: SKILL CI/CD Pipeline
on:
push:
branches: [main]
pull_request:
branches: [main]
jobs:
test:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- name: Set up Python
uses: actions/setup-python@v5
with:
python-version: "3.11"
- name: Install dependencies
run: |
pip install tickdb-skill-sdk pytest pytest-cov black flake8
- name: Lint
run: |
black --check src/ tests/
flake8 src/ --max-line-length=100 --ignore=E501,W503
- name: Unit tests
run: |
pytest tests/ --cov=src --cov-report=xml --cov-fail-under=80
- name: Build SKILL package
run: |
tickdb-cli skill build --package-name volatility-surface-builder
- name: Sign package
run: |
echo "${{ secrets.SKILL_SIGNING_KEY }}" > signing-key.pem
tickdb-cli skill sign --package ./*.skill --signing-key signing-key.pem
- name: Deploy to staging (on PR merge)
if: github.ref == 'refs/heads/main'
run: |
tickdb-cli skill deploy \
--package ./*.skill \
--target staging \
--runtime-url https://staging-api.tickdb.ai \
--api-key ${{ secrets.TICKDB_STAGING_KEY }}
security-scan:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- name: Run security scan
uses: tickdb/skill-security-scan@v2
with:
package-path: "./volatility-surface-builder.skill"
scan-level: enterprise
6. Enterprise Considerations and Migration Strategies
6.1 Moving from Ad-Hoc Adapters to SKILLs
If your team currently maintains external adapter services (as described in the opening scenario), the migration path is straightforward:
| Phase | Action | Duration |
|---|---|---|
| 1 | Audit existing adapter logic; identify transformation rules | 1–2 weeks |
| 2 | Write equivalent SKILL functions; wrap in skill.yaml | 2–3 weeks |
| 3 | Run SKILL in shadow mode alongside existing adapter | 1–2 weeks |
| 4 | Validate output parity (byte-level comparison of transformed data) | 1 week |
| 5 | Cut over to SKILL; decommission adapter | 1 day |
6.2 SKILL Governance in Large Organizations
Enterprise teams with multiple SKILLs benefit from centralized governance:
| Governance Concern | Recommended Practice |
|---|---|
| SKILL ownership | One team owns each SKILL; ownership recorded in manifest |
| Version management | Semantic versioning (MAJOR.MINOR.PATCH); MAJOR changes require review |
| Dependency management | SKILLs declare dependencies on specific TickDB API versions |
| Deprecation | 90-day deprecation notice via manifest deprecation field |
| Rollback | Last-3-versions rollback via tickdb-cli skill rollback |
7. Closing
The SKILL protocol transforms TickDB from a data delivery platform into a programmable market data infrastructure. The three patterns covered here — custom normalization, compliance logging, and pre-trade risk — represent the most common enterprise use cases, but the architecture is deliberately general. Any transformation, enrichment, or gate that your business logic requires can be expressed as a SKILL.
The key architectural advantages this delivers in practice:
- Single source of truth: Business logic lives alongside the data, not in a separate microservice that can drift out of sync.
- Consistent operational model: SKILLs share the same deployment, monitoring, and security framework as TickDB itself.
- Portable: A SKILL written for the cloud runtime runs unchanged in a private deployment (subject to resource configuration differences).
For teams currently maintaining adapter layers, migration to the SKILL model typically pays for itself within three months in reduced maintenance burden and incident resolution time.
Next Steps
If you're an individual quant developer exploring SKILLs for personal use, start with the free tier at tickdb.ai — you can deploy one SKILL with limited resources to experiment with the extension model.
If you're an enterprise team evaluating SKILLs for production use, schedule a technical architecture review with the TickDB enterprise team at enterprise@tickdb.ai. Bring your current adapter architecture diagrams — the team will map them to the equivalent SKILL topology.
If you're ready to build your first SKILL, install the SDK and run through the local development quickstart at docs.tickdb.ai/skill-sdk/quickstart. The first function takes under an hour to write and deploy.
If you use AI coding assistants, search for and install the tickdb-market-data SKILL in your AI tool's marketplace to get native TickDB function calling support.
This article does not constitute investment advice. Markets involve risk; past performance does not guarantee future results.