When a market data feed delivers 50,000 price updates per second across 500 symbols, the difference between a well-designed asyncio application and a brittle one reveals itself in milliseconds. Coroutines that fail to shut down cleanly leak memory. Tasks spawned without cancellation boundaries pile up until the event loop grinds to a halt. The 500 concurrent WebSocket connections you tested with mock data collapse in production when the remote server disconnects and your reconnect logic triggers a thundering herd of retries.
This is not a tutorial on what coroutines are. It is a field guide for engineers who need to launch, monitor, and cancel hundreds or thousands of concurrent tasks with production-grade reliability. We will dissect three interlocking mechanisms — asyncio.gather, TaskGroup, and signal handling — and show how they compose into a coherent lifecycle management system.
The Lifecycle Management Problem
Every concurrent system faces four fundamental challenges:
- Launch: Spawning tasks without blocking the event loop
- Observation: Tracking task state without polling overhead
- Cancellation: Terminating tasks cleanly, releasing resources
- Coordination: Waiting for groups of tasks to complete or fail together
Python's asyncio provides multiple primitives that address these challenges, but they have different trade-offs, and mixing them incorrectly produces subtle bugs that survive unit tests and emerge only under load.
asyncio.gather: Launching and Waiting for Task Groups
asyncio.gather is the workhorse for launching multiple coroutines concurrently and waiting for all of them to complete. It accepts an iterable of awaitables and returns their results in order.
import asyncio
async def fetch_ticker(symbol: str) -> dict:
await asyncio.sleep(0.1) # Simulate network I/O
return {"symbol": symbol, "price": 150.0}
async def fetch_all(symbols: list[str]) -> list[dict]:
results = await asyncio.gather(
*[fetch_ticker(s) for s in symbols]
)
return results
# Usage
symbols = ["AAPL", "GOOGL", "MSFT", "NVDA", "TSLA"]
prices = await fetch_all(symbols)
print(prices)
Controlling Return Behavior with return_exceptions
By default, gather cancels all remaining tasks when one raises an exception. For resilient systems, pass return_exceptions=True to collect results and errors without short-circuiting:
async def robust_fetch_all(symbols: list[str]) -> dict[str, float | Exception]:
tasks = [fetch_ticker(s) for s in symbols]
results = await asyncio.gather(*tasks, return_exceptions=True)
return dict(zip(symbols, results))
This pattern is essential when consuming real-time market data feeds — you cannot afford to lose visibility into all symbols because one WebSocket connection drops.
Limiting Concurrency with Semaphores
Launching 10,000 tasks simultaneously triggers connection pool exhaustion and rate limit violations. Use asyncio.Semaphore to cap concurrent executions:
import asyncio
async def fetch_with_limit(
symbols: list[str],
concurrency: int = 50
) -> list[dict]:
semaphore = asyncio.Semaphore(concurrency)
async def throttled_fetch(symbol: str) -> dict:
async with semaphore:
return await fetch_ticker(symbol)
return await asyncio.gather(*[throttled_fetch(s) for s in symbols])
A semaphore value of 50 means at most 50 coroutines hold the semaphore at once. The remaining tasks queue internally, releasing the event loop between acquisitions. This is dramatically more efficient than spawning 50 threads or processes.
Gathering with Timeout
Attach a timeout to prevent a single stalled task from blocking the entire group:
async def fetch_with_timeout(
symbols: list[str],
timeout: float = 5.0
) -> list[dict]:
async def timed_fetch(symbol: str) -> dict:
return await asyncio.wait_for(fetch_ticker(symbol), timeout=timeout)
try:
return await asyncio.gather(*[timed_fetch(s) for s in symbols])
except asyncio.TimeoutError:
# Log which symbols timed out
raise RuntimeError(f"Timeout exceeded after {timeout}s") from None
TaskGroup: Structured Concurrency with Guaranteed Cleanup
Introduced in Python 3.11, asyncio.TaskGroup provides structured concurrency — a paradigm where child tasks are explicitly scoped to a parent context and are automatically cancelled when the scope exits, even due to an exception.
The Fundamental Difference from gather
# gather: explicit result collection
async def gather_pattern():
try:
results = await asyncio.gather(task1(), task2())
except Exception:
# task1 and task2 continue running unless explicitly cancelled
pass
# TaskGroup: automatic cancellation on scope exit
async def taskgroup_pattern():
async with asyncio.TaskGroup() as tg:
tg.create_task(task1()) # starts immediately
tg.create_task(task2())
# Both tasks are cancelled if we exit this block for any reason
# We never reach here unless both tasks completed
This distinction matters profoundly for long-running services. With gather, a timeout exception leaves orphaned tasks running in the background. With TaskGroup, the context manager ensures all child tasks receive cancellation before the block exits.
Creating Tasks Inside a TaskGroup
import asyncio
async def stream_depth_updates(symbol: str, taskgroup: asyncio.TaskGroup):
"""Creates a task that will be cancelled if the TaskGroup exits."""
task = taskgroup.create_task(process_depth_feed(symbol))
return task
async def consume_multiple_feeds(symbols: list[str]):
async with asyncio.TaskGroup() as tg:
tasks = {
symbol: tg.create_task(process_depth_feed(symbol))
for symbol in symbols
}
# All tasks have completed or been cancelled at this point
# Safe to inspect results or clean up resources
for symbol, task in tasks.items():
if task.cancelled():
print(f"Feed {symbol} was cancelled — connection closed")
elif exc := task.exception():
print(f"Feed {symbol} failed: {exc}")
The shield Primitive for Non-Cancellable Operations
Some operations — writing a checkpoint to disk, sending a final heartbeat, flushing a buffer — should not be cancelled even when the parent requests shutdown. asyncio.shield protects a coroutine from cancellation:
async def graceful_shutdown(duration: float = 5.0):
"""Attempt to flush state within the shutdown window."""
try:
await asyncio.wait_for(flush_state_to_disk(), timeout=duration)
except asyncio.TimeoutError:
print("Flush timed out — proceeding with partial state")
except asyncio.CancelledError:
# Shield ensures flush_state_to_disk completes even during shutdown
await asyncio.shield(flush_state_to_disk())
raise # Re-raise CancelledError after shield completes
async def main():
try:
await run_application()
except asyncio.CancelledError:
await graceful_shutdown(duration=5.0)
Signal Handling: Graceful Shutdown in Production
Production asyncio services must respond to SIGTERM and SIGINT signals. The naive approach — catching KeyboardInterrupt in the main coroutine — fails because signals are delivered asynchronously to the event loop.
The asyncio.Event Shutdown Pattern
The canonical pattern uses asyncio.Event as a stop signal that all coroutines monitor:
import asyncio
import signal
import os
class MarketDataService:
def __init__(self, symbols: list[str]):
self.symbols = symbols
self.shutdown_event = asyncio.Event()
self.tasks: list[asyncio.Task] = []
async def start(self):
# Register signal handlers before any I/O begins
loop = asyncio.get_running_loop()
for sig in (signal.SIGTERM, signal.SIGINT):
loop.add_signal_handler(
sig,
lambda s=sig: asyncio.create_task(self._handle_signal(s))
)
# Launch workers
self.tasks = [
asyncio.create_task(self._depth_worker(symbol))
for symbol in self.symbols
]
# Wait for shutdown signal
await self.shutdown_event.wait()
print("Shutdown signal received — initiating graceful shutdown")
async def _handle_signal(self, sig: signal.Signum):
"""Called on signal receipt — triggers shutdown."""
sig_name = sig.name
print(f"Received {sig_name} — requesting shutdown")
self.shutdown_event.set()
async def _depth_worker(self, symbol: str):
"""Worker that monitors the shutdown event."""
try:
while not self.shutdown_event.is_set():
data = await self._fetch_depth(symbol)
await self._process(data)
await asyncio.sleep(0.05) # 20 Hz update rate
except asyncio.CancelledError:
print(f"Worker {symbol} cancelled — cleaning up")
raise # Re-raise to allow TaskGroup to track it
finally:
# Guaranteed cleanup: close connections, flush buffers
await self._cleanup(symbol)
async def _fetch_depth(self, symbol: str):
# Simulated API call — replace with real TickDB depth endpoint
return {"symbol": symbol, "bids": [], "asks": []}
async def _process(self, data: dict):
pass # Process depth update
async def _cleanup(self, symbol: str):
print(f"Cleaning up resources for {symbol}")
async def main():
service = MarketDataService(["AAPL", "GOOGL", "NVDA"])
try:
await service.start()
except asyncio.CancelledError:
print("Application cancelled")
finally:
# Wait for all tasks to finish cleanup
if service.tasks:
await asyncio.gather(*service.tasks, return_exceptions=True)
print("Shutdown complete")
if __name__ == "__main__":
asyncio.run(main())
The TaskGroup + Signal Combination
For services where all workers belong to a single scope, combine TaskGroup with the shutdown event:
async def run_with_structured_shutdown(symbols: list[str]):
shutdown = asyncio.Event()
errors: list[Exception] = []
def signal_handler():
shutdown.set()
loop = asyncio.get_running_loop()
for sig in (signal.SIGTERM, signal.SIGINT):
loop.add_signal_handler(sig, signal_handler)
async with asyncio.TaskGroup() as tg:
async def watched_worker(symbol: str):
try:
while not shutdown.is_set():
await process_market_data(symbol)
await asyncio.sleep(1)
except asyncio.CancelledError:
raise
except Exception as e:
errors.append(e)
raise
for symbol in symbols:
tg.create_task(watched_worker(symbol))
# TaskGroup ensures all workers are cancelled when we exit the `async with` block
# Signal handler sets shutdown, breaking the while loop
# All tasks complete their finally blocks before reaching here
if errors:
print(f"Encountered {len(errors)} errors during shutdown")
Monitoring Task State Without Polling
Polling task.done() in a loop adds overhead and complexity. Instead, use asyncio.wait with a timeout to observe task state without busy-waiting:
async def monitor_tasks(tasks: list[asyncio.Task], interval: float = 5.0):
"""Periodically report task health without blocking."""
while tasks:
done, pending = await asyncio.wait(
tasks,
timeout=interval,
return_when=asyncio.FIRST_COMPLETED
)
for task in done:
if exc := task.exception():
print(f"Task failed with: {exc}")
# Remove completed tasks from monitoring
tasks = list(pending)
if pending:
print(f"Active tasks: {len(pending)}")
# Better: attach callbacks for state changes
def add_task_observers(tasks: list[asyncio.Task]):
for task in tasks:
task.add_done_callback(
lambda t: print(f"Task {id(t)} completed: {t.result() if not t.cancelled() else 'cancelled'}")
)
For production monitoring, integrate with structured logging:
import logging
logger = logging.getLogger(__name__)
async def monitored_depth_worker(symbol: str, shutdown: asyncio.Event):
task = asyncio.current_task()
try:
while not shutdown.is_set():
depth_data = await fetch_depth(symbol)
logger.debug("Depth update", extra={"symbol": symbol, "bids": len(depth_data.get("bids", []))})
await process(depth_data)
except asyncio.CancelledError:
logger.info("Worker cancelled", extra={"symbol": symbol})
raise
except Exception as e:
logger.error("Worker failed", extra={"symbol": symbol, "error": str(e)})
raise
finally:
logger.info("Worker cleanup complete", extra={"symbol": symbol})
Cancellation: The Complete Protocol
Task cancellation in asyncio is cooperative — the task must check for CancelledError and unwind. Forgetting to handle cancellation produces resource leaks:
async def bad_worker():
# WRONG: no cancellation handling — cleanup never runs
while True:
conn = await acquire_connection() # Connection never released
data = await conn.recv()
await process(data)
async def good_worker(shutdown: asyncio.Event):
# RIGHT: try/finally guarantees cleanup
conn = None
try:
while not shutdown.is_set():
conn = await acquire_connection()
data = await conn.recv()
await process(data)
except asyncio.CancelledError:
logger.info("Worker received cancellation")
raise
finally:
if conn:
await conn.close() # Guaranteed cleanup
logger.info("Connection closed")
Cancelling a Specific Task
To cancel a single task without affecting others:
task = asyncio.create_task(long_running_work())
# Give it 10 seconds to finish
try:
await asyncio.wait_for(task, timeout=10.0)
except asyncio.TimeoutError:
task.cancel() # Request cancellation
try:
await task # Wait for it to acknowledge cancellation
except asyncio.CancelledError:
print("Task was cancelled gracefully")
Cancelling All Pending Tasks
async def cancel_all_tasks(tasks: list[asyncio.Task]):
for task in tasks:
task.cancel()
# Wait for all cancellations to be acknowledged
await asyncio.gather(*tasks, return_exceptions=True)
Production Patterns: Real-Time Data Pipeline
The following pattern combines all techniques into a production-grade WebSocket consumer for market depth data:
import asyncio
import signal
import logging
from dataclasses import dataclass, field
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
@dataclass
class DepthSnapshot:
symbol: str
bids: list[tuple[float, float]] # (price, size)
asks: list[tuple[float, float]]
timestamp: float = field(default_factory=lambda: asyncio.get_event_loop().time())
class DepthPipeline:
def __init__(self, symbols: list[str], max_concurrent: int = 50):
self.symbols = symbols
self.max_concurrent = max_concurrent
self.shutdown = asyncio.Event()
self.tasks: list[asyncio.Task] = []
self.semaphore = asyncio.Semaphore(max_concurrent)
async def run(self):
loop = asyncio.get_running_loop()
for sig in (signal.SIGTERM, signal.SIGINT):
loop.add_signal_handler(sig, self._request_shutdown)
logger.info(f"Starting depth pipeline for {len(self.symbols)} symbols")
async with asyncio.TaskGroup() as tg:
for symbol in self.symbols:
tg.create_task(self._consume_depth(symbol))
logger.info("Pipeline shutdown complete")
def _request_shutdown(self):
logger.info("Shutdown requested")
self.shutdown.set()
async def _consume_depth(self, symbol: str):
async with self.semaphore: # Concurrency control
try:
await self._depth_loop(symbol)
except asyncio.CancelledError:
logger.info(f"Depth consumer cancelled: {symbol}")
raise
except Exception as e:
logger.error(f"Depth consumer error: {symbol} — {e}")
raise
async def _depth_loop(self, symbol: str):
while not self.shutdown.is_set():
try:
# Simulate WebSocket depth subscription
snapshot = await asyncio.wait_for(
self._fetch_depth(symbol),
timeout=30.0
)
await self._process_snapshot(snapshot)
except asyncio.TimeoutError:
logger.warning(f"Depth timeout for {symbol} — reconnecting")
await asyncio.sleep(1)
async def _fetch_depth(self, symbol: str) -> DepthSnapshot:
# Replace with real TickDB WebSocket depth subscription
await asyncio.sleep(0.05)
return DepthSnapshot(
symbol=symbol,
bids=[(150.0, 100), (149.99, 200)],
asks=[(150.01, 150), (150.02, 100)]
)
async def _process_snapshot(self, snapshot: DepthSnapshot):
# Calculate buy/sell pressure ratio
bid_volume = sum(size for _, size in snapshot.bids)
ask_volume = sum(size for _, size in snapshot.asks)
pressure_ratio = bid_volume / ask_volume if ask_volume > 0 else float('inf')
logger.debug(
f"{snapshot.symbol}: pressure={pressure_ratio:.2f}, "
f"bid_vol={bid_volume}, ask_vol={ask_volume}"
)
# Add your trading logic here
async def main():
symbols = [f"SYMBOL{i}" for i in range(100)] # 100 concurrent symbols
pipeline = DepthPipeline(symbols, max_concurrent=50)
await pipeline.run()
if __name__ == "__main__":
asyncio.run(main())
This pipeline demonstrates the complete lifecycle management system: signal handlers trigger shutdown, the shutdown event propagates cancellation to all workers, TaskGroup ensures all tasks complete their finally blocks, and the semaphore prevents connection pool exhaustion.
Common Pitfalls and How to Avoid Them
| Pitfall | Symptom | Fix |
|---|---|---|
Forgetting to handle CancelledError |
Resource leaks (unclosed connections, unflushed buffers) | Wrap all workers in try/finally |
| No concurrency limit | Connection pool exhaustion, rate limit violations | Always use asyncio.Semaphore |
Mixing gather and TaskGroup |
Inconsistent cancellation semantics | Use TaskGroup for scoped lifecycle; gather for fire-and-forget result collection |
Catching CancelledError without re-raising |
Silent cancellation — hard to debug | Either handle and log, or re-raise immediately |
Not setting a loop policy before add_signal_handler |
RuntimeError: set_wakeup_fd only works in main thread |
Ensure signal handlers are registered in the main thread before any coroutines run |
| Creating tasks outside a TaskGroup | Tasks outlive their scope | Keep all short-lived tasks within TaskGroup or track them explicitly |
Conclusion
Managing the lifecycle of thousands of coroutines is not about choosing the right primitive — it is about understanding how gather, TaskGroup, and signal handling compose into a coherent shutdown protocol. The rules are simple:
Launch with limits. Never spawn unlimited tasks. Semaphores enforce connection pool boundaries.
Structure with TaskGroup. Group related tasks into explicit scopes. Exit the scope to trigger automatic cancellation and cleanup.
Coordinate with events. Use asyncio.Event as the shutdown signal that propagates cancellation across all workers.
Handle cancellation explicitly. Every worker must catch CancelledError in a try/finally block. Graceful unwinding is not optional — it is the contract.
The market does not wait for your event loop to shut down cleanly. When the data feed disconnects, when the connection pool saturates, when the server sends a FIN packet — your asyncio application must respond within milliseconds, cancelling thousands of tasks without leaking memory or leaving connections dangling. The patterns in this article are the foundation for building that resilience.
This article provides technical guidance on asyncio programming patterns. Always test shutdown behavior under load before deploying to production.