When you need true parallelism for CPU-bound work, Python's multiprocessing module bypasses the GIL by spawning separate processes.
Basic Process Creation
from multiprocessing import Process
import os
def worker(name):
print(f"Worker {name}, PID: {os.getpid()}")
if __name__ == "__main__":
processes = []
for i in range(4):
p = Process(target=worker, args=(i,))
processes.append(p)
p.start()
for p in processes:
p.join()Always guard with if __name__ == "__main__": — required on Windows, good practice everywhere.
Process Pool for Batch Work
from multiprocessing import Pool
def square(x):
return x * x
if __name__ == "__main__":
with Pool(processes=4) as pool:
# Map function across inputs
results = pool.map(square, range(10))
print(results) # [0, 1, 4, 9, 16, 25, 36, 49, 64, 81]Async Pool Operations
from multiprocessing import Pool
import time
def slow_square(x):
time.sleep(0.5)
return x * x
if __name__ == "__main__":
with Pool(4) as pool:
# Non-blocking
result = pool.map_async(slow_square, range(8))
# Do other work while waiting
print("Doing other work...")
# Get results (blocks until done)
values = result.get(timeout=10)
print(values)imap for Memory Efficiency
Process results as they arrive:
from multiprocessing import Pool
def process_chunk(data):
return sum(data)
if __name__ == "__main__":
large_data = [range(1000) for _ in range(100)]
with Pool(4) as pool:
# Yields results as completed
for result in pool.imap(process_chunk, large_data):
print(f"Got result: {result}")starmap for Multiple Arguments
from multiprocessing import Pool
def multiply(a, b):
return a * b
if __name__ == "__main__":
pairs = [(2, 3), (4, 5), (6, 7)]
with Pool(4) as pool:
results = pool.starmap(multiply, pairs)
print(results) # [6, 20, 42]Queue-Based Communication
from multiprocessing import Process, Queue
def producer(queue):
for item in ["apple", "banana", "cherry"]:
queue.put(item)
queue.put(None) # Sentinel
def consumer(queue):
while True:
item = queue.get()
if item is None:
break
print(f"Processing: {item}")
if __name__ == "__main__":
queue = Queue()
p1 = Process(target=producer, args=(queue,))
p2 = Process(target=consumer, args=(queue,))
p1.start()
p2.start()
p1.join()
p2.join()Shared Memory with Value/Array
from multiprocessing import Process, Value, Array
def increment_counter(counter, iterations):
for _ in range(iterations):
with counter.get_lock():
counter.value += 1
if __name__ == "__main__":
counter = Value('i', 0) # 'i' = integer
processes = [
Process(target=increment_counter, args=(counter, 1000))
for _ in range(4)
]
for p in processes:
p.start()
for p in processes:
p.join()
print(counter.value) # 4000SharedMemory (Python 3.8+)
Fast shared memory for numpy-like data:
from multiprocessing import shared_memory, Process
import numpy as np
def worker(name, size):
# Attach to existing shared memory
shm = shared_memory.SharedMemory(name=name)
arr = np.ndarray((size,), dtype=np.float64, buffer=shm.buf)
# Modify in place
arr *= 2
shm.close()
if __name__ == "__main__":
# Create shared memory
data = np.array([1.0, 2.0, 3.0, 4.0])
shm = shared_memory.SharedMemory(create=True, size=data.nbytes)
# Copy data to shared memory
arr = np.ndarray(data.shape, dtype=data.dtype, buffer=shm.buf)
arr[:] = data
p = Process(target=worker, args=(shm.name, len(data)))
p.start()
p.join()
print(arr) # [2. 4. 6. 8.]
shm.close()
shm.unlink()ProcessPoolExecutor
Higher-level interface matching concurrent.futures:
from concurrent.futures import ProcessPoolExecutor, as_completed
def heavy_computation(n):
return sum(i * i for i in range(n))
if __name__ == "__main__":
numbers = [1000000, 2000000, 3000000, 4000000]
with ProcessPoolExecutor(max_workers=4) as executor:
futures = {
executor.submit(heavy_computation, n): n
for n in numbers
}
for future in as_completed(futures):
n = futures[future]
result = future.result()
print(f"Sum of squares to {n}: {result}")Timeouts and Cancellation
from concurrent.futures import ProcessPoolExecutor, TimeoutError
import time
def slow_task(seconds):
time.sleep(seconds)
return f"Slept {seconds}s"
if __name__ == "__main__":
with ProcessPoolExecutor() as executor:
future = executor.submit(slow_task, 10)
try:
result = future.result(timeout=2)
except TimeoutError:
future.cancel()
print("Task timed out")Manager for Complex Shared State
from multiprocessing import Process, Manager
def update_dict(shared_dict, key, value):
shared_dict[key] = value
if __name__ == "__main__":
with Manager() as manager:
shared_dict = manager.dict()
shared_list = manager.list()
processes = [
Process(target=update_dict, args=(shared_dict, i, i*2))
for i in range(5)
]
for p in processes:
p.start()
for p in processes:
p.join()
print(dict(shared_dict)) # {0: 0, 1: 2, 2: 4, 3: 6, 4: 8}Real-World: Parallel Image Processing
from multiprocessing import Pool
from pathlib import Path
def process_image(path):
# Simulate CPU-intensive image processing
import hashlib
data = Path(path).read_bytes()
return {
"path": str(path),
"size": len(data),
"hash": hashlib.md5(data).hexdigest()
}
if __name__ == "__main__":
image_paths = list(Path("images").glob("*.jpg"))
with Pool() as pool: # Uses cpu_count() by default
results = pool.map(process_image, image_paths)
for r in results:
print(f"{r['path']}: {r['hash']}")Choosing the Right Tool
| Use Case | Tool |
|---|---|
| Simple parallelism | Pool.map() |
| Complex shared state | Manager |
| Numeric arrays | SharedMemory |
| Async patterns | ProcessPoolExecutor |
| Fine control | Process + Queue |
Multiprocessing adds overhead—process creation isn't free. Use it when computation time dominates, not for quick operations.
React to this post: