Введение
Модуль multiprocessing в Python обеспечивает параллельные вычисления, используя несколько ядер процессора, что может значительно повысить производительность вычислений для задач, интенсивно использующих CPU. Эта лабораторная работа исследует различные методы передачи аргументов процессам в модуле multiprocessing, решая общие проблемы в параллельном программировании и демонстрируя практические стратегии эффективного распараллеливания.
К концу этой лабораторной работы вы поймете, как использовать различные методы для передачи данных параллельным процессам и применять эти методы в реальных сценариях.
Введение в Multiprocessing в Python
На этом шаге мы изучим основы multiprocessing в Python и создадим нашу первую параллельную программу.
Что такое Multiprocessing?
Multiprocessing - это модуль Python, который позволяет нам запускать несколько процессов параллельно, эффективно используя несколько ядер CPU. Это отличается от потоков (threading), которые ограничены Global Interpreter Lock (GIL), что препятствует истинному параллельному выполнению кода Python.
Вот простое сравнение:
| Характеристика | Multiprocessing | Threading |
|---|---|---|
| Память | Отдельные пространства памяти | Общее пространство памяти |
| Параллелизм | Истинное параллельное выполнение | Ограничено GIL |
| Область применения | CPU-bound задачи | I/O-bound задачи |
| Накладные расходы | Выше (отдельные процессы) | Ниже |
Создание вашей первой программы Multiprocessing
Давайте создадим простой пример multiprocessing. Откройте новый файл в WebIDE и назовите его simple_process.py:
import multiprocessing
import time
import os
def worker():
"""Simple function to demonstrate a process"""
print(f"Worker process id: {os.getpid()}")
print(f"Worker parent process id: {os.getppid()}")
time.sleep(1)
print("Worker process completed")
if __name__ == "__main__":
## Print information about the main process
print(f"Main process id: {os.getpid()}")
## Create a process
process = multiprocessing.Process(target=worker)
## Start the process
print("Starting worker process...")
process.start()
## Wait for the process to complete
process.join()
print("Main process completed")
Теперь давайте запустим эту программу:
python3 simple_process.py
Вы должны увидеть вывод, похожий на:
Main process id: 12345
Starting worker process...
Worker process id: 12346
Worker parent process id: 12345
Worker process completed
Main process completed
Точные идентификаторы процессов будут отличаться в вашей системе. Обратите внимание, что рабочий процесс имеет другой идентификатор процесса, чем основной процесс, подтверждая, что это отдельные процессы.
Понимание создания процесса
Когда мы создаем процесс multiprocessing и запускаем его, Python:
- Создает новый процесс
- Импортирует модуль, содержащий целевую функцию
- Выполняет целевую функцию в новом процессе
- Возвращает управление в родительский процесс
Защита if __name__ == "__main__": важна при работе с multiprocessing в Python. Это предотвращает создание дубликатов процессов при импорте модуля.
Несколько процессов
Давайте изменим наш пример, чтобы создать несколько процессов. Создайте новый файл с именем multiple_processes.py:
import multiprocessing
import time
import os
def worker(worker_id):
"""Worker function that accepts an argument"""
print(f"Worker {worker_id} (PID: {os.getpid()}) started")
time.sleep(1)
print(f"Worker {worker_id} completed")
return worker_id * 2 ## This return value won't be captured
if __name__ == "__main__":
print(f"Main process PID: {os.getpid()}")
## Create multiple processes
processes = []
for i in range(3):
p = multiprocessing.Process(target=worker, args=(i,))
processes.append(p)
p.start()
## Wait for all processes to complete
for p in processes:
p.join()
print("All processes completed")
Запустите этот код:
python3 multiple_processes.py
Вы должны увидеть вывод, похожий на:
Main process PID: 12345
Worker 0 (PID: 12346) started
Worker 1 (PID: 12347) started
Worker 2 (PID: 12348) started
Worker 0 completed
Worker 1 completed
Worker 2 completed
All processes completed
Обратите внимание, как мы передали аргумент (worker_id) каждому процессу, используя параметр args. Это наш первый пример передачи аргументов процессу.
Также обратите внимание, что процессы могут выполняться не обязательно в предсказуемом порядке, так как они выполняются параллельно.
Передача базовых аргументов с использованием Pool
На предыдущем шаге мы передали простой аргумент процессу. Теперь давайте рассмотрим более эффективный способ обработки нескольких задач с использованием класса Pool и узнаем о различных способах передачи аргументов.
Класс Pool
Класс Pool предоставляет удобный способ распараллеливания выполнения функции для нескольких входных значений. Он распределяет входные данные между процессами и собирает результаты.
Давайте создадим простой пример, чтобы понять, как работает Pool. Создайте новый файл с именем pool_example.py:
import multiprocessing
import time
def square(x):
"""Function that squares a number and returns the result"""
print(f"Processing {x}...")
time.sleep(1) ## Simulate a time-consuming operation
return x * x
if __name__ == "__main__":
## Create a list of numbers to process
numbers = [1, 2, 3, 4, 5]
## Create a Pool with 2 processes
with multiprocessing.Pool(processes=2) as pool:
## Map the square function to the numbers
start_time = time.time()
results = pool.map(square, numbers)
end_time = time.time()
print(f"Results: {results}")
print(f"Time taken: {end_time - start_time:.2f} seconds")
## Compare with sequential processing
start_time = time.time()
sequential_results = [square(n) for n in numbers]
end_time = time.time()
print(f"Sequential results: {sequential_results}")
print(f"Sequential time: {end_time - start_time:.2f} seconds")
Запустите код:
python3 pool_example.py
Вы должны увидеть вывод, похожий на:
Processing 1...
Processing 2...
Processing 3...
Processing 4...
Processing 5...
Results: [1, 4, 9, 16, 25]
Time taken: 3.01 seconds
Processing 1...
Processing 2...
Processing 3...
Processing 4...
Processing 5...
Sequential results: [1, 4, 9, 16, 25]
Sequential time: 5.01 seconds
Обратите внимание, как Pool работает быстрее, потому что он обрабатывает несколько чисел параллельно. С размером пула 2, он может обрабатывать 2 числа одновременно, в то время как последовательный подход обрабатывает их одно за другим.
Передача одиночных аргументов с помощью Pool.map()
Метод map() идеально подходит, когда ваша функция принимает один аргумент. Он применяет функцию к каждому элементу в итерируемом объекте параллельно.
Давайте рассмотрим это подробнее на другом примере. Создайте файл с именем pool_map.py:
import multiprocessing
import time
import os
def process_item(item):
"""Process a single item"""
process_id = os.getpid()
print(f"Process {process_id} processing item {item}")
time.sleep(1) ## Simulate work
return item * 10
if __name__ == "__main__":
## Create a list of items to process
items = list(range(10))
## Get the number of CPU cores available
num_cores = multiprocessing.cpu_count()
print(f"System has {num_cores} CPU cores")
## Create a Pool with the number of available cores
with multiprocessing.Pool(processes=num_cores) as pool:
print("Starting parallel processing...")
start_time = time.time()
## Process all items in parallel
results = pool.map(process_item, items)
end_time = time.time()
print(f"All processing completed in {end_time - start_time:.2f} seconds")
print(f"Results: {results}")
Запустите код:
python3 pool_map.py
Ваш вывод должен показать, что элементы обрабатываются параллельно в нескольких процессах, со временем, значительно меньшим, чем 10 секунд (что было бы временем последовательной обработки для 10 элементов с задержкой в 1 секунду каждый).
Передача нескольких аргументов с помощью Pool.starmap()
Что делать, если вашей функции требуется несколько аргументов? Здесь на помощь приходит starmap().
Создайте файл с именем pool_starmap.py:
import multiprocessing
import time
def process_data(id, value, multiplier):
"""Process data with multiple arguments"""
print(f"Processing item {id}: {value} with multiplier {multiplier}")
time.sleep(1) ## Simulate work
return value * multiplier
if __name__ == "__main__":
## Create a list of argument tuples
## Each tuple contains all arguments for one function call
arguments = [
(1, 5, 2), ## id=1, value=5, multiplier=2
(2, 10, 3), ## id=2, value=10, multiplier=3
(3, 15, 2), ## id=3, value=15, multiplier=2
(4, 20, 4), ## id=4, value=20, multiplier=4
(5, 25, 5) ## id=5, value=25, multiplier=5
]
## Create a Pool with 3 processes
with multiprocessing.Pool(processes=3) as pool:
print("Starting parallel processing with multiple arguments...")
start_time = time.time()
## Process all items in parallel using starmap
results = pool.starmap(process_data, arguments)
end_time = time.time()
print(f"All processing completed in {end_time - start_time:.2f} seconds")
print(f"Results: {results}")
Запустите код:
python3 pool_starmap.py
Вы должны увидеть вывод, показывающий, что каждый элемент обрабатывается со своим набором аргументов, и окончательный список результатов, содержащий все вычисленные значения.
Ключевые моменты о Pool
Poolавтоматически обрабатывает распределение задач между процессами- Он управляет очередью задач и назначает их доступным рабочим процессам
- Он собирает результаты и возвращает их в том же порядке, что и входные данные
- Количество процессов может быть оптимизировано на основе ядер CPU вашей системы и характера вашей задачи
Продвинутые методы передачи аргументов
На предыдущих шагах мы узнали о базовой передаче аргументов с использованием объектов Process и класса Pool. Теперь давайте рассмотрим более продвинутые методы передачи аргументов в сценариях multiprocessing.
Передача сложных данных с помощью apply() и apply_async()
Иногда вам нужна большая гибкость, чем предоставляют map() и starmap(). Методы apply() и apply_async() позволяют вам выполнять функцию с определенными аргументами и обрабатывать результаты более динамично.
Создайте файл с именем pool_apply.py:
import multiprocessing
import time
import random
def process_task(task_id, duration, data):
"""Process a task with multiple arguments including complex data"""
print(f"Starting task {task_id} with duration {duration}s and data: {data}")
time.sleep(duration) ## Simulate variable work time
result = sum(data) * task_id
print(f"Task {task_id} completed")
return result
if __name__ == "__main__":
## Create a Pool with 2 processes
with multiprocessing.Pool(processes=2) as pool:
print("Starting tasks with apply()...")
## Execute tasks with apply() - blocking
result1 = pool.apply(process_task, args=(1, 2, [1, 2, 3]))
print(f"Result 1: {result1}")
result2 = pool.apply(process_task, args=(2, 1, [4, 5, 6]))
print(f"Result 2: {result2}")
print("\nStarting tasks with apply_async()...")
## Execute tasks with apply_async() - non-blocking
async_result1 = pool.apply_async(process_task, args=(3, 3, [7, 8, 9]))
async_result2 = pool.apply_async(process_task, args=(4, 2, [10, 11, 12]))
## Do other work while tasks are running
print("Tasks are running in the background...")
time.sleep(1)
print("Main process is doing other work...")
## Get results when needed (will wait if not yet available)
print(f"Result 3: {async_result1.get()}")
print(f"Result 4: {async_result2.get()}")
Запустите код:
python3 pool_apply.py
Вывод покажет, как apply() является блокирующим (последовательным), в то время как apply_async() позволяет задачам выполняться параллельно.
Использование functools.partial для частичного применения функции
Когда у вас есть функция, которая принимает несколько аргументов, но вы хотите зафиксировать некоторые из них, functools.partial очень полезен.
Создайте файл с именем partial_example.py:
import multiprocessing
from functools import partial
import time
def process_with_config(data, config_a, config_b):
"""Process data with specific configuration parameters"""
print(f"Processing {data} with config_a={config_a}, config_b={config_b}")
time.sleep(1)
return data * config_a + config_b
if __name__ == "__main__":
## Data to process
data_items = [1, 2, 3, 4, 5]
## Configuration values
config_a = 10
config_b = 5
## Create a partially applied function with fixed config values
partial_process = partial(process_with_config, config_a=config_a, config_b=config_b)
## Create a Pool
with multiprocessing.Pool(processes=3) as pool:
## Use map with the partially applied function
print("Starting processing with partial function...")
results = pool.map(partial_process, data_items)
print(f"Results: {results}")
Запустите код:
python3 partial_example.py
Вывод покажет, как каждый элемент данных обрабатывается с теми же значениями конфигурации, которые были зафиксированы с помощью partial().
Передача аргументов через общую память
Для больших данных, которые не следует копировать для каждого процесса, общая память может быть более эффективной. Давайте рассмотрим эту технику.
Создайте файл с именем shared_memory_example.py:
import multiprocessing
import numpy as np
import time
def process_array_chunk(array, start_idx, end_idx):
"""Process a chunk of a shared array"""
print(f"Processing chunk from index {start_idx} to {end_idx}")
## Simulate processing each element
for i in range(start_idx, end_idx):
array[i] = array[i] ** 2
time.sleep(1) ## Simulate additional work
return start_idx, end_idx
if __name__ == "__main__":
## Create a shared array
array_size = 1000000
shared_array = multiprocessing.Array('d', array_size) ## 'd' for double precision
## Fill the array with initial values
temp_array = np.frombuffer(shared_array.get_obj())
temp_array[:] = np.arange(array_size)
print(f"Array initialized with size {array_size}")
print(f"First 5 elements: {temp_array[:5]}")
## Define chunk size and create tasks
num_processes = 4
chunk_size = array_size // num_processes
tasks = []
for i in range(num_processes):
start = i * chunk_size
end = start + chunk_size if i < num_processes - 1 else array_size
tasks.append((start, end))
## Process chunks in parallel
with multiprocessing.Pool(processes=num_processes) as pool:
## Create arguments for each task
args = [(shared_array, start, end) for start, end in tasks]
print("Starting parallel processing...")
start_time = time.time()
## Using starmap to pass multiple arguments
results = pool.starmap(process_array_chunk, args)
end_time = time.time()
print(f"Processing completed in {end_time - start_time:.2f} seconds")
## Verify results
print(f"First 5 elements after processing: {temp_array[:5]}")
print(f"Results summary: {results}")
Запустите код:
python3 shared_memory_example.py
Вывод покажет, как большой массив обрабатывается по частям разными процессами, при этом все процессы имеют доступ к одной и той же общей памяти.
Сравнение производительности
Давайте сравним эти различные методы для реальной задачи.
Создайте файл с именем performance_comparison.py:
import multiprocessing
import time
import numpy as np
from functools import partial
def process_data_simple(data):
"""Simple processing function"""
return sum(x ** 2 for x in data)
def process_data_with_config(data, multiplier):
"""Processing with additional configuration"""
return sum(x ** 2 for x in data) * multiplier
def run_test(test_name, func, data_chunks, pool_size=4, **kwargs):
"""Run a test and measure performance"""
start_time = time.time()
with multiprocessing.Pool(processes=pool_size) as pool:
if kwargs:
partial_func = partial(func, **kwargs)
results = pool.map(partial_func, data_chunks)
else:
results = pool.map(func, data_chunks)
end_time = time.time()
print(f"{test_name} completed in {end_time - start_time:.4f} seconds")
return results
if __name__ == "__main__":
## Create test data
chunk_size = 1000000
num_chunks = 8
data_chunks = [np.random.rand(chunk_size) for _ in range(num_chunks)]
print(f"Created {num_chunks} data chunks of size {chunk_size} each")
## Test 1: Simple map
results1 = run_test("Simple map", process_data_simple, data_chunks)
## Test 2: Using partial
results2 = run_test("Map with partial", process_data_with_config,
data_chunks, multiplier=2)
## Test 3: Sequential processing for comparison
start_time = time.time()
seq_results = [process_data_simple(chunk) for chunk in data_chunks]
end_time = time.time()
print(f"Sequential processing completed in {end_time - start_time:.4f} seconds")
## Validate results
print(f"\nFirst result from each test:")
print(f"Test 1 first result: {results1[0]:.4f}")
print(f"Test 2 first result: {results2[0]:.4f} (should be double of Test 1)")
print(f"Sequential first result: {seq_results[0]:.4f}")
Запустите сравнение производительности:
python3 performance_comparison.py
Вывод покажет различия в производительности между подходами, демонстрируя преимущества параллельной обработки и накладные расходы различных методов передачи аргументов.
Реальное применение: параллельная обработка изображений
Теперь, когда мы понимаем различные методы передачи аргументов в Python multiprocessing, давайте применим эти концепции к реальному сценарию: параллельной обработке изображений. Это распространенный вариант использования, в котором multiprocessing может значительно повысить производительность.
Настройка среды
Сначала установим необходимые пакеты:
pip install Pillow numpy
Создание образцов изображений
Давайте создадим скрипт для генерации нескольких образцов изображений для нашей обработки. Создайте файл с именем create_images.py:
from PIL import Image
import numpy as np
import os
def create_sample_image(filename, width=800, height=600):
"""Create a sample image with random data"""
## Create random array data
data = np.random.randint(0, 255, (height, width, 3), dtype=np.uint8)
## Create image from array
img = Image.fromarray(data, 'RGB')
## Save the image
img.save(filename)
print(f"Created image: {filename}")
if __name__ == "__main__":
## Create a directory for sample images
os.makedirs("sample_images", exist_ok=True)
## Create 5 sample images
for i in range(5):
create_sample_image(f"sample_images/sample_{i+1}.png")
print("All sample images created successfully")
Запустите этот скрипт, чтобы создать образцы изображений:
python3 create_images.py
Последовательная обработка изображений
Сначала реализуем последовательную версию наших задач обработки изображений. Создайте файл с именем sequential_image_processing.py:
from PIL import Image, ImageFilter
import os
import time
def apply_filters(image_path, output_dir):
"""Apply multiple filters to an image and save the results"""
## Load the image
image_name = os.path.basename(image_path)
print(f"Processing image: {image_name}")
img = Image.open(image_path)
## Apply filters
## 1. Blur filter
start_time = time.time()
blurred = img.filter(ImageFilter.GaussianBlur(radius=5))
blurred.save(os.path.join(output_dir, f"blur_{image_name}"))
## 2. Edge detection filter
edges = img.filter(ImageFilter.FIND_EDGES)
edges.save(os.path.join(output_dir, f"edges_{image_name}"))
## 3. Emboss filter
emboss = img.filter(ImageFilter.EMBOSS)
emboss.save(os.path.join(output_dir, f"emboss_{image_name}"))
## 4. Sharpen filter
sharpen = img.filter(ImageFilter.SHARPEN)
sharpen.save(os.path.join(output_dir, f"sharpen_{image_name}"))
end_time = time.time()
processing_time = end_time - start_time
print(f"Completed processing {image_name} in {processing_time:.2f} seconds")
return image_name, processing_time
def process_images_sequentially(image_dir, output_dir):
"""Process all images in a directory sequentially"""
## Ensure output directory exists
os.makedirs(output_dir, exist_ok=True)
## Get all image files
image_files = [os.path.join(image_dir, f) for f in os.listdir(image_dir)
if f.endswith(('.png', '.jpg', '.jpeg'))]
print(f"Found {len(image_files)} images to process")
## Process each image sequentially
start_time = time.time()
results = []
for image_path in image_files:
result = apply_filters(image_path, output_dir)
results.append(result)
end_time = time.time()
total_time = end_time - start_time
print(f"\nSequential processing completed")
print(f"Total processing time: {total_time:.2f} seconds")
return results
if __name__ == "__main__":
## Define directories
image_dir = "sample_images"
output_dir = "processed_sequential"
## Process images
results = process_images_sequentially(image_dir, output_dir)
## Print summary
print("\nProcessing summary:")
for image_name, proc_time in results:
print(f" {image_name}: {proc_time:.2f} seconds")
Запустите последовательную обработку:
python3 sequential_image_processing.py
Параллельная обработка изображений
Теперь давайте реализуем параллельную обработку изображений, используя изученные нами методы. Создайте файл с именем parallel_image_processing.py:
from PIL import Image, ImageFilter
import multiprocessing
import os
import time
def apply_filters(args):
"""Apply multiple filters to an image and save the results"""
image_path, output_dir = args
## Load the image
image_name = os.path.basename(image_path)
print(f"Process {os.getpid()} processing image: {image_name}")
img = Image.open(image_path)
## Apply filters
## 1. Blur filter
start_time = time.time()
blurred = img.filter(ImageFilter.GaussianBlur(radius=5))
blurred.save(os.path.join(output_dir, f"blur_{image_name}"))
## 2. Edge detection filter
edges = img.filter(ImageFilter.FIND_EDGES)
edges.save(os.path.join(output_dir, f"edges_{image_name}"))
## 3. Emboss filter
emboss = img.filter(ImageFilter.EMBOSS)
emboss.save(os.path.join(output_dir, f"emboss_{image_name}"))
## 4. Sharpen filter
sharpen = img.filter(ImageFilter.SHARPEN)
sharpen.save(os.path.join(output_dir, f"sharpen_{image_name}"))
end_time = time.time()
processing_time = end_time - start_time
print(f"Process {os.getpid()} completed {image_name} in {processing_time:.2f} seconds")
return image_name, processing_time
def process_images_in_parallel(image_dir, output_dir, num_processes=None):
"""Process all images in a directory in parallel"""
## Ensure output directory exists
os.makedirs(output_dir, exist_ok=True)
## Get all image files
image_files = [os.path.join(image_dir, f) for f in os.listdir(image_dir)
if f.endswith(('.png', '.jpg', '.jpeg'))]
print(f"Found {len(image_files)} images to process")
## Determine number of processes
if num_processes is None:
num_processes = min(multiprocessing.cpu_count(), len(image_files))
print(f"Using {num_processes} processes for parallel processing")
## Create arguments for each task
task_args = [(image_path, output_dir) for image_path in image_files]
## Process images in parallel
start_time = time.time()
with multiprocessing.Pool(processes=num_processes) as pool:
results = pool.map(apply_filters, task_args)
end_time = time.time()
total_time = end_time - start_time
print(f"\nParallel processing completed")
print(f"Total processing time: {total_time:.2f} seconds")
return results
if __name__ == "__main__":
## Define directories
image_dir = "sample_images"
output_dir = "processed_parallel"
## Process images
results = process_images_in_parallel(image_dir, output_dir)
## Print summary
print("\nProcessing summary:")
for image_name, proc_time in results:
print(f" {image_name}: {proc_time:.2f} seconds")
Запустите параллельную обработку:
python3 parallel_image_processing.py
Продвинутая параллельная обработка с различными фильтрами
Теперь давайте реализуем более сложную версию, в которой мы применяем разные фильтры к одному и тому же изображению параллельно. Создайте файл с именем advanced_parallel_processing.py:
from PIL import Image, ImageFilter
import multiprocessing
import os
import time
from functools import partial
def apply_filter(filter_info, image_path, output_dir):
"""Apply a specific filter to an image"""
filter_name, filter_obj = filter_info
## Load the image
image_name = os.path.basename(image_path)
print(f"Process {os.getpid()} applying {filter_name} to {image_name}")
## Open image each time to avoid potential issues with sharing PIL objects
img = Image.open(image_path)
## Apply filter
start_time = time.time()
filtered = img.filter(filter_obj)
## Save the filtered image
output_filename = f"{filter_name}_{image_name}"
output_path = os.path.join(output_dir, output_filename)
filtered.save(output_path)
end_time = time.time()
processing_time = end_time - start_time
print(f"Process {os.getpid()} completed {filter_name} on {image_name} in {processing_time:.2f} seconds")
return filter_name, image_name, processing_time
def process_image_with_multiple_filters(image_path, output_dir, filters):
"""Process an image with multiple filters in parallel"""
## Create a partial function with fixed image_path and output_dir
apply_filter_to_image = partial(apply_filter,
image_path=image_path,
output_dir=output_dir)
## Process the image with multiple filters in parallel
with multiprocessing.Pool() as pool:
results = pool.map(apply_filter_to_image, filters)
return results
def main():
## Define directories
image_dir = "sample_images"
output_dir = "processed_advanced"
os.makedirs(output_dir, exist_ok=True)
## Define filters to apply
filters = [
("blur", ImageFilter.GaussianBlur(radius=5)),
("edges", ImageFilter.FIND_EDGES),
("emboss", ImageFilter.EMBOSS),
("sharpen", ImageFilter.SHARPEN),
("contour", ImageFilter.CONTOUR),
("detail", ImageFilter.DETAIL),
("smooth", ImageFilter.SMOOTH),
("smoothmore", ImageFilter.SMOOTH_MORE)
]
## Get all image files
image_files = [os.path.join(image_dir, f) for f in os.listdir(image_dir)
if f.endswith(('.png', '.jpg', '.jpeg'))]
print(f"Found {len(image_files)} images to process with {len(filters)} filters each")
## Process each image with all filters
all_results = []
overall_start = time.time()
for image_path in image_files:
image_name = os.path.basename(image_path)
print(f"\nProcessing {image_name} with {len(filters)} filters in parallel")
start_time = time.time()
results = process_image_with_multiple_filters(image_path, output_dir, filters)
end_time = time.time()
image_time = end_time - start_time
print(f"Completed all filters for {image_name} in {image_time:.2f} seconds")
all_results.extend(results)
overall_end = time.time()
total_time = overall_end - overall_start
## Print summary
print("\nProcessing Summary:")
print(f"Total processing time: {total_time:.2f} seconds")
print(f"Total operations: {len(all_results)}")
## Calculate average time per filter
avg_time = sum(time for _, _, time in all_results) / len(all_results)
print(f"Average processing time per filter: {avg_time:.2f} seconds")
if __name__ == "__main__":
main()
Запустите продвинутую параллельную обработку:
python3 advanced_parallel_processing.py
Сравнение производительности
Давайте создадим скрипт для сравнения производительности различных подходов. Создайте файл с именем compare_performance.py:
import os
import time
import subprocess
import matplotlib.pyplot as plt
import numpy as np
def run_script(script_name):
"""Run a Python script and measure its execution time"""
print(f"Running {script_name}...")
start_time = time.time()
## Run the script
process = subprocess.run(['python3', script_name],
capture_output=True, text=True)
end_time = time.time()
execution_time = end_time - start_time
print(f"Completed {script_name} in {execution_time:.2f} seconds")
print(f"Output: {process.stdout[-200:]}") ## Show last part of output
return execution_time
def main():
## Scripts to compare
scripts = [
'sequential_image_processing.py',
'parallel_image_processing.py',
'advanced_parallel_processing.py'
]
## Run each script and measure time
times = []
for script in scripts:
if os.path.exists(script):
times.append(run_script(script))
else:
print(f"Script {script} not found!")
times.append(0)
## Plot results
if any(times):
labels = ['Sequential', 'Basic Parallel', 'Advanced Parallel']
x = np.arange(len(labels))
plt.figure(figsize=(10, 6))
plt.bar(x, times, color=['red', 'blue', 'green'])
plt.xlabel('Processing Method')
plt.ylabel('Execution Time (s)')
plt.title('Image Processing Performance Comparison')
plt.xticks(x, labels)
## Add execution time as text on bars
for i, v in enumerate(times):
plt.text(i, v + 0.1, f"{v:.2f}s", ha='center')
## Save the plot
plt.tight_layout()
plt.savefig('performance_comparison.png')
print("Performance comparison plot saved as 'performance_comparison.png'")
else:
print("No valid execution times to plot")
if __name__ == "__main__":
main()
Давайте запустим сравнение:
python3 compare_performance.py
Просмотрите результаты и изучите изображение performance_comparison.png, чтобы увидеть различия во времени выполнения между различными подходами.
Основные выводы
Из этого реального примера вы можете наблюдать несколько важных аспектов multiprocessing:
Параллельная обработка может значительно сократить время выполнения при работе с задачами, интенсивно использующими CPU, такими как обработка изображений.
Метод передачи аргументов влияет как на сложность кода, так и на производительность:
- Simple map() для простых функций с одним аргументом
- Частичные функции для фиксации определенных параметров
- Продвинутые методы для сложных рабочих процессов
Соображения для реальных приложений:
- Управление ресурсами (CPU, память)
- Стратегии распределения задач
- Обработка ошибок в параллельных средах
- Координация между процессами
Этот практический пример демонстрирует, как применить концепции multiprocessing, которые мы изучили в этой лабораторной работе, для эффективного решения реальных задач.
Резюме
В этой лабораторной работе вы узнали о Python multiprocessing и различных методах передачи аргументов параллельным процессам. Вот краткое изложение того, что вы сделали:
Основы Multiprocessing: Вы создали свою первую программу multiprocessing и поняли разницу между процессами и потоками.
Методы передачи аргументов: Вы изучили различные методы передачи аргументов:
- Базовая передача аргументов с помощью
Process.args - Передача одного аргумента с помощью
Pool.map() - Несколько аргументов с помощью
Pool.starmap() - Динамическое выполнение с помощью
apply()иapply_async() - Использование
functools.partialдля частичного применения функции - Общая память для эффективного обмена данными
- Базовая передача аргументов с помощью
Реальное применение: Вы применили эти концепции к практическому примеру обработки изображений, продемонстрировав значительное повышение производительности за счет распараллеливания.
Навыки, которые вы получили в этой лабораторной работе, ценны для любого приложения Python, интенсивно использующего CPU, позволяя вам использовать всю вычислительную мощность многоядерных систем. При разработке более сложных приложений не забывайте учитывать компромиссы между различными методами передачи аргументов и выбирать наиболее подходящий метод для вашего конкретного случая использования.



