asyncio enables concurrent I/O without threads. Here's how to use it effectively.

Basic Coroutines

import asyncio
 
async def hello():
    print("Hello")
    await asyncio.sleep(1)
    print("World")
 
# Run the coroutine
asyncio.run(hello())

Multiple Coroutines

import asyncio
 
async def fetch(name, delay):
    print(f"{name} starting")
    await asyncio.sleep(delay)
    print(f"{name} done")
    return f"{name} result"
 
async def main():
    # Run concurrently
    results = await asyncio.gather(
        fetch("A", 2),
        fetch("B", 1),
        fetch("C", 3),
    )
    print(results)  # ['A result', 'B result', 'C result']
 
asyncio.run(main())

Tasks

import asyncio
 
async def background_task():
    while True:
        print("Working...")
        await asyncio.sleep(1)
 
async def main():
    # Create task (starts immediately)
    task = asyncio.create_task(background_task())
    
    # Do other work
    await asyncio.sleep(3)
    
    # Cancel the task
    task.cancel()
    try:
        await task
    except asyncio.CancelledError:
        print("Task cancelled")
 
asyncio.run(main())

Gather with Error Handling

import asyncio
 
async def might_fail(n):
    if n == 2:
        raise ValueError("Error on 2")
    await asyncio.sleep(0.1)
    return n
 
async def main():
    # return_exceptions=True to get exceptions as results
    results = await asyncio.gather(
        might_fail(1),
        might_fail(2),
        might_fail(3),
        return_exceptions=True
    )
    
    for result in results:
        if isinstance(result, Exception):
            print(f"Error: {result}")
        else:
            print(f"Success: {result}")
 
asyncio.run(main())

Timeouts

import asyncio
 
async def slow_operation():
    await asyncio.sleep(10)
    return "done"
 
async def main():
    try:
        result = await asyncio.wait_for(slow_operation(), timeout=2)
    except asyncio.TimeoutError:
        print("Operation timed out")
 
asyncio.run(main())

TaskGroup (Python 3.11+)

import asyncio
 
async def fetch(url):
    await asyncio.sleep(1)
    return f"data from {url}"
 
async def main():
    async with asyncio.TaskGroup() as tg:
        task1 = tg.create_task(fetch("url1"))
        task2 = tg.create_task(fetch("url2"))
        task3 = tg.create_task(fetch("url3"))
    
    # All tasks complete here
    print(task1.result(), task2.result(), task3.result())
 
asyncio.run(main())

Semaphore (Limit Concurrency)

import asyncio
 
semaphore = asyncio.Semaphore(3)
 
async def limited_task(n):
    async with semaphore:
        print(f"Task {n} starting")
        await asyncio.sleep(1)
        print(f"Task {n} done")
 
async def main():
    tasks = [limited_task(i) for i in range(10)]
    await asyncio.gather(*tasks)
 
asyncio.run(main())

Locks

import asyncio
 
lock = asyncio.Lock()
counter = 0
 
async def increment():
    global counter
    async with lock:
        temp = counter
        await asyncio.sleep(0.01)
        counter = temp + 1
 
async def main():
    await asyncio.gather(*[increment() for _ in range(100)])
    print(counter)  # 100
 
asyncio.run(main())

Events

import asyncio
 
event = asyncio.Event()
 
async def waiter():
    print("Waiting for event...")
    await event.wait()
    print("Event received!")
 
async def setter():
    await asyncio.sleep(2)
    print("Setting event")
    event.set()
 
async def main():
    await asyncio.gather(waiter(), setter())
 
asyncio.run(main())

Queues

import asyncio
 
async def producer(queue):
    for i in range(5):
        await queue.put(i)
        print(f"Produced {i}")
        await asyncio.sleep(0.1)
    await queue.put(None)  # Sentinel
 
async def consumer(queue):
    while True:
        item = await queue.get()
        if item is None:
            break
        print(f"Consumed {item}")
        queue.task_done()
 
async def main():
    queue = asyncio.Queue()
    await asyncio.gather(
        producer(queue),
        consumer(queue)
    )
 
