A Comprehensive Guide to Python Multiprocessing Programming

Limitations of GIL (Global Interpreter Lock)

The GIL in Python is a mechanism in the CPython interpreter that ensures only one thread executes Python bytecode at a time. This is a bottleneck for CPU-bound pure Python programs, as true parallel execution cannot be achieved even with multiple CPU cores.

Advantages of Multiprocessing

Using multiprocessing can bypass the GIL limitation since each process has its own independent Python interpreter and memory space, allowing for true parallel computation.

Methods for Implementing Multiprocessing

1. Using <span>os.fork()</span> (Unix/Linux Systems)

import os
import time

def child_process():
    """Task executed by the child process"""
    print(f"Child process PID: {os.getpid()}, Parent process PID: {os.getppid()}")
    for i in range(5):
        print(f"Child process: Counting {i}")
        time.sleep(1)

def parent_process():
    """Task executed by the parent process"""
    print(f"Parent process PID: {os.getpid()}")
    
    # Create child process
    pid = os.fork()
    
    if pid == 0:
        # Child process
        child_process()
        os._exit(0)  # Exit child process
    else:
        # Parent process
        print(f"Parent process: Created child process, PID: {pid}")
        # Wait for child process to finish
        os.waitpid(pid, 0)
        print("Parent process: Child process has ended")

if __name__ == "__main__":
    parent_process()

2. Using <span>multiprocessing</span> Module (Cross-Platform)

import multiprocessing
import time
import os

def cpu_intensive_task(n, name):
    """CPU-bound task"""
    print(f"Process {name} (PID: {os.getpid()}) starts calculating {n}")
    result = 0
    for i in range(n):
        result += i * i
    print(f"Process {name} completed calculation, result: {result}")
    return result

def io_intensive_task(name, duration):
    """I/O-bound task"""
    print(f"Process {name} starts I/O operation")
    time.sleep(duration)
    print(f"Process {name} completed I/O operation")
    return f"{name} completed"

if __name__ == "__main__":
    print(f"Main process PID: {os.getpid()}")
    
    # Create processes
    processes = []
    
    # CPU-bound tasks
    p1 = multiprocessing.Process(
        target=cpu_intensive_task, 
        args=(10000000, "CPU Worker 1")
    )
    
    p2 = multiprocessing.Process(
        target=cpu_intensive_task, 
        args=(15000000, "CPU Worker 2")
    )
    
    # I/O-bound task
    p3 = multiprocessing.Process(
        target=io_intensive_task, 
        args=("IO Worker 1", 3)
    )
    
    processes.extend([p1, p2, p3])
    
    # Start all processes
    start_time = time.time()
    for p in processes:
        p.start()
    
    # Wait for all processes to complete
    for p in processes:
        p.join()
    
    end_time = time.time()
    print(f"All processes completed, total time: {end_time - start_time:.2f} seconds")

3. Using Process Pool <span>Pool</span>

import multiprocessing
import time
import math

def is_prime(n):
    """Check if a number is prime (computationally intensive task)"""
    if n < 2:
        return False
    for i in range(2, int(math.sqrt(n)) + 1):
        if n % i == 0:
            return False
    return True

def process_numbers(numbers):
    """Process a list of numbers"""
    results = []
    for n in numbers:
        if is_prime(n):
            results.append(n)
    return results

if __name__ == "__main__":
    # Create a large list of numbers for testing
    numbers = list(range(100000, 200000))
    
    # Split the list of numbers into 4 chunks
    chunk_size = len(numbers) // 4
    chunks = [
        numbers[i:i + chunk_size] 
        for i in range(0, len(numbers), chunk_size)
    ]
    
    print("Using single process...")
    start_time = time.time()
    single_result = process_numbers(numbers)
    single_time = time.time() - start_time
    print(f"Single process time: {single_time:.2f} seconds, found {len(single_result)} primes")
    
    print("\nUsing multiprocessing pool...")
    start_time = time.time()
    
    # Create process pool
    with multiprocessing.Pool(processes=4) as pool:
        # Parallel processing
        results = pool.map(process_numbers, chunks)
        
        # Merge results
        multi_result = []
        for result in results:
            multi_result.extend(result)
    
    multi_time = time.time() - start_time
    print(f"Multiprocessing time: {multi_time:.2f} seconds, found {len(multi_result)} primes")
    print(f"Speedup ratio: {single_time / multi_time:.2f}x")

4. Inter-Process Communication

import multiprocessing
import time
import random

def producer(queue, name):
    """Producer process"""
    for i in range(5):
        item = f"{name} produced item {i}"
        print(f"Producer {name}: {item}")
        queue.put(item)
        time.sleep(random.uniform(0.1, 0.5))
    # Send end signal
    queue.put(None)

def consumer(queue, name):
    """Consumer process"""
    while True:
        item = queue.get()
        if item is None:
            # Put None back to allow other consumers to end
            queue.put(None)
            break
        print(f"Consumer {name}: Processing {item}")
        time.sleep(random.uniform(0.2, 0.8))
    print(f"Consumer {name}: Completed")

if __name__ == "__main__":
    # Create a queue for inter-process communication
    queue = multiprocessing.Queue()
    
    # Create producer processes
    producers = [
        multiprocessing.Process(target=producer, args=(queue, f"P{i}"))
        for i in range(2)
    ]
    
    # Create consumer processes
    consumers = [
        multiprocessing.Process(target=consumer, args=(queue, f"C{i}"))
        for i in range(3)
    ]
    
    # Start all processes
    for p in producers + consumers:
        p.start()
    
    # Wait for producers to complete
    for p in producers:
        p.join()
    
    # Put end signal
    queue.put(None)
    
    # Wait for consumers to complete
    for c in consumers:
        c.join()
    
    print("All processes completed")

5. Shared Memory and Process Locks

import multiprocessing
import time

def worker_with_lock(shared_value, lock, worker_id):
    """Worker function using a lock"""
    for _ in range(5):
        # Acquire lock
        with lock:
            current_value = shared_value.value
            print(f"Worker {worker_id}: Read value {current_value}")
            time.sleep(0.1)  # Simulate processing time
            new_value = current_value + 1
            shared_value.value = new_value
            print(f"Worker {worker_id}: Wrote value {new_value}")

def worker_without_lock(shared_value, worker_id):
    """Worker function without using a lock (may have race conditions)"""
    for _ in range(5):
        current_value = shared_value.value
        print(f"Worker {worker_id} (no lock): Read value {current_value}")
        time.sleep(0.1)
        new_value = current_value + 1
        shared_value.value = new_value
        print(f"Worker {worker_id} (no lock): Wrote value {new_value}")

if __name__ == "__main__":
    print("=== Version with lock ===")
    # Create shared value and lock
    shared_value = multiprocessing.Value('i', 0)
    lock = multiprocessing.Lock()
    
    processes = []
    for i in range(3):
        p = multiprocessing.Process(
            target=worker_with_lock, 
            args=(shared_value, lock, i)
        )
        processes.append(p)
        p.start()
    
    for p in processes:
        p.join()
    
    print(f"Final value: {shared_value.value}")
    
    print("\n=== Version without lock (may have race conditions) ===")
    shared_value = multiprocessing.Value('i', 0)
    
    processes = []
    for i in range(3):
        p = multiprocessing.Process(
            target=worker_without_lock, 
            args=(shared_value, i)
        )
        processes.append(p)
        p.start()
    
    for p in processes:
        p.join()
    
    print(f"Final value: {shared_value.value} (may be incorrect)")

6. Practical Application: Parallel Image Processing

import multiprocessing
from PIL import Image, ImageFilter
import os
import time

def process_image(args):
    """Process a single image"""
    input_path, output_path, filter_type = args
    
    try:
        # Open image
        with Image.open(input_path) as img:
            print(f"Processing image: {os.path.basename(input_path)}")
            
            # Apply filter
            if filter_type == 'blur':
                processed = img.filter(ImageFilter.GaussianBlur(2))
            elif filter_type == 'sharpen':
                processed = img.filter(ImageFilter.SHARPEN)
            elif filter_type == 'edges':
                processed = img.filter(ImageFilter.FIND_EDGES)
            else:
                processed = img
            
            # Save processed image
            processed.save(output_path)
            return True
    except Exception as e:
        print(f"Error processing {input_path}: {e}")
        return False

def batch_process_images(input_dir, output_dir, filter_type, num_processes=4):
    """Batch process images"""
    # Create output directory
    os.makedirs(output_dir, exist_ok=True)
    
    # Prepare task list
    tasks = []
    for filename in os.listdir(input_dir):
        if filename.lower().endswith(('.png', '.jpg', '.jpeg')):
            input_path = os.path.join(input_dir, filename)
            output_path = os.path.join(output_dir, f"processed_{filename}")
            tasks.append((input_path, output_path, filter_type))
    
    if not tasks:
        print("No image files found")
        return
    
    print(f"Found {len(tasks)} image files, processing with {num_processes} processes")
    
    # Use process pool for parallel processing
    start_time = time.time()
    
    with multiprocessing.Pool(processes=num_processes) as pool:
        results = pool.map(process_image, tasks)
    
    end_time = time.time()
    
    # Count results
    successful = sum(results)
    print(f"Processing completed: {successful}/{len(tasks)} successful")
    print(f"Total time: {end_time - start_time:.2f} seconds")

if __name__ == "__main__":
    # Prepare test images before actual use
    # This is just to demonstrate code structure
    print("Image processing example code")
    # batch_process_images("input_images", "output_images", "blur", 4)

Best Practices for Multiprocessing Programming

  1. Select the Appropriate Number of Processes:

  • CPU-bound tasks: Number of processes ≈ Number of CPU cores
  • I/O-bound tasks: More processes can be created
  • Avoid Shared State:

    • Prefer message passing (queues) over shared memory
    • If sharing is necessary, use appropriate synchronization mechanisms
  • Resource Management:

    • Use <span>with</span> statement to manage process pools
    • Close and clean up resources in a timely manner
  • Error Handling:

    • Handle exceptions in child processes
    • Set appropriate timeout mechanisms
  • Platform Compatibility:

    • Use <span>multiprocessing</span> instead of <span>os.fork</span> to ensure cross-platform compatibility
    • Be aware of differences in multiprocessing implementation between Windows and Unix systems

    Conclusion

    Multiprocessing is an effective way to bypass Python’s GIL limitation and achieve true parallel computation. By using multiprocessing wisely, the execution efficiency of CPU-bound tasks can be significantly improved. The key is to choose the appropriate inter-process communication mechanism based on the specific task type and to pay attention to resource management and error handling.

    Leave a Comment