Python’s async/await support has been production-ready since Python 3.7. Five years later, async code is common, but well-written async code is not. The patterns that look reasonable in tutorials often fall apart under production load. Here are the ones that actually work.
The Event Loop Is Not a Thread Pool
The first mistake most people make is treating async Python like it solves all concurrency problems. It does not.
async/await is cooperative multitasking. Your code yields control at await points. If any await call blocks the event loop - a blocking database driver, a time.sleep, a CPU-heavy computation - everything else waits.
# BAD: blocks the event loop for all requests
async def get_user(user_id: int):
time.sleep(2) # This blocks EVERYTHING
return db.query(user_id) # Synchronous driver
# GOOD: use async-native libraries
async def get_user(user_id: int):
await asyncio.sleep(2) # Yields control
return await db.query(user_id) # async-native driver
Check every library you use for async support. SQLAlchemy 2.0 has async support. Psycopg3 is async-native. Aiohttp and httpx are async HTTP clients. The old synchronous versions will silently kill your concurrency.
TaskGroup for Concurrent Operations
The most useful addition in Python 3.11 is asyncio.TaskGroup. Before it, managing concurrent tasks and handling their errors was awkward.
# Before TaskGroup: hard to handle cancellation correctly
tasks = [asyncio.create_task(fetch(url)) for url in urls]
results = await asyncio.gather(*tasks, return_exceptions=True)
# With TaskGroup: cancels remaining tasks if one fails
async with asyncio.TaskGroup() as tg:
tasks = [tg.create_task(fetch(url)) for url in urls]
results = [t.result() for t in tasks]
TaskGroup cancels all remaining tasks if any task raises an exception. gather with return_exceptions=True lets errors silently pass through. For most production use cases, failing fast and cancelling is the right behavior.
Semaphores for Concurrency Control
Launching 10,000 concurrent HTTP requests sounds great until you hit rate limits or exhaust file descriptors. Semaphores let you bound concurrency.
async def fetch_all(urls: list[str]) -> list[str]:
semaphore = asyncio.Semaphore(50) # Max 50 concurrent
async def fetch_one(url: str) -> str:
async with semaphore:
async with httpx.AsyncClient() as client:
resp = await client.get(url, timeout=10)
return resp.text
async with asyncio.TaskGroup() as tg:
tasks = [tg.create_task(fetch_one(url)) for url in urls]
return [t.result() for t in tasks]
Tune the semaphore limit based on your downstream service’s rate limits and your available connections.
Connection Pools, Not Client Per Request
Creating a new HTTP client or database connection per request is expensive. Use pools.
# BAD: new connection per call
async def get_data(key: str):
async with httpx.AsyncClient() as client:
return await client.get(f"https://api.example.com/{key}")
# GOOD: shared client with connection pool
_client: httpx.AsyncClient | None = None
async def get_client() -> httpx.AsyncClient:
global _client
if _client is None:
_client = httpx.AsyncClient(
limits=httpx.Limits(max_connections=100, max_keepalive_connections=20)
)
return _client
async def get_data(key: str):
client = await get_client()
return await client.get(f"https://api.example.com/{key}")
For databases, asyncpg has its own pool:
pool = await asyncpg.create_pool(dsn=DATABASE_URL, min_size=5, max_size=20)
async with pool.acquire() as conn:
row = await conn.fetchrow("SELECT * FROM users WHERE id = $1", user_id)
Timeouts on Everything
Network calls that never time out will eventually hang your service. Wrap every external call with a timeout.
async def fetch_with_timeout(url: str, timeout_seconds: float = 5.0) -> str:
try:
async with asyncio.timeout(timeout_seconds):
async with httpx.AsyncClient() as client:
resp = await client.get(url)
return resp.text
except TimeoutError:
raise ServiceUnavailableError(f"Timeout fetching {url}")
asyncio.timeout is available from Python 3.11. For earlier versions, asyncio.wait_for does the same thing.
Background Tasks Done Right
Running background work from a web handler is common. The wrong way crashes silently.
# BAD: fire and forget with no error handling
@app.post("/events")
async def track_event(event: Event):
asyncio.create_task(process_event(event)) # Errors disappear
return {"ok": True}
# GOOD: attach error handler
def handle_task_error(task: asyncio.Task):
if not task.cancelled() and task.exception():
logger.error("Background task failed", exc_info=task.exception())
@app.post("/events")
async def track_event(event: Event):
task = asyncio.create_task(process_event(event))
task.add_done_callback(handle_task_error)
return {"ok": True}
For anything that must not be lost, use a real queue (Celery, ARQ, Redis Streams) instead of background tasks. Background tasks die with the process.
The Libraries That Work in Production
| Use case | Sync | Async |
|---|---|---|
| HTTP client | requests | httpx |
| PostgreSQL | psycopg2 | asyncpg / psycopg3 |
| Redis | redis-py (sync) | redis-py (async) |
| ORM | SQLAlchemy 1.x | SQLAlchemy 2.0 |
| Web framework | Flask | FastAPI, Starlette |
Bottom Line
Async Python works well when you use async-native libraries, bound your concurrency with semaphores, share connection pools, and put timeouts on every external call. The mistakes - blocking the event loop with sync code, creating connections per request, losing errors in fire-and-forget tasks - all show up as mysterious slowdowns under load. Get these patterns right from the start and async Python will handle high concurrency efficiently.
Comments