Как передавать аргументы в Python multiprocessing

PythonBeginner
Практиковаться сейчас

Введение

Модуль multiprocessing в Python обеспечивает параллельные вычисления, используя несколько ядер процессора, что может значительно повысить производительность вычислений для задач, интенсивно использующих CPU. Эта лабораторная работа исследует различные методы передачи аргументов процессам в модуле multiprocessing, решая общие проблемы в параллельном программировании и демонстрируя практические стратегии эффективного распараллеливания.

К концу этой лабораторной работы вы поймете, как использовать различные методы для передачи данных параллельным процессам и применять эти методы в реальных сценариях.

Введение в Multiprocessing в Python

На этом шаге мы изучим основы multiprocessing в Python и создадим нашу первую параллельную программу.

Что такое Multiprocessing?

Multiprocessing - это модуль Python, который позволяет нам запускать несколько процессов параллельно, эффективно используя несколько ядер CPU. Это отличается от потоков (threading), которые ограничены Global Interpreter Lock (GIL), что препятствует истинному параллельному выполнению кода Python.

Вот простое сравнение:

Характеристика Multiprocessing Threading
Память Отдельные пространства памяти Общее пространство памяти
Параллелизм Истинное параллельное выполнение Ограничено GIL
Область применения CPU-bound задачи I/O-bound задачи
Накладные расходы Выше (отдельные процессы) Ниже

Создание вашей первой программы Multiprocessing

Давайте создадим простой пример multiprocessing. Откройте новый файл в WebIDE и назовите его simple_process.py:

import multiprocessing
import time
import os

def worker():
    """Simple function to demonstrate a process"""
    print(f"Worker process id: {os.getpid()}")
    print(f"Worker parent process id: {os.getppid()}")
    time.sleep(1)
    print("Worker process completed")

if __name__ == "__main__":
    ## Print information about the main process
    print(f"Main process id: {os.getpid()}")

    ## Create a process
    process = multiprocessing.Process(target=worker)

    ## Start the process
    print("Starting worker process...")
    process.start()

    ## Wait for the process to complete
    process.join()

    print("Main process completed")

Теперь давайте запустим эту программу:

python3 simple_process.py

Вы должны увидеть вывод, похожий на:

Main process id: 12345
Starting worker process...
Worker process id: 12346
Worker parent process id: 12345
Worker process completed
Main process completed

Точные идентификаторы процессов будут отличаться в вашей системе. Обратите внимание, что рабочий процесс имеет другой идентификатор процесса, чем основной процесс, подтверждая, что это отдельные процессы.

Понимание создания процесса

Когда мы создаем процесс multiprocessing и запускаем его, Python:

  1. Создает новый процесс
  2. Импортирует модуль, содержащий целевую функцию
  3. Выполняет целевую функцию в новом процессе
  4. Возвращает управление в родительский процесс

Защита if __name__ == "__main__": важна при работе с multiprocessing в Python. Это предотвращает создание дубликатов процессов при импорте модуля.

Несколько процессов

Давайте изменим наш пример, чтобы создать несколько процессов. Создайте новый файл с именем multiple_processes.py:

import multiprocessing
import time
import os

def worker(worker_id):
    """Worker function that accepts an argument"""
    print(f"Worker {worker_id} (PID: {os.getpid()}) started")
    time.sleep(1)
    print(f"Worker {worker_id} completed")
    return worker_id * 2  ## This return value won't be captured

if __name__ == "__main__":
    print(f"Main process PID: {os.getpid()}")

    ## Create multiple processes
    processes = []
    for i in range(3):
        p = multiprocessing.Process(target=worker, args=(i,))
        processes.append(p)
        p.start()

    ## Wait for all processes to complete
    for p in processes:
        p.join()

    print("All processes completed")

Запустите этот код:

python3 multiple_processes.py

Вы должны увидеть вывод, похожий на:

Main process PID: 12345
Worker 0 (PID: 12346) started
Worker 1 (PID: 12347) started
Worker 2 (PID: 12348) started
Worker 0 completed
Worker 1 completed
Worker 2 completed
All processes completed

Обратите внимание, как мы передали аргумент (worker_id) каждому процессу, используя параметр args. Это наш первый пример передачи аргументов процессу.

Также обратите внимание, что процессы могут выполняться не обязательно в предсказуемом порядке, так как они выполняются параллельно.

Передача базовых аргументов с использованием Pool

На предыдущем шаге мы передали простой аргумент процессу. Теперь давайте рассмотрим более эффективный способ обработки нескольких задач с использованием класса Pool и узнаем о различных способах передачи аргументов.

Класс Pool

Класс Pool предоставляет удобный способ распараллеливания выполнения функции для нескольких входных значений. Он распределяет входные данные между процессами и собирает результаты.

Давайте создадим простой пример, чтобы понять, как работает Pool. Создайте новый файл с именем pool_example.py:

import multiprocessing
import time

def square(x):
    """Function that squares a number and returns the result"""
    print(f"Processing {x}...")
    time.sleep(1)  ## Simulate a time-consuming operation
    return x * x

if __name__ == "__main__":
    ## Create a list of numbers to process
    numbers = [1, 2, 3, 4, 5]

    ## Create a Pool with 2 processes
    with multiprocessing.Pool(processes=2) as pool:
        ## Map the square function to the numbers
        start_time = time.time()
        results = pool.map(square, numbers)
        end_time = time.time()

        print(f"Results: {results}")
        print(f"Time taken: {end_time - start_time:.2f} seconds")

    ## Compare with sequential processing
    start_time = time.time()
    sequential_results = [square(n) for n in numbers]
    end_time = time.time()

    print(f"Sequential results: {sequential_results}")
    print(f"Sequential time: {end_time - start_time:.2f} seconds")

Запустите код:

python3 pool_example.py

Вы должны увидеть вывод, похожий на:

Processing 1...
Processing 2...
Processing 3...
Processing 4...
Processing 5...
Results: [1, 4, 9, 16, 25]
Time taken: 3.01 seconds
Processing 1...
Processing 2...
Processing 3...
Processing 4...
Processing 5...
Sequential results: [1, 4, 9, 16, 25]
Sequential time: 5.01 seconds

Обратите внимание, как Pool работает быстрее, потому что он обрабатывает несколько чисел параллельно. С размером пула 2, он может обрабатывать 2 числа одновременно, в то время как последовательный подход обрабатывает их одно за другим.

Передача одиночных аргументов с помощью Pool.map()

Метод map() идеально подходит, когда ваша функция принимает один аргумент. Он применяет функцию к каждому элементу в итерируемом объекте параллельно.

Давайте рассмотрим это подробнее на другом примере. Создайте файл с именем pool_map.py:

import multiprocessing
import time
import os

def process_item(item):
    """Process a single item"""
    process_id = os.getpid()
    print(f"Process {process_id} processing item {item}")
    time.sleep(1)  ## Simulate work
    return item * 10

if __name__ == "__main__":
    ## Create a list of items to process
    items = list(range(10))

    ## Get the number of CPU cores available
    num_cores = multiprocessing.cpu_count()
    print(f"System has {num_cores} CPU cores")

    ## Create a Pool with the number of available cores
    with multiprocessing.Pool(processes=num_cores) as pool:
        print("Starting parallel processing...")
        start_time = time.time()

        ## Process all items in parallel
        results = pool.map(process_item, items)

        end_time = time.time()
        print(f"All processing completed in {end_time - start_time:.2f} seconds")
        print(f"Results: {results}")

Запустите код:

python3 pool_map.py

Ваш вывод должен показать, что элементы обрабатываются параллельно в нескольких процессах, со временем, значительно меньшим, чем 10 секунд (что было бы временем последовательной обработки для 10 элементов с задержкой в 1 секунду каждый).

Передача нескольких аргументов с помощью Pool.starmap()

Что делать, если вашей функции требуется несколько аргументов? Здесь на помощь приходит starmap().

Создайте файл с именем pool_starmap.py:

import multiprocessing
import time

def process_data(id, value, multiplier):
    """Process data with multiple arguments"""
    print(f"Processing item {id}: {value} with multiplier {multiplier}")
    time.sleep(1)  ## Simulate work
    return value * multiplier

if __name__ == "__main__":
    ## Create a list of argument tuples
    ## Each tuple contains all arguments for one function call
    arguments = [
        (1, 5, 2),    ## id=1, value=5, multiplier=2
        (2, 10, 3),   ## id=2, value=10, multiplier=3
        (3, 15, 2),   ## id=3, value=15, multiplier=2
        (4, 20, 4),   ## id=4, value=20, multiplier=4
        (5, 25, 5)    ## id=5, value=25, multiplier=5
    ]

    ## Create a Pool with 3 processes
    with multiprocessing.Pool(processes=3) as pool:
        print("Starting parallel processing with multiple arguments...")
        start_time = time.time()

        ## Process all items in parallel using starmap
        results = pool.starmap(process_data, arguments)

        end_time = time.time()
        print(f"All processing completed in {end_time - start_time:.2f} seconds")
        print(f"Results: {results}")

Запустите код:

python3 pool_starmap.py

Вы должны увидеть вывод, показывающий, что каждый элемент обрабатывается со своим набором аргументов, и окончательный список результатов, содержащий все вычисленные значения.

Ключевые моменты о Pool

  • Pool автоматически обрабатывает распределение задач между процессами
  • Он управляет очередью задач и назначает их доступным рабочим процессам
  • Он собирает результаты и возвращает их в том же порядке, что и входные данные
  • Количество процессов может быть оптимизировано на основе ядер CPU вашей системы и характера вашей задачи

Продвинутые методы передачи аргументов

На предыдущих шагах мы узнали о базовой передаче аргументов с использованием объектов Process и класса Pool. Теперь давайте рассмотрим более продвинутые методы передачи аргументов в сценариях multiprocessing.

Передача сложных данных с помощью apply() и apply_async()

Иногда вам нужна большая гибкость, чем предоставляют map() и starmap(). Методы apply() и apply_async() позволяют вам выполнять функцию с определенными аргументами и обрабатывать результаты более динамично.

Создайте файл с именем pool_apply.py:

import multiprocessing
import time
import random

def process_task(task_id, duration, data):
    """Process a task with multiple arguments including complex data"""
    print(f"Starting task {task_id} with duration {duration}s and data: {data}")
    time.sleep(duration)  ## Simulate variable work time
    result = sum(data) * task_id
    print(f"Task {task_id} completed")
    return result

if __name__ == "__main__":
    ## Create a Pool with 2 processes
    with multiprocessing.Pool(processes=2) as pool:
        print("Starting tasks with apply()...")

        ## Execute tasks with apply() - blocking
        result1 = pool.apply(process_task, args=(1, 2, [1, 2, 3]))
        print(f"Result 1: {result1}")

        result2 = pool.apply(process_task, args=(2, 1, [4, 5, 6]))
        print(f"Result 2: {result2}")

        print("\nStarting tasks with apply_async()...")

        ## Execute tasks with apply_async() - non-blocking
        async_result1 = pool.apply_async(process_task, args=(3, 3, [7, 8, 9]))
        async_result2 = pool.apply_async(process_task, args=(4, 2, [10, 11, 12]))

        ## Do other work while tasks are running
        print("Tasks are running in the background...")
        time.sleep(1)
        print("Main process is doing other work...")

        ## Get results when needed (will wait if not yet available)
        print(f"Result 3: {async_result1.get()}")
        print(f"Result 4: {async_result2.get()}")

Запустите код:

python3 pool_apply.py

Вывод покажет, как apply() является блокирующим (последовательным), в то время как apply_async() позволяет задачам выполняться параллельно.

Использование functools.partial для частичного применения функции