asyncio.run(main())

Running in Executor (Blocking Code)

import asyncio
import time
 
def blocking_io():
    time.sleep(1)  # Simulates blocking I/O
    return "result"
 
def cpu_bound():
    # Heavy computation
    return sum(i * i for i in range(10_000_000))
 
async def main():
    loop = asyncio.get_event_loop()
    
    # Run blocking I/O in thread pool
    result = await loop.run_in_executor(None, blocking_io)
    print(result)
    
    # Run CPU-bound in process pool
    import concurrent.futures
    with concurrent.futures.ProcessPoolExecutor() as pool:
        result = await loop.run_in_executor(pool, cpu_bound)
        print(result)
 
asyncio.run(main())

Async Context Managers

import asyncio
 
class AsyncResource:
    async def __aenter__(self):
        print("Acquiring resource")
        await asyncio.sleep(0.1)
        return self
    
    async def __aexit__(self, exc_type, exc_val, exc_tb):
        print("Releasing resource")
        await asyncio.sleep(0.1)
    
    async def do_work(self):
        print("Working")
 
async def main():
    async with AsyncResource() as resource:
        await resource.do_work()
 
asyncio.run(main())

Async Iterators

import asyncio
 
class AsyncRange:
    def __init__(self, stop):
        self.stop = stop
        self.current = 0
    
    def __aiter__(self):
        return self
    
    async def __anext__(self):
        if self.current >= self.stop:
            raise StopAsyncIteration
        await asyncio.sleep(0.1)
        value = self.current
        self.current += 1
        return value
 
async def main():
    async for i in AsyncRange(5):
        print(i)
 
asyncio.run(main())

Async Generators

import asyncio
 
async def async_range(stop):
    for i in range(stop):
        await asyncio.sleep(0.1)
        yield i
 
async def main():
    async for i in async_range(5):
        print(i)
 
asyncio.run(main())

HTTP Example (aiohttp)

import asyncio
import aiohttp
 
async def fetch(session, url):
    async with session.get(url) as response:
        return await response.text()
 
async def main():
    async with aiohttp.ClientSession() as session:
        urls = [
            "https://example.com",
            "https://httpbin.org/get",
        ]
        tasks = [fetch(session, url) for url in urls]
        results = await asyncio.gather(*tasks)
        for url, result in zip(urls, results):
            print(f"{url}: {len(result)} bytes")
 
asyncio.run(main())

Common Patterns

import asyncio
 
# Retry pattern
async def retry(coro_func, retries=3, delay=1):
    for attempt in range(retries):
        try:
            return await coro_func()
        except Exception as e:
            if attempt == retries - 1:
                raise
            await asyncio.sleep(delay)
 
# Batch processing
async def process_batch(items, batch_size=10):
    for i in range(0, len(items), batch_size):
        batch = items[i:i + batch_size]
        await asyncio.gather(*[process(item) for item in batch])
 
# Rate limiting
class RateLimiter:
    def __init__(self, rate):
        self.rate = rate
        self.semaphore = asyncio.Semaphore(rate)
    
    async def acquire(self):
        await self.semaphore.acquire()
        asyncio.create_task(self._release_later())
    
    async def _release_later(self):
        await asyncio.sleep(1)
        self.semaphore.release()

Best Practices

# Use asyncio.run() as entry point
asyncio.run(main())
 
# Prefer gather for concurrent tasks
await asyncio.gather(task1(), task2())
 
# Use TaskGroup (3.11+) for better error handling
async with asyncio.TaskGroup() as tg:
    tg.create_task(task1())
    tg.create_task(task2())
 
# Set timeouts for external calls
await asyncio.wait_for(external_call(), timeout=30)
 
# Use semaphore to limit concurrency
async with semaphore:
    await make_request()
 
# Run blocking code in executor
await loop.run_in_executor(None, blocking_func)

asyncio excels at I/O-bound concurrent code. Use it for network requests, file I/O, and any task that spends time waiting.

React to this post: