はじめに
Python の multiprocessing モジュールは、複数の CPU コアを活用して並列コンピューティングを可能にし、CPU 負荷の高いタスクの計算パフォーマンスを大幅に向上させることができます。この実験(Lab)では、multiprocessing モジュールでプロセスに引数を渡すためのさまざまなテクニックを探求し、並行プログラミングにおける一般的な課題に対処し、効果的な並列化のための実用的な戦略を実証します。
この実験(Lab)の終わりには、並列プロセスにデータを渡すためのさまざまな方法を理解し、これらのテクニックを現実世界のシナリオに適用できるようになります。
Python での Multiprocessing の紹介
このステップでは、Python での multiprocessing の基本を学び、最初の並列プログラムを作成します。
Multiprocessing とは?
Multiprocessing は、複数のプロセスを並列に実行し、複数の CPU コアを効果的に使用できるようにする Python モジュールです。これは、Python コードの真の並列実行を妨げる Global Interpreter Lock (GIL) によって制限されるスレッド処理とは異なります。
簡単な比較を以下に示します。
| 特徴 | Multiprocessing | スレッド処理 |
|---|---|---|
| メモリ | 個別のメモリ空間 | 共有メモリ空間 |
| 並列性 | 真の並列実行 | GIL によって制限される |
| ユースケース | CPU バウンドタスク | I/O バウンドタスク |
| オーバーヘッド | 高い(個別のプロセス) | 低い |
最初の Multiprocessing プログラムの作成
簡単な multiprocessing の例を作成しましょう。WebIDE で新しいファイルを開き、simple_process.py という名前を付けます。
import multiprocessing
import time
import os
def worker():
"""Simple function to demonstrate a process"""
print(f"Worker process id: {os.getpid()}")
print(f"Worker parent process id: {os.getppid()}")
time.sleep(1)
print("Worker process completed")
if __name__ == "__main__":
## Print information about the main process
print(f"Main process id: {os.getpid()}")
## Create a process
process = multiprocessing.Process(target=worker)
## Start the process
print("Starting worker process...")
process.start()
## Wait for the process to complete
process.join()
print("Main process completed")
次に、このプログラムを実行します。
python3 simple_process.py
次のような出力が表示されるはずです。
Main process id: 12345
Starting worker process...
Worker process id: 12346
Worker parent process id: 12345
Worker process completed
Main process completed
正確なプロセス ID は、システムによって異なります。ワーカープロセスがメインプロセスとは異なるプロセス ID を持っていることに注目してください。これは、それらが別のプロセスであることを確認しています。
プロセスの作成について
multiprocessing の Process を作成して開始すると、Python は次のことを行います。
- 新しいプロセスを作成します
- ターゲット関数を含むモジュールをインポートします
- 新しいプロセスでターゲット関数を実行します
- 親プロセスに制御を返します
if __name__ == "__main__": ガードは、Python で multiprocessing を使用する際に重要です。これにより、モジュールがインポートされたときに重複するプロセスが作成されるのを防ぎます。
複数のプロセス
複数のプロセスを作成するように例を変更しましょう。multiple_processes.py という名前の新しいファイルを作成します。
import multiprocessing
import time
import os
def worker(worker_id):
"""Worker function that accepts an argument"""
print(f"Worker {worker_id} (PID: {os.getpid()}) started")
time.sleep(1)
print(f"Worker {worker_id} completed")
return worker_id * 2 ## This return value won't be captured
if __name__ == "__main__":
print(f"Main process PID: {os.getpid()}")
## Create multiple processes
processes = []
for i in range(3):
p = multiprocessing.Process(target=worker, args=(i,))
processes.append(p)
p.start()
## Wait for all processes to complete
for p in processes:
p.join()
print("All processes completed")
このコードを実行します。
python3 multiple_processes.py
次のような出力が表示されるはずです。
Main process PID: 12345
Worker 0 (PID: 12346) started
Worker 1 (PID: 12347) started
Worker 2 (PID: 12348) started
Worker 0 completed
Worker 1 completed
Worker 2 completed
All processes completed
args パラメータを使用して、各プロセスに引数(worker_id)を渡したことに注目してください。これは、プロセスに引数を渡す最初の例です。
また、プロセスは必ずしも予測可能な順序で実行されるとは限らないことにも注意してください。これらは並列に実行されているためです。
Pool を使用した基本的な引数の受け渡し
前のステップでは、単純な引数をプロセスに渡しました。ここでは、Pool クラスを使用して複数のタスクを処理するより効率的な方法を探り、引数を渡すさまざまな方法について学びます。
Pool クラス
Pool クラスは、複数の入力値に対して関数の実行を並列化するための便利な方法を提供します。入力データを複数のプロセスに分散し、結果を収集します。
Pool の仕組みを理解するために、簡単な例を作成しましょう。pool_example.py という名前の新しいファイルを作成します。
import multiprocessing
import time
def square(x):
"""Function that squares a number and returns the result"""
print(f"Processing {x}...")
time.sleep(1) ## Simulate a time-consuming operation
return x * x
if __name__ == "__main__":
## Create a list of numbers to process
numbers = [1, 2, 3, 4, 5]
## Create a Pool with 2 processes
with multiprocessing.Pool(processes=2) as pool:
## Map the square function to the numbers
start_time = time.time()
results = pool.map(square, numbers)
end_time = time.time()
print(f"Results: {results}")
print(f"Time taken: {end_time - start_time:.2f} seconds")
## Compare with sequential processing
start_time = time.time()
sequential_results = [square(n) for n in numbers]
end_time = time.time()
print(f"Sequential results: {sequential_results}")
print(f"Sequential time: {end_time - start_time:.2f} seconds")
コードを実行します。
python3 pool_example.py
次のような出力が表示されるはずです。
Processing 1...
Processing 2...
Processing 3...
Processing 4...
Processing 5...
Results: [1, 4, 9, 16, 25]
Time taken: 3.01 seconds
Processing 1...
Processing 2...
Processing 3...
Processing 4...
Processing 5...
Sequential results: [1, 4, 9, 16, 25]
Sequential time: 5.01 seconds
Pool が、複数の数値を並列に処理するため、より高速であることに注目してください。プールサイズが 2 の場合、一度に 2 つの数値を処理できますが、シーケンシャルなアプローチでは、それらを 1 つずつ処理します。
Pool.map() を使用した単一引数の受け渡し
map() メソッドは、関数が単一の引数を受け取る場合に最適です。反復可能オブジェクト内の各項目に関数を並列に適用します。
別の例で、これをさらに詳しく見てみましょう。pool_map.py という名前のファイルを作成します。
import multiprocessing
import time
import os
def process_item(item):
"""Process a single item"""
process_id = os.getpid()
print(f"Process {process_id} processing item {item}")
time.sleep(1) ## Simulate work
return item * 10
if __name__ == "__main__":
## Create a list of items to process
items = list(range(10))
## Get the number of CPU cores available
num_cores = multiprocessing.cpu_count()
print(f"System has {num_cores} CPU cores")
## Create a Pool with the number of available cores
with multiprocessing.Pool(processes=num_cores) as pool:
print("Starting parallel processing...")
start_time = time.time()
## Process all items in parallel
results = pool.map(process_item, items)
end_time = time.time()
print(f"All processing completed in {end_time - start_time:.2f} seconds")
print(f"Results: {results}")
コードを実行します。
python3 pool_map.py
出力には、複数のプロセスで並列に処理されている項目が表示され、時間は 10 秒より大幅に短くなります(10 個の項目をそれぞれ 1 秒遅延でシーケンシャルに処理した場合)。
Pool.starmap() を使用した複数の引数の受け渡し
関数が複数の引数を必要とする場合はどうでしょうか?ここで starmap() が登場します。
pool_starmap.py という名前のファイルを作成します。
import multiprocessing
import time
def process_data(id, value, multiplier):
"""Process data with multiple arguments"""
print(f"Processing item {id}: {value} with multiplier {multiplier}")
time.sleep(1) ## Simulate work
return value * multiplier
if __name__ == "__main__":
## Create a list of argument tuples
## Each tuple contains all arguments for one function call
arguments = [
(1, 5, 2), ## id=1, value=5, multiplier=2
(2, 10, 3), ## id=2, value=10, multiplier=3
(3, 15, 2), ## id=3, value=15, multiplier=2
(4, 20, 4), ## id=4, value=20, multiplier=4
(5, 25, 5) ## id=5, value=25, multiplier=5
]
## Create a Pool with 3 processes
with multiprocessing.Pool(processes=3) as pool:
print("Starting parallel processing with multiple arguments...")
start_time = time.time()
## Process all items in parallel using starmap
results = pool.starmap(process_data, arguments)
end_time = time.time()
print(f"All processing completed in {end_time - start_time:.2f} seconds")
print(f"Results: {results}")
コードを実行します。
python3 pool_starmap.py
出力には、各項目が独自の引数セットで処理され、最終的な結果リストにはすべての計算値が含まれていることが示されます。
Pool に関する重要なポイント
Poolは、タスクのプロセス間での分散を自動的に処理します- タスクのキューを管理し、利用可能なワーカープロセスに割り当てます
- 結果を収集し、入力データと同じ順序で返します
- プロセスの数は、システムの CPU コアとタスクの性質に基づいて最適化できます
高度な引数の受け渡しテクニック
前のステップでは、Process オブジェクトと Pool クラスを使用した基本的な引数の受け渡しについて学びました。ここでは、multiprocessing シナリオで引数を渡すための、より高度なテクニックを探ります。
apply() と apply_async() を使用した複雑なデータの受け渡し
map() と starmap() が提供するものよりも、より柔軟性が必要になる場合があります。apply() および apply_async() メソッドを使用すると、特定の引数で関数を実行し、結果をより動的に処理できます。
pool_apply.py という名前のファイルを作成します。
import multiprocessing
import time
import random
def process_task(task_id, duration, data):
"""Process a task with multiple arguments including complex data"""
print(f"Starting task {task_id} with duration {duration}s and data: {data}")
time.sleep(duration) ## Simulate variable work time
result = sum(data) * task_id
print(f"Task {task_id} completed")
return result
if __name__ == "__main__":
## Create a Pool with 2 processes
with multiprocessing.Pool(processes=2) as pool:
print("Starting tasks with apply()...")
## Execute tasks with apply() - blocking
result1 = pool.apply(process_task, args=(1, 2, [1, 2, 3]))
print(f"Result 1: {result1}")
result2 = pool.apply(process_task, args=(2, 1, [4, 5, 6]))
print(f"Result 2: {result2}")
print("\nStarting tasks with apply_async()...")
## Execute tasks with apply_async() - non-blocking
async_result1 = pool.apply_async(process_task, args=(3, 3, [7, 8, 9]))
async_result2 = pool.apply_async(process_task, args=(4, 2, [10, 11, 12]))
## Do other work while tasks are running
print("Tasks are running in the background...")
time.sleep(1)
print("Main process is doing other work...")
## Get results when needed (will wait if not yet available)
print(f"Result 3: {async_result1.get()}")
print(f"Result 4: {async_result2.get()}")
コードを実行します。
python3 pool_apply.py
出力には、apply() がブロッキング(シーケンシャル)である一方、apply_async() がタスクを並列に実行できることが示されます。
functools.partial を使用した部分的な関数適用
複数の引数を受け取る関数があり、そのうちのいくつかを固定したい場合は、functools.partial が非常に役立ちます。
partial_example.py という名前のファイルを作成します。
import multiprocessing
from functools import partial
import time
def process_with_config(data, config_a, config_b):
"""Process data with specific configuration parameters"""
print(f"Processing {data} with config_a={config_a}, config_b={config_b}")
time.sleep(1)
return data * config_a + config_b
if __name__ == "__main__":
## Data to process
data_items = [1, 2, 3, 4, 5]
## Configuration values
config_a = 10
config_b = 5
## Create a partially applied function with fixed config values
partial_process = partial(process_with_config, config_a=config_a, config_b=config_b)
## Create a Pool
with multiprocessing.Pool(processes=3) as pool:
## Use map with the partially applied function
print("Starting processing with partial function...")
results = pool.map(partial_process, data_items)
print(f"Results: {results}")
コードを実行します。
python3 partial_example.py
出力には、各データ項目が、partial() で固定された同じ構成値で処理される様子が示されます。
共有メモリを介した引数の受け渡し
各プロセスでコピーする必要がない大きなデータの場合、共有メモリの方が効率的です。このテクニックを探ってみましょう。
shared_memory_example.py という名前のファイルを作成します。
import multiprocessing
import numpy as np
import time
def process_array_chunk(array, start_idx, end_idx):
"""Process a chunk of a shared array"""
print(f"Processing chunk from index {start_idx} to {end_idx}")
## Simulate processing each element
for i in range(start_idx, end_idx):
array[i] = array[i] ** 2
time.sleep(1) ## Simulate additional work
return start_idx, end_idx
if __name__ == "__main__":
## Create a shared array
array_size = 1000000
shared_array = multiprocessing.Array('d', array_size) ## 'd' for double precision
## Fill the array with initial values
temp_array = np.frombuffer(shared_array.get_obj())
temp_array[:] = np.arange(array_size)
print(f"Array initialized with size {array_size}")
print(f"First 5 elements: {temp_array[:5]}")
## Define chunk size and create tasks
num_processes = 4
chunk_size = array_size // num_processes
tasks = []
for i in range(num_processes):
start = i * chunk_size
end = start + chunk_size if i < num_processes - 1 else array_size
tasks.append((start, end))
## Process chunks in parallel
with multiprocessing.Pool(processes=num_processes) as pool:
## Create arguments for each task
args = [(shared_array, start, end) for start, end in tasks]
print("Starting parallel processing...")
start_time = time.time()
## Using starmap to pass multiple arguments
results = pool.starmap(process_array_chunk, args)
end_time = time.time()
print(f"Processing completed in {end_time - start_time:.2f} seconds")
## Verify results
print(f"First 5 elements after processing: {temp_array[:5]}")
print(f"Results summary: {results}")
コードを実行します。
python3 shared_memory_example.py
出力には、大きな配列が異なるプロセスによってチャンク単位で処理され、すべてのプロセスが同じ共有メモリにアクセスできることが示されます。
パフォーマンス比較
これらのさまざまなテクニックを実際のタスクで比較してみましょう。
performance_comparison.py という名前のファイルを作成します。
import multiprocessing
import time
import numpy as np
from functools import partial
def process_data_simple(data):
"""Simple processing function"""
return sum(x ** 2 for x in data)
def process_data_with_config(data, multiplier):
"""Processing with additional configuration"""
return sum(x ** 2 for x in data) * multiplier
def run_test(test_name, func, data_chunks, pool_size=4, **kwargs):
"""Run a test and measure performance"""
start_time = time.time()
with multiprocessing.Pool(processes=pool_size) as pool:
if kwargs:
partial_func = partial(func, **kwargs)
results = pool.map(partial_func, data_chunks)
else:
results = pool.map(func, data_chunks)
end_time = time.time()
print(f"{test_name} completed in {end_time - start_time:.4f} seconds")
return results
if __name__ == "__main__":
## Create test data
chunk_size = 1000000
num_chunks = 8
data_chunks = [np.random.rand(chunk_size) for _ in range(num_chunks)]
print(f"Created {num_chunks} data chunks of size {chunk_size} each")
## Test 1: Simple map
results1 = run_test("Simple map", process_data_simple, data_chunks)
## Test 2: Using partial
results2 = run_test("Map with partial", process_data_with_config,
data_chunks, multiplier=2)
## Test 3: Sequential processing for comparison
start_time = time.time()
seq_results = [process_data_simple(chunk) for chunk in data_chunks]
end_time = time.time()
print(f"Sequential processing completed in {end_time - start_time:.4f} seconds")
## Validate results
print(f"\nFirst result from each test:")
print(f"Test 1 first result: {results1[0]:.4f}")
print(f"Test 2 first result: {results2[0]:.4f} (should be double of Test 1)")
print(f"Sequential first result: {seq_results[0]:.4f}")
パフォーマンス比較を実行します。
python3 performance_comparison.py
出力には、アプローチ間のパフォーマンスの違いが示され、並列処理の利点と、さまざまな引数受け渡しテクニックのオーバーヘッドが示されます。
実際のアプリケーション:並列画像処理
Python の multiprocessing で引数を渡すさまざまなテクニックを理解したので、これらの概念を実際のシナリオ、つまり並列画像処理に適用してみましょう。これは、multiprocessing がパフォーマンスを大幅に向上させることができる一般的なユースケースです。
環境のセットアップ
まず、必要なパッケージをインストールしましょう。
pip install Pillow numpy
サンプル画像の作成
処理用のサンプル画像をいくつか生成するスクリプトを作成しましょう。create_images.py という名前のファイルを作成します。
from PIL import Image
import numpy as np
import os
def create_sample_image(filename, width=800, height=600):
"""Create a sample image with random data"""
## Create random array data
data = np.random.randint(0, 255, (height, width, 3), dtype=np.uint8)
## Create image from array
img = Image.fromarray(data, 'RGB')
## Save the image
img.save(filename)
print(f"Created image: {filename}")
if __name__ == "__main__":
## Create a directory for sample images
os.makedirs("sample_images", exist_ok=True)
## Create 5 sample images
for i in range(5):
create_sample_image(f"sample_images/sample_{i+1}.png")
print("All sample images created successfully")
このスクリプトを実行して、サンプル画像を作成します。
python3 create_images.py
シーケンシャル画像処理
まず、画像処理タスクのシーケンシャルバージョンを実装しましょう。sequential_image_processing.py という名前のファイルを作成します。
from PIL import Image, ImageFilter
import os
import time
def apply_filters(image_path, output_dir):
"""Apply multiple filters to an image and save the results"""
## Load the image
image_name = os.path.basename(image_path)
print(f"Processing image: {image_name}")
img = Image.open(image_path)
## Apply filters
## 1. Blur filter
start_time = time.time()
blurred = img.filter(ImageFilter.GaussianBlur(radius=5))
blurred.save(os.path.join(output_dir, f"blur_{image_name}"))
## 2. Edge detection filter
edges = img.filter(ImageFilter.FIND_EDGES)
edges.save(os.path.join(output_dir, f"edges_{image_name}"))
## 3. Emboss filter
emboss = img.filter(ImageFilter.EMBOSS)
emboss.save(os.path.join(output_dir, f"emboss_{image_name}"))
## 4. Sharpen filter
sharpen = img.filter(ImageFilter.SHARPEN)
sharpen.save(os.path.join(output_dir, f"sharpen_{image_name}"))
end_time = time.time()
processing_time = end_time - start_time
print(f"Completed processing {image_name} in {processing_time:.2f} seconds")
return image_name, processing_time
def process_images_sequentially(image_dir, output_dir):
"""Process all images in a directory sequentially"""
## Ensure output directory exists
os.makedirs(output_dir, exist_ok=True)
## Get all image files
image_files = [os.path.join(image_dir, f) for f in os.listdir(image_dir)
if f.endswith(('.png', '.jpg', '.jpeg'))]
print(f"Found {len(image_files)} images to process")
## Process each image sequentially
start_time = time.time()
results = []
for image_path in image_files:
result = apply_filters(image_path, output_dir)
results.append(result)
end_time = time.time()
total_time = end_time - start_time
print(f"\nSequential processing completed")
print(f"Total processing time: {total_time:.2f} seconds")
return results
if __name__ == "__main__":
## Define directories
image_dir = "sample_images"
output_dir = "processed_sequential"
## Process images
results = process_images_sequentially(image_dir, output_dir)
## Print summary
print("\nProcessing summary:")
for image_name, proc_time in results:
print(f" {image_name}: {proc_time:.2f} seconds")
シーケンシャル処理を実行します。
python3 sequential_image_processing.py
並列画像処理
次に、これまで学習したテクニックを使用して、並列画像処理を実装しましょう。parallel_image_processing.py という名前のファイルを作成します。
from PIL import Image, ImageFilter
import multiprocessing
import os
import time
def apply_filters(args):
"""Apply multiple filters to an image and save the results"""
image_path, output_dir = args
## Load the image
image_name = os.path.basename(image_path)
print(f"Process {os.getpid()} processing image: {image_name}")
img = Image.open(image_path)
## Apply filters
## 1. Blur filter
start_time = time.time()
blurred = img.filter(ImageFilter.GaussianBlur(radius=5))
blurred.save(os.path.join(output_dir, f"blur_{image_name}"))
## 2. Edge detection filter
edges = img.filter(ImageFilter.FIND_EDGES)
edges.save(os.path.join(output_dir, f"edges_{image_name}"))
## 3. Emboss filter
emboss = img.filter(ImageFilter.EMBOSS)
emboss.save(os.path.join(output_dir, f"emboss_{image_name}"))
## 4. Sharpen filter
sharpen = img.filter(ImageFilter.SHARPEN)
sharpen.save(os.path.join(output_dir, f"sharpen_{image_name}"))
end_time = time.time()
processing_time = end_time - start_time
print(f"Process {os.getpid()} completed {image_name} in {processing_time:.2f} seconds")
return image_name, processing_time
def process_images_in_parallel(image_dir, output_dir, num_processes=None):
"""Process all images in a directory in parallel"""
## Ensure output directory exists
os.makedirs(output_dir, exist_ok=True)
## Get all image files
image_files = [os.path.join(image_dir, f) for f in os.listdir(image_dir)
if f.endswith(('.png', '.jpg', '.jpeg'))]
print(f"Found {len(image_files)} images to process")
## Determine number of processes
if num_processes is None:
num_processes = min(multiprocessing.cpu_count(), len(image_files))
print(f"Using {num_processes} processes for parallel processing")
## Create arguments for each task
task_args = [(image_path, output_dir) for image_path in image_files]
## Process images in parallel
start_time = time.time()
with multiprocessing.Pool(processes=num_processes) as pool:
results = pool.map(apply_filters, task_args)
end_time = time.time()
total_time = end_time - start_time
print(f"\nParallel processing completed")
print(f"Total processing time: {total_time:.2f} seconds")
return results
if __name__ == "__main__":
## Define directories
image_dir = "sample_images"
output_dir = "processed_parallel"
## Process images
results = process_images_in_parallel(image_dir, output_dir)
## Print summary
print("\nProcessing summary:")
for image_name, proc_time in results:
print(f" {image_name}: {proc_time:.2f} seconds")
並列処理を実行します。
python3 parallel_image_processing.py
さまざまなフィルターを使用した高度な並列処理
次に、同じ画像に異なるフィルターを並列に適用する、より複雑なバージョンを実装しましょう。advanced_parallel_processing.py という名前のファイルを作成します。
from PIL import Image, ImageFilter
import multiprocessing
import os
import time
from functools import partial
def apply_filter(filter_info, image_path, output_dir):
"""Apply a specific filter to an image"""
filter_name, filter_obj = filter_info
## Load the image
image_name = os.path.basename(image_path)
print(f"Process {os.getpid()} applying {filter_name} to {image_name}")
## Open image each time to avoid potential issues with sharing PIL objects
img = Image.open(image_path)
## Apply filter
start_time = time.time()
filtered = img.filter(filter_obj)
## Save the filtered image
output_filename = f"{filter_name}_{image_name}"
output_path = os.path.join(output_dir, output_filename)
filtered.save(output_path)
end_time = time.time()
processing_time = end_time - start_time
print(f"Process {os.getpid()} completed {filter_name} on {image_name} in {processing_time:.2f} seconds")
return filter_name, image_name, processing_time
def process_image_with_multiple_filters(image_path, output_dir, filters):
"""Process an image with multiple filters in parallel"""
## Create a partial function with fixed image_path and output_dir
apply_filter_to_image = partial(apply_filter,
image_path=image_path,
output_dir=output_dir)
## Process the image with multiple filters in parallel
with multiprocessing.Pool() as pool:
results = pool.map(apply_filter_to_image, filters)
return results
def main():
## Define directories
image_dir = "sample_images"
output_dir = "processed_advanced"
os.makedirs(output_dir, exist_ok=True)
## Define filters to apply
filters = [
("blur", ImageFilter.GaussianBlur(radius=5)),
("edges", ImageFilter.FIND_EDGES),
("emboss", ImageFilter.EMBOSS),
("sharpen", ImageFilter.SHARPEN),
("contour", ImageFilter.CONTOUR),
("detail", ImageFilter.DETAIL),
("smooth", ImageFilter.SMOOTH),
("smoothmore", ImageFilter.SMOOTH_MORE)
]
## Get all image files
image_files = [os.path.join(image_dir, f) for f in os.listdir(image_dir)
if f.endswith(('.png', '.jpg', '.jpeg'))]
print(f"Found {len(image_files)} images to process with {len(filters)} filters each")
## Process each image with all filters
all_results = []
overall_start = time.time()
for image_path in image_files:
image_name = os.path.basename(image_path)
print(f"\nProcessing {image_name} with {len(filters)} filters in parallel")
start_time = time.time()
results = process_image_with_multiple_filters(image_path, output_dir, filters)
end_time = time.time()
image_time = end_time - start_time
print(f"Completed all filters for {image_name} in {image_time:.2f} seconds")
all_results.extend(results)
overall_end = time.time()
total_time = overall_end - overall_start
## Print summary
print("\nProcessing Summary:")
print(f"Total processing time: {total_time:.2f} seconds")
print(f"Total operations: {len(all_results)}")
## Calculate average time per filter
avg_time = sum(time for _, _, time in all_results) / len(all_results)
print(f"Average processing time per filter: {avg_time:.2f} seconds")
if __name__ == "__main__":
main()
高度な並列処理を実行します。
python3 advanced_parallel_processing.py
パフォーマンスの比較
さまざまなアプローチのパフォーマンスを比較するスクリプトを作成しましょう。compare_performance.py という名前のファイルを作成します。
import os
import time
import subprocess
import matplotlib.pyplot as plt
import numpy as np
def run_script(script_name):
"""Run a Python script and measure its execution time"""
print(f"Running {script_name}...")
start_time = time.time()
## Run the script
process = subprocess.run(['python3', script_name],
capture_output=True, text=True)
end_time = time.time()
execution_time = end_time - start_time
print(f"Completed {script_name} in {execution_time:.2f} seconds")
print(f"Output: {process.stdout[-200:]}") ## Show last part of output
return execution_time
def main():
## Scripts to compare
scripts = [
'sequential_image_processing.py',
'parallel_image_processing.py',
'advanced_parallel_processing.py'
]
## Run each script and measure time
times = []
for script in scripts:
if os.path.exists(script):
times.append(run_script(script))
else:
print(f"Script {script} not found!")
times.append(0)
## Plot results
if any(times):
labels = ['Sequential', 'Basic Parallel', 'Advanced Parallel']
x = np.arange(len(labels))
plt.figure(figsize=(10, 6))
plt.bar(x, times, color=['red', 'blue', 'green'])
plt.xlabel('Processing Method')
plt.ylabel('Execution Time (s)')
plt.title('Image Processing Performance Comparison')
plt.xticks(x, labels)
## Add execution time as text on bars
for i, v in enumerate(times):
plt.text(i, v + 0.1, f"{v:.2f}s", ha='center')
## Save the plot
plt.tight_layout()
plt.savefig('performance_comparison.png')
print("Performance comparison plot saved as 'performance_comparison.png'")
else:
print("No valid execution times to plot")
if __name__ == "__main__":
main()
比較を実行しましょう。
python3 compare_performance.py
結果を確認し、performance_comparison.png 画像を調べて、さまざまなアプローチ間の実行時間の違いを確認します。
主なポイント
この実際の例から、multiprocessing のいくつかの重要な側面を観察できます。
並列処理は、画像処理などの CPU 集約型タスクを処理する場合、実行時間を大幅に短縮できます。
引数を渡す方法は、コードの複雑さとパフォーマンスの両方に影響します。
- 単純な単一引数関数には、単純な map()
- 特定のパラメーターを修正するための部分関数
- 複雑なワークフローのための高度なテクニック
実際のアプリケーションに関する考慮事項:
- リソース管理(CPU、メモリ)
- タスク分散戦略
- 並列環境でのエラー処理
- プロセス間の調整
この実践的な例は、このラボ全体で学習した multiprocessing の概念を適用して、実際の問題を効率的に解決する方法を示しています。
まとめ
この Lab では、Python の multiprocessing と、並列プロセスに引数を渡すためのさまざまなテクニックについて学習しました。以下は、これまでに達成したことの概要です。
Multiprocessing の基礎: 最初の multiprocessing プログラムを作成し、プロセスとスレッドの違いを理解しました。
引数の受け渡しテクニック: さまざまな引数の受け渡し方法を探求しました。
Process.argsを使用した基本的な引数の受け渡しPool.map()を使用した単一引数の受け渡しPool.starmap()を使用した複数の引数apply()とapply_async()を使用した動的な実行- 部分的な関数適用に
functools.partialを使用 - 効率的なデータ共有のための共有メモリ
実際のアプリケーション: これらの概念を実践的な画像処理の例に適用し、並列化による大幅なパフォーマンス向上を示しました。
この Lab で学習したスキルは、CPU を集中的に使用する Python アプリケーションにとって貴重であり、マルチコアシステムのすべての計算能力を活用できます。より複雑なアプリケーションを開発する際には、さまざまな引数の受け渡しテクニック間のトレードオフを考慮し、特定のユースケースに最適な方法を選択することを忘れないでください。