Когда у вас есть функция, которая принимает несколько аргументов, но вы хотите зафиксировать некоторые из них, functools.partial очень полезен.

Создайте файл с именем partial_example.py:

import multiprocessing
from functools import partial
import time

def process_with_config(data, config_a, config_b):
    """Process data with specific configuration parameters"""
    print(f"Processing {data} with config_a={config_a}, config_b={config_b}")
    time.sleep(1)
    return data * config_a + config_b

if __name__ == "__main__":
    ## Data to process
    data_items = [1, 2, 3, 4, 5]

    ## Configuration values
    config_a = 10
    config_b = 5

    ## Create a partially applied function with fixed config values
    partial_process = partial(process_with_config, config_a=config_a, config_b=config_b)

    ## Create a Pool
    with multiprocessing.Pool(processes=3) as pool:
        ## Use map with the partially applied function
        print("Starting processing with partial function...")
        results = pool.map(partial_process, data_items)
        print(f"Results: {results}")

Запустите код:

python3 partial_example.py

Вывод покажет, как каждый элемент данных обрабатывается с теми же значениями конфигурации, которые были зафиксированы с помощью partial().

Передача аргументов через общую память

Для больших данных, которые не следует копировать для каждого процесса, общая память может быть более эффективной. Давайте рассмотрим эту технику.

Создайте файл с именем shared_memory_example.py:

import multiprocessing
import numpy as np
import time

def process_array_chunk(array, start_idx, end_idx):
    """Process a chunk of a shared array"""
    print(f"Processing chunk from index {start_idx} to {end_idx}")

    ## Simulate processing each element
    for i in range(start_idx, end_idx):
        array[i] = array[i] ** 2

    time.sleep(1)  ## Simulate additional work
    return start_idx, end_idx

if __name__ == "__main__":
    ## Create a shared array
    array_size = 1000000
    shared_array = multiprocessing.Array('d', array_size)  ## 'd' for double precision

    ## Fill the array with initial values
    temp_array = np.frombuffer(shared_array.get_obj())
    temp_array[:] = np.arange(array_size)

    print(f"Array initialized with size {array_size}")
    print(f"First 5 elements: {temp_array[:5]}")

    ## Define chunk size and create tasks
    num_processes = 4
    chunk_size = array_size // num_processes
    tasks = []

    for i in range(num_processes):
        start = i * chunk_size
        end = start + chunk_size if i < num_processes - 1 else array_size
        tasks.append((start, end))

    ## Process chunks in parallel
    with multiprocessing.Pool(processes=num_processes) as pool:
        ## Create arguments for each task
        args = [(shared_array, start, end) for start, end in tasks]

        print("Starting parallel processing...")
        start_time = time.time()

        ## Using starmap to pass multiple arguments
        results = pool.starmap(process_array_chunk, args)

        end_time = time.time()
        print(f"Processing completed in {end_time - start_time:.2f} seconds")

    ## Verify results
    print(f"First 5 elements after processing: {temp_array[:5]}")
    print(f"Results summary: {results}")

Запустите код:

python3 shared_memory_example.py

Вывод покажет, как большой массив обрабатывается по частям разными процессами, при этом все процессы имеют доступ к одной и той же общей памяти.

Сравнение производительности

Давайте сравним эти различные методы для реальной задачи.

Создайте файл с именем performance_comparison.py:

import multiprocessing
import time
import numpy as np
from functools import partial

def process_data_simple(data):
    """Simple processing function"""
    return sum(x ** 2 for x in data)

def process_data_with_config(data, multiplier):
    """Processing with additional configuration"""
    return sum(x ** 2 for x in data) * multiplier

def run_test(test_name, func, data_chunks, pool_size=4, **kwargs):
    """Run a test and measure performance"""
    start_time = time.time()

    with multiprocessing.Pool(processes=pool_size) as pool:
        if kwargs:
            partial_func = partial(func, **kwargs)
            results = pool.map(partial_func, data_chunks)
        else:
            results = pool.map(func, data_chunks)

    end_time = time.time()
    print(f"{test_name} completed in {end_time - start_time:.4f} seconds")
    return results

