Lesson 20 โข Advanced
Advanced Async & Await Patterns
Asynchronous programming is at the heart of modern Python applications. Whether you're building high-performance web APIs, data pipelines, websocket apps, scrapers, or microservices โ mastering advanced async/await patterns gives you the ability to build systems that scale effortlessly.
What You'll Learn
This lesson dives beyond the basics. You'll learn event loop mechanics, tasks, concurrency patterns, async iterators, async generators, synchronization primitives, and real-world architectures used in production environments.
๐ฅ 1. Understanding the Event Loop Deeply
The Python async system is powered by the event loop, a scheduler that:
| What It Does | Why It Matters |
|---|---|
| Executes async tasks | Runs your async def functions |
| Handles IO events | Network, file, database operations |
| Manages task switching | Pauses waiting tasks, runs ready ones |
| Runs callbacks & timers | Scheduled operations and delays |
await something. No waiting = no switching!Basic structure:
Event Loop Basics
Running async code with asyncio.run()
import asyncio
async def main():
print("Hello Async")
asyncio.run(main())asyncio.run():
- Creates an event loop
- Starts running the coroutine
- Closes the loop after completion
โก 2. Coroutine Chaining & Composition
Coroutines can call other coroutines using await:
Coroutine Chaining
Calling coroutines from coroutines
async def fetch_data():
await asyncio.sleep(1)
return "data"
async def process_data():
data = await fetch_data()
print("Processed:", data)You can chain dozens of async functions without blocking the thread.
โ๏ธ 3. Running Tasks Concurrently With asyncio.gather
asyncio.gather()!This is how you run coroutines in parallel (non-blocking):
asyncio.gather
Running tasks in parallel
async def a():
await asyncio.sleep(1)
return "A"
async def b():
await asyncio.sleep(1)
return "B"
results = await asyncio.gather(a(), b())Time taken: 1 second, not 2.
Used heavily in:
- Web scraping
- Parallel API calls
- Batch data processing
๐งจ 4. Creating Background Tasks With asyncio.create_task
Tasks let coroutines run independently in the background:
Background Tasks
Running tasks independently
async def background():
while True:
print("Running in background")
await asyncio.sleep(3)
async def main():
task = asyncio.create_task(background())
await asyncio.sleep(10)
task.cancel()Perfect for:
- Polling loops
- Streams
- Websocket heartbeats
- Scheduled jobs
๐งต 5. Using asyncio.wait() for Advanced Control
gather() waits for ALL runners to finish. wait() lets you say "tell me when the FIRST one finishes" or "tell me if anyone falls"!Different from gather(), wait() lets you specify:
| Option | When to Use |
|---|---|
| FIRST_COMPLETED | React as soon as ANY task finishes |
| ALL_COMPLETED | Wait for ALL tasks (like gather) |
| FIRST_EXCEPTION | Stop immediately if any task fails |
asyncio.wait
Advanced task control
done, pending = await asyncio.wait(
{task1, task2, task3},
return_when=asyncio.FIRST_COMPLETED
)Used for:
- Racing multiple sources
- Timeout systems
- Picking fastest available API
๐ 6. Timeouts With asyncio.wait_for
Async Timeouts
Handling operation timeouts
async def fetch():
await asyncio.sleep(5)
return "done"
try:
await asyncio.wait_for(fetch(), timeout=2)
except asyncio.TimeoutError:
print("Timeout!")Critical for resilient systems that depend on external APIs.
๐งฑ 7. Async Context Managers (async with)
async with handles this "enter and exit" pattern asynchronously!Used for resources that need async setup AND cleanup:
| Use Case | Why Async? |
|---|---|
| Database connections | Connect/disconnect takes network time |
| HTTP clients | Opening/closing sessions is IO |
| WebSockets | Handshake/teardown is async |
Async Context Managers
Using async with
class AsyncResource:
async def __aenter__(self):
print("Opening")
return self
async def __aexit__(self, exc_type, exc, tb):
print("Closing")
async with AsyncResource() as r:
print("Using resource")๐ 8. Async Iterators (async for)
Used for:
- Streams
- File IO wrappers
- Real-time data feeds
Example:
Async Iterators
Iterating asynchronously
class AsyncCounter:
def __init__(self):
self.value = 0
def __aiter__(self):
return self
async def __anext__(self):
if self.value >= 5:
raise StopAsyncIteration
await asyncio.sleep(1)
self.value += 1
return self.value
async for x in AsyncCounter():
print(x)๐ 9. Async Generators (async def โฆ yield)
These are perfect for streaming data where each item needs async fetching:
Async Generators
Streaming data with yield
async def stream_numbers():
for i in range(5):
await asyncio.sleep(1)
yield iConsume it with:
Consuming Async Generator
Using async for with generators
async for n in stream_numbers():
print(n)Used in:
- Websocket data
- Real-time dashboards
- Server push notifications
๐ 10. Async Locks, Semaphores & Synchronization
โข Lock โ One at a Time
Prevents race conditions when multiple tasks access shared data:
Async Lock
Preventing race conditions
lock = asyncio.Lock()
async with lock:
# only one task enters here at a time
...โข Semaphore
Limit concurrency:
Async Semaphore
Limiting concurrent operations
sem = asyncio.Semaphore(10)
async with sem:
await fetch_api()โข Event
Notify tasks:
Async Event
Signaling between tasks
event = asyncio.Event()
event.set() # resume waiting tasksโก 11. ProducerโConsumer Pattern (Async Pipeline)
Common pattern for decoupling work:
- AI data pipelines: Load data โ Process โ Save
- Web crawlers: Discover URLs โ Fetch pages โ Extract data
- Background jobs: Receive requests โ Queue โ Process
Producer-Consumer Pattern
Async queue-based pipeline
queue = asyncio.Queue()
async def producer():
for i in range(5):
await queue.put(i)
async def consumer():
while True:
item = await queue.get()
print("Got:", item)
queue.task_done()๐ง 12. Parallelism vs Concurrency
| Concept | What It Means | Real-World Example |
|---|---|---|
| Concurrency | Handling multiple tasks by switching between them | One chef cooking 3 dishes, switching between them |
| Parallelism | Actually doing multiple things at the same time | Three chefs each cooking one dish simultaneously |
AsyncIO = Concurrency, NOT CPU parallelism!
Choose the right tool:
asyncioโ IO-bound (network, files, database)multiprocessingโ CPU-bound (math, ML, image processing)concurrent.futuresโ Mixed workloads
A professional engineer must know when to choose which.
๐ 13. Combining Async With APIs (Practical)
Using aiohttp:
Async HTTP Client
Fetching APIs with aiohttp
import aiohttp
import asyncio
async def fetch(url):
async with aiohttp.ClientSession() as session:
async with session.get(url) as r:
return await r.json()This lets you send 1000 requests concurrently without blocking.
๐ 14. Async Patterns Used in Real Web Frameworks
FastAPI, Starlette, aiohttp, Quart โ all rely on:
- async routers
- async database sessions
- async locks for shared state
- async iterators for streaming responses
Example FastAPI stream:
FastAPI Streaming
Server-sent events pattern
async def event_stream():
for i in range(100):
yield f"data: {i}\n\n"
await asyncio.sleep(0.1)๐งฉ 15. Full Architecture Demo (Advanced)
A real system might include:
- async file readers
- async DB connections
- async external API fetchers
- queues (producer/consumer)
- batch processors
- background tasks
- cancellation handlers
Async lets each component run without blocking any other.
๐ฅ 16. Understanding Task Cancellation in Depth
Tasks can be cancelled, but you must handle the cancellation gracefully:
Task Cancellation
Graceful task shutdown
async def worker():
try:
while True:
print("Working...")
await asyncio.sleep(1)
except asyncio.CancelledError:
print("Worker cancelled safely")
# Cancelling:
task = asyncio.create_task(worker())
await asyncio.sleep(3)
task.cancel()
await taskWhy this matters:
- โ graceful shutdowns
- โ timeouts
- โ web servers closing requests
- โ stopping background loops
- โ cleaning up resources
If you don't handle CancelledError, tasks become "dangling zombies" and corrupt state.
๐ง 17. Task Groups (Python 3.11+) โ Structured Concurrency
TaskGroups improve error handling and cancellation. If one task inside a group fails, the entire group is safely cancelled.
Task Groups
Structured concurrency
async with asyncio.TaskGroup() as tg:
tg.create_task(fetch_user())
tg.create_task(fetch_orders())Benefits:
- โ predictable cancellation
- โ easier debugging
- โ no silent orphan tasks
- โ safer microservices
- โ recommended by Python core devs
TaskGroups will replace asyncio.gather() in most future architectures.
โ๏ธ 18. Async Error Handling (Fail-Fast, Fail-Safe, Recovering)
Async code introduces unique failure modes:
Case 1 โ fail fast (stop everything):
Fail Fast Pattern
Stop on first error
await asyncio.gather(a(), b(), c())Case 2 โ fail safe (continue, collect errors):
Fail Safe Pattern
Continue despite errors
results = await asyncio.gather(a(), b(), c(), return_exceptions=True)Case 3 โ retry pattern:
Retry Pattern
Automatic retry on failure
async def retry(coro, attempts=3):
for _ in range(attempts):
try:
return await coro()
except:
await asyncio.sleep(1)Case 4 โ circuit breaker (advanced):
Used in microservices to avoid hammering broken APIs.
๐งต 19. Avoiding Deadlocks in Async Code
Deadlocks occur when tasks wait on each other incorrectly.
Example of a deadlock:
Deadlock Example
What NOT to do
await lock.acquire()
await lock.acquire() # never releases โ deadlockProper pattern:
Proper Lock Usage
Safe lock pattern
async with lock:
...Async deadlocks usually come from:
- โ forgetting await
- โ acquiring multiple locks
- โ circular waiting
- โ blocking calls inside async code
๐ 20. Using Queues for Safe Concurrency
Async queues give safe producer/consumer flow.
Async Queues
Safe producer/consumer pattern
queue = asyncio.Queue()
async def producer():
for i in range(100):
await queue.put(i)
async def consumer():
while True:
item = await queue.get()
print("Processing", item)
queue.task_done()Real uses:
- โ background job systems
- โ AI pipeline batching
- โ distributed crawlers
- โ video/audio frame processing
- โ websocket message routing
Queues stop you from overwhelming your system.
๐ก 21. Building Real-Time Streams With Async Generators
Example โ streaming Bitcoin prices, logs, or live chat updates:
Real-Time Streams
Streaming data with async generators
async def price_stream():
while True:
yield await fetch_price()
await asyncio.sleep(1)
# Consumption:
async for price in price_stream():
print(price)This pattern powers:
- โ real-time dashboards
- โ monitoring systems
- โ live analytics
- โ trading bots
๐งฉ 22. Understanding Cooperative Multitasking
time.sleep()or heavy CPU loops freezes your ENTIRE async app! The event loop can't switch to other tasks because your code never "yields" control. Always use async alternatives!AsyncIO โ preemptive threading.
Your code must yield control to let other tasks run:
โ Bad (blocks event loop):
Blocking Sleep (Bad)
This blocks the event loop
time.sleep(1)โ Good:
Async Sleep (Good)
This yields control properly
await asyncio.sleep(1)โ Bad (CPU work blocks):
CPU Blocking (Bad)
Heavy CPU work blocks the loop
for i in range(10_000_000): ...โ Fix: push CPU work to executor:
Offload to Thread (Good)
Run CPU work in a thread
await asyncio.to_thread(cpu_heavy_func)โ๏ธ 23. Mixing AsyncIO With Sync Code (The Right Way)
Sometimes you must call blocking code inside async systems:
Mixing Sync and Async
Running sync code in async context
result = await asyncio.to_thread(blocking_function)Used for:
- โ image processing
- โ file compression
- โ pandas computations
- โ machine learning prediction (non-async)
to_thread() prevents freezing the event loop.
๐ฆ 24. Handling Bounded Parallelism (Prevent Overload)
Running thousands of tasks at once can overload:
- โ network
- โ memory
- โ CPU
- โ database
Use semaphores:
Bounded Parallelism
Limiting concurrent operations
sem = asyncio.Semaphore(5)
async def safe_fetch(url):
async with sem:
return await fetch(url)Now only 5 requests run at once.
๐งฌ 25. Combining AsyncIO With Multiprocessing
For CPU-heavy workloads (AI / ML / video processing), async alone is not enough.
Pattern:
AsyncIO โ handles network + coordination
ProcessPoolExecutor โ handles CPU-heavy tasks
Example:
Async + Multiprocessing
Offloading CPU work to processes
loop = asyncio.get_event_loop()
result = await loop.run_in_executor(None, cpu_task)Used in:
- โ video rendering
- โ neural network inference
- โ compression
- โ hashing
- โ scientific computing
This hybrid model is extremely powerful.
๐ 26. Async File IO (aiofiles)
โ Normal file IO blocks:
Blocking File IO (Bad)
Synchronous file reading
open("file.txt").read()โ Async file IO:
Async File IO (Good)
Non-blocking file reading
import aiofiles
async with aiofiles.open("big.txt") as f:
async for line in f:
...Used in:
- โ huge logs
- โ large dataset preprocessing
- โ AI training input pipelines
๐ 27. Restartable Background Loops
This is how websocket heartbeats, game loops, monitoring tasks work.
Restartable Loops
Self-healing background tasks
async def heartbeat():
while True:
send_ping()
await asyncio.sleep(5)
# Restartable version:
async def resilient_heartbeat():
while True:
try:
await heartbeat()
except:
await asyncio.sleep(1)Used in:
- โ Discord bots
- โ IoT sensors
- โ distributed systems
๐ 28. Backpressure Techniques (Critical for Stability)
Backpressure prevents fast producers from exploding memory.
Backpressure
Controlling producer speed
if queue.qsize() > 1000:
await asyncio.sleep(0.1)Real-world use:
- โ Kafka consumers
- โ Redis Streams workers
- โ RabbitMQ consumers
- โ FastAPI streaming endpoints
๐งจ 29. Async Retry Strategies With Exponential Backoff
Exponential Backoff
Retry with increasing delays
async def fetch_with_retry(url, attempts=5):
delay = 0.5
for _ in range(attempts):
try:
return await fetch(url)
except:
await asyncio.sleep(delay)
delay *= 2This protects:
- โ ML data ingestion jobs
- โ microservice calls
- โ database queries
- โ message queues
๐ 30. Async Bulk Execution Pattern (High Throughput)
Processing thousands of tasks at controlled speed:
Bulk Execution
Processing in batches
async def bulk_process(urls, batch=50):
for i in range(0, len(urls), batch):
chunk = urls[i:i+batch]
await asyncio.gather(*(fetch(u) for u in chunk))Used in:
- โ bulk API ingestion
- โ scraping 10,000 pages
- โ ML dataset downloading
- โ parallel text generation
๐ฅ 31. Async Caching (In-Memory, Time-Based, and Function-Scoped)
Caching async functions requires async-safe patterns โ you cannot use normal functools.lru_cache on coroutines.
Basic async cache:
Basic Async Cache
Simple in-memory caching
cache = {}
async def cached_fetch(key, func):
if key in cache:
return cache[key]
result = await func()
cache[key] = result
return resultTime-based expiration:
Time-Based Cache
Cache with TTL expiration
import time
async_cache = {}
async def timed_cache(key, func, ttl=60):
if key in async_cache:
value, timestamp = async_cache[key]
if time.time() - timestamp < ttl:
return value
value = await func()
async_cache[key] = (value, time.time())
return valueReal use cases:
- โ ML inference caching
- โ API rate-limit protection
- โ expensive SQL queries
- โ metadata caching
- โ reusable results in pipelines
โก 32. Async LRU Cache (Custom Implementation)
Async LRU caching requires manual implementation:
Async LRU Cache
Least Recently Used cache
from collections import OrderedDict
class AsyncLRU:
def __init__(self, size=128):
self.cache = OrderedDict()
self.size = size
async def get(self, key, func):
if key in self.cache:
self.cache.move_to_end(key)
return self.cache[key]
result = await func()
self.cache[key] = result
if len(self.cache) > self.size:
self.cache.popitem(last=False)
return resultUsed in:
- โ ML preprocessing
- โ NLP tokenization
- โ thumbnail generation
- โ API microservices
- โ complex dependency graphs
๐ 33. Async Stream Pipelines (End-to-End Processing)
A professional async pipeline processes streaming data in stages.
Stream Pipeline
Chained async processing
async def reader():
async for line in aio_read_stream():
yield line
async def cleaner(stream):
async for line in stream:
yield line.strip()
async def tokenizer(stream):
async for line in stream:
yield line.split()
async def run_pipeline():
async for tokens in tokenizer(cleaner(reader())):
print(tokens)This powers:
- โ ETL pipelines
- โ log processors
- โ real-time analytics
- โ AI data loaders
- โ chat message routers
Each stage is async โ memory safe โ fully streaming.
๐งฉ 34. Async + CPU Hybrid Pipelines
Some tasks are IO-heavy, some are CPU-heavy.
Pattern:
async โ IO
threads โ CPU
process pool โ heavy CPU
Hybrid example:
Hybrid Pipeline
Mixing async and CPU work
async def transform_async(stream):
async for item in stream:
result = await asyncio.to_thread(cpu_heavy_compute, item)
yield resultUsed in:
- โ AI preprocessing
- โ video/audio decoding
- โ encryption
- โ hashing pipelines
- โ huge text transformations
๐ก 35. Async WebSockets โ Real-Time Bi-Directional Streams
WebSocket Server
Real-time bi-directional communication
import websockets
import asyncio
async def handler(ws):
async for message in ws:
await ws.send(f"Echo: {message}")
asyncio.run(websockets.serve(handler, "localhost", 9000))Why this matters:
- โ chat apps
- โ live dashboards
- โ multiplayer games
- โ trading bots
- โ IoT communications
WebSockets are the backbone of real-time async systems.
๐งต 36. Async Task Supervision (Supervisor Pattern)
Many real systems run supervised workers that restart when they fail.
Supervisor Pattern
Auto-restarting failed tasks
async def supervise(coro):
while True:
try:
await coro()
except Exception as e:
print("Restarting due to:", e)
await asyncio.sleep(1)Used in:
- โ monitoring daemons
- โ heartbeat services
- โ queue workers
- โ real-time scrapers
- โ Discord bots
This prevents silent crashes.
๐ 37. Async Retry Queues (Automatic Failure Recovery)
A robust pattern for distributed workers:
Retry Queue
Automatic job retry on failure
retry_queue = asyncio.Queue()
async def worker():
while True:
job = await retry_queue.get()
try:
await process(job)
except:
await asyncio.sleep(1)
await retry_queue.put(job)This ensures:
- โ jobs NEVER disappear
- โ failed jobs are retried
- โ system never overloads
- โ stable long-running services
๐ง 38. Backoff, Jitter & Fail-Fast Patterns (Industry Standard)
Avoid DDoSing an API by accident.
Example with jitter:
Backoff with Jitter
Industry-standard retry pattern
import random
async def backoff_retry(func):
delay = 1
for _ in range(5):
try:
return await func()
except:
await asyncio.sleep(delay + random.random())
delay *= 2This is used in:
- โ AWS SDKs
- โ Google Cloud libraries
- โ Stripe API clients
- โ Redis cluster drivers
๐งฌ 39. Async Cancellation Shields (protect tasks)
Sometimes you want a task to finish even if outer tasks are cancelled.
Cancellation Shield
Protecting critical tasks
async with asyncio.shield(task):
await taskUseful when:
- โ writing logs on shutdown
- โ saving ML model state
- โ finishing DB transactions
- โ flushing message queues
๐งช 40. Async Testing Patterns (pytest + asyncio)
Professional testing:
Async Testing
Testing with pytest-asyncio
@pytest.mark.asyncio
async def test_fetch():
result = await fetch()
assert result == 123Mocking async functions:
Mocking Async Functions
Replacing async functions in tests
async def fake_fetch():
return {"ok": True}
monkeypatch.setattr(module, "fetch", fake_fetch)Testing is essential for async correctness.
๐งฉ 41. Async Resource Pools (DB, HTTP, GPU)
Pooling reduces overhead:
Resource Pools
Managing shared resources
class Pool:
async def acquire(self): ...
async def release(self, item): ...Used in:
- โ database connectors
- โ HTTP clients
- โ GPU memory managers
- โ ML model workers
Pools prevent resource exhaustion.
๐ 42. Combining Async with Redis, Kafka, RabbitMQ
Real distributed systems use async clients:
- โ aioredis
- โ aiokafka
- โ aio_pika
Example โ Kafka consumer:
Kafka Consumer
Async message consumption
async for msg in consumer:
process(msg.value)Used in:
- โ event-driven architecture
- โ streaming analytics
- โ ML log pipelines
๐ 43. Async Microservice Mesh (Framework-Level Example)
A microservice might include:
- โ async router
- โ async DB layer
- โ async background workers
- โ async external API fetchers
- โ async timeouts & retries
- โ async signals
- โ async message queues
This is how modern companies scale to millions of users.
๐ง 44. Avoid These async/await Mistakes (Every Beginner Makes Them)
| โ Common Mistake | โ Correct Fix |
|---|---|
time.sleep(1) | await asyncio.sleep(1) |
Forgetting await โ task never runs | Look for "coroutine was never awaited" warning |
| Heavy CPU loop inside async | await asyncio.to_thread(func) |
| Too many parallel tasks at once | Use semaphores for bounded concurrency |
๐ Conclusion
You now understand high-level async engineering concepts:
โ Event loop mechanics
โ Coroutine composition
โ Parallel execution
โ Task cancellation
โ Task groups
โ Error handling patterns
โ Deadlock prevention
โ Queue-based concurrency
โ Real-time streams
โ Cooperative multitasking
โ Hybrid async/sync
โ Bounded parallelism
โ Multiprocessing integration
โ Async file IO
โ Background loops
โ Backpressure techniques
โ Retry strategies
โ Bulk execution
โ Async caching
โ Stream pipelines
โ WebSocket handling
โ Supervision patterns
โ Retry queues
โ Backoff & jitter
โ Cancellation shields
โ Testing patterns
โ Resource pools
โ Distributed systems
โ Microservice architecture
โ Common pitfalls
๐ Quick Reference โ Async & Await
| Syntax | What it does |
|---|---|
| async def fn(): | Define a coroutine function |
| await fn() | Pause and wait for a coroutine |
| asyncio.run(main()) | Run the top-level coroutine |
| asyncio.gather(*coros) | Run coroutines concurrently |
| asyncio.create_task() | Schedule a coroutine as a Task |
๐ Great work! You've completed this lesson.
You now understand async/await patterns, how to structure concurrent code, and when to use asyncio vs threads.
Up next: AsyncIO Deep Dive โ go deeper into the event loop, Tasks, and Futures.
Sign up for free to track which lessons you've completed and get learning reminders.