The contextlib module provides utilities for creating and working with context managers beyond the basic __enter__/__exit__ protocol.
@contextmanager Decorator
Create context managers from generators:
from contextlib import contextmanager
import time
@contextmanager
def timer(label: str):
start = time.perf_counter()
try:
yield
finally:
elapsed = time.perf_counter() - start
print(f"{label}: {elapsed:.3f}s")
with timer("Processing"):
time.sleep(0.5)
# Processing: 0.500sYielding Values
from contextlib import contextmanager
@contextmanager
def temp_file(content: str):
import tempfile
import os
fd, path = tempfile.mkstemp()
try:
with os.fdopen(fd, 'w') as f:
f.write(content)
yield path # Value available as 'as' target
finally:
os.unlink(path)
with temp_file("test data") as path:
print(f"File at: {path}")
with open(path) as f:
print(f.read())
# File automatically deletedsuppress() - Ignore Exceptions
from contextlib import suppress
# Instead of try/except pass
try:
os.remove('nonexistent.txt')
except FileNotFoundError:
pass
# Use suppress
from contextlib import suppress
import os
with suppress(FileNotFoundError):
os.remove('nonexistent.txt')
# Multiple exception types
with suppress(FileNotFoundError, PermissionError):
os.remove('file.txt')redirect_stdout / redirect_stderr
from contextlib import redirect_stdout, redirect_stderr
from io import StringIO
# Capture stdout
buffer = StringIO()
with redirect_stdout(buffer):
print("This goes to buffer")
print("So does this")
output = buffer.getvalue()
print(f"Captured: {output}")
# Redirect to file
with open('log.txt', 'w') as f:
with redirect_stdout(f):
print("This goes to file")
# Suppress output
import os
with redirect_stdout(open(os.devnull, 'w')):
print("This is discarded")ExitStack - Dynamic Context Management
from contextlib import ExitStack
# Manage variable number of context managers
def process_files(paths: list):
with ExitStack() as stack:
files = [stack.enter_context(open(p)) for p in paths]
for f in files:
print(f.read())
# All files closed automatically
# Conditional context managers
def maybe_transaction(use_transaction: bool):
with ExitStack() as stack:
if use_transaction:
stack.enter_context(database.transaction())
# Do work...ExitStack Callbacks
from contextlib import ExitStack
def cleanup(name):
print(f"Cleaning up {name}")
with ExitStack() as stack:
stack.callback(cleanup, "resource1")
stack.callback(cleanup, "resource2")
print("Doing work...")
# Output:
# Doing work...
# Cleaning up resource2
# Cleaning up resource1nullcontext - Optional Context Manager
from contextlib import nullcontext
def process(use_lock: bool = False):
lock = threading.Lock() if use_lock else nullcontext()
with lock:
# Work is done with or without lock
do_work()
# Also useful for optional file handling
def read_data(path: str = None):
cm = open(path) if path else nullcontext(sys.stdin)
with cm as f:
return f.read()closing() - Ensure close() is Called
from contextlib import closing
from urllib.request import urlopen
# urlopen returns object with close() but isn't a context manager in older Python
with closing(urlopen('https://example.com')) as response:
html = response.read()
# Works with any object that has close()
class Connection:
def close(self):
print("Connection closed")
with closing(Connection()) as conn:
pass # Connection.close() called automaticallyAsync Context Managers
from contextlib import asynccontextmanager
import asyncio
@asynccontextmanager
async def async_timer(label: str):
start = asyncio.get_event_loop().time()
try:
yield
finally:
elapsed = asyncio.get_event_loop().time() - start
print(f"{label}: {elapsed:.3f}s")
async def main():
async with async_timer("Async operation"):
await asyncio.sleep(0.5)
asyncio.run(main())AsyncExitStack
from contextlib import AsyncExitStack
import aiofiles
async def process_files_async(paths: list):
async with AsyncExitStack() as stack:
files = [
await stack.enter_async_context(aiofiles.open(p))
for p in paths
]
for f in files:
content = await f.read()
print(content)Reusable Context Manager
from contextlib import contextmanager
# Problem: generator-based CMs are single-use
@contextmanager
def single_use():
print("Enter")
yield
print("Exit")
cm = single_use()
with cm:
pass # Works
# with cm: # RuntimeError: generator already executing
# Solution: Create new generator each time
class ReusableTimer:
def __init__(self, label):
self.label = label
def __enter__(self):
self.start = time.perf_counter()
return self
def __exit__(self, *args):
elapsed = time.perf_counter() - self.start
print(f"{self.label}: {elapsed:.3f}s")
timer = ReusableTimer("Operation")
with timer:
pass
with timer: # Works!
passException Handling in Context Managers
from contextlib import contextmanager
@contextmanager
def handle_errors():
try:
yield
except ValueError as e:
print(f"Handled ValueError: {e}")
# Don't re-raise, exception is suppressed
except TypeError as e:
print(f"Logging TypeError: {e}")
raise # Re-raise the exception
with handle_errors():
raise ValueError("This is handled")
# with handle_errors():
# raise TypeError("This propagates")Chained Context Managers
from contextlib import contextmanager
@contextmanager
def logged(name: str):
print(f"Entering {name}")
try:
yield
finally:
print(f"Exiting {name}")
@contextmanager
def timed():
start = time.perf_counter()
try:
yield
finally:
print(f"Elapsed: {time.perf_counter() - start:.3f}s")
# Nest them
with logged("operation"), timed():
time.sleep(0.1)
# Or compose
@contextmanager
def logged_and_timed(name: str):
with logged(name), timed():
yieldDatabase Transaction Pattern
from contextlib import contextmanager
@contextmanager
def transaction(connection):
cursor = connection.cursor()
try:
yield cursor
connection.commit()
except Exception:
connection.rollback()
raise
finally:
cursor.close()
with transaction(db_connection) as cursor:
cursor.execute("INSERT INTO users VALUES (?)", ("Alice",))
cursor.execute("INSERT INTO users VALUES (?)", ("Bob",))
# Auto-commit on success, rollback on exceptionResource Pool Pattern
from contextlib import contextmanager
from queue import Queue
class ConnectionPool:
def __init__(self, size: int):
self._pool = Queue(maxsize=size)
for _ in range(size):
self._pool.put(self._create_connection())
def _create_connection(self):
return {"id": id(object())} # Placeholder
@contextmanager
def connection(self):
conn = self._pool.get()
try:
yield conn
finally:
self._pool.put(conn)
pool = ConnectionPool(5)
with pool.connection() as conn:
print(f"Using connection {conn['id']}")Summary Table
| Utility | Use Case |
|---|---|
@contextmanager | Create CM from generator |
@asynccontextmanager | Async version |
suppress() | Ignore specific exceptions |
redirect_stdout/stderr | Capture or redirect output |
ExitStack | Dynamic/conditional CMs |
nullcontext() | Optional/no-op CM |
closing() | Ensure close() is called |
The contextlib module transforms resource management from boilerplate into clean, composable patterns.
React to this post: