Threading lets you run code concurrently. Here's how to use it safely in Python.
Creating Threads
import threading
import time
def worker(name):
print(f"{name} starting")
time.sleep(1)
print(f"{name} done")
# Create and start threads
t1 = threading.Thread(target=worker, args=("Thread-1",))
t2 = threading.Thread(target=worker, args=("Thread-2",))
t1.start()
t2.start()
# Wait for completion
t1.join()
t2.join()
print("All done")Thread Class
import threading
class WorkerThread(threading.Thread):
def __init__(self, name):
super().__init__()
self.name = name
def run(self):
print(f"{self.name} running")
# Do work here
thread = WorkerThread("Worker-1")
thread.start()
thread.join()Daemon Threads
Daemon threads exit when the main program exits:
import threading
import time
def background_task():
while True:
print("Background running...")
time.sleep(1)
thread = threading.Thread(target=background_task, daemon=True)
thread.start()
time.sleep(3)
print("Main exiting")
# Daemon thread stops automaticallyLocks
Prevent race conditions:
import threading
counter = 0
lock = threading.Lock()
def increment():
global counter
with lock: # Acquire and release automatically
temp = counter
temp += 1
counter = temp
threads = [threading.Thread(target=increment) for _ in range(100)]
for t in threads:
t.start()
for t in threads:
t.join()
print(counter) # Always 100RLock (Reentrant Lock)
Can be acquired multiple times by same thread:
import threading
rlock = threading.RLock()
def outer():
with rlock:
inner() # Can acquire same lock again
def inner():
with rlock:
print("Inner function")Events
Signal between threads:
import threading
import time
event = threading.Event()
def waiter():
print("Waiting for event...")
event.wait() # Block until set
print("Event received!")
def setter():
time.sleep(2)
print("Setting event")
event.set()
threading.Thread(target=waiter).start()
threading.Thread(target=setter).start()Condition Variables
Wait for specific conditions:
import threading
condition = threading.Condition()
items = []
def consumer():
with condition:
while not items:
condition.wait() # Release lock and wait
item = items.pop()
print(f"Consumed: {item}")
def producer():
with condition:
items.append("item")
condition.notify() # Wake up one waiterSemaphores
Limit concurrent access:
import threading
import time
# Allow max 3 concurrent accesses
semaphore = threading.Semaphore(3)
def access_resource(name):
with semaphore:
print(f"{name} acquired")
time.sleep(1)
print(f"{name} released")
for i in range(10):
threading.Thread(target=access_resource, args=(f"Thread-{i}",)).start()Thread-Safe Queue
Best way to communicate between threads:
import threading
import queue
q = queue.Queue()
def producer():
for i in range(5):
q.put(i)
print(f"Produced: {i}")
q.put(None) # Sentinel
def consumer():
while True:
item = q.get()
if item is None:
break
print(f"Consumed: {item}")
q.task_done()
threading.Thread(target=producer).start()
threading.Thread(target=consumer).start()Thread Pool
Use ThreadPoolExecutor for easier management:
from concurrent.futures import ThreadPoolExecutor
import time
def task(n):
time.sleep(1)
return n * 2
with ThreadPoolExecutor(max_workers=4) as executor:
# Submit individual tasks
future = executor.submit(task, 5)
print(future.result()) # 10
# Map over iterable
results = executor.map(task, [1, 2, 3, 4, 5])
print(list(results)) # [2, 4, 6, 8, 10]Thread Local Data
Data unique to each thread:
import threading
local_data = threading.local()
def process():
local_data.value = threading.current_thread().name
print(f"Thread {local_data.value}")
for i in range(3):
threading.Thread(target=process).start()Common Patterns
Worker pool
import threading
import queue
def worker(q):
while True:
item = q.get()
if item is None:
break
process(item)
q.task_done()
q = queue.Queue()
threads = []
for _ in range(4):
t = threading.Thread(target=worker, args=(q,))
t.start()
threads.append(t)
# Add work
for item in items:
q.put(item)
# Wait for completion
q.join()
# Stop workers
for _ in threads:
q.put(None)
for t in threads:
t.join()Timeout
import threading
def slow_task():
time.sleep(10)
thread = threading.Thread(target=slow_task)
thread.start()
thread.join(timeout=2) # Wait max 2 seconds
if thread.is_alive():
print("Task still running")Thread-safe counter
import threading
class Counter:
def __init__(self):
self._value = 0
self._lock = threading.Lock()
def increment(self):
with self._lock:
self._value += 1
@property
def value(self):
with self._lock:
return self._valueThe GIL
Python's Global Interpreter Lock means:
- Only one thread executes Python bytecode at a time
- Threading helps with I/O-bound tasks (waiting for network, files)
- For CPU-bound tasks, use
multiprocessinginstead
# Good for threading (I/O bound)
def fetch_url(url):
response = requests.get(url)
return response.text
# Use multiprocessing instead (CPU bound)
def compute_hash(data):
return hashlib.sha256(data).hexdigest()Quick Reference
import threading
# Create thread
t = threading.Thread(target=func, args=(arg,))
t.start()
t.join()
# Lock
lock = threading.Lock()
with lock:
# Critical section
# Event
event = threading.Event()
event.set()
event.wait()
event.clear()
# Queue (thread-safe)
import queue
q = queue.Queue()
q.put(item)
item = q.get()
# Thread pool
from concurrent.futures import ThreadPoolExecutor
with ThreadPoolExecutor(max_workers=4) as executor:
results = executor.map(func, items)Use threading for I/O-bound concurrency. For CPU-bound work, use multiprocessing. When in doubt, use ThreadPoolExecutor.
React to this post: