You are running 2,000 coroutines. Every 50 milliseconds, they hit an external API, collect market data, and aggregate results. At 2:47 AM, your system receives a shutdown signal. You have approximately 3 seconds before the container orchestrator kills your process. What happens to the data in flight? The connections still open? The files still being written?
If your answer is "I hope nothing breaks," you have a coroutine lifecycle problem.
Lifecycle management is the difference between a system that survives production and one that simply happens to work most of the time. In this article, we dissect the full arc: spawning hundreds of concurrent tasks, monitoring their health, handling cancellation requests, and shutting down gracefully — all with production-grade patterns you can deploy today.
The Problem with "Fire and Forget" Coroutines
The naive approach looks deceptively simple:
import asyncio
async def fetch_data(symbol: str):
await asyncio.sleep(0.1) # Simulate API call
return {"symbol": symbol, "price": 100.0}
async def main():
symbols = [f"STOCK_{i}" for i in range(2000)]
tasks = [asyncio.create_task(fetch_data(s)) for s in symbols]
results = await asyncio.gather(*tasks)
return results
asyncio.run(main())
This code works. It will even work reliably under normal conditions. But it has three failure modes that matter in production:
Failure Mode 1: No cancellation propagation. If main() is interrupted, the remaining tasks run to completion or timeout. With 2,000 tasks and a 30-second API timeout, a shutdown signal can leave your process alive for 15 minutes.
Failure Mode 2: No health monitoring. You have no visibility into which tasks are still running, which have failed, or which are stuck in a retry loop. A single task that hangs indefinitely is invisible.
Failure Mode 3: No resource cleanup. Database connections, file handles, and WebSocket sessions opened by individual tasks will not close cleanly if the process terminates mid-execution.
The solution is not more code. The solution is structured lifecycle management — a discipline, not a library.
Spawning Tasks: Beyond create_task
The create_task Spectrum
asyncio.create_task() is the fundamental primitive, but it is only the start of a spectrum of spawning strategies.
| Strategy | Use case | Cancellation support | Exception propagation |
|---|---|---|---|
create_task(coro()) |
Fire-and-forget with optional awaiting | Manual | Via result retrieval |
asyncio.gather(*tasks) |
Parallel execution of known tasks | All-or-nothing with return_exceptions=True |
Collects all exceptions |
TaskGroup (3.11+) |
Structured concurrency with automatic cleanup | Propagates to all children | Stops on first exception |
asyncio.TaskGroup with semaphore |
Bounded concurrency | Semaphore-enforced | Graceful backpressure |
For lifecycle management purposes, we care about two things: whether tasks can be cancelled collectively, and whether exceptions in one task can bring down others.
Structured Concurrency with TaskGroup (Python 3.11+)
TaskGroup is the modern answer to the cancellation problem. It provides structured concurrency — a guarantee that all child tasks are awaited before the group exits, and that a cancellation signal propagates to all children automatically.
import asyncio
async def fetch_with_timeout(symbol: str, timeout: float = 5.0):
"""Fetch data with individual timeout protection."""
try:
async with asyncio.timeout(timeout):
await asyncio.sleep(0.1 * (hash(symbol) % 10) / 10) # Variable delay
return {"symbol": symbol, "status": "success"}
except asyncio.TimeoutError:
return {"symbol": symbol, "status": "timeout"}
async def batch_fetch(symbols: list[str], concurrency: int = 100):
"""Fetch data with bounded concurrency using TaskGroup."""
results = []
semaphore = asyncio.Semaphore(concurrency)
async def bounded_fetch(symbol: str):
async with semaphore:
return await fetch_with_timeout(symbol)
async with asyncio.TaskGroup() as tg:
tasks = [tg.create_task(bounded_fetch(s)) for s in symbols]
# All tasks have completed — collect results
for task in tasks:
results.append(task.result())
return results
# Usage
asyncio.run(batch_fetch([f"STOCK_{i}" for i in range(2000)]))
The TaskGroup guarantees that even if one task times out or raises an exception, all tasks are given a chance to complete or clean up before the context manager exits. This is structured concurrency's core promise.
Implementing a Custom Task Manager
For systems that need more granular control — real-time health dashboards, dynamic task prioritization, or custom cancellation policies — you need a task manager that tracks state.
import asyncio
from dataclasses import dataclass, field
from enum import Enum, auto
from typing import Optional
import time
class TaskState(Enum):
PENDING = auto()
RUNNING = auto()
COMPLETED = auto()
FAILED = auto()
CANCELLED = auto()
TIMED_OUT = auto()
@dataclass
class TrackedTask:
task_id: str
coro: asyncio.Task
state: TaskState = TaskState.PENDING
created_at: float = field(default_factory=time.time)
completed_at: Optional[float] = None
result: Optional[any] = None
error: Optional[Exception] = None
class TaskManager:
"""Manages a pool of tasks with lifecycle visibility."""
def __init__(self, max_concurrency: int = 100):
self.semaphore = asyncio.Semaphore(max_concurrency)
self.tasks: dict[str, TrackedTask] = {}
self._lock = asyncio.Lock()
self._shutdown_event = asyncio.Event()
async def spawn(
self,
task_id: str,
coro,
timeout: Optional[float] = None
) -> asyncio.Task:
"""Spawn a task with tracking and optional timeout."""
async with self._lock:
if task_id in self.tasks:
raise ValueError(f"Task {task_id} already exists")
async def tracked_coro():
self._update_state(task_id, TaskState.RUNNING)
try:
async with self.semaphore:
if timeout:
result = await asyncio.wait_for(coro, timeout=timeout)
else:
result = await coro
self._update_state(task_id, TaskState.COMPLETED)
self._update_result(task_id, result)
return result
except asyncio.CancelledError:
self._update_state(task_id, TaskState.CANCELLED)
raise
except asyncio.TimeoutError:
self._update_state(task_id, TaskState.TIMED_OUT)
return None
except Exception as e:
self._update_state(task_id, TaskState.FAILED)
self._update_error(task_id, e)
raise
task = asyncio.create_task(tracked_coro())
tracked = TrackedTask(
task_id=task_id,
coro=task,
state=TaskState.PENDING,
created_at=time.time()
)
async with self._lock:
self.tasks[task_id] = tracked
return task
def _update_state(self, task_id: str, state: TaskState):
if task_id in self.tasks:
self.tasks[task_id].state = state
def _update_result(self, task_id: str, result: any):
if task_id in self.tasks:
self.tasks[task_id].result = result
self.tasks[task_id].completed_at = time.time()
def _update_error(self, task_id: str, error: Exception):
if task_id in self.tasks:
self.tasks[task_id].error = error
self.tasks[task_id].completed_at = time.time()
async def cancel_all(self, timeout: float = 5.0):
"""Cancel all running tasks with a timeout."""
async with self._lock:
running_tasks = [
t.coro for t in self.tasks.values()
if t.state == TaskState.RUNNING
]
if not running_tasks:
return
# Schedule cancellation for all running tasks
for task in running_tasks:
task.cancel()
# Wait for all to acknowledge cancellation
await asyncio.wait(
running_tasks,
timeout=timeout,
return_when=asyncio.ALL_COMPLETED
)
def get_stats(self) -> dict:
"""Return aggregate statistics."""
states = [t.state for t in self.tasks.values()]
return {
"total": len(self.tasks),
"pending": states.count(TaskState.PENDING),
"running": states.count(TaskState.RUNNING),
"completed": states.count(TaskState.COMPLETED),
"failed": states.count(TaskState.FAILED),
"cancelled": states.count(TaskState.CANCELLED),
"timed_out": states.count(TaskState.TIMED_OUT),
}
This task manager gives you:
- State tracking for every task, updated in real-time
- Semaphore-based concurrency control to prevent resource exhaustion
- Graceful cancellation with a configurable timeout
Signal Handling: The Missing Piece
The scenario at the top of this article — 2,000 tasks, 3-second shutdown window — is a signal handling problem. In Unix-like systems, your process receives SIGTERM when the orchestrator wants it to stop. You have a limited window to clean up.
import asyncio
import signal
from typing import Optional
class GracefulShutdown:
"""Handles SIGTERM/SIGINT with structured shutdown."""
def __init__(self, task_manager: TaskManager, shutdown_timeout: float = 5.0):
self.task_manager = task_manager
self.shutdown_timeout = shutdown_timeout
self._shutdown_event = asyncio.Event()
self._original_sigterm: Optional[signal.Handler] = None
self._original_sigint: Optional[signal.Handler] = None
def start(self):
"""Register signal handlers."""
loop = asyncio.get_running_loop()
self._original_sigterm = signal.signal(
signal.SIGTERM,
lambda s, f: asyncio.create_task(self.trigger_shutdown())
)
self._original_sigint = signal.signal(
signal.SIGINT,
lambda s, f: asyncio.create_task(self.trigger_shutdown())
)
async def trigger_shutdown(self):
"""Initiate the shutdown sequence."""
if self._shutdown_event.is_set():
return # Already shutting down
print("Shutdown signal received — initiating graceful shutdown...")
self._shutdown_event.set()
# Step 1: Stop accepting new tasks
# (TaskManager will reject new spawn calls)
# Step 2: Cancel all running tasks
await self.task_manager.cancel_all(timeout=self.shutdown_timeout * 0.7)
# Step 3: Wait for cleanup operations (flush buffers, close connections)
await asyncio.sleep(self.shutdown_timeout * 0.2)
# Step 4: Force terminate any stubborn tasks
# (This is a last resort — in practice, 5 seconds should be enough)
stats = self.task_manager.get_stats()
if stats["running"] > 0:
print(f"WARNING: {stats['running']} tasks still running — forcing exit")
print("Shutdown complete.")
@property
def shutdown_requested(self) -> bool:
return self._shutdown_event.is_set()
async def wait_for_shutdown(self):
"""Block until a shutdown signal is received."""
await self._shutdown_event.wait()
# Usage in your main function
async def main():
task_manager = TaskManager(max_concurrency=100)
shutdown_handler = GracefulShutdown(task_manager, shutdown_timeout=5.0)
shutdown_handler.start()
# Your main loop
await run_market_data_pipeline(task_manager, shutdown_handler)
# Or, run until shutdown is requested
# await shutdown_handler.wait_for_shutdown()
async def run_market_data_pipeline(
task_manager: TaskManager,
shutdown_handler: GracefulShutdown
):
"""Example: running a pipeline with shutdown support."""
symbols = [f"STOCK_{i}" for i in range(2000)]
# Spawn tasks with individual timeouts
for symbol in symbols:
if shutdown_handler.shutdown_requested:
break
await task_manager.spawn(
task_id=symbol,
coro=fetch_market_data(symbol),
timeout=30.0
)
# Monitor and aggregate
while not shutdown_handler.shutdown_requested:
stats = task_manager.get_stats()
print(f"Progress: {stats['completed']}/{stats['total']} completed, "
f"{stats['running']} running, {stats['failed']} failed")
# Check if we're done
if stats["pending"] == 0 and stats["running"] == 0:
break
await asyncio.sleep(1)
The SIGTERM vs SIGINT Distinction
In containerized environments, SIGTERM is the standard termination signal. Your orchestrator (Kubernetes, Docker Compose) sends SIGTERM first, then waits for a grace period (default: 30 seconds in Kubernetes), then sends SIGKILL if the process hasn't exited.
Design your shutdown handler for SIGTERM. SIGINT (Ctrl+C) is a human-initiated interrupt — it should trigger the same cleanup path, but the human expects it to be instantaneous. For human-initiated interrupts, reduce the shutdown timeout to 2 seconds.
Real-World Pattern: Bounded Task Queue with Backpressure
A common production scenario is a task queue that processes items at a bounded rate. When the queue is full, you need backpressure — not rejection, not unbounded growth.
import asyncio
from collections import deque
from dataclasses import dataclass
from typing import Callable, Awaitable
@dataclass
class WorkItem:
item_id: str
payload: any
enqueued_at: float
@property
def age(self) -> float:
return asyncio.get_event_loop().time() - self.enqueued_at
class BoundedTaskQueue:
"""A bounded queue with backpressure and priority support."""
def __init__(
self,
max_size: int = 10_000,
max_concurrent_tasks: int = 200,
task_timeout: float = 30.0,
shutdown_handler: Optional[GracefulShutdown] = None
):
self.queue: asyncio.PriorityQueue = asyncio.PriorityQueue(maxsize=max_size)
self.max_concurrent = max_concurrent_tasks
self.task_timeout = task_timeout
self.shutdown_handler = shutdown_handler
self.semaphore = asyncio.Semaphore(max_concurrent_tasks)
self.active_tasks: set[asyncio.Task] = set()
self._lock = asyncio.Lock()
self._worker_task: Optional[asyncio.Task] = None
async def enqueue(self, item: WorkItem, priority: int = 5):
"""Enqueue an item with priority (lower = higher priority)."""
if self.shutdown_handler and self.shutdown_handler.shutdown_requested:
raise RuntimeError("Cannot enqueue — shutdown in progress")
await asyncio.wait_for(self.queue.put((priority, item)), timeout=5.0)
async def start(self):
"""Start the consumer worker loop."""
self._worker_task = asyncio.create_task(self._consume_loop())
async def stop(self, timeout: float = 10.0):
"""Stop the queue gracefully."""
# Stop accepting new items
await self.queue.join()
# Cancel the worker
if self._worker_task:
self._worker_task.cancel()
try:
await asyncio.wait_for(self._worker_task, timeout=timeout)
except (asyncio.CancelledError, asyncio.TimeoutError):
pass
async def _consume_loop(self):
"""Main consumer loop."""
while True:
try:
# Wait for an item with timeout to check for shutdown
priority, item = await asyncio.wait_for(
self.queue.get(),
timeout=1.0
)
except asyncio.TimeoutError:
if self.shutdown_handler and self.shutdown_handler.shutdown_requested:
break
continue
# Check if we should process this item (age threshold)
# Older items get processed even under backpressure
if item.age > 60.0 or not self.queue.full():
asyncio.create_task(self._process_item(item))
else:
# Re-queue with higher priority (move to front conceptually)
await self.queue.put((max(1, priority - 1), item))
self.queue.task_done()
async def _process_item(self, item: WorkItem):
"""Process a single work item."""
async with self.semaphore:
try:
result = await asyncio.wait_for(
self._process_fn(item),
timeout=self.task_timeout
)
return result
except asyncio.TimeoutError:
print(f"Task {item.item_id} timed out after {self.task_timeout}s")
except Exception as e:
print(f"Task {item.item_id} failed: {e}")
finally:
await self.queue.join() # Ensure task_done is matched
def _process_fn(self, item: WorkItem) -> Awaitable:
"""Override this to define actual processing logic."""
raise NotImplementedError
@property
def stats(self) -> dict:
return {
"queue_size": self.queue.qsize(),
"max_size": self.queue.maxsize,
"concurrent_tasks": self.max_concurrent - self.semaphore._value,
}
This bounded queue pattern gives you:
- Backpressure: When the queue is full, producers slow down naturally.
- Priority aging: Old items are promoted to prevent starvation.
- Graceful shutdown: No items are lost during shutdown — everything in the queue is processed.
Monitoring: The Observability Layer
Lifecycle management without observability is guesswork. You need to answer three questions in real-time:
- How many tasks are running right now?
- Which tasks have failed, and why?
- Is the system making progress or stalling?
import asyncio
import time
from dataclasses import dataclass
from typing import Optional
@dataclass
class LifecycleMetrics:
started_at: float
total_spawned: int = 0
total_completed: int = 0
total_failed: int = 0
total_cancelled: int = 0
total_timed_out: int = 0
active_count: int = 0
error_rate: float = 0.0
throughput: float = 0.0 # completions per second
def update_rates(self):
elapsed = time.time() - self.started_at
self.throughput = self.total_completed / max(elapsed, 1)
total_finished = self.total_completed + self.total_failed
self.error_rate = self.total_failed / max(total_finished, 1)
class LifecycleMonitor:
"""Real-time monitoring for task lifecycle."""
def __init__(self, sample_interval: float = 5.0):
self.sample_interval = sample_interval
self.metrics = LifecycleMetrics(started_at=time.time())
self._monitor_task: Optional[asyncio.Task] = None
self._task_manager: Optional[TaskManager] = None
def attach(self, task_manager: TaskManager):
"""Attach to a TaskManager for live monitoring."""
self._task_manager = task_manager
async def start(self):
"""Start the monitoring loop."""
self._monitor_task = asyncio.create_task(self._monitor_loop())
async def stop(self):
"""Stop the monitoring loop."""
if self._monitor_task:
self._monitor_task.cancel()
try:
await self._monitor_task
except asyncio.CancelledError:
pass
async def _monitor_loop(self):
"""Collect metrics at regular intervals."""
while True:
await asyncio.sleep(self.sample_interval)
if self._task_manager:
stats = self._task_manager.get_stats()
self.metrics.total_spawned = stats["total"]
self.metrics.total_completed = stats["completed"]
self.metrics.total_failed = stats["failed"]
self.metrics.total_cancelled = stats["cancelled"]
self.metrics.active_count = stats["running"]
self.metrics.update_rates()
self._log_metrics(stats)
def _log_metrics(self, stats: dict):
"""Log current metrics."""
print(
f"[{time.strftime('%H:%M:%S')}] "
f"Tasks: {stats['total']} total | "
f"{stats['completed']} done, {stats['running']} active, "
f"{stats['failed']} failed | "
f"Throughput: {self.metrics.throughput:.2f}/s | "
f"Error rate: {self.metrics.error_rate:.1%}"
)
def get_metrics(self) -> LifecycleMetrics:
return self.metrics
With this monitoring layer, you can build dashboards, alert on stalled progress (no completions in 60 seconds), and generate post-mortem reports from the metrics captured during execution.
Cancellation: The Fine-Grained Approach
Sometimes you need to cancel specific tasks rather than all tasks. Perhaps a user cancelled a specific order, or a data source became stale, or you hit a rate limit and need to pause a subset of workers.
import asyncio
from typing import Optional
class CancellableTaskRegistry:
"""Registry for individually cancellable tasks."""
def __init__(self):
self._tasks: dict[str, asyncio.Task] = {}
self._lock = asyncio.Lock()
async def register(self, task_id: str, coro) -> asyncio.Task:
"""Register a task with an ID."""
async with self._lock:
if task_id in self._tasks:
raise ValueError(f"Task {task_id} already exists")
task = asyncio.create_task(coro)
self._tasks[task_id] = task
return task
async def cancel(self, task_id: str, timeout: float = 2.0) -> bool:
"""Cancel a specific task by ID."""
async with self._lock:
if task_id not in self._tasks:
return False
task = self._tasks.pop(task_id)
task.cancel()
try:
await asyncio.wait_for(task, timeout=timeout)
except (asyncio.CancelledError, asyncio.TimeoutError):
pass # Expected — task was cancelled
return True
async def cancel_where(self, predicate: callable) -> int:
"""Cancel all tasks where predicate(task_id) is True."""
async with self._lock:
to_cancel = [
(task_id, self._tasks.pop(task_id))
for task_id in list(self._tasks.keys())
if predicate(task_id)
]
count = 0
for task_id, task in to_cancel:
task.cancel()
count += 1
return count
async def wait_any(self, task_ids: list[str]) -> tuple[str, any]:
"""Wait for any of the specified tasks to complete."""
if not task_ids:
raise ValueError("No task IDs provided")
async with self._lock:
tasks = {
tid: self._tasks[tid]
for tid in task_ids
if tid in self._tasks
}
if not tasks:
raise ValueError("None of the specified task IDs are registered")
done, pending = await asyncio.wait(
tasks.values(),
return_when=asyncio.FIRST_COMPLETED
)
# Cancel all pending
for task in pending:
task.cancel()
# Return the completed task's ID and result
completed_task = next(iter(done))
completed_id = next(tid for tid, t in tasks.items() if t == completed_task)
try:
result = completed_task.result()
except Exception as e:
result = e
return completed_id, result
The cancel_where method is particularly useful for mass cancellations — for example, cancelling all tasks related to a specific market when that market halts trading.
Putting It All Together: A Production-Ready Template
Here is a complete, production-ready template that ties together all the patterns discussed in this article:
import asyncio
import signal
import time
from typing import Optional
# (Assume the classes from previous sections are defined here)
# from task_manager import TaskManager, TrackedTask
# from graceful_shutdown import GracefulShutdown
# from lifecycle_monitor import LifecycleMonitor
class MarketDataPipeline:
"""Production-ready template for async task lifecycle management."""
def __init__(
self,
max_concurrency: int = 200,
task_timeout: float = 30.0,
shutdown_timeout: float = 10.0
):
self.max_concurrency = max_concurrency
self.task_timeout = task_timeout
self.shutdown_timeout = shutdown_timeout
self.task_manager = TaskManager(max_concurrency=max_concurrency)
self.shutdown_handler = GracefulShutdown(
self.task_manager,
shutdown_timeout=shutdown_timeout
)
self.monitor = LifecycleMonitor(sample_interval=5.0)
self._running = False
async def start(self):
"""Start the pipeline."""
self.shutdown_handler.start()
self.monitor.attach(self.task_manager)
await self.monitor.start()
self._running = True
async def stop(self):
"""Stop the pipeline gracefully."""
print("Pipeline stopping...")
self._running = False
await self.monitor.stop()
await self.shutdown_handler.trigger_shutdown()
print(f"Final stats: {self.task_manager.get_stats()}")
async def process_symbols(self, symbols: list[str]):
"""Process a list of symbols with full lifecycle management."""
for symbol in symbols:
if not self._running:
break
if self.shutdown_handler.shutdown_requested:
break
try:
await self.task_manager.spawn(
task_id=symbol,
coro=self._fetch_and_process(symbol),
timeout=self.task_timeout
)
except Exception as e:
print(f"Failed to spawn task for {symbol}: {e}")
# Wait for remaining tasks to complete
while True:
stats = self.task_manager.get_stats()
if stats["running"] == 0 and stats["pending"] == 0:
break
if self.shutdown_handler.shutdown_requested:
await self.task_manager.cancel_all(timeout=5.0)
break
await asyncio.sleep(1)
async def _fetch_and_process(self, symbol: str):
"""Example work function — replace with actual implementation."""
await asyncio.sleep(0.1) # Simulate API call
return {"symbol": symbol, "processed": True}
async def main():
pipeline = MarketDataPipeline(
max_concurrency=100,
task_timeout=30.0,
shutdown_timeout=10.0
)
try:
await pipeline.start()
await pipeline.process_symbols([f"STOCK_{i}" for i in range(2000)])
except KeyboardInterrupt:
print("Interrupted by user")
finally:
await pipeline.stop()
if __name__ == "__main__":
asyncio.run(main())
Key Takeaways
| Principle | Implementation | Why it matters |
|---|---|---|
| Structured concurrency | TaskGroup (3.11+) or custom manager |
Guarantees cleanup on exit |
| Bounded concurrency | Semaphore-based throttling | Prevents resource exhaustion |
| Graceful shutdown | SIGTERM handler with timeout |
Survives container orchestrator kills |
| Individual cancellation | Registry with cancel-by-predicate | Handles stale data sources and user cancellations |
| Observability | Real-time metrics collection | Debugging and alerting in production |
| Timeout per task | asyncio.wait_for or asyncio.timeout |
Prevents indefinite hangs |
The difference between a system that "usually works" and one that survives production is not more features. It is the discipline to handle the lifecycle of every task from spawn to shutdown — including the failures, the timeouts, and the signals you cannot control.
Next Steps
If you are building a data pipeline with asyncio, start with the TaskManager and GracefulShutdown patterns. They give you the foundation you need before you add complexity.
If you are running hundreds of concurrent tasks, implement the LifecycleMonitor before you need it. By the time you realize you need observability, you will already be flying blind.
If you are on Python 3.11+, prefer TaskGroup over asyncio.gather for new code. It is a clearer signal of intent and provides automatic cleanup.
This article does not constitute investment advice. Markets involve risk; past performance does not guarantee future results.