if __name__ == "__main__":
    ## Create test data
    chunk_size = 1000000
    num_chunks = 8
    data_chunks = [np.random.rand(chunk_size) for _ in range(num_chunks)]

    print(f"Created {num_chunks} data chunks of size {chunk_size} each")

    ## Test 1: Simple map
    results1 = run_test("Simple map", process_data_simple, data_chunks)

    ## Test 2: Using partial
    results2 = run_test("Map with partial", process_data_with_config,
                        data_chunks, multiplier=2)

    ## Test 3: Sequential processing for comparison
    start_time = time.time()
    seq_results = [process_data_simple(chunk) for chunk in data_chunks]
    end_time = time.time()
    print(f"Sequential processing completed in {end_time - start_time:.4f} seconds")

    ## Validate results
    print(f"\nFirst result from each test:")
    print(f"Test 1 first result: {results1[0]:.4f}")
    print(f"Test 2 first result: {results2[0]:.4f} (should be double of Test 1)")
    print(f"Sequential first result: {seq_results[0]:.4f}")

Запустите сравнение производительности:

python3 performance_comparison.py

Вывод покажет различия в производительности между подходами, демонстрируя преимущества параллельной обработки и накладные расходы различных методов передачи аргументов.

Реальное применение: параллельная обработка изображений

Теперь, когда мы понимаем различные методы передачи аргументов в Python multiprocessing, давайте применим эти концепции к реальному сценарию: параллельной обработке изображений. Это распространенный вариант использования, в котором multiprocessing может значительно повысить производительность.

Настройка среды

Сначала установим необходимые пакеты:

pip install Pillow numpy

Создание образцов изображений

Давайте создадим скрипт для генерации нескольких образцов изображений для нашей обработки. Создайте файл с именем create_images.py:

from PIL import Image
import numpy as np
import os

def create_sample_image(filename, width=800, height=600):
    """Create a sample image with random data"""
    ## Create random array data
    data = np.random.randint(0, 255, (height, width, 3), dtype=np.uint8)

    ## Create image from array
    img = Image.fromarray(data, 'RGB')

    ## Save the image
    img.save(filename)
    print(f"Created image: {filename}")

if __name__ == "__main__":
    ## Create a directory for sample images
    os.makedirs("sample_images", exist_ok=True)

    ## Create 5 sample images
    for i in range(5):
        create_sample_image(f"sample_images/sample_{i+1}.png")

    print("All sample images created successfully")

Запустите этот скрипт, чтобы создать образцы изображений:

python3 create_images.py

Последовательная обработка изображений

Сначала реализуем последовательную версию наших задач обработки изображений. Создайте файл с именем sequential_image_processing.py:

from PIL import Image, ImageFilter
import os
import time

def apply_filters(image_path, output_dir):
    """Apply multiple filters to an image and save the results"""
    ## Load the image
    image_name = os.path.basename(image_path)
    print(f"Processing image: {image_name}")
    img = Image.open(image_path)

    ## Apply filters
    ## 1. Blur filter
    start_time = time.time()
    blurred = img.filter(ImageFilter.GaussianBlur(radius=5))
    blurred.save(os.path.join(output_dir, f"blur_{image_name}"))

    ## 2. Edge detection filter
    edges = img.filter(ImageFilter.FIND_EDGES)
    edges.save(os.path.join(output_dir, f"edges_{image_name}"))

    ## 3. Emboss filter
    emboss = img.filter(ImageFilter.EMBOSS)
    emboss.save(os.path.join(output_dir, f"emboss_{image_name}"))

    ## 4. Sharpen filter
    sharpen = img.filter(ImageFilter.SHARPEN)
    sharpen.save(os.path.join(output_dir, f"sharpen_{image_name}"))

    end_time = time.time()
    processing_time = end_time - start_time

    print(f"Completed processing {image_name} in {processing_time:.2f} seconds")
    return image_name, processing_time

def process_images_sequentially(image_dir, output_dir):
    """Process all images in a directory sequentially"""
    ## Ensure output directory exists
    os.makedirs(output_dir, exist_ok=True)

    ## Get all image files
    image_files = [os.path.join(image_dir, f) for f in os.listdir(image_dir)
                   if f.endswith(('.png', '.jpg', '.jpeg'))]

    print(f"Found {len(image_files)} images to process")

    ## Process each image sequentially
    start_time = time.time()
    results = []

    for image_path in image_files:
        result = apply_filters(image_path, output_dir)
        results.append(result)

    end_time = time.time()
    total_time = end_time - start_time

    print(f"\nSequential processing completed")
    print(f"Total processing time: {total_time:.2f} seconds")
    return results

if __name__ == "__main__":
    ## Define directories
    image_dir = "sample_images"
    output_dir = "processed_sequential"

    ## Process images
    results = process_images_sequentially(image_dir, output_dir)

    ## Print summary
    print("\nProcessing summary:")
    for image_name, proc_time in results:
        print(f"  {image_name}: {proc_time:.2f} seconds")

Запустите последовательную обработку:

python3 sequential_image_processing.py

Параллельная обработка изображений

Теперь давайте реализуем параллельную обработку изображений, используя изученные нами методы. Создайте файл с именем parallel_image_processing.py:

from PIL import Image, ImageFilter
import multiprocessing
import os
import time

def apply_filters(args):
    """Apply multiple filters to an image and save the results"""
    image_path, output_dir = args

    ## Load the image
    image_name = os.path.basename(image_path)
    print(f"Process {os.getpid()} processing image: {image_name}")
    img = Image.open(image_path)

    ## Apply filters
    ## 1. Blur filter
    start_time = time.time()
    blurred = img.filter(ImageFilter.GaussianBlur(radius=5))
    blurred.save(os.path.join(output_dir, f"blur_{image_name}"))

    ## 2. Edge detection filter
    edges = img.filter(ImageFilter.FIND_EDGES)
    edges.save(os.path.join(output_dir, f"edges_{image_name}"))

    ## 3. Emboss filter
    emboss = img.filter(ImageFilter.EMBOSS)
    emboss.save(os.path.join(output_dir, f"emboss_{image_name}"))

    ## 4. Sharpen filter
    sharpen = img.filter(ImageFilter.SHARPEN)
    sharpen.save(os.path.join(output_dir, f"sharpen_{image_name}"))

    end_time = time.time()
    processing_time = end_time - start_time

    print(f"Process {os.getpid()} completed {image_name} in {processing_time:.2f} seconds")
    return image_name, processing_time

def process_images_in_parallel(image_dir, output_dir, num_processes=None):
    """Process all images in a directory in parallel"""
    ## Ensure output directory exists
    os.makedirs(output_dir, exist_ok=True)

    ## Get all image files
    image_files = [os.path.join(image_dir, f) for f in os.listdir(image_dir)
                   if f.endswith(('.png', '.jpg', '.jpeg'))]

    print(f"Found {len(image_files)} images to process")

    ## Determine number of processes
    if num_processes is None:
        num_processes = min(multiprocessing.cpu_count(), len(image_files))

    print(f"Using {num_processes} processes for parallel processing")

    ## Create arguments for each task
    task_args = [(image_path, output_dir) for image_path in image_files]

    ## Process images in parallel
    start_time = time.time()

    with multiprocessing.Pool(processes=num_processes) as pool:
        results = pool.map(apply_filters, task_args)

    end_time = time.time()
    total_time = end_time - start_time

    print(f"\nParallel processing completed")
    print(f"Total processing time: {total_time:.2f} seconds")
    return results

if __name__ == "__main__":
    ## Define directories
    image_dir = "sample_images"
    output_dir = "processed_parallel"

    ## Process images
    results = process_images_in_parallel(image_dir, output_dir)

    ## Print summary
    print("\nProcessing summary:")
    for image_name, proc_time in results:
        print(f"  {image_name}: {proc_time:.2f} seconds")

