Limitations of GIL (Global Interpreter Lock)
The GIL in Python is a mechanism in the CPython interpreter that ensures only one thread executes Python bytecode at a time. This is a bottleneck for CPU-bound pure Python programs, as true parallel execution cannot be achieved even with multiple CPU cores.
Advantages of Multiprocessing
Using multiprocessing can bypass the GIL limitation since each process has its own independent Python interpreter and memory space, allowing for true parallel computation.
Methods for Implementing Multiprocessing
1. Using <span>os.fork()</span> (Unix/Linux Systems)
import os
import time
def child_process():
"""Task executed by the child process"""
print(f"Child process PID: {os.getpid()}, Parent process PID: {os.getppid()}")
for i in range(5):
print(f"Child process: Counting {i}")
time.sleep(1)
def parent_process():
"""Task executed by the parent process"""
print(f"Parent process PID: {os.getpid()}")
# Create child process
pid = os.fork()
if pid == 0:
# Child process
child_process()
os._exit(0) # Exit child process
else:
# Parent process
print(f"Parent process: Created child process, PID: {pid}")
# Wait for child process to finish
os.waitpid(pid, 0)
print("Parent process: Child process has ended")
if __name__ == "__main__":
parent_process()
2. Using <span>multiprocessing</span> Module (Cross-Platform)
import multiprocessing
import time
import os
def cpu_intensive_task(n, name):
"""CPU-bound task"""
print(f"Process {name} (PID: {os.getpid()}) starts calculating {n}")
result = 0
for i in range(n):
result += i * i
print(f"Process {name} completed calculation, result: {result}")
return result
def io_intensive_task(name, duration):
"""I/O-bound task"""
print(f"Process {name} starts I/O operation")
time.sleep(duration)
print(f"Process {name} completed I/O operation")
return f"{name} completed"
if __name__ == "__main__":
print(f"Main process PID: {os.getpid()}")
# Create processes
processes = []
# CPU-bound tasks
p1 = multiprocessing.Process(
target=cpu_intensive_task,
args=(10000000, "CPU Worker 1")
)
p2 = multiprocessing.Process(
target=cpu_intensive_task,
args=(15000000, "CPU Worker 2")
)
# I/O-bound task
p3 = multiprocessing.Process(
target=io_intensive_task,
args=("IO Worker 1", 3)
)
processes.extend([p1, p2, p3])
# Start all processes
start_time = time.time()
for p in processes:
p.start()
# Wait for all processes to complete
for p in processes:
p.join()
end_time = time.time()
print(f"All processes completed, total time: {end_time - start_time:.2f} seconds")
3. Using Process Pool <span>Pool</span>
import multiprocessing
import time
import math
def is_prime(n):
"""Check if a number is prime (computationally intensive task)"""
if n < 2:
return False
for i in range(2, int(math.sqrt(n)) + 1):
if n % i == 0:
return False
return True
def process_numbers(numbers):
"""Process a list of numbers"""
results = []
for n in numbers:
if is_prime(n):
results.append(n)
return results
if __name__ == "__main__":
# Create a large list of numbers for testing
numbers = list(range(100000, 200000))
# Split the list of numbers into 4 chunks
chunk_size = len(numbers) // 4
chunks = [
numbers[i:i + chunk_size]
for i in range(0, len(numbers), chunk_size)
]
print("Using single process...")
start_time = time.time()
single_result = process_numbers(numbers)
single_time = time.time() - start_time
print(f"Single process time: {single_time:.2f} seconds, found {len(single_result)} primes")
print("\nUsing multiprocessing pool...")
start_time = time.time()
# Create process pool
with multiprocessing.Pool(processes=4) as pool:
# Parallel processing
results = pool.map(process_numbers, chunks)
# Merge results
multi_result = []
for result in results:
multi_result.extend(result)
multi_time = time.time() - start_time
print(f"Multiprocessing time: {multi_time:.2f} seconds, found {len(multi_result)} primes")
print(f"Speedup ratio: {single_time / multi_time:.2f}x")
4. Inter-Process Communication
import multiprocessing
import time
import random
def producer(queue, name):
"""Producer process"""
for i in range(5):
item = f"{name} produced item {i}"
print(f"Producer {name}: {item}")
queue.put(item)
time.sleep(random.uniform(0.1, 0.5))
# Send end signal
queue.put(None)
def consumer(queue, name):
"""Consumer process"""
while True:
item = queue.get()
if item is None:
# Put None back to allow other consumers to end
queue.put(None)
break
print(f"Consumer {name}: Processing {item}")
time.sleep(random.uniform(0.2, 0.8))
print(f"Consumer {name}: Completed")
if __name__ == "__main__":
# Create a queue for inter-process communication
queue = multiprocessing.Queue()
# Create producer processes
producers = [
multiprocessing.Process(target=producer, args=(queue, f"P{i}"))
for i in range(2)
]
# Create consumer processes
consumers = [
multiprocessing.Process(target=consumer, args=(queue, f"C{i}"))
for i in range(3)
]
# Start all processes
for p in producers + consumers:
p.start()
# Wait for producers to complete
for p in producers:
p.join()
# Put end signal
queue.put(None)
# Wait for consumers to complete
for c in consumers:
c.join()
print("All processes completed")
5. Shared Memory and Process Locks
import multiprocessing
import time
def worker_with_lock(shared_value, lock, worker_id):
"""Worker function using a lock"""
for _ in range(5):
# Acquire lock
with lock:
current_value = shared_value.value
print(f"Worker {worker_id}: Read value {current_value}")
time.sleep(0.1) # Simulate processing time
new_value = current_value + 1
shared_value.value = new_value
print(f"Worker {worker_id}: Wrote value {new_value}")
def worker_without_lock(shared_value, worker_id):
"""Worker function without using a lock (may have race conditions)"""
for _ in range(5):
current_value = shared_value.value
print(f"Worker {worker_id} (no lock): Read value {current_value}")
time.sleep(0.1)
new_value = current_value + 1
shared_value.value = new_value
print(f"Worker {worker_id} (no lock): Wrote value {new_value}")
if __name__ == "__main__":
print("=== Version with lock ===")
# Create shared value and lock
shared_value = multiprocessing.Value('i', 0)
lock = multiprocessing.Lock()
processes = []
for i in range(3):
p = multiprocessing.Process(
target=worker_with_lock,
args=(shared_value, lock, i)
)
processes.append(p)
p.start()
for p in processes:
p.join()
print(f"Final value: {shared_value.value}")
print("\n=== Version without lock (may have race conditions) ===")
shared_value = multiprocessing.Value('i', 0)
processes = []
for i in range(3):
p = multiprocessing.Process(
target=worker_without_lock,
args=(shared_value, i)
)
processes.append(p)
p.start()
for p in processes:
p.join()
print(f"Final value: {shared_value.value} (may be incorrect)")
6. Practical Application: Parallel Image Processing
import multiprocessing
from PIL import Image, ImageFilter
import os
import time
def process_image(args):
"""Process a single image"""
input_path, output_path, filter_type = args
try:
# Open image
with Image.open(input_path) as img:
print(f"Processing image: {os.path.basename(input_path)}")
# Apply filter
if filter_type == 'blur':
processed = img.filter(ImageFilter.GaussianBlur(2))
elif filter_type == 'sharpen':
processed = img.filter(ImageFilter.SHARPEN)
elif filter_type == 'edges':
processed = img.filter(ImageFilter.FIND_EDGES)
else:
processed = img
# Save processed image
processed.save(output_path)
return True
except Exception as e:
print(f"Error processing {input_path}: {e}")
return False
def batch_process_images(input_dir, output_dir, filter_type, num_processes=4):
"""Batch process images"""
# Create output directory
os.makedirs(output_dir, exist_ok=True)
# Prepare task list
tasks = []
for filename in os.listdir(input_dir):
if filename.lower().endswith(('.png', '.jpg', '.jpeg')):
input_path = os.path.join(input_dir, filename)
output_path = os.path.join(output_dir, f"processed_{filename}")
tasks.append((input_path, output_path, filter_type))
if not tasks:
print("No image files found")
return
print(f"Found {len(tasks)} image files, processing with {num_processes} processes")
# Use process pool for parallel processing
start_time = time.time()
with multiprocessing.Pool(processes=num_processes) as pool:
results = pool.map(process_image, tasks)
end_time = time.time()
# Count results
successful = sum(results)
print(f"Processing completed: {successful}/{len(tasks)} successful")
print(f"Total time: {end_time - start_time:.2f} seconds")
if __name__ == "__main__":
# Prepare test images before actual use
# This is just to demonstrate code structure
print("Image processing example code")
# batch_process_images("input_images", "output_images", "blur", 4)
Best Practices for Multiprocessing Programming
-
Select the Appropriate Number of Processes:
- CPU-bound tasks: Number of processes ≈ Number of CPU cores
- I/O-bound tasks: More processes can be created
Avoid Shared State:
- Prefer message passing (queues) over shared memory
- If sharing is necessary, use appropriate synchronization mechanisms
Resource Management:
- Use
<span>with</span>statement to manage process pools - Close and clean up resources in a timely manner
Error Handling:
- Handle exceptions in child processes
- Set appropriate timeout mechanisms
Platform Compatibility:
- Use
<span>multiprocessing</span>instead of<span>os.fork</span>to ensure cross-platform compatibility - Be aware of differences in multiprocessing implementation between Windows and Unix systems
Conclusion
Multiprocessing is an effective way to bypass Python’s GIL limitation and achieve true parallel computation. By using multiprocessing wisely, the execution efficiency of CPU-bound tasks can be significantly improved. The key is to choose the appropriate inter-process communication mechanism based on the specific task type and to pay attention to resource management and error handling.