In 2021, a proprietary trading firm in Singapore spent three months building a custom adapter layer just to normalize TickDB's raw order book data into the format their internal risk engine expected. The adapter lived outside TickDB entirely — a separate microservice that their DevOps team had to maintain, monitor, and debug whenever the upstream data schema changed. Every schema update broke their pipeline. Every new market they wanted to add required changes in two places.

This is the problem the SKILL protocol was designed to eliminate.

The SKILL protocol is not merely an API wrapper. It is an extension architecture that lets enterprise teams embed their proprietary business logic directly into the data delivery layer — without forking the core platform, without maintaining brittle sidecar services, and without waiting for a product roadmap to catch up with their specific requirements. Whether you need custom normalization rules for a non-standard asset class, proprietary volatility surface calculations, or enterprise-grade audit logging that satisfies regulatory requirements in a specific jurisdiction, the SKILL protocol provides a structured path to extend TickDB's capabilities while maintaining a clean separation between your intellectual property and the underlying data infrastructure.

This article walks through the full SKILL development lifecycle: from understanding the protocol architecture, to writing your first custom function, to deploying in a private or hybrid environment that satisfies enterprise security and compliance requirements.


1. Understanding the SKILL Protocol Architecture

The SKILL protocol operates on a simple but powerful principle: instead of building every conceivable data transformation into the core platform, TickDB exposes a well-defined extension surface that third parties can implement. Think of it as a plugin architecture with strong contracts — you control the logic, TickDB controls the data plane.

1.1 Core Components

A SKILL package consists of three essential components:

The Manifest (skill.yaml)

The manifest declares what the SKILL does, which TickDB capabilities it extends, and what parameters it accepts. This is the contract between your extension and the TickDB runtime.

apiVersion: tickdb.ai/v1
kind: Skill
metadata:
  name: volatility-surface-builder
  version: 1.0.0
  description: "Computes implied volatility surface from options chain data"
  author: "enterprise-quant-team"
spec:
  capabilities:
    - type: function
      name: compute_iv_surface
      input: options_chain_snapshot
      output: volatility_surface_3d
    - type: pipeline
      name: iv_enrichment
      triggers_on:
        - symbol
        - interval
  config:
    - name: tenor_range
      type: string
      default: "1D,1W,2W,1M,2M,3M,6M,1Y"
    - name: moneyness_strikes
      type: integer
      default: 11
    - name: model
      type: enum
      default: black_scholes
      options: [black_scholes, binomial, sabr]
  runtime:
    python_version: "3.11"
    memory_limit: "512Mi"
    timeout_seconds: 30

The Function Implementation

The actual business logic lives in one or more function files. The protocol supports Python and JavaScript as first-class runtimes, with WebAssembly support on the roadmap for Q3 2026.

The Data Binding Layer

This is where the magic happens. The binding layer connects your custom functions to TickDB's internal data streams. You declare which data feeds your function subscribes to, and TickDB handles the subscription lifecycle, backpressure management, and error propagation.

# skill_function.py
import numpy as np
from scipy.stats import norm
from typing import Dict, List, Any
from tickdb.skill import on_data, register_function

class VolatilitySurfaceBuilder:
    """
    Builds a 3D implied volatility surface from an options chain snapshot.
    Uses Black-Scholes implied volatility inversion via Newton-Raphson.
    """
    
    def __init__(self, tenor_range: List[str], moneyness_strikes: int, model: str = "black_scholes"):
        self.tenor_range = tenor_range
        self.moneyness_strikes = moneyness_strikes
        self.model = model
    
    @register_function("compute_iv_surface")
    @on_data(triggers=["options_chain"])
    def compute_iv_surface(self, chain_data: Dict[str, Any]) -> Dict[str, Any]:
        """
        Main entry point for the IV surface computation.
        
        Args:
            chain_data: Raw options chain with strikes, expiries, bids, asks
            
        Returns:
            3D volatility surface as a dictionary with tenor x moneyness grid
        """
        surface = {}
        spot = chain_data.get("underlying_price")
        
        for expiry in chain_data.get("expiries", []):
            tenor_key = self._expiry_to_tenor(expiry)
            if tenor_key not in self.tenor_range:
                continue
                
            strikes = chain_data.get("strikes", {}).get(expiry, [])
            iv_row = {}
            
            for strike_data in strikes:
                iv = self._compute_iv_single(
                    spot=spot,
                    strike=strike_data["strike"],
                    expiry=expiry,
                    option_price=strike_data.get("mid_price"),
                    option_type=strike_data.get("type"),  # call or put
                    risk_free=self._get_risk_free_rate(expiry)
                )
                moneyness = np.log(strike_data["strike"] / spot)
                iv_row[moneyness] = iv
            
            surface[tenor_key] = iv_row
        
        return {
            "surface": surface,
            "spot": spot,
            "timestamp": chain_data["timestamp"],
            "model": self.model
        }
    
    def _compute_iv_single(
        self,
        spot: float,
        strike: float,
        expiry: str,
        option_price: float,
        option_type: str,
        risk_free: float
    ) -> float:
        """Newton-Raphson IV inversion for a single strike."""
        T = self._expiry_to_year_fraction(expiry)
        if T <= 0 or option_price <= 0:
            return np.nan
        
        # Initial guess: rough approximation using intrinsic value
        intrinsic = max(0, spot - strike) if option_type == "call" else max(0, strike - spot)
        sigma = 0.30  # Start with 30% vol assumption
        
        for _ in range(100):  # Max 100 iterations
            d1 = (np.log(spot / strike) + (risk_free + 0.5 * sigma**2) * T) / (sigma * np.sqrt(T))
            d2 = d1 - sigma * np.sqrt(T)
            
            if option_type == "call":
                price = spot * norm.cdf(d1) - strike * np.exp(-risk_free * T) * norm.cdf(d2)
            else:
                price = strike * np.exp(-risk_free * T) * norm.cdf(-d2) - spot * norm.cdf(-d1)
            
            vega = spot * norm.pdf(d1) * np.sqrt(T)  # Analytical vega
            
            if abs(vega) < 1e-10:
                break
                
            diff = option_price - price
            if abs(diff) < 1e-6:
                break
                
            sigma += diff / vega  # Newton-Raphson step
            sigma = max(0.01, min(sigma, 5.0))  # Bound sigma to reasonable range
        
        return sigma
    
    def _expiry_to_tenor(self, expiry: str) -> str:
        """Convert expiration string to tenor notation."""
        # Implementation maps expiry dates to standard tenor buckets
        pass
    
    def _expiry_to_year_fraction(self, expiry: str) -> float:
        """Convert expiration to year fraction for pricing models."""
        pass
    
    def _get_risk_free_rate(self, expiry: str) -> float:
        """Fetch interpolated risk-free rate for the given expiry."""
        # In production, this would call an internal rate service
        return 0.05

1.2 Execution Model

When you deploy a SKILL, the TickDB runtime manages its lifecycle. Understanding this execution model is critical for writing performant, resilient extensions.

Lifecycle Phase What Happens Your Responsibility
Initialization SKILL package loaded; config validated; resources allocated Define sensible defaults; validate inputs in __init__
Data subscription SKILL binds to specified data streams Use the @on_data decorator to declare subscriptions
Execution Functions called with normalized input data Keep functions stateless where possible; handle errors gracefully
State management Optional in-memory or persistent state between calls Use tickdb.skill.get_state() / set_state() for cross-call persistence
Teardown Resources released; subscriptions cancelled Clean up any external connections in finalizer
# State management example
from tickdb.skill import get_state, set_state, on_disconnect

class RollingVolatilityTracker:
    """Tracks rolling volatility with state persistence across calls."""
    
    def __init__(self, window: int = 20):
        self.window = window
        self.prices = get_state("rolling_prices", default=[])
    
    @register_function("update_rolling_vol")
    @on_data(triggers=["trade"])
    def update_rolling_vol(self, trade: Dict) -> Dict:
        self.prices.append(trade["price"])
        if len(self.prices) > self.window:
            self.prices = self.prices[-self.window:]
        
        # Persist state for next invocation
        set_state("rolling_prices", self.prices)
        
        returns = np.diff(np.log(self.prices))
        vol = np.std(returns) * np.sqrt(252 * 390)  # Annualized, ~390 1-min bars/day
        
        return {"rolling_volatility": vol, "sample_size": len(self.prices)}
    
    @on_disconnect
    def cleanup(self):
        """Called when SKILL is unloaded. Save state to persistent storage."""
        # In production: write to database or object storage
        pass

2. Function Extension Patterns for Common Enterprise Use Cases

The SKILL protocol's flexibility makes it suitable for a wide range of enterprise scenarios. This section documents the three most common extension patterns observed in production deployments.

2.1 Pattern 1: Custom Data Normalization for Non-Standard Asset Classes

Institutional teams often need to work with asset classes that TickDB does not natively support at the data level — exotic derivatives, structured products, OTC instruments. A normalization SKILL bridges the gap.

Use case: A commodity trading desk needs to normalize LNG (liquefied natural gas) forward curves from proprietary broker feeds into the same format as their liquid futures data.

Architecture:

Proprietary Broker Feed → Custom Ingestion SKILL → Normalized Curve → TickDB Storage
                                              ↓
                                     Vol Surface SKILL
                                              ↓
                                     Risk Engine (downstream)
# lng_curve_normalizer.py
from typing import Dict, List, Tuple
from datetime import datetime, timedelta
from tickdb.skill import register_function, schedule

class LNGCurveNormalizer:
    """
    Normalizes broker LNG forward curves into standardized format.
    Handles the peculiarities of TTF, JKM, and Henry Hub benchmarks.
    """
    
    def __init__(self, benchmark: str = "TTF"):
        self.benchmark = benchmark
        self.curve_cache = {}
    
    @register_function("normalize_lng_curve")
    def normalize_lng_curve(self, raw_broker_data: Dict) -> Dict:
        """
        Transforms broker proprietary format into TickDB standard curve.
        
        Input: {"broker_id": "GFI", "timestamp": "...", "bids": [...], "asks": [...]}
        Output: {"symbol": "LNG-TTF", "tenors": [...], "mid_prices": [...], ...}
        """
        tenors = self._parse_tenor_dates(raw_broker_data["delivery_periods"])
        bids = raw_broker_data["bids"]
        asks = raw_broker_data["asks"]
        
        mid_prices = [(b + a) / 2 for b, a in zip(bids, asks)]
        spreads = [a - b for a, b in zip(asks, bids)]
        
        normalized = {
            "symbol": f"LNG-{self.benchmark}",
            "benchmark": self.benchmark,
            "timestamp": raw_broker_data["timestamp"],
            "broker_id": raw_broker_data["broker_id"],
            "tenors": tenors,
            "mid_prices": mid_prices,
            "bid_prices": bids,
            "ask_prices": asks,
            "spread": spreads,
            "liquidity_score": self._compute_liquidity_score(spreads),
            "unit": "USD/MMBtu"
        }
        
        return normalized
    
    def _parse_tenor_dates(self, periods: List[str]) -> List[str]:
        """Convert broker period codes to ISO date strings."""
        tenor_map = {
            "W+1": (datetime.now() + timedelta(days=1)).strftime("%Y-%m-%d"),
            "W+2": (datetime.now() + timedelta(days=8)).strftime("%Y-%m-%d"),
            "M+1": (datetime.now() + timedelta(days=30)).strftime("%Y-%m-%d"),
            "M+2": (datetime.now() + timedelta(days=60)).strftime("%Y-%m-%d"),
            "Q+1": (datetime.now() + timedelta(days=90)).strftime("%Y-%m-%d"),
        }
        return [tenor_map.get(p, p) for p in periods]
    
    def _compute_liquidity_score(self, spreads: List[float]) -> float:
        """
        Scores curve liquidity based on spread tightness.
        Returns 0-1, where 1 = highly liquid.
        """
        avg_spread = sum(spreads) / len(spreads)
        # Threshold calibration: 0.05 USD/MMBtu = tight, 0.50 = illiquid
        score = max(0, 1 - (avg_spread - 0.05) / 0.45)
        return round(score, 3)

2.2 Pattern 2: Regulatory Compliance and Audit Logging

Financial institutions operating under MiFID II, Dodd-Frank, or MAS regulations face strict requirements around data lineage, audit trails, and algorithmic trading records. A compliance SKILL handles this without modifying the core TickDB code.

Use case: An asset manager needs to log every data point consumed by their trading algorithms, including timestamps, data source, and the algorithm's response, for regulatory audit purposes.

# compliance_audit_logger.py
import json
import hashlib
from datetime import datetime
from typing import Any, Dict, Optional
from tickdb.skill import (
    register_function, 
    on_data, 
    on_function_call,
    AuditLogger,
    get_config
)

class ComplianceAuditLogger:
    """
    MiFID II / Dodd-Frank compliant audit logger.
    Logs all data consumption and function calls to immutable audit store.
    """
    
    def __init__(self, audit_bucket: str, retention_days: int = 2555):
        """
        Args:
            audit_bucket: S3/GCS bucket for audit log storage
            retention_days: Minimum 7 years for MiFID II compliance (2555 days)
        """
        self.audit_logger = AuditLogger(
            destination=audit_bucket,
            retention_days=retention_days,
            encryption_key_id=get_config("audit_encryption_key"),
            partition_by="day"  # One file per day for efficient querying
        )
        self.entity_id = get_config("legal_entity_id")
        self.counter = 0
    
    @on_function_call  # Intercept ALL function calls, not just specific triggers
    def log_function_call(self, func_name: str, args: tuple, kwargs: dict, result: Any) -> None:
        """Called automatically after every registered function executes."""
        self.counter += 1
        
        audit_record = {
            "audit_id": self._generate_audit_id(),
            "timestamp": datetime.utcnow().isoformat() + "Z",
            "legal_entity_id": self.entity_id,
            "function_name": func_name,
            "input_hash": self._hash_payload({"args": args, "kwargs": kwargs}),
            "output_hash": self._hash_payload(result),
            "execution_time_ms": getattr(result, "_execution_time_ms", None),
            "data_source": getattr(result, "_data_source", "unknown"),
            "sequence_number": self.counter,
            "record_type": "FUNC_CALL"
        }
        
        self.audit_logger.write(audit_record)
    
    @on_data(triggers=["*"])  # Wildcard = all data feeds
    def log_data_consumption(self, data: Any, source: str, metadata: Dict) -> None:
        """Audit trail for every data point consumed."""
        audit_record = {
            "audit_id": self._generate_audit_id(),
            "timestamp": datetime.utcnow().isoformat() + "Z",
            "legal_entity_id": self.entity_id,
            "data_source": source,
            "data_hash": self._hash_payload(data),
            "metadata": metadata,
            "record_type": "DATA_CONSUMPTION"
        }
        
        self.audit_logger.write(audit_record)
    
    def _generate_audit_id(self) -> str:
        """Generate unique, monotonically increasing audit ID."""
        ts = datetime.utcnow().strftime("%Y%m%d%H%M%S%f")
        return f"AUD-{self.entity_id}-{ts}-{self.counter:06d}"
    
    def _hash_payload(self, payload: Any) -> str:
        """SHA-256 hash of payload for tamper evidence."""
        serialized = json.dumps(payload, sort_keys=True, default=str)
        return hashlib.sha256(serialized.encode()).hexdigest()

2.3 Pattern 3: Pre-Trade Risk Checks

Enterprise trading systems require real-time risk controls that operate on incoming market data before any order is submitted. A risk SKILL integrates directly into the TickDB data pipeline.

Use case: A prime brokerage needs to enforce position limits, check volatility thresholds, and block orders during scheduled risk windows — all before the order reaches the execution venue.

# pre_trade_risk_engine.py
from typing import Dict, List, Optional, Tuple
from dataclasses import dataclass
from datetime import time
from tickdb.skill import register_function, on_data, RiskViolation

@dataclass
class RiskLimit:
    name: str
    threshold: float
    current: float
    
    def is_breached(self) -> bool:
        return abs(self.current) >= abs(self.threshold)

class PreTradeRiskEngine:
    """
    Real-time pre-trade risk checks integrated with TickDB data pipeline.
    Blocks orders that violate risk parameters before they reach the market.
    """
    
    def __init__(
        self,
        position_limits: Dict[str, float],
        vol_limit: float = 0.30,
        max_order_size: int = 10000,
        risk_window_start: time = time(9, 30),
        risk_window_end: time = time(15, 45)
    ):
        self.position_limits = {
            symbol: RiskLimit(name=f"pos_{symbol}", threshold=limit, current=0.0)
            for symbol, limit in position_limits.items()
        }
        self.vol_limit = vol_limit
        self.max_order_size = max_order_size
        self.risk_window_start = risk_window_start
        self.risk_window_end = risk_window_end
    
    @register_function("pre_trade_check")
    @on_data(triggers=["trade", "depth"])
    def pre_trade_check(self, order: Dict, market_data: Dict) -> Dict:
        """
        Full pre-trade risk check.
        Raises RiskViolation if any check fails.
        """
        violations = []
        
        # Check 1: Position limit
        symbol = order["symbol"]
        if symbol in self.position_limits:
            new_position = self.position_limits[symbol].current + order["quantity"]
            self.position_limits[symbol].current = new_position
            if self.position_limits[symbol].is_breached():
                violations.append(
                    f"Position limit breached for {symbol}: "
                    f"{new_position} vs limit {self.position_limits[symbol].threshold}"
                )
        
        # Check 2: Order size
        if abs(order["quantity"]) > self.max_order_size:
            violations.append(
                f"Order size {order['quantity']} exceeds maximum {self.max_order_size}"
            )
        
        # Check 3: Volatility threshold (uses current market data)
        if "volatility" in market_data:
            if market_data["volatility"] > self.vol_limit:
                violations.append(
                    f"Volatility {market_data['volatility']:.2%} exceeds limit {self.vol_limit:.2%}"
                )
        
        # Check 4: Risk window
        current_time = market_data.get("timestamp")
        if current_time:
            ct = current_time.time() if hasattr(current_time, "time") else None
            if ct and not (self.risk_window_start <= ct <= self.risk_window_end):
                violations.append(
                    f"Order outside risk window ({self.risk_window_start}-{self.risk_window_end})"
                )
        
        # Check 5: Spread sanity (depth-based)
        if "spread" in market_data and "bid" in market_data and "ask" in market_data:
            spread_bps = (market_data["ask"] - market_data["bid"]) / market_data["bid"] * 10000
            if spread_bps > 100:  # > 100 bps spread is suspicious
                violations.append(
                    f"Abnormally wide spread {spread_bps:.1f} bps — possible stale data"
                )
        
        if violations:
            raise RiskViolation(
                message="Pre-trade risk check failed",
                violations=violations,
                blocking=True  # True = reject order; False = warn but allow
            )
        
        return {
            "status": "APPROVED",
            "risk_checks_passed": len(violations) == 0,
            "remaining_capacity": {
                symbol: limit.threshold - limit.current
                for symbol, limit in self.position_limits.items()
            }
        }

3. Private Deployment: Running SKILLs Behind Your Firewall

Standard TickDB SKILL deployment assumes cloud connectivity. Enterprise environments with data sovereignty requirements, air-gapped networks, or ultra-low-latency demands need a different approach. This section covers the private deployment model.

3.1 Deployment Architecture Options

Architecture Use Case Latency Data Sovereignty Complexity
TickDB Cloud (default) Standard enterprise ~100 ms round-trip Managed by TickDB Lowest
Hybrid Connect Regulatory isolation, multi-region ~50 ms Configurable per region Medium
On-Premises Runtime Air-gapped, maximum control ~5 ms (local network) Full local control Highest
Co-location HFT / ULL requirements Sub-ms Full local Highest + specialized

3.2 On-Premises SKILL Runtime

The on-premises runtime is a self-contained container that runs your SKILL packages alongside a lightweight TickDB data relay. You maintain the infrastructure; TickDB provides the software.

Minimum requirements:

  • 4 CPU cores, 8 GB RAM (per SKILL runtime instance)
  • Docker 20.10+ or Kubernetes 1.25+
  • 50 GB SSD for local state and logs
  • Network access to TickDB relay endpoint (port 443)

Setup:

# 1. Download the private runtime installer
$ wget https://enterprise.tickdb.ai/skill-runtime-2.1.0-linux-amd64.tar.gz

# 2. Extract and configure
$ tar -xzf skill-runtime-2.1.0-linux-amd64.tar.gz
$ cd skill-runtime
$ ./configure.sh \
    --enterprise-id "ACME-TRADING" \
    --relay-endpoint "relay-private.tickdb.ai:8443" \
    --tls-mode mutual \
    --audit-destination "s3://acme-audit-logs/risk-logs/" \
    --log-level info

# 3. Start the runtime
$ docker-compose up -d

# 4. Verify connectivity
$ ./tickdb-cli skill list
NAME                        VERSION   STATUS   MEMORY   CPU
volatility-surface-builder  1.0.0     RUNNING  256Mi   0.12 cores
compliance-audit-logger     2.1.0     RUNNING  128Mi   0.08 cores
pre-trade-risk-engine       1.3.2     RUNNING  384Mi   0.34 cores

3.3 SKILL Deployment in On-Premises Mode

Deploying SKILLs to the on-premises runtime follows the same manifest structure, with additional configuration for local resource allocation and offline operation.

# skill.yaml (on-premises variant)
apiVersion: tickdb.ai/v1
kind: Skill
metadata:
  name: volatility-surface-builder
  version: 1.0.0
spec:
  # ... base configuration ...
  
  deployment:
    mode: on_premises
    runtime: tickdb-runtime:2.1.0
    
    resources:
      requests:
        memory: "512Mi"
        cpu: "0.5"
      limits:
        memory: "1Gi"
        cpu: "2"
        gpu: "0"  # GPU passthrough for ML SKILLs (requires enterprise+)
    
    persistence:
      state_storage: local  # vs "cloud" (default)
      checkpoint_interval_seconds: 30
      state_path: /data/skill-state/vol-surface
    
    network:
      allow_internet: false  # Strict air-gap mode
      relay_fallback: null   # No cloud relay — fully local
    
    isolation:
      sandbox: docker  # vs "process" (shared namespace)
      read_only_filesystem: false

3.4 Handling Offline Operation

One of the key challenges in private deployment is gracefully degrading when the relay connection is interrupted. The SKILL runtime provides built-in resilience:

# skill_function.py — offline-aware implementation
from tickdb.skill import get_connectivity_status, on_reconnect

class OfflineAwareFunction:
    """
    Demonstrates graceful degradation when on-premises runtime
    loses connection to the TickDB relay.
    """
    
    def __init__(self):
        self.connectivity = get_connectivity_status()
        self.local_cache = {}
    
    @register_function("get_vol_surface")
    def get_vol_surface(self, symbol: str, force_refresh: bool = False) -> Dict:
        """
        Returns cached vol surface if offline, fresh data if online.
        """
        cache_key = f"vs_{symbol}"
        
        if not self.connectivity.is_online() and not force_refresh:
            # Offline mode: serve from local cache
            if cache_key in self.local_cache:
                cached = self.local_cache[cache_key]
                cached["_data_freshness"] = "cached_offline"
                return cached
            else:
                raise RuntimeError(
                    f"No local cache available for {symbol}. "
                    "Connectivity required for first-time data fetch."
                )
        
        # Online mode: fetch fresh data
        fresh_data = self._fetch_from_relay(symbol)
        self.local_cache[cache_key] = fresh_data
        fresh_data["_data_freshness"] = "live"
        return fresh_data
    
    @on_reconnect
    def sync_on_reconnect(self):
        """
        Automatically called when connectivity is restored.
        Use this to flush local state to central systems.
        """
        # Sync any local state accumulated during offline period
        self._flush_state_to_central_store()

4. Security and Access Control

Enterprise SKILL deployment requires robust security controls. The protocol addresses this at multiple layers.

4.1 Authentication and Authorization

# skill.yaml — security configuration
spec:
  security:
    # Who can invoke this SKILL
    allowed_callers:
      - "arn:aws:iam::123456789:role/trading-algo-prod"
      - "arn:aws:iam::123456789:role/risk-engine-prod"
      - "user:quant-researcher@acme.com"  # Individual users
    
    # Rate limiting per caller
    rate_limits:
      default: 100  # calls per minute
      overrides:
        "arn:aws:iam::...risk-engine-prod": 10000
    
    # Data access controls
    data_scope:
      # Restrict SKILL to specific symbols or asset classes
      symbols:
        - "AAPL.US"
        - "TSLA.US"
        - "BTC.*"  # Wildcard for crypto pairs
      markets:
        - us_equity
        - crypto
      
    # Encryption
    input_encryption: required
    output_encryption: required
    at_rest_encryption:
      enabled: true
      key_id: "arn:aws:kms:us-east-1:123456789:key/enterprise-skill-key"

4.2 Code Signing

All SKILL packages must be signed before deployment. The runtime rejects unsigned packages.

# Signing a SKILL package
$ tickdb-cli skill sign \
    --package ./volatility-surface-builder-1.0.0.skill \
    --signing-key "./keys/acme-trading-skill-signing.pem" \
    --output ./volatility-surface-builder-1.0.0.signed.skill

$ tickdb-cli skill verify \
    --package ./volatility-surface-builder-1.0.0.signed.skill \
    --public-key "./keys/tickdb-enterprise-ca.pem"
Signature valid. Package integrity verified.

5. Development, Testing, and CI/CD

Professional SKILL development follows a structured lifecycle. This section documents the recommended workflow.

5.1 Local Development Environment

# Install the SKILL SDK
$ pip install tickdb-skill-sdk==2.1.0

# Initialize a new SKILL project
$ tickdb-skill init --name volatility-surface-builder --runtime python3.11

Created project structure:
volatility-surface-builder/
├── skill.yaml              # Manifest
├── src/
│   ├── __init__.py
│   ├── functions/
│   │   ├── __init__.py
│   │   └── surface_builder.py
│   └── utils/
│       ├── __init__.py
│       └── numerics.py
├── tests/
│   ├── __init__.py
│   ├── test_surface_builder.py
│   └── fixtures/
│       └── sample_chain.json
├── Makefile
└── README.md

5.2 Unit Testing

# tests/test_surface_builder.py
import pytest
import numpy as np
from src.functions.surface_builder import VolatilitySurfaceBuilder

class TestVolatilitySurfaceBuilder:
    """Unit tests for the IV surface computation."""
    
    @pytest.fixture
    def builder(self):
        return VolatilitySurfaceBuilder(
            tenor_range=["1W", "1M", "2M"],
            moneyness_strikes=5,
            model="black_scholes"
        )
    
    @pytest.fixture
    def sample_chain(self):
        """Standard at-the-money options chain for testing."""
        return {
            "underlying_price": 100.0,
            "timestamp": "2026-04-15T10:00:00Z",
            "expiries": ["2026-04-22", "2026-05-15", "2026-06-15"],
            "strikes": {
                "2026-04-22": [
                    {"strike": 95.0, "mid_price": 5.50, "type": "call"},
                    {"strike": 100.0, "mid_price": 2.10, "type": "call"},
                    {"strike": 105.0, "mid_price": 0.60, "type": "call"},
                ],
                # ... additional expirations
            }
        }
    
    def test_surface_output_structure(self, builder, sample_chain):
        """Surface output contains required fields."""
        result = builder.compute_iv_surface(sample_chain)
        
        assert "surface" in result
        assert "spot" in result
        assert "timestamp" in result
        assert result["spot"] == 100.0
    
    def test_iv_within_reasonable_bounds(self, builder, sample_chain):
        """Computed IV should be between 1% and 200%."""
        result = builder.compute_iv_surface(sample_chain)
        
        for tenor, moneyness_grid in result["surface"].items():
            for moneyness, iv in moneyness_grid.items():
                assert 0.01 < iv < 2.0, f"IV {iv} out of bounds at tenor={tenor}, moneyness={moneyness}"
    
    def test_call_parity(self, builder):
        """Synthetic put IV should match call IV at same moneyness (put-call parity)."""
        # Construct a chain where put-call parity should hold exactly
        # (ignoring early exercise for short expirations)
        # Verify computed IVs match within tolerance
        pass
    
    def test_edge_case_zero_vega(self, builder):
        """Very short-dated options with tiny vega should not crash."""
        short_chain = {
            "underlying_price": 100.0,
            "timestamp": "2026-04-15T10:00:00Z",
            "expiries": ["2026-04-16"],
            "strikes": {
                "2026-04-16": [
                    {"strike": 100.0, "mid_price": 0.01, "type": "call"},
                ]
            }
        }
        
        result = builder.compute_iv_surface(short_chain)
        assert not np.isnan(result["surface"].get("1W", {}).get(0.0, np.nan))

5.3 CI/CD Pipeline

# .github/workflows/skill-ci.yml
name: SKILL CI/CD Pipeline

on:
  push:
    branches: [main]
  pull_request:
    branches: [main]

jobs:
  test:
    runs-on: ubuntu-latest
    steps:
      - uses: actions/checkout@v4
      
      - name: Set up Python
        uses: actions/setup-python@v5
        with:
          python-version: "3.11"
      
      - name: Install dependencies
        run: |
          pip install tickdb-skill-sdk pytest pytest-cov black flake8
      
      - name: Lint
        run: |
          black --check src/ tests/
          flake8 src/ --max-line-length=100 --ignore=E501,W503
      
      - name: Unit tests
        run: |
          pytest tests/ --cov=src --cov-report=xml --cov-fail-under=80
      
      - name: Build SKILL package
        run: |
          tickdb-cli skill build --package-name volatility-surface-builder
      
      - name: Sign package
        run: |
          echo "${{ secrets.SKILL_SIGNING_KEY }}" > signing-key.pem
          tickdb-cli skill sign --package ./*.skill --signing-key signing-key.pem
      
      - name: Deploy to staging (on PR merge)
        if: github.ref == 'refs/heads/main'
        run: |
          tickdb-cli skill deploy \
            --package ./*.skill \
            --target staging \
            --runtime-url https://staging-api.tickdb.ai \
            --api-key ${{ secrets.TICKDB_STAGING_KEY }}

  security-scan:
    runs-on: ubuntu-latest
    steps:
      - uses: actions/checkout@v4
      
      - name: Run security scan
        uses: tickdb/skill-security-scan@v2
        with:
          package-path: "./volatility-surface-builder.skill"
          scan-level: enterprise

6. Enterprise Considerations and Migration Strategies

6.1 Moving from Ad-Hoc Adapters to SKILLs

If your team currently maintains external adapter services (as described in the opening scenario), the migration path is straightforward:

Phase Action Duration
1 Audit existing adapter logic; identify transformation rules 1–2 weeks
2 Write equivalent SKILL functions; wrap in skill.yaml 2–3 weeks
3 Run SKILL in shadow mode alongside existing adapter 1–2 weeks
4 Validate output parity (byte-level comparison of transformed data) 1 week
5 Cut over to SKILL; decommission adapter 1 day

6.2 SKILL Governance in Large Organizations

Enterprise teams with multiple SKILLs benefit from centralized governance:

Governance Concern Recommended Practice
SKILL ownership One team owns each SKILL; ownership recorded in manifest
Version management Semantic versioning (MAJOR.MINOR.PATCH); MAJOR changes require review
Dependency management SKILLs declare dependencies on specific TickDB API versions
Deprecation 90-day deprecation notice via manifest deprecation field
Rollback Last-3-versions rollback via tickdb-cli skill rollback

7. Closing

The SKILL protocol transforms TickDB from a data delivery platform into a programmable market data infrastructure. The three patterns covered here — custom normalization, compliance logging, and pre-trade risk — represent the most common enterprise use cases, but the architecture is deliberately general. Any transformation, enrichment, or gate that your business logic requires can be expressed as a SKILL.

The key architectural advantages this delivers in practice:

  • Single source of truth: Business logic lives alongside the data, not in a separate microservice that can drift out of sync.
  • Consistent operational model: SKILLs share the same deployment, monitoring, and security framework as TickDB itself.
  • Portable: A SKILL written for the cloud runtime runs unchanged in a private deployment (subject to resource configuration differences).

For teams currently maintaining adapter layers, migration to the SKILL model typically pays for itself within three months in reduced maintenance burden and incident resolution time.


Next Steps

If you're an individual quant developer exploring SKILLs for personal use, start with the free tier at tickdb.ai — you can deploy one SKILL with limited resources to experiment with the extension model.

If you're an enterprise team evaluating SKILLs for production use, schedule a technical architecture review with the TickDB enterprise team at enterprise@tickdb.ai. Bring your current adapter architecture diagrams — the team will map them to the equivalent SKILL topology.

If you're ready to build your first SKILL, install the SDK and run through the local development quickstart at docs.tickdb.ai/skill-sdk/quickstart. The first function takes under an hour to write and deploy.

If you use AI coding assistants, search for and install the tickdb-market-data SKILL in your AI tool's marketplace to get native TickDB function calling support.


This article does not constitute investment advice. Markets involve risk; past performance does not guarantee future results.