Python’s async/await support has been production-ready since Python 3.7. Five years later, async code is common, but well-written async code is not. The patterns that look reasonable in tutorials often fall apart under production load. Here are the ones that actually work.

The Event Loop Is Not a Thread Pool

The first mistake most people make is treating async Python like it solves all concurrency problems. It does not.

async/await is cooperative multitasking. Your code yields control at await points. If any await call blocks the event loop - a blocking database driver, a time.sleep, a CPU-heavy computation - everything else waits.

# BAD: blocks the event loop for all requests
async def get_user(user_id: int):
    time.sleep(2)  # This blocks EVERYTHING
    return db.query(user_id)  # Synchronous driver

# GOOD: use async-native libraries
async def get_user(user_id: int):
    await asyncio.sleep(2)  # Yields control
    return await db.query(user_id)  # async-native driver

Check every library you use for async support. SQLAlchemy 2.0 has async support. Psycopg3 is async-native. Aiohttp and httpx are async HTTP clients. The old synchronous versions will silently kill your concurrency.

TaskGroup for Concurrent Operations

The most useful addition in Python 3.11 is asyncio.TaskGroup. Before it, managing concurrent tasks and handling their errors was awkward.

# Before TaskGroup: hard to handle cancellation correctly
tasks = [asyncio.create_task(fetch(url)) for url in urls]
results = await asyncio.gather(*tasks, return_exceptions=True)

# With TaskGroup: cancels remaining tasks if one fails
async with asyncio.TaskGroup() as tg:
    tasks = [tg.create_task(fetch(url)) for url in urls]

results = [t.result() for t in tasks]

TaskGroup cancels all remaining tasks if any task raises an exception. gather with return_exceptions=True lets errors silently pass through. For most production use cases, failing fast and cancelling is the right behavior.

Semaphores for Concurrency Control

Launching 10,000 concurrent HTTP requests sounds great until you hit rate limits or exhaust file descriptors. Semaphores let you bound concurrency.

async def fetch_all(urls: list[str]) -> list[str]:
    semaphore = asyncio.Semaphore(50)  # Max 50 concurrent

    async def fetch_one(url: str) -> str:
        async with semaphore:
            async with httpx.AsyncClient() as client:
                resp = await client.get(url, timeout=10)
                return resp.text

    async with asyncio.TaskGroup() as tg:
        tasks = [tg.create_task(fetch_one(url)) for url in urls]

    return [t.result() for t in tasks]

Tune the semaphore limit based on your downstream service’s rate limits and your available connections.

Connection Pools, Not Client Per Request

Creating a new HTTP client or database connection per request is expensive. Use pools.

# BAD: new connection per call
async def get_data(key: str):
    async with httpx.AsyncClient() as client:
        return await client.get(f"https://api.example.com/{key}")

# GOOD: shared client with connection pool
_client: httpx.AsyncClient | None = None

async def get_client() -> httpx.AsyncClient:
    global _client
    if _client is None:
        _client = httpx.AsyncClient(
            limits=httpx.Limits(max_connections=100, max_keepalive_connections=20)
        )
    return _client

async def get_data(key: str):
    client = await get_client()
    return await client.get(f"https://api.example.com/{key}")

For databases, asyncpg has its own pool:

pool = await asyncpg.create_pool(dsn=DATABASE_URL, min_size=5, max_size=20)
async with pool.acquire() as conn:
    row = await conn.fetchrow("SELECT * FROM users WHERE id = $1", user_id)

Timeouts on Everything

Network calls that never time out will eventually hang your service. Wrap every external call with a timeout.

async def fetch_with_timeout(url: str, timeout_seconds: float = 5.0) -> str:
    try:
        async with asyncio.timeout(timeout_seconds):
            async with httpx.AsyncClient() as client:
                resp = await client.get(url)
                return resp.text
    except TimeoutError:
        raise ServiceUnavailableError(f"Timeout fetching {url}")

asyncio.timeout is available from Python 3.11. For earlier versions, asyncio.wait_for does the same thing.

Background Tasks Done Right

Running background work from a web handler is common. The wrong way crashes silently.

# BAD: fire and forget with no error handling
@app.post("/events")
async def track_event(event: Event):
    asyncio.create_task(process_event(event))  # Errors disappear
    return {"ok": True}

# GOOD: attach error handler
def handle_task_error(task: asyncio.Task):
    if not task.cancelled() and task.exception():
        logger.error("Background task failed", exc_info=task.exception())

@app.post("/events")
async def track_event(event: Event):
    task = asyncio.create_task(process_event(event))
    task.add_done_callback(handle_task_error)
    return {"ok": True}

For anything that must not be lost, use a real queue (Celery, ARQ, Redis Streams) instead of background tasks. Background tasks die with the process.

The Libraries That Work in Production

Use case Sync Async
HTTP client requests httpx
PostgreSQL psycopg2 asyncpg / psycopg3
Redis redis-py (sync) redis-py (async)
ORM SQLAlchemy 1.x SQLAlchemy 2.0
Web framework Flask FastAPI, Starlette

Bottom Line

Async Python works well when you use async-native libraries, bound your concurrency with semaphores, share connection pools, and put timeouts on every external call. The mistakes - blocking the event loop with sync code, creating connections per request, losing errors in fire-and-forget tasks - all show up as mysterious slowdowns under load. Get these patterns right from the start and async Python will handle high concurrency efficiently.