6. Async Context Managers
Proper resource management with async operations:
```python
import asyncio
class AsyncDatabaseConnection:
"""Async database connection with automatic cleanup."""
def __init__(self, dsn: str):
self.dsn = dsn
self.connection = None
async def __aenter__(self):
print("Opening connection")
await asyncio.sleep(0.1) # Simulate connection
self.connection = {"dsn": self.dsn, "connected": True}
return self.connection
async def __aexit__(self, exc_type, exc_val, exc_tb):
print("Closing connection")
await asyncio.sleep(0.1) # Simulate cleanup
self.connection = None
async def query_database():
async with AsyncDatabaseConnection("postgresql://localhost") as conn:
# Connection automatically closed on exit
return await perform_query(conn)
```
Use cases:
- Database connections (asyncpg, motor)
- HTTP sessions (aiohttp.ClientSession)
- File I/O (aiofiles)
- Locks and semaphores
7. Async Iterators and Generators
Stream data asynchronously:
```python
import asyncio
from typing import AsyncIterator
async def fetch_pages(url: str, max_pages: int) -> AsyncIterator[dict]:
"""Fetch paginated data lazily."""
for page in range(1, max_pages + 1):
await asyncio.sleep(0.2) # API call
yield {
"page": page,
"url": f"{url}?page={page}",
"data": [f"item_{page}_{i}" for i in range(5)]
}
async def process_stream():
async for page_data in fetch_pages("https://api.example.com", 10):
# Process each page as it arrives (memory efficient)
print(f"Processing page {page_data['page']}")
```
Benefits:
- Memory efficient for large datasets
- Start processing before all data arrives
- Natural backpressure handling
8. Producer-Consumer with Queues
Coordinate work between producers and consumers:
```python
import asyncio
from asyncio import Queue
async def producer(queue: Queue, producer_id: int, num_items: int):
for i in range(num_items):
item = f"Item-{producer_id}-{i}"
await queue.put(item)
await asyncio.sleep(0.1)
await queue.put(None) # Signal completion
async def consumer(queue: Queue, consumer_id: int):
while True:
item = await queue.get()
if item is None:
queue.task_done()
break
print(f"Consumer {consumer_id} processing: {item}")
await asyncio.sleep(0.2)
queue.task_done()
async def run_pipeline():
queue = Queue(maxsize=10)
# 2 producers, 3 consumers
producers = [asyncio.create_task(producer(queue, i, 5)) for i in range(2)]
consumers = [asyncio.create_task(consumer(queue, i)) for i in range(3)]
await asyncio.gather(*producers)
await queue.join() # Wait for all items processed
for c in consumers:
c.cancel()
```
9. Rate Limiting with Semaphores
Control concurrent operations:
```python
import asyncio
from typing import List
async def api_call(url: str, semaphore: asyncio.Semaphore) -> dict:
async with semaphore: # Only N operations at once
print(f"Calling {url}")
await asyncio.sleep(0.5)
return {"url": url, "status": 200}
async def rate_limited_requests(urls: List[str], max_concurrent: int = 5):
semaphore = asyncio.Semaphore(max_concurrent)
tasks = [api_call(url, semaphore) for url in urls]
return await asyncio.gather(*tasks)
# Limits to 5 concurrent requests regardless of total URLs
```
Use cases:
- API rate limiting (respect API quotas)
- Database connection limits
- File descriptor limits
- Memory-constrained operations
10. Async Locks for Shared State
Thread-safe operations in async context:
```python
import asyncio
class AsyncCounter:
def __init__(self):
self.value = 0
self.lock = asyncio.Lock()
async def increment(self):
async with self.lock:
current = self.value
await asyncio.sleep(0.01) # Simulate work
self.value = current + 1
async def get_value(self) -> int:
async with self.lock:
return self.value
```
Synchronization primitives:
Lock: Mutual exclusionEvent: Signal between tasksCondition: Wait for conditionSemaphore: Limit concurrent access