Python's logging module includes handlers for various destinations. Understanding handlers is key to production-ready logging.
Basic Handler Setup
import logging
logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)
# Console handler
console = logging.StreamHandler()
console.setLevel(logging.INFO)
console.setFormatter(logging.Formatter('%(levelname)s: %(message)s'))
# File handler
file = logging.FileHandler('app.log')
file.setLevel(logging.DEBUG)
file.setFormatter(logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s'))
logger.addHandler(console)
logger.addHandler(file)
logger.info("This goes to both")
logger.debug("This only goes to file")RotatingFileHandler
Rotate by file size:
from logging.handlers import RotatingFileHandler
import logging
logger = logging.getLogger('app')
logger.setLevel(logging.DEBUG)
handler = RotatingFileHandler(
'app.log',
maxBytes=10*1024*1024, # 10MB
backupCount=5 # Keep 5 old files
)
handler.setFormatter(logging.Formatter('%(asctime)s - %(message)s'))
logger.addHandler(handler)
# Creates: app.log, app.log.1, app.log.2, etc.TimedRotatingFileHandler
Rotate by time:
from logging.handlers import TimedRotatingFileHandler
import logging
handler = TimedRotatingFileHandler(
'app.log',
when='midnight', # 'S', 'M', 'H', 'D', 'midnight', 'W0'-'W6'
interval=1,
backupCount=30 # Keep 30 days
)
# Other options:
# when='H', interval=6 -> every 6 hours
# when='W0' -> every Monday
# when='D', interval=1 -> daily
handler.suffix = '%Y-%m-%d' # Filename suffix formatSMTPHandler for Email Alerts
from logging.handlers import SMTPHandler
import logging
mail_handler = SMTPHandler(
mailhost=('smtp.example.com', 587),
fromaddr='app@example.com',
toaddrs=['admin@example.com'],
subject='Application Error',
credentials=('user', 'password'),
secure=() # Enable TLS
)
mail_handler.setLevel(logging.ERROR)
logger = logging.getLogger('app')
logger.addHandler(mail_handler)
# Only ERROR and above trigger emails
logger.error("Database connection failed!")SysLogHandler
Send to syslog:
from logging.handlers import SysLogHandler
import logging
handler = SysLogHandler(
address='/dev/log', # Unix socket
# Or: address=('localhost', 514)
facility=SysLogHandler.LOG_LOCAL0
)
formatter = logging.Formatter('%(name)s: %(levelname)s %(message)s')
handler.setFormatter(formatter)
logger = logging.getLogger('myapp')
logger.addHandler(handler)HTTPHandler
Send logs to HTTP endpoint:
from logging.handlers import HTTPHandler
import logging
handler = HTTPHandler(
host='logs.example.com',
url='/api/logs',
method='POST',
secure=True # HTTPS
)
logger = logging.getLogger('app')
logger.addHandler(handler)SocketHandler
Send to TCP socket:
from logging.handlers import SocketHandler
import logging
handler = SocketHandler('localhost', 9999)
logger = logging.getLogger('app')
logger.addHandler(handler)
# Receiver (separate process):
import pickle
import socketserver
class LogRecordHandler(socketserver.StreamRequestHandler):
def handle(self):
while True:
chunk = self.connection.recv(4)
if len(chunk) < 4:
break
slen = int.from_bytes(chunk, 'big')
data = self.connection.recv(slen)
record = pickle.loads(data)
print(f"Received: {record.getMessage()}")QueueHandler for Async Logging
Non-blocking logging with queue:
import logging
from logging.handlers import QueueHandler, QueueListener
from queue import Queue
# Create queue
log_queue = Queue()
# Queue handler (fast, non-blocking)
queue_handler = QueueHandler(log_queue)
# Actual handlers (run in background)
file_handler = logging.FileHandler('app.log')
console_handler = logging.StreamHandler()
# Listener processes queue in background
listener = QueueListener(
log_queue,
file_handler,
console_handler,
respect_handler_level=True
)
listener.start()
# Logger uses queue handler
logger = logging.getLogger('app')
logger.addHandler(queue_handler)
# On shutdown
listener.stop()MemoryHandler
Buffer logs, flush on trigger:
from logging.handlers import MemoryHandler
import logging
# Target handler for actual output
file_handler = logging.FileHandler('app.log')
# Memory handler buffers until capacity or flushLevel
memory_handler = MemoryHandler(
capacity=100, # Buffer 100 records
flushLevel=logging.ERROR, # Flush on ERROR
target=file_handler,
flushOnClose=True
)
logger = logging.getLogger('app')
logger.addHandler(memory_handler)
# DEBUG/INFO buffered until ERROR occurs or buffer full
for i in range(50):
logger.debug(f"Debug {i}")
logger.error("Error occurred!") # Flushes all buffered logsWatchedFileHandler
Handle log rotation by external tools:
from logging.handlers import WatchedFileHandler
import logging
# Detects when file is rotated (e.g., by logrotate)
# and reopens it automatically
handler = WatchedFileHandler('app.log')
logger = logging.getLogger('app')
logger.addHandler(handler)Custom Handler
import logging
import json
import requests
class SlackHandler(logging.Handler):
def __init__(self, webhook_url: str):
super().__init__()
self.webhook_url = webhook_url
def emit(self, record):
try:
message = self.format(record)
payload = {
'text': f"*{record.levelname}*: {message}",
'username': 'Log Bot'
}
requests.post(self.webhook_url, json=payload, timeout=5)
except Exception:
self.handleError(record)
# Usage
slack = SlackHandler('https://hooks.slack.com/services/...')
slack.setLevel(logging.ERROR)
slack.setFormatter(logging.Formatter('%(name)s - %(message)s'))
logger = logging.getLogger('app')
logger.addHandler(slack)Handler Filters
import logging
class IgnoreHealthChecks(logging.Filter):
def filter(self, record):
return '/health' not in record.getMessage()
class OnlyModule(logging.Filter):
def __init__(self, module_name):
super().__init__()
self.module = module_name
def filter(self, record):
return record.name.startswith(self.module)
handler = logging.FileHandler('app.log')
handler.addFilter(IgnoreHealthChecks())
handler.addFilter(OnlyModule('myapp'))Multiple Formatters by Level
import logging
class LevelFormatter(logging.Formatter):
FORMATS = {
logging.DEBUG: '%(name)s: %(message)s',
logging.INFO: '%(asctime)s - %(message)s',
logging.WARNING: '%(asctime)s - WARNING - %(message)s',
logging.ERROR: '%(asctime)s - ERROR - %(name)s - %(message)s\n%(exc_info)s',
}
def format(self, record):
fmt = self.FORMATS.get(record.levelno, self.FORMATS[logging.INFO])
formatter = logging.Formatter(fmt)
return formatter.format(record)
handler = logging.StreamHandler()
handler.setFormatter(LevelFormatter())Handler Configuration via dictConfig
import logging.config
config = {
'version': 1,
'handlers': {
'console': {
'class': 'logging.StreamHandler',
'level': 'INFO',
'formatter': 'simple',
},
'file': {
'class': 'logging.handlers.RotatingFileHandler',
'filename': 'app.log',
'maxBytes': 10485760,
'backupCount': 5,
'formatter': 'detailed',
},
},
'formatters': {
'simple': {'format': '%(levelname)s: %(message)s'},
'detailed': {'format': '%(asctime)s - %(name)s - %(levelname)s - %(message)s'},
},
'root': {
'level': 'DEBUG',
'handlers': ['console', 'file'],
},
}
logging.config.dictConfig(config)Handler Reference
| Handler | Use Case |
|---|---|
StreamHandler | Console output |
FileHandler | Simple file logging |
RotatingFileHandler | Size-based rotation |
TimedRotatingFileHandler | Time-based rotation |
SMTPHandler | Email alerts |
SysLogHandler | System logging |
HTTPHandler | Log aggregators |
QueueHandler | Async logging |
MemoryHandler | Buffered logging |
NullHandler | Silence library logs |
Handlers route your logs wherever they need to go. Combine them for comprehensive logging strategies.
React to this post: