Как использовать multiprocessing в Python

онлайн тренажер по питону
Онлайн-тренажер Python для начинающих

Изучайте Python легко и без перегрузки теорией. Решайте практические задачи с автоматической проверкой, получайте подсказки на русском языке и пишите код прямо в браузере — без необходимости что-либо устанавливать.

Начать курс

Модуль multiprocessing в Python предоставляет мощный инструментарий для создания параллельных программ, способных эффективно использовать многоядерные процессоры. В отличие от стандартной многопоточности, которая ограничена Global Interpreter Lock (GIL), multiprocessing позволяет запускать несколько процессов одновременно, каждый из которых имеет собственную память и независимое выполнение.

Основные принципы работы multiprocessing

Многопроцессность в Python основана на создании отдельных процессов, которые могут выполняться параллельно на разных ядрах процессора. Каждый процесс работает в изолированном пространстве памяти, что исключает проблемы с разделяемыми данными, характерные для многопоточности.

Ключевые компоненты модуля multiprocessing включают:

  • Process — класс для создания и управления отдельными процессами
  • Pool — пул процессов для эффективного выполнения множества задач
  • Queue — очереди для безопасной передачи данных между процессами
  • Lock — механизмы синхронизации для контроля доступа к ресурсам
  • Manager — объекты для управления разделяемыми данными

Когда использовать multiprocessing

Multiprocessing наиболее эффективен в следующих случаях:

CPU-интенсивные задачи: математические вычисления, обработка изображений, анализ данных, научные расчеты. Такие задачи выигрывают от параллельного выполнения на разных ядрах процессора.

Обработка больших объемов данных: когда необходимо применить одну и ту же функцию к множеству элементов, multiprocessing может значительно ускорить выполнение.

Независимые задачи: операции, которые не требуют частого обмена данными между процессами.

Для I/O-интенсивных задач (работа с файлами, сетевые запросы) лучше использовать асинхронное программирование (asyncio) или многопоточность.

Создание и управление процессами

Базовый пример с классом Process

import multiprocessing
import time

def worker_task(name, duration):
    """Функция для выполнения в отдельном процессе"""
    print(f"Процесс {name} начал работу")
    time.sleep(duration)
    print(f"Процесс {name} завершил работу")

if __name__ == "__main__":
    # Создание процессов
    process1 = multiprocessing.Process(target=worker_task, args=("Worker-1", 3))
    process2 = multiprocessing.Process(target=worker_task, args=("Worker-2", 2))
    
    # Запуск процессов
    start_time = time.time()
    process1.start()
    process2.start()
    
    # Ожидание завершения процессов
    process1.join()
    process2.join()
    
    end_time = time.time()
    print(f"Общее время выполнения: {end_time - start_time:.2f} секунд")

Наследование от класса Process

import multiprocessing
import os

class CustomProcess(multiprocessing.Process):
    def __init__(self, name, work_count):
        super().__init__()
        self.name = name
        self.work_count = work_count
    
    def run(self):
        """Метод, выполняемый в процессе"""
        print(f"Процесс {self.name} (PID: {os.getpid()}) начал работу")
        
        for i in range(self.work_count):
            print(f"{self.name}: выполняю задачу {i+1}")
            time.sleep(1)
        
        print(f"Процесс {self.name} завершил работу")

if __name__ == "__main__":
    processes = []
    
    # Создание нескольких процессов
    for i in range(3):
        p = CustomProcess(f"Process-{i+1}", 3)
        processes.append(p)
        p.start()
    
    # Ожидание завершения всех процессов
    for p in processes:
        p.join()

Работа с пулом процессов

Pool предоставляет удобный интерфейс для управления группой процессов и автоматического распределения задач между ними.

Основные методы Pool

import multiprocessing
import time

def compute_square(number):
    """Вычисление квадрата числа"""
    time.sleep(0.1)  # Имитация вычислений
    return number ** 2

def compute_factorial(n):
    """Вычисление факториала"""
    if n <= 1:
        return 1
    result = 1
    for i in range(2, n + 1):
        result *= i
    return result

if __name__ == "__main__":
    numbers = list(range(1, 21))
    
    # Использование map для параллельного выполнения
    with multiprocessing.Pool(processes=4) as pool:
        print("Вычисление квадратов:")
        squares = pool.map(compute_square, numbers)
        print(f"Квадраты: {squares[:10]}...")  # Первые 10 результатов
        
        print("\nВычисление факториалов:")
        factorials = pool.map(compute_factorial, numbers[:10])
        print(f"Факториалы: {factorials}")

Асинхронное выполнение с Pool

import multiprocessing
import time

def long_task(task_id):
    """Длительная задача"""
    print(f"Задача {task_id} начата")
    time.sleep(2)
    result = task_id * task_id
    print(f"Задача {task_id} завершена")
    return result

if __name__ == "__main__":
    with multiprocessing.Pool(processes=3) as pool:
        # Асинхронное выполнение задач
        async_results = []
        
        for i in range(5):
            async_result = pool.apply_async(long_task, (i,))
            async_results.append(async_result)
        
        # Получение результатов
        results = []
        for async_result in async_results:
            result = async_result.get(timeout=10)
            results.append(result)
        
        print(f"Результаты: {results}")

Межпроцессное взаимодействие

Использование Queue для обмена данными

import multiprocessing
import time
import random

def producer(queue, producer_id):
    """Процесс-производитель данных"""
    for i in range(5):
        data = f"Данные от производителя {producer_id}: элемент {i}"
        queue.put(data)
        print(f"Производитель {producer_id} отправил: {data}")
        time.sleep(random.uniform(0.1, 0.5))
    
    queue.put(None)  # Сигнал завершения

def consumer(queue, consumer_id):
    """Процесс-потребитель данных"""
    while True:
        try:
            data = queue.get(timeout=1)
            if data is None:
                break
            print(f"Потребитель {consumer_id} получил: {data}")
            time.sleep(random.uniform(0.1, 0.3))
        except:
            break

if __name__ == "__main__":
    queue = multiprocessing.Queue()
    
    # Создание процессов
    producer_process = multiprocessing.Process(target=producer, args=(queue, 1))
    consumer_process = multiprocessing.Process(target=consumer, args=(queue, 1))
    
    # Запуск процессов
    producer_process.start()
    consumer_process.start()
    
    # Ожидание завершения
    producer_process.join()
    consumer_process.join()

Использование Pipe для двусторонней связи

import multiprocessing
import time

def sender(conn):
    """Отправляющий процесс"""
    messages = ["Сообщение 1", "Сообщение 2", "Сообщение 3"]
    
    for msg in messages:
        conn.send(msg)
        print(f"Отправлено: {msg}")
        time.sleep(1)
    
    conn.close()

def receiver(conn):
    """Принимающий процесс"""
    while True:
        try:
            msg = conn.recv()
            print(f"Получено: {msg}")
        except EOFError:
            break
    
    conn.close()

if __name__ == "__main__":
    # Создание pipe
    parent_conn, child_conn = multiprocessing.Pipe()
    
    # Создание процессов
    sender_process = multiprocessing.Process(target=sender, args=(child_conn,))
    receiver_process = multiprocessing.Process(target=receiver, args=(parent_conn,))
    
    # Запуск процессов
    sender_process.start()
    receiver_process.start()
    
    # Ожидание завершения
    sender_process.join()
    receiver_process.join()

Синхронизация процессов

Использование Lock для взаимного исключения

import multiprocessing
import time

def worker_with_lock(lock, shared_resource, worker_id):
    """Рабочий процесс с использованием блокировки"""
    for i in range(3):
        with lock:
            print(f"Процесс {worker_id} получил доступ к ресурсу")
            current_value = shared_resource.value
            time.sleep(0.1)  # Имитация работы с ресурсом
            shared_resource.value = current_value + 1
            print(f"Процесс {worker_id} увеличил значение до {shared_resource.value}")
        
        time.sleep(0.1)  # Работа без блокировки

if __name__ == "__main__":
    lock = multiprocessing.Lock()
    shared_resource = multiprocessing.Value('i', 0)
    
    processes = []
    for i in range(4):
        p = multiprocessing.Process(
            target=worker_with_lock, 
            args=(lock, shared_resource, i)
        )
        processes.append(p)
        p.start()
    
    for p in processes:
        p.join()
    
    print(f"Итоговое значение: {shared_resource.value}")

Использование Manager для разделяемых объектов

import multiprocessing
import time

def update_shared_dict(shared_dict, process_id):
    """Обновление разделяемого словаря"""
    for i in range(3):
        key = f"process_{process_id}_item_{i}"
        shared_dict[key] = f"Значение от процесса {process_id}"
        print(f"Процесс {process_id} добавил {key}")
        time.sleep(0.1)

if __name__ == "__main__":
    with multiprocessing.Manager() as manager:
        shared_dict = manager.dict()
        
        processes = []
        for i in range(3):
            p = multiprocessing.Process(
                target=update_shared_dict, 
                args=(shared_dict, i)
            )
            processes.append(p)
            p.start()
        
        for p in processes:
            p.join()
        
        print("Содержимое разделяемого словаря:")
        for key, value in shared_dict.items():
            print(f"{key}: {value}")

Практические примеры применения

Параллельная обработка изображений

import multiprocessing
from PIL import Image
import os

def process_image(image_path):
    """Обработка одного изображения"""
    try:
        with Image.open(image_path) as img:
            # Изменение размера
            img_resized = img.resize((800, 600))
            
            # Создание имени для обработанного файла
            name, ext = os.path.splitext(image_path)
            output_path = f"{name}_processed{ext}"
            
            # Сохранение обработанного изображения
            img_resized.save(output_path)
            
            return f"Обработано: {image_path} -> {output_path}"
    except Exception as e:
        return f"Ошибка при обработке {image_path}: {str(e)}"

def batch_process_images(image_paths, num_processes=4):
    """Пакетная обработка изображений"""
    with multiprocessing.Pool(processes=num_processes) as pool:
        results = pool.map(process_image, image_paths)
    
    return results

if __name__ == "__main__":
    # Список путей к изображениям
    image_paths = ["image1.jpg", "image2.jpg", "image3.jpg"]
    
    # Обработка изображений
    results = batch_process_images(image_paths)
    
    for result in results:
        print(result)

Параллельные вычисления с большими данными

import multiprocessing
import numpy as np
import time

def compute_matrix_chunk(args):
    """Вычисление части матрицы"""
    start_row, end_row, size = args
    
    # Создание части матрицы
    chunk = np.zeros((end_row - start_row, size))
    
    for i in range(start_row, end_row):
        for j in range(size):
            # Сложные вычисления
            chunk[i - start_row, j] = np.sin(i * j) * np.cos(i + j)
    
    return start_row, chunk

def parallel_matrix_computation(matrix_size, num_processes=4):
    """Параллельное вычисление матрицы"""
    # Разделение работы между процессами
    chunk_size = matrix_size // num_processes
    tasks = []
    
    for i in range(num_processes):
        start_row = i * chunk_size
        end_row = (i + 1) * chunk_size if i < num_processes - 1 else matrix_size
        tasks.append((start_row, end_row, matrix_size))
    
    # Выполнение вычислений
    with multiprocessing.Pool(processes=num_processes) as pool:
        results = pool.map(compute_matrix_chunk, tasks)
    
    # Сборка результатов
    final_matrix = np.zeros((matrix_size, matrix_size))
    for start_row, chunk in results:
        end_row = start_row + chunk.shape[0]
        final_matrix[start_row:end_row, :] = chunk
    
    return final_matrix

if __name__ == "__main__":
    matrix_size = 1000
    
    start_time = time.time()
    result_matrix = parallel_matrix_computation(matrix_size, num_processes=4)
    end_time = time.time()
    
    print(f"Вычисление матрицы {matrix_size}x{matrix_size} завершено")
    print(f"Время выполнения: {end_time - start_time:.2f} секунд")
    print(f"Размер результирующей матрицы: {result_matrix.shape}")

Обработка ошибок и отладка

Обработка исключений в процессах

import multiprocessing
import traceback

def risky_task(task_id):
    """Задача, которая может вызвать ошибку"""
    try:
        if task_id == 2:
            raise ValueError(f"Ошибка в задаче {task_id}")
        
        result = task_id * 2
        return f"Задача {task_id} успешно выполнена: {result}"
    
    except Exception as e:
        # Возвращаем информацию об ошибке
        return f"Ошибка в задаче {task_id}: {str(e)}\n{traceback.format_exc()}"

def error_callback(error):
    """Callback для обработки ошибок"""
    print(f"Произошла ошибка: {error}")

if __name__ == "__main__":
    with multiprocessing.Pool(processes=3) as pool:
        tasks = range(5)
        
        # Использование apply_async с обработкой ошибок
        results = []
        for task_id in tasks:
            result = pool.apply_async(
                risky_task, 
                (task_id,), 
                error_callback=error_callback
            )
            results.append(result)
        
        # Получение результатов
        for i, result in enumerate(results):
            try:
                output = result.get(timeout=5)
                print(f"Результат {i}: {output}")
            except Exception as e:
                print(f"Не удалось получить результат {i}: {e}")

Оптимизация производительности

Выбор оптимального количества процессов

import multiprocessing
import time
import psutil

def cpu_intensive_task(n):
    """CPU-интенсивная задача"""
    result = 0
    for i in range(n):
        result += i ** 2
    return result

def benchmark_multiprocessing():
    """Тестирование производительности с разным количеством процессов"""
    task_size = 1000000
    tasks = [task_size] * 20
    
    cpu_count = multiprocessing.cpu_count()
    print(f"Количество CPU ядер: {cpu_count}")
    
    results = {}
    
    for num_processes in [1, 2, 4, cpu_count, cpu_count * 2]:
        print(f"\nТестирование с {num_processes} процессами:")
        
        start_time = time.time()
        
        if num_processes == 1:
            # Последовательное выполнение
            results[num_processes] = [cpu_intensive_task(task) for task in tasks]
        else:
            # Параллельное выполнение
            with multiprocessing.Pool(processes=num_processes) as pool:
                results[num_processes] = pool.map(cpu_intensive_task, tasks)
        
        end_time = time.time()
        execution_time = end_time - start_time
        
        print(f"Время выполнения: {execution_time:.2f} секунд")
        print(f"Ускорение: {results[1] / execution_time:.2f}x" if num_processes > 1 else "Базовое время")

if __name__ == "__main__":
    benchmark_multiprocessing()

Особенности работы на разных операционных системах

Кроссплатформенная совместимость

import multiprocessing
import os
import sys

def cross_platform_worker(data):
    """Рабочая функция, совместимая с разными ОС"""
    process_id = os.getpid()
    platform = sys.platform
    
    print(f"Процесс {process_id} на {platform} обрабатывает: {data}")
    
    # Имитация работы
    result = data * 2
    
    return {
        'data': data,
        'result': result,
        'process_id': process_id,
        'platform': platform
    }

def main():
    """Главная функция с защитой точки входа"""
    print(f"Запуск на платформе: {sys.platform}")
    print(f"Количество CPU: {multiprocessing.cpu_count()}")
    
    # Данные для обработки
    data = list(range(10))
    
    # Использование контекстного менеджера для Pool
    with multiprocessing.Pool() as pool:
        results = pool.map(cross_platform_worker, data)
    
    # Вывод результатов
    for result in results:
        print(f"Данные: {result['data']}, Результат: {result['result']}, "
              f"PID: {result['process_id']}, Платформа: {result['platform']}")

if __name__ == "__main__":
    # Важно: защита точки входа необходима для Windows
    main()

Лучшие практики и рекомендации

Управление ресурсами

import multiprocessing
import time
import signal
import sys

class ProcessManager:
    """Менеджер для управления процессами"""
    
    def __init__(self, max_processes=4):
        self.max_processes = max_processes
        self.processes = []
        self.shutdown_event = multiprocessing.Event()
        
        # Обработчик сигналов для корректного завершения
        signal.signal(signal.SIGINT, self.signal_handler)
        signal.signal(signal.SIGTERM, self.signal_handler)
    
    def signal_handler(self, signum, frame):
        """Обработчик сигналов завершения"""
        print(f"\nПолучен сигнал {signum}. Завершение процессов...")
        self.shutdown()
        sys.exit(0)
    
    def worker(self, worker_id, task_queue, result_queue):
        """Рабочий процесс"""
        print(f"Процесс {worker_id} запущен")
        
        while not self.shutdown_event.is_set():
            try:
                # Получение задачи с таймаутом
                task = task_queue.get(timeout=1)
                
                if task is None:  # Сигнал завершения
                    break
                
                # Выполнение задачи
                result = self.process_task(task)
                result_queue.put(result)
                
            except:
                continue
        
        print(f"Процесс {worker_id} завершен")
    
    def process_task(self, task):
        """Обработка задачи"""
        # Имитация работы
        time.sleep(0.1)
        return task * 2
    
    def start_workers(self, tasks):
        """Запуск рабочих процессов"""
        task_queue = multiprocessing.Queue()
        result_queue = multiprocessing.Queue()
        
        # Добавление задач в очередь
        for task in tasks:
            task_queue.put(task)
        
        # Создание и запуск процессов
        for i in range(self.max_processes):
            p = multiprocessing.Process(
                target=self.worker,
                args=(i, task_queue, result_queue)
            )
            p.start()
            self.processes.append(p)
        
        # Сбор результатов
        results = []
        for _ in tasks:
            try:
                result = result_queue.get(timeout=5)
                results.append(result)
            except:
                break
        
        # Завершение процессов
        for _ in self.processes:
            task_queue.put(None)
        
        return results
    
    def shutdown(self):
        """Корректное завершение всех процессов"""
        self.shutdown_event.set()
        
        for p in self.processes:
            if p.is_alive():
                p.terminate()
                p.join(timeout=1)
                
                if p.is_alive():
                    p.kill()

if __name__ == "__main__":
    manager = ProcessManager(max_processes=3)
    
    try:
        tasks = list(range(20))
        results = manager.start_workers(tasks)
        
        print(f"Обработано {len(results)} задач")
        print(f"Результаты: {results}")
        
    finally:
        manager.shutdown()

Модуль multiprocessing предоставляет мощные возможности для создания эффективных параллельных программ в Python. Правильное использование его компонентов позволяет значительно ускорить выполнение CPU-интенсивных задач, эффективно использовать ресурсы многоядерных систем и создавать масштабируемые приложения.

При работе с multiprocessing важно учитывать особенности межпроцессного взаимодействия, правильно управлять ресурсами и обрабатывать ошибки. Выбор между различными подходами (Process, Pool, Queue) должен основываться на специфике решаемой задачи и требованиях к производительности.

Новости