Запустите параллельную обработку:

python3 parallel_image_processing.py

Продвинутая параллельная обработка с различными фильтрами

Теперь давайте реализуем более сложную версию, в которой мы применяем разные фильтры к одному и тому же изображению параллельно. Создайте файл с именем advanced_parallel_processing.py:

from PIL import Image, ImageFilter
import multiprocessing
import os
import time
from functools import partial

def apply_filter(filter_info, image_path, output_dir):
    """Apply a specific filter to an image"""
    filter_name, filter_obj = filter_info

    ## Load the image
    image_name = os.path.basename(image_path)
    print(f"Process {os.getpid()} applying {filter_name} to {image_name}")

    ## Open image each time to avoid potential issues with sharing PIL objects
    img = Image.open(image_path)

    ## Apply filter
    start_time = time.time()
    filtered = img.filter(filter_obj)

    ## Save the filtered image
    output_filename = f"{filter_name}_{image_name}"
    output_path = os.path.join(output_dir, output_filename)
    filtered.save(output_path)

    end_time = time.time()
    processing_time = end_time - start_time

    print(f"Process {os.getpid()} completed {filter_name} on {image_name} in {processing_time:.2f} seconds")
    return filter_name, image_name, processing_time

def process_image_with_multiple_filters(image_path, output_dir, filters):
    """Process an image with multiple filters in parallel"""
    ## Create a partial function with fixed image_path and output_dir
    apply_filter_to_image = partial(apply_filter,
                                   image_path=image_path,
                                   output_dir=output_dir)

    ## Process the image with multiple filters in parallel
    with multiprocessing.Pool() as pool:
        results = pool.map(apply_filter_to_image, filters)

    return results

def main():
    ## Define directories
    image_dir = "sample_images"
    output_dir = "processed_advanced"
    os.makedirs(output_dir, exist_ok=True)

    ## Define filters to apply
    filters = [
        ("blur", ImageFilter.GaussianBlur(radius=5)),
        ("edges", ImageFilter.FIND_EDGES),
        ("emboss", ImageFilter.EMBOSS),
        ("sharpen", ImageFilter.SHARPEN),
        ("contour", ImageFilter.CONTOUR),
        ("detail", ImageFilter.DETAIL),
        ("smooth", ImageFilter.SMOOTH),
        ("smoothmore", ImageFilter.SMOOTH_MORE)
    ]

    ## Get all image files
    image_files = [os.path.join(image_dir, f) for f in os.listdir(image_dir)
                   if f.endswith(('.png', '.jpg', '.jpeg'))]

    print(f"Found {len(image_files)} images to process with {len(filters)} filters each")

    ## Process each image with all filters
    all_results = []
    overall_start = time.time()

    for image_path in image_files:
        image_name = os.path.basename(image_path)
        print(f"\nProcessing {image_name} with {len(filters)} filters in parallel")

        start_time = time.time()
        results = process_image_with_multiple_filters(image_path, output_dir, filters)
        end_time = time.time()

        image_time = end_time - start_time
        print(f"Completed all filters for {image_name} in {image_time:.2f} seconds")

        all_results.extend(results)

    overall_end = time.time()
    total_time = overall_end - overall_start

    ## Print summary
    print("\nProcessing Summary:")
    print(f"Total processing time: {total_time:.2f} seconds")
    print(f"Total operations: {len(all_results)}")

    ## Calculate average time per filter
    avg_time = sum(time for _, _, time in all_results) / len(all_results)
    print(f"Average processing time per filter: {avg_time:.2f} seconds")

if __name__ == "__main__":
    main()

Запустите продвинутую параллельную обработку:

python3 advanced_parallel_processing.py

Сравнение производительности

Давайте создадим скрипт для сравнения производительности различных подходов. Создайте файл с именем compare_performance.py:

