The contextlib module provides utilities for working with context managers. Here's what you need to know.
contextmanager Decorator
from contextlib import contextmanager
@contextmanager
def timer(name):
"""Time a block of code."""
import time
start = time.perf_counter()
try:
yield
finally:
elapsed = time.perf_counter() - start
print(f"{name}: {elapsed:.4f}s")
# Usage
with timer("database query"):
result = db.execute(query)Yielding Values
from contextlib import contextmanager
@contextmanager
def temporary_file(content):
"""Create a temporary file with content."""
import tempfile
import os
fd, path = tempfile.mkstemp()
try:
os.write(fd, content.encode())
os.close(fd)
yield path
finally:
os.unlink(path)
# Usage
with temporary_file("test data") as path:
with open(path) as f:
print(f.read())
# File is deleted after the blocksuppress
from contextlib import suppress
# Instead of try/except/pass
try:
os.remove("file.txt")
except FileNotFoundError:
pass
# Use suppress
with suppress(FileNotFoundError):
os.remove("file.txt")
# Multiple exceptions
with suppress(FileNotFoundError, PermissionError):
os.remove("file.txt")redirect_stdout / redirect_stderr
from contextlib import redirect_stdout, redirect_stderr
from io import StringIO
# Capture stdout
buffer = StringIO()
with redirect_stdout(buffer):
print("This goes to buffer")
output = buffer.getvalue()
print(f"Captured: {output}")
# Redirect to file
with open("output.log", "w") as f:
with redirect_stdout(f):
print("This goes to file")
# Redirect stderr
with redirect_stderr(StringIO()) as err:
import sys
print("error message", file=sys.stderr)ExitStack
from contextlib import ExitStack
# Manage multiple context managers dynamically
def process_files(filenames):
with ExitStack() as stack:
files = [
stack.enter_context(open(fname))
for fname in filenames
]
# All files are open here
for f in files:
process(f.read())
# All files are closed here
# Conditional context managers
with ExitStack() as stack:
if need_lock:
stack.enter_context(lock)
if need_transaction:
stack.enter_context(transaction)
# Do work
# Register cleanup callbacks
with ExitStack() as stack:
resource = acquire_resource()
stack.callback(release_resource, resource)
# resource is released on exitclosing
from contextlib import closing
# For objects with close() but no __exit__
class Connection:
def close(self):
print("Closing connection")
# Manual
conn = Connection()
try:
# use connection
pass
finally:
conn.close()
# With closing
with closing(Connection()) as conn:
# use connection
pass
# close() is called automaticallynullcontext
from contextlib import nullcontext
def process(data, lock=None):
"""Process with optional locking."""
cm = lock if lock else nullcontext()
with cm:
# process data
pass
# With lock
process(data, threading.Lock())
# Without lock
process(data)
# As a stand-in for optional context managers
def get_output(filename=None):
if filename:
return open(filename, "w")
return nullcontext(sys.stdout)
with get_output() as f:
f.write("output")AsyncExitStack
from contextlib import AsyncExitStack
import asyncio
async def process_connections(urls):
async with AsyncExitStack() as stack:
connections = [
await stack.enter_async_context(connect(url))
for url in urls
]
# All connections open
for conn in connections:
await conn.fetch()
# All connections closedaclosing
from contextlib import aclosing
class AsyncResource:
async def aclose(self):
print("Closing async resource")
async def main():
async with aclosing(AsyncResource()) as resource:
# use resource
pass
# aclose() is calledchdir (Python 3.11+)
from contextlib import chdir
import os
print(os.getcwd()) # /home/user
with chdir("/tmp"):
print(os.getcwd()) # /tmp
# Do work in /tmp
print(os.getcwd()) # /home/user (restored)Building Custom Context Managers
from contextlib import contextmanager, ContextDecorator
# Class-based approach
class ManagedResource(ContextDecorator):
def __init__(self, name):
self.name = name
def __enter__(self):
print(f"Acquiring {self.name}")
return self
def __exit__(self, exc_type, exc_val, exc_tb):
print(f"Releasing {self.name}")
return False # Don't suppress exceptions
# Can be used as decorator too
@ManagedResource("database")
def do_database_work():
print("Working...")
# Or as context manager
with ManagedResource("cache") as resource:
print(f"Using {resource.name}")Error Handling
from contextlib import contextmanager
@contextmanager
def transaction(conn):
"""Database transaction with rollback on error."""
try:
yield conn
conn.commit()
except Exception:
conn.rollback()
raise
# Suppress specific exceptions
@contextmanager
def ignore_keyboard_interrupt():
try:
yield
except KeyboardInterrupt:
print("Interrupted, cleaning up...")
# Transform exceptions
@contextmanager
def wrap_errors():
try:
yield
except ValueError as e:
raise CustomError(f"Invalid value: {e}") from eCommon Patterns
from contextlib import contextmanager
import threading
# Thread-safe resource pool
@contextmanager
def pooled_resource(pool):
resource = pool.acquire()
try:
yield resource
finally:
pool.release(resource)
# Temporary state change
@contextmanager
def temporary_env(**env_vars):
import os
original = {k: os.environ.get(k) for k in env_vars}
os.environ.update(env_vars)
try:
yield
finally:
for key, value in original.items():
if value is None:
os.environ.pop(key, None)
else:
os.environ[key] = value
# Logging context
@contextmanager
def log_block(name, logger):
logger.info(f"Starting {name}")
try:
yield
except Exception:
logger.exception(f"Failed {name}")
raise
else:
logger.info(f"Completed {name}")Reentrant Context Managers
from contextlib import contextmanager
import threading
class ReentrantLock:
"""Context manager that can be entered multiple times."""
def __init__(self):
self._lock = threading.RLock()
self._count = 0
def __enter__(self):
self._lock.acquire()
self._count += 1
return self
def __exit__(self, *args):
self._count -= 1
self._lock.release()
return False
# Can nest safely
lock = ReentrantLock()
with lock:
with lock:
# Both acquisitions work
passcontextlib makes resource management clean and explicit. Use it for setup/teardown patterns, optional context managers, and dynamic resource handling.
React to this post: