How to use multiprocessing in Python

онлайн тренажер по питону
Online Python Trainer for Beginners

Learn Python easily without overwhelming theory. Solve practical tasks with automatic checking, get hints in Russian, and write code directly in your browser — no installation required.

Start Course

Multiprocessing Module in Python: A Comprehensive Guide for Parallel Programming

The multiprocessing module in Python offers a robust toolkit for building parallel programs that can effectively leverage multi-core processors. Unlike standard multithreading, which is limited by the Global Interpreter Lock (GIL), multiprocessing allows you to run multiple processes concurrently, each with its own memory space and independent execution.

Key Principles of Multiprocessing

Multiprocessing in Python relies on creating separate processes that can run in parallel on different processor cores. Each process operates in an isolated memory space, eliminating shared data issues commonly found in multithreading.

Core Components of the multiprocessing Module:

  • Process: A class for creating and managing individual processes.
  • Pool: A pool of processes for efficiently executing multiple tasks.
  • Queue: Queues for safe data transfer between processes.
  • Lock: Synchronization mechanisms for controlling resource access.
  • Manager: Objects for managing shared data.

When to Use Multiprocessing

Multiprocessing is most effective in the following scenarios:

  • CPU-Intensive Tasks: Mathematical computations, image processing, data analysis, scientific calculations. These tasks benefit from parallel execution on different processor cores.

  • Large-Scale Data Processing: When you need to apply the same function to multiple elements, multiprocessing can significantly speed up execution.

  • Independent Tasks: Operations that don't require frequent data exchange between processes.

For I/O-intensive tasks (file operations, network requests), it's better to use asynchronous programming (asyncio) or multithreading.

Creating and Managing Processes

Basic Example with the Process Class

import multiprocessing
import time

def worker_task(name, duration):
    """Function to be executed in a separate process"""
    print(f"Process {name} started")
    time.sleep(duration)
    print(f"Process {name} finished")

if __name__ == "__main__":
    # Creating processes
    process1 = multiprocessing.Process(target=worker_task, args=("Worker-1", 3))
    process2 = multiprocessing.Process(target=worker_task, args=("Worker-2", 2))
    
    # Starting processes
    start_time = time.time()
    process1.start()
    process2.start()
    
    # Waiting for processes to complete
    process1.join()
    process2.join()
    
    end_time = time.time()
    print(f"Total execution time: {end_time - start_time:.2f} seconds")

Inheriting from the Process Class

import multiprocessing
import os

class CustomProcess(multiprocessing.Process):
    def __init__(self, name, work_count):
        super().__init__()
        self.name = name
        self.work_count = work_count
    
    def run(self):
        """Method executed in the process"""
        print(f"Process {self.name} (PID: {os.getpid()}) started")
        
        for i in range(self.work_count):
            print(f"{self.name}: performing task {i+1}")
            time.sleep(1)
        
        print(f"Process {self.name} finished")

if __name__ == "__main__":
    processes = []
    
    # Creating multiple processes
    for i in range(3):
        p = CustomProcess(f"Process-{i+1}", 3)
        processes.append(p)
        p.start()
    
    # Waiting for all processes to complete
    for p in processes:
        p.join()

Working with a Pool of Processes

Pool provides a convenient interface for managing a group of processes and automatically distributing tasks among them.

Main Methods of Pool

import multiprocessing
import time

def compute_square(number):
    """Calculating the square of a number"""
    time.sleep(0.1)  # Simulating computation
    return number ** 2

def compute_factorial(n):
    """Calculating the factorial"""
    if n <= 1:
        return 1
    result = 1
    for i in range(2, n + 1):
        result *= i
    return result

if __name__ == "__main__":
    numbers = list(range(1, 21))
    
    # Using map for parallel execution
    with multiprocessing.Pool(processes=4) as pool:
        print("Calculating squares:")
        squares = pool.map(compute_square, numbers)
        print(f"Squares: {squares[:10]}...")  # First 10 results
        
        print("\nCalculating factorials:")
        factorials = pool.map(compute_factorial, numbers[:10])
        print(f"Factorials: {factorials}")

Asynchronous Execution with Pool

import multiprocessing
import time

def long_task(task_id):
    """Long-running task"""
    print(f"Task {task_id} started")
    time.sleep(2)
    result = task_id * task_id
    print(f"Task {task_id} finished")
    return result

