Multiprocessing Module in Python: A Comprehensive Guide for Parallel Programming
The multiprocessing module in Python offers a robust toolkit for building parallel programs that can effectively leverage multi-core processors. Unlike standard multithreading, which is limited by the Global Interpreter Lock (GIL), multiprocessing allows you to run multiple processes concurrently, each with its own memory space and independent execution.
Key Principles of Multiprocessing
Multiprocessing in Python relies on creating separate processes that can run in parallel on different processor cores. Each process operates in an isolated memory space, eliminating shared data issues commonly found in multithreading.
Core Components of the multiprocessing Module:
- Process: A class for creating and managing individual processes.
- Pool: A pool of processes for efficiently executing multiple tasks.
- Queue: Queues for safe data transfer between processes.
- Lock: Synchronization mechanisms for controlling resource access.
- Manager: Objects for managing shared data.
When to Use Multiprocessing
Multiprocessing is most effective in the following scenarios:
-
CPU-Intensive Tasks: Mathematical computations, image processing, data analysis, scientific calculations. These tasks benefit from parallel execution on different processor cores.
-
Large-Scale Data Processing: When you need to apply the same function to multiple elements, multiprocessing can significantly speed up execution.
-
Independent Tasks: Operations that don't require frequent data exchange between processes.
For I/O-intensive tasks (file operations, network requests), it's better to use asynchronous programming (asyncio) or multithreading.
Creating and Managing Processes
Basic Example with the Process Class
import multiprocessing
import time
def worker_task(name, duration):
"""Function to be executed in a separate process"""
print(f"Process {name} started")
time.sleep(duration)
print(f"Process {name} finished")
if __name__ == "__main__":
# Creating processes
process1 = multiprocessing.Process(target=worker_task, args=("Worker-1", 3))
process2 = multiprocessing.Process(target=worker_task, args=("Worker-2", 2))
# Starting processes
start_time = time.time()
process1.start()
process2.start()
# Waiting for processes to complete
process1.join()
process2.join()
end_time = time.time()
print(f"Total execution time: {end_time - start_time:.2f} seconds")
Inheriting from the Process Class
import multiprocessing
import os
class CustomProcess(multiprocessing.Process):
def __init__(self, name, work_count):
super().__init__()
self.name = name
self.work_count = work_count
def run(self):
"""Method executed in the process"""
print(f"Process {self.name} (PID: {os.getpid()}) started")
for i in range(self.work_count):
print(f"{self.name}: performing task {i+1}")
time.sleep(1)
print(f"Process {self.name} finished")
if __name__ == "__main__":
processes = []
# Creating multiple processes
for i in range(3):
p = CustomProcess(f"Process-{i+1}", 3)
processes.append(p)
p.start()
# Waiting for all processes to complete
for p in processes:
p.join()
Working with a Pool of Processes
Pool provides a convenient interface for managing a group of processes and automatically distributing tasks among them.
Main Methods of Pool
import multiprocessing
import time
def compute_square(number):
"""Calculating the square of a number"""
time.sleep(0.1) # Simulating computation
return number ** 2
def compute_factorial(n):
"""Calculating the factorial"""
if n <= 1:
return 1
result = 1
for i in range(2, n + 1):
result *= i
return result
if __name__ == "__main__":
numbers = list(range(1, 21))
# Using map for parallel execution
with multiprocessing.Pool(processes=4) as pool:
print("Calculating squares:")
squares = pool.map(compute_square, numbers)
print(f"Squares: {squares[:10]}...") # First 10 results
print("\nCalculating factorials:")
factorials = pool.map(compute_factorial, numbers[:10])
print(f"Factorials: {factorials}")
Asynchronous Execution with Pool
import multiprocessing
import time
def long_task(task_id):
"""Long-running task"""
print(f"Task {task_id} started")
time.sleep(2)
result = task_id * task_id
print(f"Task {task_id} finished")
return result
if __name__ == "__main__":
with multiprocessing.Pool(processes=3) as pool:
# Asynchronous task execution
async_results = []
for i in range(5):
async_result = pool.apply_async(long_task, (i,))
async_results.append(async_result)
# Getting results
results = []
for async_result in async_results:
result = async_result.get(timeout=10)
results.append(result)
print(f"Results: {results}")
Inter-Process Communication
Using Queue for Data Exchange
import multiprocessing
import time
import random
def producer(queue, producer_id):
"""Data producer process"""
for i in range(5):
data = f"Data from producer {producer_id}: item {i}"
queue.put(data)
print(f"Producer {producer_id} sent: {data}")
time.sleep(random.uniform(0.1, 0.5))
queue.put(None) # Completion signal
def consumer(queue, consumer_id):
"""Data consumer process"""
while True:
try:
data = queue.get(timeout=1)
if data is None:
break
print(f"Consumer {consumer_id} received: {data}")
time.sleep(random.uniform(0.1, 0.3))
except:
break
if __name__ == "__main__":
queue = multiprocessing.Queue()
# Creating processes
producer_process = multiprocessing.Process(target=producer, args=(queue, 1))
consumer_process = multiprocessing.Process(target=consumer, args=(queue, 1))
# Starting processes
producer_process.start()
consumer_process.start()
# Waiting for completion
producer_process.join()
consumer_process.join()
Using Pipe for Two-Way Communication
import multiprocessing
import time
def sender(conn):
"""Sending process"""
messages = ["Message 1", "Message 2", "Message 3"]
for msg in messages:
conn.send(msg)
print(f"Sent: {msg}")
time.sleep(1)
conn.close()
def receiver(conn):
"""Receiving process"""
while True:
try:
msg = conn.recv()
print(f"Received: {msg}")
except EOFError:
break
conn.close()
if __name__ == "__main__":
# Creating pipe
parent_conn, child_conn = multiprocessing.Pipe()
# Creating processes
sender_process = multiprocessing.Process(target=sender, args=(child_conn,))
receiver_process = multiprocessing.Process(target=receiver, args=(parent_conn,))
# Starting processes
sender_process.start()
receiver_process.start()
# Waiting for completion
sender_process.join()
receiver_process.join()
Process Synchronization
Using Lock for Mutual Exclusion
import multiprocessing
import time
def worker_with_lock(lock, shared_resource, worker_id):
"""Worker process using a lock"""
for i in range(3):
with lock:
print(f"Process {worker_id} gained access to the resource")
current_value = shared_resource.value
time.sleep(0.1) # Simulating resource work
shared_resource.value = current_value + 1
print(f"Process {worker_id} increased the value to {shared_resource.value}")
time.sleep(0.1) # Working without a lock
if __name__ == "__main__":
lock = multiprocessing.Lock()
shared_resource = multiprocessing.Value('i', 0)
processes = []
for i in range(4):
p = multiprocessing.Process(
target=worker_with_lock,
args=(lock, shared_resource, i)
)
processes.append(p)
p.start()
for p in processes:
p.join()
print(f"Final value: {shared_resource.value}")
Using Manager for Shared Objects
import multiprocessing
import time
def update_shared_dict(shared_dict, process_id):
"""Updating a shared dictionary"""
for i in range(3):
key = f"process_{process_id}_item_{i}"
shared_dict[key] = f"Value from process {process_id}"
print(f"Process {process_id} added {key}")
time.sleep(0.1)
if __name__ == "__main__":
with multiprocessing.Manager() as manager:
shared_dict = manager.dict()
processes = []
for i in range(3):
p = multiprocessing.Process(
target=update_shared_dict,
args=(shared_dict, i)
)
processes.append(p)
p.start()
for p in processes:
p.join()
print("Contents of the shared dictionary:")
for key, value in shared_dict.items():
print(f"{key}: {value}")
Practical Examples of Applications
Parallel Image Processing
import multiprocessing
from PIL import Image
import os
def process_image(image_path):
"""Processing a single image"""
try:
with Image.open(image_path) as img:
# Resizing
img_resized = img.resize((800, 600))
# Creating a name for the processed file
name, ext = os.path.splitext(image_path)
output_path = f"{name}_processed{ext}"
# Saving the processed image
img_resized.save(output_path)
return f"Processed: {image_path} -> {output_path}"
except Exception as e:
return f"Error processing {image_path}: {str(e)}"
def batch_process_images(image_paths, num_processes=4):
"""Batch image processing"""
with multiprocessing.Pool(processes=num_processes) as pool:
results = pool.map(process_image, image_paths)
return results
if __name__ == "__main__":
# List of image paths
image_paths = ["image1.jpg", "image2.jpg", "image3.jpg"]
# Processing images
results = batch_process_images(image_paths)
for result in results:
print(result)
Parallel Computation with Big Data
import multiprocessing
import numpy as np
import time
def compute_matrix_chunk(args):
"""Calculating a part of the matrix"""
start_row, end_row, size = args
# Creating a part of the matrix
chunk = np.zeros((end_row - start_row, size))
for i in range(start_row, end_row):
for j in range(size):
# Complex calculations
chunk[i - start_row, j] = np.sin(i * j) * np.cos(i + j)
return start_row, chunk
def parallel_matrix_computation(matrix_size, num_processes=4):
"""Parallel matrix computation"""
# Dividing work among processes
chunk_size = matrix_size // num_processes
tasks = []
for i in range(num_processes):
start_row = i * chunk_size
end_row = (i + 1) * chunk_size if i < num_processes - 1 else matrix_size
tasks.append((start_row, end_row, matrix_size))
# Performing calculations
with multiprocessing.Pool(processes=num_processes) as pool:
results = pool.map(compute_matrix_chunk, tasks)
# Assembling results
final_matrix = np.zeros((matrix_size, matrix_size))
for start_row, chunk in results:
end_row = start_row + chunk.shape[0]
final_matrix[start_row:end_row, :] = chunk
return final_matrix
if __name__ == "__main__":
matrix_size = 1000
start_time = time.time()
result_matrix = parallel_matrix_computation(matrix_size, num_processes=4)
end_time = time.time()
print(f"Calculation of matrix {matrix_size}x{matrix_size} completed")
print(f"Execution time: {end_time - start_time:.2f} seconds")
print(f"Size of the resulting matrix: {result_matrix.shape}")
Error Handling and Debugging
Handling Exceptions in Processes
import multiprocessing
import traceback
def risky_task(task_id):
"""A task that might cause an error"""
try:
if task_id == 2:
raise ValueError(f"Error in task {task_id}")
result = task_id * 2
return f"Task {task_id} completed successfully: {result}"
except Exception as e:
# Returning error information
return f"Error in task {task_id}: {str(e)}\n{traceback.format_exc()}"
def error_callback(error):
"""Callback for handling errors"""
print(f"An error occurred: {error}")
if __name__ == "__main__":
with multiprocessing.Pool(processes=3) as pool:
tasks = range(5)
# Using apply_async with error handling
results = []
for task_id in tasks:
result = pool.apply_async(
risky_task,
(task_id,),
error_callback=error_callback
)
results.append(result)
# Getting results
for i, result in enumerate(results):
try:
output = result.get(timeout=5)
print(f"Result {i}: {output}")
except Exception as e:
print(f"Failed to get result {i}: {e}")
Performance Optimization
Choosing the Optimal Number of Processes
import multiprocessing
import time
import psutil
def cpu_intensive_task(n):
"""CPU-intensive task"""
result = 0
for i in range(n):
result += i ** 2
return result
def benchmark_multiprocessing():
"""Benchmarking performance with different numbers of processes"""
task_size = 1000000
tasks = [task_size] * 20
cpu_count = multiprocessing.cpu_count()
print(f"Number of CPU cores: {cpu_count}")
results = {}
for num_processes in [1, 2, 4, cpu_count, cpu_count * 2]:
print(f"\nTesting with {num_processes} processes:")
start_time = time.time()
if num_processes == 1:
# Sequential execution
results[num_processes] = [cpu_intensive_task(task) for task in tasks]
else:
# Parallel execution
with multiprocessing.Pool(processes=num_processes) as pool:
results[num_processes] = pool.map(cpu_intensive_task, tasks)
end_time = time.time()
execution_time = end_time - start_time
print(f"Execution time: {execution_time:.2f} seconds")
print(f"Acceleration: {results[1] / execution_time:.2f}x" if num_processes > 1 else "Baseline time")
if __name__ == "__main__":
benchmark_multiprocessing()
Operating System Specifics
Cross-Platform Compatibility
import multiprocessing
import os
import sys
def cross_platform_worker(data):
"""Worker function compatible with different OS"""
process_id = os.getpid()
platform = sys.platform
print(f"Process {process_id} on {platform} is processing: {data}")
# Simulating work
result = data * 2
return {
'data': data,
'result': result,
'process_id': process_id,
'platform': platform
}
def main():
"""Main function with entry point protection"""
print(f"Running on platform: {sys.platform}")
print(f"Number of CPUs: {multiprocessing.cpu_count()}")
# Data for processing
data = list(range(10))
# Using context manager for Pool
with multiprocessing.Pool() as pool:
results = pool.map(cross_platform_worker, data)
# Outputting results
for result in results:
print(f"Data: {result['data']}, Result: {result['result']}, "
f"PID: {result['process_id']}, Platform: {result['platform']}")
if __name__ == "__main__":
# Important: entry point protection is necessary for Windows
main()
Best Practices and Recommendations
Resource Management
import multiprocessing
import time
import signal
import sys
class ProcessManager:
"""Manager for managing processes"""
def __init__(self, max_processes=4):
self.max_processes = max_processes
self.processes = []
self.shutdown_event = multiprocessing.Event()
# Signal handler for proper termination
signal.signal(signal.SIGINT, self.signal_handler)
signal.signal(signal.SIGTERM, self.signal_handler)
def signal_handler(self, signum, frame):
"""Signal handler for termination"""
print(f"\nReceived signal {signum}. Terminating processes...")
self.shutdown()
sys.exit(0)
def worker(self, worker_id, task_queue, result_queue):
"""Worker process"""
print(f"Process {worker_id} started")
while not self.shutdown_event.is_set():
try:
# Getting task with timeout
task = task_queue.get(timeout=1)
if task is None: # Termination signal
break
# Performing task
result = self.process_task(task)
result_queue.put(result)
except:
continue
print(f"Process {worker_id} terminated")
def process_task(self, task):
"""Processing task"""
# Simulating work
time.sleep(0.1)
return task * 2
def start_workers(self, tasks):
"""Starting worker processes"""
task_queue = multiprocessing.Queue()
result_queue = multiprocessing.Queue()
# Adding tasks to queue
for task in tasks:
task_queue.put(task)
# Creating and starting processes
for i in range(self.max_processes):
p = multiprocessing.Process(
target=self.worker,
args=(i, task_queue, result_queue)
)
p.start()
self.processes.append(p)
# Collecting results
results = []
for _ in tasks:
try:
result = result_queue.get(timeout=5)
results.append(result)
except:
break
# Terminating processes
for _ in self.processes:
task_queue.put(None)
return results
def shutdown(self):
"""Properly terminating all processes"""
self.shutdown_event.set()
for p in self.processes:
if p.is_alive():
p.terminate()
p.join(timeout=1)
if p.is_alive():
p.kill()
if __name__ == "__main__":
manager = ProcessManager(max_processes=3)
try:
tasks = list(range(20))
results = manager.start_workers(tasks)
print(f"Processed {len(results)} tasks")
print(f"Results: {results}")
finally:
manager.shutdown()
The multiprocessing module provides powerful capabilities for creating efficient parallel programs in Python. Correctly using its components can significantly accelerate the execution of CPU-intensive tasks, effectively utilize the resources of multi-core systems, and create scalable applications.
When working with multiprocessing, it's important to consider the specifics of inter-process communication, properly manage resources, and handle errors. The choice between different approaches (Process, Pool, Queue) should be based on the specifics of the problem being solved and the performance requirements.
The Future of AI in Mathematics and Everyday Life: How Intelligent Agents Are Already Changing the Game
Experts warned about the risks of fake charity with AI
In Russia, universal AI-agent for robots and industrial processes was developed