介绍
Python 的 multiprocessing 模块通过利用多个 CPU 核心来实现并行计算,这可以显著提高 CPU 密集型任务的计算性能。这个实验(Lab)探讨了在 multiprocessing 模块中将参数传递给进程的不同技术,解决了并发编程中的常见挑战,并演示了有效并行化的实用策略。
通过完成这个实验,你将了解如何使用不同的方法将数据传递给并行进程,并将这些技术应用于实际场景。
Python 的 multiprocessing 模块通过利用多个 CPU 核心来实现并行计算,这可以显著提高 CPU 密集型任务的计算性能。这个实验(Lab)探讨了在 multiprocessing 模块中将参数传递给进程的不同技术,解决了并发编程中的常见挑战,并演示了有效并行化的实用策略。
通过完成这个实验,你将了解如何使用不同的方法将数据传递给并行进程,并将这些技术应用于实际场景。
在这一步,我们将学习 Python 中多进程的基础知识,并创建我们的第一个并行程序。
多进程是 Python 的一个模块,它允许我们并行运行多个进程,有效地使用多个 CPU 核心。这与线程(threading)不同,线程受到全局解释器锁(Global Interpreter Lock,GIL)的限制,该锁阻止了 Python 代码的真正并行执行。
这里有一个简单的比较:
| 特性 | 多进程 | 线程(threading) |
|---|---|---|
| 内存 | 独立的内存空间 | 共享的内存空间 |
| 并行性 | 真正的并行执行 | 受 GIL 限制 |
| 用例 | CPU 密集型任务 | I/O 密集型任务 |
| 开销 | 较高(独立的进程) | 较低 |
让我们创建一个简单的多进程示例。在 WebIDE 中打开一个新文件,并将其命名为 simple_process.py:
import multiprocessing
import time
import os
def worker():
"""Simple function to demonstrate a process"""
print(f"Worker process id: {os.getpid()}")
print(f"Worker parent process id: {os.getppid()}")
time.sleep(1)
print("Worker process completed")
if __name__ == "__main__":
## Print information about the main process
print(f"Main process id: {os.getpid()}")
## Create a process
process = multiprocessing.Process(target=worker)
## Start the process
print("Starting worker process...")
process.start()
## Wait for the process to complete
process.join()
print("Main process completed")
现在让我们运行这个程序:
python3 simple_process.py
你应该看到类似这样的输出:
Main process id: 12345
Starting worker process...
Worker process id: 12346
Worker parent process id: 12345
Worker process completed
Main process completed
确切的进程 ID 在你的系统上会有所不同。注意,工作进程的进程 ID 与主进程的进程 ID 不同,这证实了它们是独立的进程。
当我们创建一个多进程(multiprocessing)的 Process 并启动它时,Python 会:
if __name__ == "__main__": 保护在 Python 中使用多进程时非常重要。这可以防止在导入模块时创建重复的进程。
让我们修改我们的示例以创建多个进程。创建一个名为 multiple_processes.py 的新文件:
import multiprocessing
import time
import os
def worker(worker_id):
"""Worker function that accepts an argument"""
print(f"Worker {worker_id} (PID: {os.getpid()}) started")
time.sleep(1)
print(f"Worker {worker_id} completed")
return worker_id * 2 ## This return value won't be captured
if __name__ == "__main__":
print(f"Main process PID: {os.getpid()}")
## Create multiple processes
processes = []
for i in range(3):
p = multiprocessing.Process(target=worker, args=(i,))
processes.append(p)
p.start()
## Wait for all processes to complete
for p in processes:
p.join()
print("All processes completed")
运行这段代码:
python3 multiple_processes.py
你应该看到类似这样的输出:
Main process PID: 12345
Worker 0 (PID: 12346) started
Worker 1 (PID: 12347) started
Worker 2 (PID: 12348) started
Worker 0 completed
Worker 1 completed
Worker 2 completed
All processes completed
注意我们如何使用 args 参数将参数(worker_id)传递给每个进程。这是我们传递参数给进程的第一个例子。
还要注意,这些进程可能不一定会以可预测的顺序执行,因为它们是并行运行的。
在上一步中,我们向一个进程传递了一个简单的参数。现在,让我们探索一种更有效的方法,使用 Pool 类来处理多个任务,并了解不同的参数传递方式。
Pool 类提供了一种方便的方法,可以在多个输入值上并行执行一个函数。它将输入数据分发到各个进程中并收集结果。
让我们创建一个简单的例子来理解 Pool 的工作原理。创建一个名为 pool_example.py 的新文件:
import multiprocessing
import time
def square(x):
"""Function that squares a number and returns the result"""
print(f"Processing {x}...")
time.sleep(1) ## Simulate a time-consuming operation
return x * x
if __name__ == "__main__":
## Create a list of numbers to process
numbers = [1, 2, 3, 4, 5]
## Create a Pool with 2 processes
with multiprocessing.Pool(processes=2) as pool:
## Map the square function to the numbers
start_time = time.time()
results = pool.map(square, numbers)
end_time = time.time()
print(f"Results: {results}")
print(f"Time taken: {end_time - start_time:.2f} seconds")
## Compare with sequential processing
start_time = time.time()
sequential_results = [square(n) for n in numbers]
end_time = time.time()
print(f"Sequential results: {sequential_results}")
print(f"Sequential time: {end_time - start_time:.2f} seconds")
运行代码:
python3 pool_example.py
你应该看到类似这样的输出:
Processing 1...
Processing 2...
Processing 3...
Processing 4...
Processing 5...
Results: [1, 4, 9, 16, 25]
Time taken: 3.01 seconds
Processing 1...
Processing 2...
Processing 3...
Processing 4...
Processing 5...
Sequential results: [1, 4, 9, 16, 25]
Sequential time: 5.01 seconds
注意 Pool 如何更快,因为它并行处理多个数字。使用大小为 2 的池,它可以一次处理 2 个数字,而顺序方法则一个接一个地处理它们。
当你的函数只接受一个参数时,map() 方法非常完美。它将函数应用于可迭代对象中的每个项目,并以并行方式进行。
让我们通过另一个例子来进一步探索这一点。创建一个名为 pool_map.py 的文件:
import multiprocessing
import time
import os
def process_item(item):
"""Process a single item"""
process_id = os.getpid()
print(f"Process {process_id} processing item {item}")
time.sleep(1) ## Simulate work
return item * 10
if __name__ == "__main__":
## Create a list of items to process
items = list(range(10))
## Get the number of CPU cores available
num_cores = multiprocessing.cpu_count()
print(f"System has {num_cores} CPU cores")
## Create a Pool with the number of available cores
with multiprocessing.Pool(processes=num_cores) as pool:
print("Starting parallel processing...")
start_time = time.time()
## Process all items in parallel
results = pool.map(process_item, items)
end_time = time.time()
print(f"All processing completed in {end_time - start_time:.2f} seconds")
print(f"Results: {results}")
运行代码:
python3 pool_map.py
你的输出应该显示项目在多个进程中并行处理,时间明显短于 10 秒(这将是 10 个项目顺序处理的时间,每个项目有 1 秒的延迟)。
如果你的函数需要多个参数呢?这就是 starmap() 的用武之地。
创建一个名为 pool_starmap.py 的文件:
import multiprocessing
import time
def process_data(id, value, multiplier):
"""Process data with multiple arguments"""
print(f"Processing item {id}: {value} with multiplier {multiplier}")
time.sleep(1) ## Simulate work
return value * multiplier
if __name__ == "__main__":
## Create a list of argument tuples
## Each tuple contains all arguments for one function call
arguments = [
(1, 5, 2), ## id=1, value=5, multiplier=2
(2, 10, 3), ## id=2, value=10, multiplier=3
(3, 15, 2), ## id=3, value=15, multiplier=2
(4, 20, 4), ## id=4, value=20, multiplier=4
(5, 25, 5) ## id=5, value=25, multiplier=5
]
## Create a Pool with 3 processes
with multiprocessing.Pool(processes=3) as pool:
print("Starting parallel processing with multiple arguments...")
start_time = time.time()
## Process all items in parallel using starmap
results = pool.starmap(process_data, arguments)
end_time = time.time()
print(f"All processing completed in {end_time - start_time:.2f} seconds")
print(f"Results: {results}")
运行代码:
python3 pool_starmap.py
你应该看到输出显示每个项目都使用其自己的参数集进行处理,并且最终结果列表包含所有计算值。
Pool 自动处理跨进程的任务分配在之前的步骤中,我们学习了使用 Process 对象和 Pool 类进行基本参数传递。现在,让我们探索在多进程场景中传递参数的更高级技术。
有时,你需要比 map() 和 starmap() 提供的更灵活的功能。apply() 和 apply_async() 方法允许你使用特定的参数执行一个函数,并更动态地处理结果。
创建一个名为 pool_apply.py 的文件:
import multiprocessing
import time
import random
def process_task(task_id, duration, data):
"""Process a task with multiple arguments including complex data"""
print(f"Starting task {task_id} with duration {duration}s and data: {data}")
time.sleep(duration) ## Simulate variable work time
result = sum(data) * task_id
print(f"Task {task_id} completed")
return result
if __name__ == "__main__":
## Create a Pool with 2 processes
with multiprocessing.Pool(processes=2) as pool:
print("Starting tasks with apply()...")
## Execute tasks with apply() - blocking
result1 = pool.apply(process_task, args=(1, 2, [1, 2, 3]))
print(f"Result 1: {result1}")
result2 = pool.apply(process_task, args=(2, 1, [4, 5, 6]))
print(f"Result 2: {result2}")
print("\nStarting tasks with apply_async()...")
## Execute tasks with apply_async() - non-blocking
async_result1 = pool.apply_async(process_task, args=(3, 3, [7, 8, 9]))
async_result2 = pool.apply_async(process_task, args=(4, 2, [10, 11, 12]))
## Do other work while tasks are running
print("Tasks are running in the background...")
time.sleep(1)
print("Main process is doing other work...")
## Get results when needed (will wait if not yet available)
print(f"Result 3: {async_result1.get()}")
print(f"Result 4: {async_result2.get()}")
运行代码:
python3 pool_apply.py
输出将显示 apply() 是阻塞的(顺序的),而 apply_async() 允许任务并行运行。
当你有一个接受多个参数但想要固定其中一些参数的函数时,functools.partial 非常有用。
创建一个名为 partial_example.py 的文件:
import multiprocessing
from functools import partial
import time
def process_with_config(data, config_a, config_b):
"""Process data with specific configuration parameters"""
print(f"Processing {data} with config_a={config_a}, config_b={config_b}")
time.sleep(1)
return data * config_a + config_b
if __name__ == "__main__":
## Data to process
data_items = [1, 2, 3, 4, 5]
## Configuration values
config_a = 10
config_b = 5
## Create a partially applied function with fixed config values
partial_process = partial(process_with_config, config_a=config_a, config_b=config_b)
## Create a Pool
with multiprocessing.Pool(processes=3) as pool:
## Use map with the partially applied function
print("Starting processing with partial function...")
results = pool.map(partial_process, data_items)
print(f"Results: {results}")
运行代码:
python3 partial_example.py
输出将显示每个数据项如何使用与 partial() 固定的相同配置值进行处理。
对于不应该为每个进程复制的大型数据,共享内存可能更有效。让我们探索这项技术。
创建一个名为 shared_memory_example.py 的文件:
import multiprocessing
import numpy as np
import time
def process_array_chunk(array, start_idx, end_idx):
"""Process a chunk of a shared array"""
print(f"Processing chunk from index {start_idx} to {end_idx}")
## Simulate processing each element
for i in range(start_idx, end_idx):
array[i] = array[i] ** 2
time.sleep(1) ## Simulate additional work
return start_idx, end_idx
if __name__ == "__main__":
## Create a shared array
array_size = 1000000
shared_array = multiprocessing.Array('d', array_size) ## 'd' for double precision
## Fill the array with initial values
temp_array = np.frombuffer(shared_array.get_obj())
temp_array[:] = np.arange(array_size)
print(f"Array initialized with size {array_size}")
print(f"First 5 elements: {temp_array[:5]}")
## Define chunk size and create tasks
num_processes = 4
chunk_size = array_size // num_processes
tasks = []
for i in range(num_processes):
start = i * chunk_size
end = start + chunk_size if i < num_processes - 1 else array_size
tasks.append((start, end))
## Process chunks in parallel
with multiprocessing.Pool(processes=num_processes) as pool:
## Create arguments for each task
args = [(shared_array, start, end) for start, end in tasks]
print("Starting parallel processing...")
start_time = time.time()
## Using starmap to pass multiple arguments
results = pool.starmap(process_array_chunk, args)
end_time = time.time()
print(f"Processing completed in {end_time - start_time:.2f} seconds")
## Verify results
print(f"First 5 elements after processing: {temp_array[:5]}")
print(f"Results summary: {results}")
运行代码:
python3 shared_memory_example.py
输出将显示一个大型数组如何被不同的进程分块处理,所有进程都可以访问相同的共享内存。
让我们比较这些不同的技术,以完成一个实际的任务。
创建一个名为 performance_comparison.py 的文件:
import multiprocessing
import time
import numpy as np
from functools import partial
def process_data_simple(data):
"""Simple processing function"""
return sum(x ** 2 for x in data)
def process_data_with_config(data, multiplier):
"""Processing with additional configuration"""
return sum(x ** 2 for x in data) * multiplier
def run_test(test_name, func, data_chunks, pool_size=4, **kwargs):
"""Run a test and measure performance"""
start_time = time.time()
with multiprocessing.Pool(processes=pool_size) as pool:
if kwargs:
partial_func = partial(func, **kwargs)
results = pool.map(partial_func, data_chunks)
else:
results = pool.map(func, data_chunks)
end_time = time.time()
print(f"{test_name} completed in {end_time - start_time:.4f} seconds")
return results
if __name__ == "__main__":
## Create test data
chunk_size = 1000000
num_chunks = 8
data_chunks = [np.random.rand(chunk_size) for _ in range(num_chunks)]
print(f"Created {num_chunks} data chunks of size {chunk_size} each")
## Test 1: Simple map
results1 = run_test("Simple map", process_data_simple, data_chunks)
## Test 2: Using partial
results2 = run_test("Map with partial", process_data_with_config,
data_chunks, multiplier=2)
## Test 3: Sequential processing for comparison
start_time = time.time()
seq_results = [process_data_simple(chunk) for chunk in data_chunks]
end_time = time.time()
print(f"Sequential processing completed in {end_time - start_time:.4f} seconds")
## Validate results
print(f"\nFirst result from each test:")
print(f"Test 1 first result: {results1[0]:.4f}")
print(f"Test 2 first result: {results2[0]:.4f} (should be double of Test 1)")
print(f"Sequential first result: {seq_results[0]:.4f}")
运行性能比较:
python3 performance_comparison.py
输出将显示不同方法之间的性能差异,展示并行处理的优势以及不同参数传递技术的开销。
现在我们已经了解了在 Python 多进程中传递各种参数的技术,让我们将这些概念应用于一个实际场景:并行图像处理。这是一个常见的用例,多进程可以显著提高性能。
首先,让我们安装所需的软件包:
pip install Pillow numpy
让我们创建一个脚本来生成一些用于处理的示例图像。创建一个名为 create_images.py 的文件:
from PIL import Image
import numpy as np
import os
def create_sample_image(filename, width=800, height=600):
"""Create a sample image with random data"""
## Create random array data
data = np.random.randint(0, 255, (height, width, 3), dtype=np.uint8)
## Create image from array
img = Image.fromarray(data, 'RGB')
## Save the image
img.save(filename)
print(f"Created image: {filename}")
if __name__ == "__main__":
## Create a directory for sample images
os.makedirs("sample_images", exist_ok=True)
## Create 5 sample images
for i in range(5):
create_sample_image(f"sample_images/sample_{i+1}.png")
print("All sample images created successfully")
运行此脚本以创建示例图像:
python3 create_images.py
首先,让我们实现我们图像处理任务的顺序版本。创建一个名为 sequential_image_processing.py 的文件:
from PIL import Image, ImageFilter
import os
import time
def apply_filters(image_path, output_dir):
"""Apply multiple filters to an image and save the results"""
## Load the image
image_name = os.path.basename(image_path)
print(f"Processing image: {image_name}")
img = Image.open(image_path)
## Apply filters
## 1. Blur filter
start_time = time.time()
blurred = img.filter(ImageFilter.GaussianBlur(radius=5))
blurred.save(os.path.join(output_dir, f"blur_{image_name}"))
## 2. Edge detection filter
edges = img.filter(ImageFilter.FIND_EDGES)
edges.save(os.path.join(output_dir, f"edges_{image_name}"))
## 3. Emboss filter
emboss = img.filter(ImageFilter.EMBOSS)
emboss.save(os.path.join(output_dir, f"emboss_{image_name}"))
## 4. Sharpen filter
sharpen = img.filter(ImageFilter.SHARPEN)
sharpen.save(os.path.join(output_dir, f"sharpen_{image_name}"))
end_time = time.time()
processing_time = end_time - start_time
print(f"Completed processing {image_name} in {processing_time:.2f} seconds")
return image_name, processing_time
def process_images_sequentially(image_dir, output_dir):
"""Process all images in a directory sequentially"""
## Ensure output directory exists
os.makedirs(output_dir, exist_ok=True)
## Get all image files
image_files = [os.path.join(image_dir, f) for f in os.listdir(image_dir)
if f.endswith(('.png', '.jpg', '.jpeg'))]
print(f"Found {len(image_files)} images to process")
## Process each image sequentially
start_time = time.time()
results = []
for image_path in image_files:
result = apply_filters(image_path, output_dir)
results.append(result)
end_time = time.time()
total_time = end_time - start_time
print(f"\nSequential processing completed")
print(f"Total processing time: {total_time:.2f} seconds")
return results
if __name__ == "__main__":
## Define directories
image_dir = "sample_images"
output_dir = "processed_sequential"
## Process images
results = process_images_sequentially(image_dir, output_dir)
## Print summary
print("\nProcessing summary:")
for image_name, proc_time in results:
print(f" {image_name}: {proc_time:.2f} seconds")
运行顺序处理:
python3 sequential_image_processing.py
现在,让我们使用我们已经学习的技术来实现并行图像处理。创建一个名为 parallel_image_processing.py 的文件:
from PIL import Image, ImageFilter
import multiprocessing
import os
import time
def apply_filters(args):
"""Apply multiple filters to an image and save the results"""
image_path, output_dir = args
## Load the image
image_name = os.path.basename(image_path)
print(f"Process {os.getpid()} processing image: {image_name}")
img = Image.open(image_path)
## Apply filters
## 1. Blur filter
start_time = time.time()
blurred = img.filter(ImageFilter.GaussianBlur(radius=5))
blurred.save(os.path.join(output_dir, f"blur_{image_name}"))
## 2. Edge detection filter
edges = img.filter(ImageFilter.FIND_EDGES)
edges.save(os.path.join(output_dir, f"edges_{image_name}"))
## 3. Emboss filter
emboss = img.filter(ImageFilter.EMBOSS)
emboss.save(os.path.join(output_dir, f"emboss_{image_name}"))
## 4. Sharpen filter
sharpen = img.filter(ImageFilter.SHARPEN)
sharpen.save(os.path.join(output_dir, f"sharpen_{image_name}"))
end_time = time.time()
processing_time = end_time - start_time
print(f"Process {os.getpid()} completed {image_name} in {processing_time:.2f} seconds")
return image_name, processing_time
def process_images_in_parallel(image_dir, output_dir, num_processes=None):
"""Process all images in a directory in parallel"""
## Ensure output directory exists
os.makedirs(output_dir, exist_ok=True)
## Get all image files
image_files = [os.path.join(image_dir, f) for f in os.listdir(image_dir)
if f.endswith(('.png', '.jpg', '.jpeg'))]
print(f"Found {len(image_files)} images to process")
## Determine number of processes
if num_processes is None:
num_processes = min(multiprocessing.cpu_count(), len(image_files))
print(f"Using {num_processes} processes for parallel processing")
## Create arguments for each task
task_args = [(image_path, output_dir) for image_path in image_files]
## Process images in parallel
start_time = time.time()
with multiprocessing.Pool(processes=num_processes) as pool:
results = pool.map(apply_filters, task_args)
end_time = time.time()
total_time = end_time - start_time
print(f"\nParallel processing completed")
print(f"Total processing time: {total_time:.2f} seconds")
return results
if __name__ == "__main__":
## Define directories
image_dir = "sample_images"
output_dir = "processed_parallel"
## Process images
results = process_images_in_parallel(image_dir, output_dir)
## Print summary
print("\nProcessing summary:")
for image_name, proc_time in results:
print(f" {image_name}: {proc_time:.2f} seconds")
运行并行处理:
python3 parallel_image_processing.py
现在,让我们实现一个更复杂的版本,其中我们对同一图像并行应用不同的滤镜。创建一个名为 advanced_parallel_processing.py 的文件:
from PIL import Image, ImageFilter
import multiprocessing
import os
import time
from functools import partial
def apply_filter(filter_info, image_path, output_dir):
"""Apply a specific filter to an image"""
filter_name, filter_obj = filter_info
## Load the image
image_name = os.path.basename(image_path)
print(f"Process {os.getpid()} applying {filter_name} to {image_name}")
## Open image each time to avoid potential issues with sharing PIL objects
img = Image.open(image_path)
## Apply filter
start_time = time.time()
filtered = img.filter(filter_obj)
## Save the filtered image
output_filename = f"{filter_name}_{image_name}"
output_path = os.path.join(output_dir, output_filename)
filtered.save(output_path)
end_time = time.time()
processing_time = end_time - start_time
print(f"Process {os.getpid()} completed {filter_name} on {image_name} in {processing_time:.2f} seconds")
return filter_name, image_name, processing_time
def process_image_with_multiple_filters(image_path, output_dir, filters):
"""Process an image with multiple filters in parallel"""
## Create a partial function with fixed image_path and output_dir
apply_filter_to_image = partial(apply_filter,
image_path=image_path,
output_dir=output_dir)
## Process the image with multiple filters in parallel
with multiprocessing.Pool() as pool:
results = pool.map(apply_filter_to_image, filters)
return results
def main():
## Define directories
image_dir = "sample_images"
output_dir = "processed_advanced"
os.makedirs(output_dir, exist_ok=True)
## Define filters to apply
filters = [
("blur", ImageFilter.GaussianBlur(radius=5)),
("edges", ImageFilter.FIND_EDGES),
("emboss", ImageFilter.EMBOSS),
("sharpen", ImageFilter.SHARPEN),
("contour", ImageFilter.CONTOUR),
("detail", ImageFilter.DETAIL),
("smooth", ImageFilter.SMOOTH),
("smoothmore", ImageFilter.SMOOTH_MORE)
]
## Get all image files
image_files = [os.path.join(image_dir, f) for f in os.listdir(image_dir)
if f.endswith(('.png', '.jpg', '.jpeg'))]
print(f"Found {len(image_files)} images to process with {len(filters)} filters each")
## Process each image with all filters
all_results = []
overall_start = time.time()
for image_path in image_files:
image_name = os.path.basename(image_path)
print(f"\nProcessing {image_name} with {len(filters)} filters in parallel")
start_time = time.time()
results = process_image_with_multiple_filters(image_path, output_dir, filters)
end_time = time.time()
image_time = end_time - start_time
print(f"Completed all filters for {image_name} in {image_time:.2f} seconds")
all_results.extend(results)
overall_end = time.time()
total_time = overall_end - overall_start
## Print summary
print("\nProcessing Summary:")
print(f"Total processing time: {total_time:.2f} seconds")
print(f"Total operations: {len(all_results)}")
## Calculate average time per filter
avg_time = sum(time for _, _, time in all_results) / len(all_results)
print(f"Average processing time per filter: {avg_time:.2f} seconds")
if __name__ == "__main__":
main()
运行高级并行处理:
python3 advanced_parallel_processing.py
让我们创建一个脚本来比较不同方法的性能。创建一个名为 compare_performance.py 的文件:
import os
import time
import subprocess
import matplotlib.pyplot as plt
import numpy as np
def run_script(script_name):
"""Run a Python script and measure its execution time"""
print(f"Running {script_name}...")
start_time = time.time()
## Run the script
process = subprocess.run(['python3', script_name],
capture_output=True, text=True)
end_time = time.time()
execution_time = end_time - start_time
print(f"Completed {script_name} in {execution_time:.2f} seconds")
print(f"Output: {process.stdout[-200:]}") ## Show last part of output
return execution_time
def main():
## Scripts to compare
scripts = [
'sequential_image_processing.py',
'parallel_image_processing.py',
'advanced_parallel_processing.py'
]
## Run each script and measure time
times = []
for script in scripts:
if os.path.exists(script):
times.append(run_script(script))
else:
print(f"Script {script} not found!")
times.append(0)
## Plot results
if any(times):
labels = ['Sequential', 'Basic Parallel', 'Advanced Parallel']
x = np.arange(len(labels))
plt.figure(figsize=(10, 6))
plt.bar(x, times, color=['red', 'blue', 'green'])
plt.xlabel('Processing Method')
plt.ylabel('Execution Time (s)')
plt.title('Image Processing Performance Comparison')
plt.xticks(x, labels)
## Add execution time as text on bars
for i, v in enumerate(times):
plt.text(i, v + 0.1, f"{v:.2f}s", ha='center')
## Save the plot
plt.tight_layout()
plt.savefig('performance_comparison.png')
print("Performance comparison plot saved as 'performance_comparison.png'")
else:
print("No valid execution times to plot")
if __name__ == "__main__":
main()
让我们运行比较:
python3 compare_performance.py
查看结果并检查 performance_comparison.png 图像,以查看不同方法之间的执行时间差异。
从这个实际例子中,你可以观察到多进程的几个重要方面:
当处理 CPU 密集型任务(如图像处理)时,并行处理可以显著减少执行时间。
传递参数的方法会影响代码复杂度和性能:
实际应用的注意事项:
这个实际的例子演示了如何应用我们在整个实验中学习的多进程概念,以有效地解决实际问题。
在这个实验中,你学习了 Python 多进程以及将参数传递给并行进程的不同技术。以下是你已经完成的总结:
多进程基础:你创建了你的第一个多进程程序,并了解了进程和线程之间的区别。
参数传递技术:你探索了各种传递参数的方法:
Process.args 进行基本参数传递Pool.map() 进行单参数传递Pool.starmap() 进行多参数传递apply() 和 apply_async() 进行动态执行functools.partial 进行部分函数应用实际应用:你将这些概念应用于一个实际的图像处理示例,通过并行化展示了显著的性能提升。
你在本实验中学习的技能对于任何 CPU 密集型 Python 应用程序都很有价值,它允许你利用多核系统的全部计算能力。在开发更复杂的应用程序时,请记住考虑不同参数传递技术之间的权衡,并为你的特定用例选择最合适的方法。