import os
import time
import subprocess
import matplotlib.pyplot as plt
import numpy as np

def run_script(script_name):
    """Run a Python script and measure its execution time"""
    print(f"Running {script_name}...")
    start_time = time.time()

    ## Run the script
    process = subprocess.run(['python3', script_name],
                            capture_output=True, text=True)

    end_time = time.time()
    execution_time = end_time - start_time

    print(f"Completed {script_name} in {execution_time:.2f} seconds")
    print(f"Output: {process.stdout[-200:]}") ## Show last part of output

    return execution_time

def main():
    ## Scripts to compare
    scripts = [
        'sequential_image_processing.py',
        'parallel_image_processing.py',
        'advanced_parallel_processing.py'
    ]

    ## Run each script and measure time
    times = []
    for script in scripts:
        if os.path.exists(script):
            times.append(run_script(script))
        else:
            print(f"Script {script} not found!")
            times.append(0)

    ## Plot results
    if any(times):
        labels = ['Sequential', 'Basic Parallel', 'Advanced Parallel']
        x = np.arange(len(labels))

        plt.figure(figsize=(10, 6))
        plt.bar(x, times, color=['red', 'blue', 'green'])
        plt.xlabel('Processing Method')
        plt.ylabel('Execution Time (s)')
        plt.title('Image Processing Performance Comparison')
        plt.xticks(x, labels)

        ## Add execution time as text on bars
        for i, v in enumerate(times):
            plt.text(i, v + 0.1, f"{v:.2f}s", ha='center')

        ## Save the plot
        plt.tight_layout()
        plt.savefig('performance_comparison.png')
        print("Performance comparison plot saved as 'performance_comparison.png'")
    else:
        print("No valid execution times to plot")

if __name__ == "__main__":
    main()

Давайте запустим сравнение:

python3 compare_performance.py

Просмотрите результаты и изучите изображение performance_comparison.png, чтобы увидеть различия во времени выполнения между различными подходами.

Основные выводы

Из этого реального примера вы можете наблюдать несколько важных аспектов multiprocessing:

  1. Параллельная обработка может значительно сократить время выполнения при работе с задачами, интенсивно использующими CPU, такими как обработка изображений.

  2. Метод передачи аргументов влияет как на сложность кода, так и на производительность:

    • Simple map() для простых функций с одним аргументом
    • Частичные функции для фиксации определенных параметров
    • Продвинутые методы для сложных рабочих процессов
  3. Соображения для реальных приложений:

    • Управление ресурсами (CPU, память)
    • Стратегии распределения задач
    • Обработка ошибок в параллельных средах
    • Координация между процессами

Этот практический пример демонстрирует, как применить концепции multiprocessing, которые мы изучили в этой лабораторной работе, для эффективного решения реальных задач.

Резюме

В этой лабораторной работе вы узнали о Python multiprocessing и различных методах передачи аргументов параллельным процессам. Вот краткое изложение того, что вы сделали:

  1. Основы Multiprocessing: Вы создали свою первую программу multiprocessing и поняли разницу между процессами и потоками.

  2. Методы передачи аргументов: Вы изучили различные методы передачи аргументов:

    • Базовая передача аргументов с помощью Process.args
    • Передача одного аргумента с помощью Pool.map()
    • Несколько аргументов с помощью Pool.starmap()
    • Динамическое выполнение с помощью apply() и apply_async()
    • Использование functools.partial для частичного применения функции
    • Общая память для эффективного обмена данными
  3. Реальное применение: Вы применили эти концепции к практическому примеру обработки изображений, продемонстрировав значительное повышение производительности за счет распараллеливания.

Навыки, которые вы получили в этой лабораторной работе, ценны для любого приложения Python, интенсивно использующего CPU, позволяя вам использовать всю вычислительную мощность многоядерных систем. При разработке более сложных приложений не забывайте учитывать компромиссы между различными методами передачи аргументов и выбирать наиболее подходящий метод для вашего конкретного случая использования.