if __name__ == "__main__":
    with multiprocessing.Pool(processes=3) as pool:
        # Asynchronous task execution
        async_results = []
        
        for i in range(5):
            async_result = pool.apply_async(long_task, (i,))
            async_results.append(async_result)
        
        # Getting results
        results = []
        for async_result in async_results:
            result = async_result.get(timeout=10)
            results.append(result)
        
        print(f"Results: {results}")

Inter-Process Communication

Using Queue for Data Exchange

import multiprocessing
import time
import random

def producer(queue, producer_id):
    """Data producer process"""
    for i in range(5):
        data = f"Data from producer {producer_id}: item {i}"
        queue.put(data)
        print(f"Producer {producer_id} sent: {data}")
        time.sleep(random.uniform(0.1, 0.5))
    
    queue.put(None)  # Completion signal

def consumer(queue, consumer_id):
    """Data consumer process"""
    while True:
        try:
            data = queue.get(timeout=1)
            if data is None:
                break
            print(f"Consumer {consumer_id} received: {data}")
            time.sleep(random.uniform(0.1, 0.3))
        except:
            break

if __name__ == "__main__":
    queue = multiprocessing.Queue()
    
    # Creating processes
    producer_process = multiprocessing.Process(target=producer, args=(queue, 1))
    consumer_process = multiprocessing.Process(target=consumer, args=(queue, 1))
    
    # Starting processes
    producer_process.start()
    consumer_process.start()
    
    # Waiting for completion
    producer_process.join()
    consumer_process.join()

Using Pipe for Two-Way Communication

import multiprocessing
import time

def sender(conn):
    """Sending process"""
    messages = ["Message 1", "Message 2", "Message 3"]
    
    for msg in messages:
        conn.send(msg)
        print(f"Sent: {msg}")
        time.sleep(1)
    
    conn.close()

def receiver(conn):
    """Receiving process"""
    while True:
        try:
            msg = conn.recv()
            print(f"Received: {msg}")
        except EOFError:
            break
    
    conn.close()

if __name__ == "__main__":
    # Creating pipe
    parent_conn, child_conn = multiprocessing.Pipe()
    
    # Creating processes
    sender_process = multiprocessing.Process(target=sender, args=(child_conn,))
    receiver_process = multiprocessing.Process(target=receiver, args=(parent_conn,))
    
    # Starting processes
    sender_process.start()
    receiver_process.start()
    
    # Waiting for completion
    sender_process.join()
    receiver_process.join()

Process Synchronization

Using Lock for Mutual Exclusion

import multiprocessing
import time

def worker_with_lock(lock, shared_resource, worker_id):
    """Worker process using a lock"""
    for i in range(3):
        with lock:
            print(f"Process {worker_id} gained access to the resource")
            current_value = shared_resource.value
            time.sleep(0.1)  # Simulating resource work
            shared_resource.value = current_value + 1
            print(f"Process {worker_id} increased the value to {shared_resource.value}")
        
        time.sleep(0.1)  # Working without a lock

if __name__ == "__main__":
    lock = multiprocessing.Lock()
    shared_resource = multiprocessing.Value('i', 0)
    
    processes = []
    for i in range(4):
        p = multiprocessing.Process(
            target=worker_with_lock, 
            args=(lock, shared_resource, i)
        )
        processes.append(p)
        p.start()
    
    for p in processes:
        p.join()
    
    print(f"Final value: {shared_resource.value}")

Using Manager for Shared Objects

import multiprocessing
import time

def update_shared_dict(shared_dict, process_id):
    """Updating a shared dictionary"""
    for i in range(3):
        key = f"process_{process_id}_item_{i}"
        shared_dict[key] = f"Value from process {process_id}"
        print(f"Process {process_id} added {key}")
        time.sleep(0.1)

if __name__ == "__main__":
    with multiprocessing.Manager() as manager:
        shared_dict = manager.dict()
        
        processes = []
        for i in range(3):
            p = multiprocessing.Process(
                target=update_shared_dict, 
                args=(shared_dict, i)
            )
            processes.append(p)
            p.start()
        
        for p in processes:
            p.join()
        
        print("Contents of the shared dictionary:")
        for key, value in shared_dict.items():
            print(f"{key}: {value}")

Practical Examples of Applications

Parallel Image Processing

import multiprocessing
from PIL import Image
import os

def process_image(image_path):
    """Processing a single image"""
    try:
        with Image.open(image_path) as img:
            # Resizing
            img_resized = img.resize((800, 600))
            
            # Creating a name for the processed file
            name, ext = os.path.splitext(image_path)
            output_path = f"{name}_processed{ext}"
            
            # Saving the processed image
            img_resized.save(output_path)
            
            return f"Processed: {image_path} -> {output_path}"
    except Exception as e:
        return f"Error processing {image_path}: {str(e)}"

def batch_process_images(image_paths, num_processes=4):
    """Batch image processing"""
    with multiprocessing.Pool(processes=num_processes) as pool:
        results = pool.map(process_image, image_paths)
    
    return results

if __name__ == "__main__":
    # List of image paths
    image_paths = ["image1.jpg", "image2.jpg", "image3.jpg"]
    
    # Processing images
    results = batch_process_images(image_paths)
    
    for result in results:
        print(result)

Parallel Computation with Big Data

import multiprocessing
import numpy as np
import time

def compute_matrix_chunk(args):
    """Calculating a part of the matrix"""
    start_row, end_row, size = args
    
    # Creating a part of the matrix
    chunk = np.zeros((end_row - start_row, size))
    
    for i in range(start_row, end_row):
        for j in range(size):
            # Complex calculations
            chunk[i - start_row, j] = np.sin(i * j) * np.cos(i + j)
    
    return start_row, chunk

def parallel_matrix_computation(matrix_size, num_processes=4):
    """Parallel matrix computation"""
    # Dividing work among processes
    chunk_size = matrix_size // num_processes
    tasks = []
    
    for i in range(num_processes):
        start_row = i * chunk_size
        end_row = (i + 1) * chunk_size if i < num_processes - 1 else matrix_size
        tasks.append((start_row, end_row, matrix_size))
    
    # Performing calculations
    with multiprocessing.Pool(processes=num_processes) as pool:
        results = pool.map(compute_matrix_chunk, tasks)
    
    # Assembling results
    final_matrix = np.zeros((matrix_size, matrix_size))
    for start_row, chunk in results:
        end_row = start_row + chunk.shape[0]
        final_matrix[start_row:end_row, :] = chunk
    
    return final_matrix

if __name__ == "__main__":
    matrix_size = 1000
    
    start_time = time.time()
    result_matrix = parallel_matrix_computation(matrix_size, num_processes=4)
    end_time = time.time()
    
    print(f"Calculation of matrix {matrix_size}x{matrix_size} completed")
    print(f"Execution time: {end_time - start_time:.2f} seconds")
    print(f"Size of the resulting matrix: {result_matrix.shape}")

Error Handling and Debugging

Handling Exceptions in Processes

import multiprocessing
import traceback

def risky_task(task_id):
    """A task that might cause an error"""
    try:
        if task_id == 2:
            raise ValueError(f"Error in task {task_id}")
        
        result = task_id * 2
        return f"Task {task_id} completed successfully: {result}"
    
    except Exception as e:
        # Returning error information
        return f"Error in task {task_id}: {str(e)}\n{traceback.format_exc()}"

def error_callback(error):
    """Callback for handling errors"""
    print(f"An error occurred: {error}")

if __name__ == "__main__":
    with multiprocessing.Pool(processes=3) as pool:
        tasks = range(5)
        
        # Using apply_async with error handling
        results = []
        for task_id in tasks:
            result = pool.apply_async(
                risky_task, 
                (task_id,), 
                error_callback=error_callback
            )
            results.append(result)
        
        # Getting results
        for i, result in enumerate(results):
            try:
                output = result.get(timeout=5)
                print(f"Result {i}: {output}")
            except Exception as e:
                print(f"Failed to get result {i}: {e}")

Performance Optimization

Choosing the Optimal Number of Processes

import multiprocessing
import time
import psutil

def cpu_intensive_task(n):
    """CPU-intensive task"""
    result = 0
    for i in range(n):
        result += i ** 2
    return result

def benchmark_multiprocessing():
    """Benchmarking performance with different numbers of processes"""
    task_size = 1000000
    tasks = [task_size] * 20
    
    cpu_count = multiprocessing.cpu_count()
    print(f"Number of CPU cores: {cpu_count}")
    
    results = {}
    
    for num_processes in [1, 2, 4, cpu_count, cpu_count * 2]:
        print(f"\nTesting with {num_processes} processes:")
        
        start_time = time.time()
        
        if num_processes == 1:
            # Sequential execution
            results[num_processes] = [cpu_intensive_task(task) for task in tasks]
        else:
            # Parallel execution
            with multiprocessing.Pool(processes=num_processes) as pool:
                results[num_processes] = pool.map(cpu_intensive_task, tasks)
        
        end_time = time.time()
        execution_time = end_time - start_time
        
        print(f"Execution time: {execution_time:.2f} seconds")
        print(f"Acceleration: {results[1] / execution_time:.2f}x" if num_processes > 1 else "Baseline time")

if __name__ == "__main__":
    benchmark_multiprocessing()

Operating System Specifics

Cross-Platform Compatibility

import multiprocessing
import os
import sys

def cross_platform_worker(data):
    """Worker function compatible with different OS"""
    process_id = os.getpid()
    platform = sys.platform
    
    print(f"Process {process_id} on {platform} is processing: {data}")
    
    # Simulating work
    result = data * 2
    
    return {
        'data': data,
        'result': result,
        'process_id': process_id,
        'platform': platform
    }

def main():
    """Main function with entry point protection"""
    print(f"Running on platform: {sys.platform}")
    print(f"Number of CPUs: {multiprocessing.cpu_count()}")
    
    # Data for processing
    data = list(range(10))
    
    # Using context manager for Pool
    with multiprocessing.Pool() as pool:
        results = pool.map(cross_platform_worker, data)
    
    # Outputting results
    for result in results:
        print(f"Data: {result['data']}, Result: {result['result']}, "
              f"PID: {result['process_id']}, Platform: {result['platform']}")

if __name__ == "__main__":
    # Important: entry point protection is necessary for Windows
    main()

Best Practices and Recommendations

Resource Management

import multiprocessing
import time
import signal
import sys

class ProcessManager:
    """Manager for managing processes"""
    
    def __init__(self, max_processes=4):
        self.max_processes = max_processes
        self.processes = []
        self.shutdown_event = multiprocessing.Event()
        
        # Signal handler for proper termination
        signal.signal(signal.SIGINT, self.signal_handler)
        signal.signal(signal.SIGTERM, self.signal_handler)
    
    def signal_handler(self, signum, frame):
        """Signal handler for termination"""
        print(f"\nReceived signal {signum}. Terminating processes...")
        self.shutdown()
        sys.exit(0)
    
    def worker(self, worker_id, task_queue, result_queue):
        """Worker process"""
        print(f"Process {worker_id} started")
        
        while not self.shutdown_event.is_set():
            try:
                # Getting task with timeout
                task = task_queue.get(timeout=1)
                
                if task is None:  # Termination signal
                    break
                
                # Performing task
                result = self.process_task(task)
                result_queue.put(result)
                
            except:
                continue
        
        print(f"Process {worker_id} terminated")
    
    def process_task(self, task):
        """Processing task"""
        # Simulating work
        time.sleep(0.1)
        return task * 2
    
    def start_workers(self, tasks):
        """Starting worker processes"""
        task_queue = multiprocessing.Queue()
        result_queue = multiprocessing.Queue()
        
        # Adding tasks to queue
        for task in tasks:
            task_queue.put(task)
        
        # Creating and starting processes
        for i in range(self.max_processes):
            p = multiprocessing.Process(
                target=self.worker,
                args=(i, task_queue, result_queue)
            )
            p.start()
            self.processes.append(p)
        
        # Collecting results
        results = []
        for _ in tasks:
            try:
                result = result_queue.get(timeout=5)
                results.append(result)
            except:
                break
        
        # Terminating processes
        for _ in self.processes:
            task_queue.put(None)
        
        return results
    
    def shutdown(self):
        """Properly terminating all processes"""
        self.shutdown_event.set()
        
        for p in self.processes:
            if p.is_alive():
                p.terminate()
                p.join(timeout=1)
                
                if p.is_alive():
                    p.kill()

if __name__ == "__main__":
    manager = ProcessManager(max_processes=3)
    
    try:
        tasks = list(range(20))
        results = manager.start_workers(tasks)
        
        print(f"Processed {len(results)} tasks")
        print(f"Results: {results}")
        
    finally:
        manager.shutdown()

The multiprocessing module provides powerful capabilities for creating efficient parallel programs in Python. Correctly using its components can significantly accelerate the execution of CPU-intensive tasks, effectively utilize the resources of multi-core systems, and create scalable applications.

When working with multiprocessing, it's important to consider the specifics of inter-process communication, properly manage resources, and handle errors. The choice between different approaches (Process, Pool, Queue) should be based on the specifics of the problem being solved and the performance requirements.

News