Python multiprocessing で引数を渡す方法

PythonBeginner
オンラインで実践に進む

はじめに

Python の multiprocessing モジュールは、複数の CPU コアを活用して並列コンピューティングを可能にし、CPU 負荷の高いタスクの計算パフォーマンスを大幅に向上させることができます。この実験(Lab)では、multiprocessing モジュールでプロセスに引数を渡すためのさまざまなテクニックを探求し、並行プログラミングにおける一般的な課題に対処し、効果的な並列化のための実用的な戦略を実証します。

この実験(Lab)の終わりには、並列プロセスにデータを渡すためのさまざまな方法を理解し、これらのテクニックを現実世界のシナリオに適用できるようになります。

Python での Multiprocessing の紹介

このステップでは、Python での multiprocessing の基本を学び、最初の並列プログラムを作成します。

Multiprocessing とは?

Multiprocessing は、複数のプロセスを並列に実行し、複数の CPU コアを効果的に使用できるようにする Python モジュールです。これは、Python コードの真の並列実行を妨げる Global Interpreter Lock (GIL) によって制限されるスレッド処理とは異なります。

簡単な比較を以下に示します。

特徴 Multiprocessing スレッド処理
メモリ 個別のメモリ空間 共有メモリ空間
並列性 真の並列実行 GIL によって制限される
ユースケース CPU バウンドタスク I/O バウンドタスク
オーバーヘッド 高い(個別のプロセス) 低い

最初の Multiprocessing プログラムの作成

簡単な multiprocessing の例を作成しましょう。WebIDE で新しいファイルを開き、simple_process.py という名前を付けます。

import multiprocessing
import time
import os

def worker():
    """Simple function to demonstrate a process"""
    print(f"Worker process id: {os.getpid()}")
    print(f"Worker parent process id: {os.getppid()}")
    time.sleep(1)
    print("Worker process completed")

if __name__ == "__main__":
    ## Print information about the main process
    print(f"Main process id: {os.getpid()}")

    ## Create a process
    process = multiprocessing.Process(target=worker)

    ## Start the process
    print("Starting worker process...")
    process.start()

    ## Wait for the process to complete
    process.join()

    print("Main process completed")

次に、このプログラムを実行します。

python3 simple_process.py

次のような出力が表示されるはずです。

Main process id: 12345
Starting worker process...
Worker process id: 12346
Worker parent process id: 12345
Worker process completed
Main process completed

正確なプロセス ID は、システムによって異なります。ワーカープロセスがメインプロセスとは異なるプロセス ID を持っていることに注目してください。これは、それらが別のプロセスであることを確認しています。

プロセスの作成について

multiprocessing の Process を作成して開始すると、Python は次のことを行います。

  1. 新しいプロセスを作成します
  2. ターゲット関数を含むモジュールをインポートします
  3. 新しいプロセスでターゲット関数を実行します
  4. 親プロセスに制御を返します

if __name__ == "__main__": ガードは、Python で multiprocessing を使用する際に重要です。これにより、モジュールがインポートされたときに重複するプロセスが作成されるのを防ぎます。

複数のプロセス

複数のプロセスを作成するように例を変更しましょう。multiple_processes.py という名前の新しいファイルを作成します。

import multiprocessing
import time
import os

def worker(worker_id):
    """Worker function that accepts an argument"""
    print(f"Worker {worker_id} (PID: {os.getpid()}) started")
    time.sleep(1)
    print(f"Worker {worker_id} completed")
    return worker_id * 2  ## This return value won't be captured

if __name__ == "__main__":
    print(f"Main process PID: {os.getpid()}")

    ## Create multiple processes
    processes = []
    for i in range(3):
        p = multiprocessing.Process(target=worker, args=(i,))
        processes.append(p)
        p.start()

    ## Wait for all processes to complete
    for p in processes:
        p.join()

    print("All processes completed")

このコードを実行します。

python3 multiple_processes.py

次のような出力が表示されるはずです。

Main process PID: 12345
Worker 0 (PID: 12346) started
Worker 1 (PID: 12347) started
Worker 2 (PID: 12348) started
Worker 0 completed
Worker 1 completed
Worker 2 completed
All processes completed

args パラメータを使用して、各プロセスに引数(worker_id)を渡したことに注目してください。これは、プロセスに引数を渡す最初の例です。

また、プロセスは必ずしも予測可能な順序で実行されるとは限らないことにも注意してください。これらは並列に実行されているためです。

Pool を使用した基本的な引数の受け渡し

前のステップでは、単純な引数をプロセスに渡しました。ここでは、Pool クラスを使用して複数のタスクを処理するより効率的な方法を探り、引数を渡すさまざまな方法について学びます。

Pool クラス

Pool クラスは、複数の入力値に対して関数の実行を並列化するための便利な方法を提供します。入力データを複数のプロセスに分散し、結果を収集します。

Pool の仕組みを理解するために、簡単な例を作成しましょう。pool_example.py という名前の新しいファイルを作成します。

import multiprocessing
import time

def square(x):
    """Function that squares a number and returns the result"""
    print(f"Processing {x}...")
    time.sleep(1)  ## Simulate a time-consuming operation
    return x * x

if __name__ == "__main__":
    ## Create a list of numbers to process
    numbers = [1, 2, 3, 4, 5]

    ## Create a Pool with 2 processes
    with multiprocessing.Pool(processes=2) as pool:
        ## Map the square function to the numbers
        start_time = time.time()
        results = pool.map(square, numbers)
        end_time = time.time()

        print(f"Results: {results}")
        print(f"Time taken: {end_time - start_time:.2f} seconds")

    ## Compare with sequential processing
    start_time = time.time()
    sequential_results = [square(n) for n in numbers]
    end_time = time.time()

    print(f"Sequential results: {sequential_results}")
    print(f"Sequential time: {end_time - start_time:.2f} seconds")

コードを実行します。

python3 pool_example.py

次のような出力が表示されるはずです。

Processing 1...
Processing 2...
Processing 3...
Processing 4...
Processing 5...
Results: [1, 4, 9, 16, 25]
Time taken: 3.01 seconds
Processing 1...
Processing 2...
Processing 3...
Processing 4...
Processing 5...
Sequential results: [1, 4, 9, 16, 25]
Sequential time: 5.01 seconds

Pool が、複数の数値を並列に処理するため、より高速であることに注目してください。プールサイズが 2 の場合、一度に 2 つの数値を処理できますが、シーケンシャルなアプローチでは、それらを 1 つずつ処理します。

Pool.map() を使用した単一引数の受け渡し

map() メソッドは、関数が単一の引数を受け取る場合に最適です。反復可能オブジェクト内の各項目に関数を並列に適用します。

別の例で、これをさらに詳しく見てみましょう。pool_map.py という名前のファイルを作成します。

import multiprocessing
import time
import os

def process_item(item):
    """Process a single item"""
    process_id = os.getpid()
    print(f"Process {process_id} processing item {item}")
    time.sleep(1)  ## Simulate work
    return item * 10

if __name__ == "__main__":
    ## Create a list of items to process
    items = list(range(10))

    ## Get the number of CPU cores available
    num_cores = multiprocessing.cpu_count()
    print(f"System has {num_cores} CPU cores")

    ## Create a Pool with the number of available cores
    with multiprocessing.Pool(processes=num_cores) as pool:
        print("Starting parallel processing...")
        start_time = time.time()

        ## Process all items in parallel
        results = pool.map(process_item, items)

        end_time = time.time()
        print(f"All processing completed in {end_time - start_time:.2f} seconds")
        print(f"Results: {results}")

コードを実行します。

python3 pool_map.py

出力には、複数のプロセスで並列に処理されている項目が表示され、時間は 10 秒より大幅に短くなります(10 個の項目をそれぞれ 1 秒遅延でシーケンシャルに処理した場合)。

Pool.starmap() を使用した複数の引数の受け渡し

関数が複数の引数を必要とする場合はどうでしょうか?ここで starmap() が登場します。

pool_starmap.py という名前のファイルを作成します。

import multiprocessing
import time

def process_data(id, value, multiplier):
    """Process data with multiple arguments"""
    print(f"Processing item {id}: {value} with multiplier {multiplier}")
    time.sleep(1)  ## Simulate work
    return value * multiplier

if __name__ == "__main__":
    ## Create a list of argument tuples
    ## Each tuple contains all arguments for one function call
    arguments = [
        (1, 5, 2),    ## id=1, value=5, multiplier=2
        (2, 10, 3),   ## id=2, value=10, multiplier=3
        (3, 15, 2),   ## id=3, value=15, multiplier=2
        (4, 20, 4),   ## id=4, value=20, multiplier=4
        (5, 25, 5)    ## id=5, value=25, multiplier=5
    ]

    ## Create a Pool with 3 processes
    with multiprocessing.Pool(processes=3) as pool:
        print("Starting parallel processing with multiple arguments...")
        start_time = time.time()

        ## Process all items in parallel using starmap
        results = pool.starmap(process_data, arguments)

        end_time = time.time()
        print(f"All processing completed in {end_time - start_time:.2f} seconds")
        print(f"Results: {results}")

コードを実行します。

python3 pool_starmap.py

出力には、各項目が独自の引数セットで処理され、最終的な結果リストにはすべての計算値が含まれていることが示されます。

Pool に関する重要なポイント

  • Pool は、タスクのプロセス間での分散を自動的に処理します
  • タスクのキューを管理し、利用可能なワーカープロセスに割り当てます
  • 結果を収集し、入力データと同じ順序で返します
  • プロセスの数は、システムの CPU コアとタスクの性質に基づいて最適化できます

高度な引数の受け渡しテクニック

前のステップでは、Process オブジェクトと Pool クラスを使用した基本的な引数の受け渡しについて学びました。ここでは、multiprocessing シナリオで引数を渡すための、より高度なテクニックを探ります。

apply() と apply_async() を使用した複雑なデータの受け渡し

map()starmap() が提供するものよりも、より柔軟性が必要になる場合があります。apply() および apply_async() メソッドを使用すると、特定の引数で関数を実行し、結果をより動的に処理できます。

pool_apply.py という名前のファイルを作成します。

import multiprocessing
import time
import random

def process_task(task_id, duration, data):
    """Process a task with multiple arguments including complex data"""
    print(f"Starting task {task_id} with duration {duration}s and data: {data}")
    time.sleep(duration)  ## Simulate variable work time
    result = sum(data) * task_id
    print(f"Task {task_id} completed")
    return result

if __name__ == "__main__":
    ## Create a Pool with 2 processes
    with multiprocessing.Pool(processes=2) as pool:
        print("Starting tasks with apply()...")

        ## Execute tasks with apply() - blocking
        result1 = pool.apply(process_task, args=(1, 2, [1, 2, 3]))
        print(f"Result 1: {result1}")

        result2 = pool.apply(process_task, args=(2, 1, [4, 5, 6]))
        print(f"Result 2: {result2}")

        print("\nStarting tasks with apply_async()...")

        ## Execute tasks with apply_async() - non-blocking
        async_result1 = pool.apply_async(process_task, args=(3, 3, [7, 8, 9]))
        async_result2 = pool.apply_async(process_task, args=(4, 2, [10, 11, 12]))

        ## Do other work while tasks are running
        print("Tasks are running in the background...")
        time.sleep(1)
        print("Main process is doing other work...")

        ## Get results when needed (will wait if not yet available)
        print(f"Result 3: {async_result1.get()}")
        print(f"Result 4: {async_result2.get()}")

コードを実行します。

python3 pool_apply.py

出力には、apply() がブロッキング(シーケンシャル)である一方、apply_async() がタスクを並列に実行できることが示されます。

functools.partial を使用した部分的な関数適用

複数の引数を受け取る関数があり、そのうちのいくつかを固定したい場合は、functools.partial が非常に役立ちます。

partial_example.py という名前のファイルを作成します。

import multiprocessing
from functools import partial
import time

def process_with_config(data, config_a, config_b):
    """Process data with specific configuration parameters"""
    print(f"Processing {data} with config_a={config_a}, config_b={config_b}")
    time.sleep(1)
    return data * config_a + config_b

if __name__ == "__main__":
    ## Data to process
    data_items = [1, 2, 3, 4, 5]

    ## Configuration values
    config_a = 10
    config_b = 5

    ## Create a partially applied function with fixed config values
    partial_process = partial(process_with_config, config_a=config_a, config_b=config_b)

    ## Create a Pool
    with multiprocessing.Pool(processes=3) as pool:
        ## Use map with the partially applied function
        print("Starting processing with partial function...")
        results = pool.map(partial_process, data_items)
        print(f"Results: {results}")

コードを実行します。

python3 partial_example.py

出力には、各データ項目が、partial() で固定された同じ構成値で処理される様子が示されます。

共有メモリを介した引数の受け渡し

各プロセスでコピーする必要がない大きなデータの場合、共有メモリの方が効率的です。このテクニックを探ってみましょう。

shared_memory_example.py という名前のファイルを作成します。

import multiprocessing
import numpy as np
import time

def process_array_chunk(array, start_idx, end_idx):
    """Process a chunk of a shared array"""
    print(f"Processing chunk from index {start_idx} to {end_idx}")

    ## Simulate processing each element
    for i in range(start_idx, end_idx):
        array[i] = array[i] ** 2

    time.sleep(1)  ## Simulate additional work
    return start_idx, end_idx

if __name__ == "__main__":
    ## Create a shared array
    array_size = 1000000
    shared_array = multiprocessing.Array('d', array_size)  ## 'd' for double precision

    ## Fill the array with initial values
    temp_array = np.frombuffer(shared_array.get_obj())
    temp_array[:] = np.arange(array_size)

    print(f"Array initialized with size {array_size}")
    print(f"First 5 elements: {temp_array[:5]}")

    ## Define chunk size and create tasks
    num_processes = 4
    chunk_size = array_size // num_processes
    tasks = []

    for i in range(num_processes):
        start = i * chunk_size
        end = start + chunk_size if i < num_processes - 1 else array_size
        tasks.append((start, end))

    ## Process chunks in parallel
    with multiprocessing.Pool(processes=num_processes) as pool:
        ## Create arguments for each task
        args = [(shared_array, start, end) for start, end in tasks]

        print("Starting parallel processing...")
        start_time = time.time()

        ## Using starmap to pass multiple arguments
        results = pool.starmap(process_array_chunk, args)

        end_time = time.time()
        print(f"Processing completed in {end_time - start_time:.2f} seconds")

    ## Verify results
    print(f"First 5 elements after processing: {temp_array[:5]}")
    print(f"Results summary: {results}")

コードを実行します。

python3 shared_memory_example.py

出力には、大きな配列が異なるプロセスによってチャンク単位で処理され、すべてのプロセスが同じ共有メモリにアクセスできることが示されます。

パフォーマンス比較

これらのさまざまなテクニックを実際のタスクで比較してみましょう。

performance_comparison.py という名前のファイルを作成します。

import multiprocessing
import time
import numpy as np
from functools import partial

def process_data_simple(data):
    """Simple processing function"""
    return sum(x ** 2 for x in data)

def process_data_with_config(data, multiplier):
    """Processing with additional configuration"""
    return sum(x ** 2 for x in data) * multiplier

def run_test(test_name, func, data_chunks, pool_size=4, **kwargs):
    """Run a test and measure performance"""
    start_time = time.time()

    with multiprocessing.Pool(processes=pool_size) as pool:
        if kwargs:
            partial_func = partial(func, **kwargs)
            results = pool.map(partial_func, data_chunks)
        else:
            results = pool.map(func, data_chunks)

    end_time = time.time()
    print(f"{test_name} completed in {end_time - start_time:.4f} seconds")
    return results

if __name__ == "__main__":
    ## Create test data
    chunk_size = 1000000
    num_chunks = 8
    data_chunks = [np.random.rand(chunk_size) for _ in range(num_chunks)]

    print(f"Created {num_chunks} data chunks of size {chunk_size} each")

    ## Test 1: Simple map
    results1 = run_test("Simple map", process_data_simple, data_chunks)

    ## Test 2: Using partial
    results2 = run_test("Map with partial", process_data_with_config,
                        data_chunks, multiplier=2)

    ## Test 3: Sequential processing for comparison
    start_time = time.time()
    seq_results = [process_data_simple(chunk) for chunk in data_chunks]
    end_time = time.time()
    print(f"Sequential processing completed in {end_time - start_time:.4f} seconds")

    ## Validate results
    print(f"\nFirst result from each test:")
    print(f"Test 1 first result: {results1[0]:.4f}")
    print(f"Test 2 first result: {results2[0]:.4f} (should be double of Test 1)")
    print(f"Sequential first result: {seq_results[0]:.4f}")

パフォーマンス比較を実行します。

python3 performance_comparison.py

出力には、アプローチ間のパフォーマンスの違いが示され、並列処理の利点と、さまざまな引数受け渡しテクニックのオーバーヘッドが示されます。

実際のアプリケーション:並列画像処理

Python の multiprocessing で引数を渡すさまざまなテクニックを理解したので、これらの概念を実際のシナリオ、つまり並列画像処理に適用してみましょう。これは、multiprocessing がパフォーマンスを大幅に向上させることができる一般的なユースケースです。

環境のセットアップ

まず、必要なパッケージをインストールしましょう。

pip install Pillow numpy

サンプル画像の作成

処理用のサンプル画像をいくつか生成するスクリプトを作成しましょう。create_images.py という名前のファイルを作成します。

from PIL import Image
import numpy as np
import os

def create_sample_image(filename, width=800, height=600):
    """Create a sample image with random data"""
    ## Create random array data
    data = np.random.randint(0, 255, (height, width, 3), dtype=np.uint8)

    ## Create image from array
    img = Image.fromarray(data, 'RGB')

    ## Save the image
    img.save(filename)
    print(f"Created image: {filename}")

if __name__ == "__main__":
    ## Create a directory for sample images
    os.makedirs("sample_images", exist_ok=True)

    ## Create 5 sample images
    for i in range(5):
        create_sample_image(f"sample_images/sample_{i+1}.png")

    print("All sample images created successfully")

このスクリプトを実行して、サンプル画像を作成します。

python3 create_images.py

シーケンシャル画像処理

まず、画像処理タスクのシーケンシャルバージョンを実装しましょう。sequential_image_processing.py という名前のファイルを作成します。

from PIL import Image, ImageFilter
import os
import time

def apply_filters(image_path, output_dir):
    """Apply multiple filters to an image and save the results"""
    ## Load the image
    image_name = os.path.basename(image_path)
    print(f"Processing image: {image_name}")
    img = Image.open(image_path)

    ## Apply filters
    ## 1. Blur filter
    start_time = time.time()
    blurred = img.filter(ImageFilter.GaussianBlur(radius=5))
    blurred.save(os.path.join(output_dir, f"blur_{image_name}"))

    ## 2. Edge detection filter
    edges = img.filter(ImageFilter.FIND_EDGES)
    edges.save(os.path.join(output_dir, f"edges_{image_name}"))

    ## 3. Emboss filter
    emboss = img.filter(ImageFilter.EMBOSS)
    emboss.save(os.path.join(output_dir, f"emboss_{image_name}"))

    ## 4. Sharpen filter
    sharpen = img.filter(ImageFilter.SHARPEN)
    sharpen.save(os.path.join(output_dir, f"sharpen_{image_name}"))

    end_time = time.time()
    processing_time = end_time - start_time

    print(f"Completed processing {image_name} in {processing_time:.2f} seconds")
    return image_name, processing_time

def process_images_sequentially(image_dir, output_dir):
    """Process all images in a directory sequentially"""
    ## Ensure output directory exists
    os.makedirs(output_dir, exist_ok=True)

    ## Get all image files
    image_files = [os.path.join(image_dir, f) for f in os.listdir(image_dir)
                   if f.endswith(('.png', '.jpg', '.jpeg'))]

    print(f"Found {len(image_files)} images to process")

    ## Process each image sequentially
    start_time = time.time()
    results = []

    for image_path in image_files:
        result = apply_filters(image_path, output_dir)
        results.append(result)

    end_time = time.time()
    total_time = end_time - start_time

    print(f"\nSequential processing completed")
    print(f"Total processing time: {total_time:.2f} seconds")
    return results

if __name__ == "__main__":
    ## Define directories
    image_dir = "sample_images"
    output_dir = "processed_sequential"

    ## Process images
    results = process_images_sequentially(image_dir, output_dir)

    ## Print summary
    print("\nProcessing summary:")
    for image_name, proc_time in results:
        print(f"  {image_name}: {proc_time:.2f} seconds")

シーケンシャル処理を実行します。

python3 sequential_image_processing.py

並列画像処理

次に、これまで学習したテクニックを使用して、並列画像処理を実装しましょう。parallel_image_processing.py という名前のファイルを作成します。

from PIL import Image, ImageFilter
import multiprocessing
import os
import time

def apply_filters(args):
    """Apply multiple filters to an image and save the results"""
    image_path, output_dir = args

    ## Load the image
    image_name = os.path.basename(image_path)
    print(f"Process {os.getpid()} processing image: {image_name}")
    img = Image.open(image_path)

    ## Apply filters
    ## 1. Blur filter
    start_time = time.time()
    blurred = img.filter(ImageFilter.GaussianBlur(radius=5))
    blurred.save(os.path.join(output_dir, f"blur_{image_name}"))

    ## 2. Edge detection filter
    edges = img.filter(ImageFilter.FIND_EDGES)
    edges.save(os.path.join(output_dir, f"edges_{image_name}"))

    ## 3. Emboss filter
    emboss = img.filter(ImageFilter.EMBOSS)
    emboss.save(os.path.join(output_dir, f"emboss_{image_name}"))

    ## 4. Sharpen filter
    sharpen = img.filter(ImageFilter.SHARPEN)
    sharpen.save(os.path.join(output_dir, f"sharpen_{image_name}"))

    end_time = time.time()
    processing_time = end_time - start_time

    print(f"Process {os.getpid()} completed {image_name} in {processing_time:.2f} seconds")
    return image_name, processing_time

def process_images_in_parallel(image_dir, output_dir, num_processes=None):
    """Process all images in a directory in parallel"""
    ## Ensure output directory exists
    os.makedirs(output_dir, exist_ok=True)

    ## Get all image files
    image_files = [os.path.join(image_dir, f) for f in os.listdir(image_dir)
                   if f.endswith(('.png', '.jpg', '.jpeg'))]

    print(f"Found {len(image_files)} images to process")

    ## Determine number of processes
    if num_processes is None:
        num_processes = min(multiprocessing.cpu_count(), len(image_files))

    print(f"Using {num_processes} processes for parallel processing")

    ## Create arguments for each task
    task_args = [(image_path, output_dir) for image_path in image_files]

    ## Process images in parallel
    start_time = time.time()

    with multiprocessing.Pool(processes=num_processes) as pool:
        results = pool.map(apply_filters, task_args)

    end_time = time.time()
    total_time = end_time - start_time

    print(f"\nParallel processing completed")
    print(f"Total processing time: {total_time:.2f} seconds")
    return results

if __name__ == "__main__":
    ## Define directories
    image_dir = "sample_images"
    output_dir = "processed_parallel"

    ## Process images
    results = process_images_in_parallel(image_dir, output_dir)

    ## Print summary
    print("\nProcessing summary:")
    for image_name, proc_time in results:
        print(f"  {image_name}: {proc_time:.2f} seconds")

並列処理を実行します。

python3 parallel_image_processing.py

さまざまなフィルターを使用した高度な並列処理

次に、同じ画像に異なるフィルターを並列に適用する、より複雑なバージョンを実装しましょう。advanced_parallel_processing.py という名前のファイルを作成します。

from PIL import Image, ImageFilter
import multiprocessing
import os
import time
from functools import partial

def apply_filter(filter_info, image_path, output_dir):
    """Apply a specific filter to an image"""
    filter_name, filter_obj = filter_info

    ## Load the image
    image_name = os.path.basename(image_path)
    print(f"Process {os.getpid()} applying {filter_name} to {image_name}")

    ## Open image each time to avoid potential issues with sharing PIL objects
    img = Image.open(image_path)

    ## Apply filter
    start_time = time.time()
    filtered = img.filter(filter_obj)

    ## Save the filtered image
    output_filename = f"{filter_name}_{image_name}"
    output_path = os.path.join(output_dir, output_filename)
    filtered.save(output_path)

    end_time = time.time()
    processing_time = end_time - start_time

    print(f"Process {os.getpid()} completed {filter_name} on {image_name} in {processing_time:.2f} seconds")
    return filter_name, image_name, processing_time

def process_image_with_multiple_filters(image_path, output_dir, filters):
    """Process an image with multiple filters in parallel"""
    ## Create a partial function with fixed image_path and output_dir
    apply_filter_to_image = partial(apply_filter,
                                   image_path=image_path,
                                   output_dir=output_dir)

    ## Process the image with multiple filters in parallel
    with multiprocessing.Pool() as pool:
        results = pool.map(apply_filter_to_image, filters)

    return results

def main():
    ## Define directories
    image_dir = "sample_images"
    output_dir = "processed_advanced"
    os.makedirs(output_dir, exist_ok=True)

    ## Define filters to apply
    filters = [
        ("blur", ImageFilter.GaussianBlur(radius=5)),
        ("edges", ImageFilter.FIND_EDGES),
        ("emboss", ImageFilter.EMBOSS),
        ("sharpen", ImageFilter.SHARPEN),
        ("contour", ImageFilter.CONTOUR),
        ("detail", ImageFilter.DETAIL),
        ("smooth", ImageFilter.SMOOTH),
        ("smoothmore", ImageFilter.SMOOTH_MORE)
    ]

    ## Get all image files
    image_files = [os.path.join(image_dir, f) for f in os.listdir(image_dir)
                   if f.endswith(('.png', '.jpg', '.jpeg'))]

    print(f"Found {len(image_files)} images to process with {len(filters)} filters each")

    ## Process each image with all filters
    all_results = []
    overall_start = time.time()

    for image_path in image_files:
        image_name = os.path.basename(image_path)
        print(f"\nProcessing {image_name} with {len(filters)} filters in parallel")

        start_time = time.time()
        results = process_image_with_multiple_filters(image_path, output_dir, filters)
        end_time = time.time()

        image_time = end_time - start_time
        print(f"Completed all filters for {image_name} in {image_time:.2f} seconds")

        all_results.extend(results)

    overall_end = time.time()
    total_time = overall_end - overall_start

    ## Print summary
    print("\nProcessing Summary:")
    print(f"Total processing time: {total_time:.2f} seconds")
    print(f"Total operations: {len(all_results)}")

    ## Calculate average time per filter
    avg_time = sum(time for _, _, time in all_results) / len(all_results)
    print(f"Average processing time per filter: {avg_time:.2f} seconds")

if __name__ == "__main__":
    main()

高度な並列処理を実行します。

python3 advanced_parallel_processing.py

パフォーマンスの比較

さまざまなアプローチのパフォーマンスを比較するスクリプトを作成しましょう。compare_performance.py という名前のファイルを作成します。

import os
import time
import subprocess
import matplotlib.pyplot as plt
import numpy as np

def run_script(script_name):
    """Run a Python script and measure its execution time"""
    print(f"Running {script_name}...")
    start_time = time.time()

    ## Run the script
    process = subprocess.run(['python3', script_name],
                            capture_output=True, text=True)

    end_time = time.time()
    execution_time = end_time - start_time

    print(f"Completed {script_name} in {execution_time:.2f} seconds")
    print(f"Output: {process.stdout[-200:]}") ## Show last part of output

    return execution_time

def main():
    ## Scripts to compare
    scripts = [
        'sequential_image_processing.py',
        'parallel_image_processing.py',
        'advanced_parallel_processing.py'
    ]

    ## Run each script and measure time
    times = []
    for script in scripts:
        if os.path.exists(script):
            times.append(run_script(script))
        else:
            print(f"Script {script} not found!")
            times.append(0)

    ## Plot results
    if any(times):
        labels = ['Sequential', 'Basic Parallel', 'Advanced Parallel']
        x = np.arange(len(labels))

        plt.figure(figsize=(10, 6))
        plt.bar(x, times, color=['red', 'blue', 'green'])
        plt.xlabel('Processing Method')
        plt.ylabel('Execution Time (s)')
        plt.title('Image Processing Performance Comparison')
        plt.xticks(x, labels)

        ## Add execution time as text on bars
        for i, v in enumerate(times):
            plt.text(i, v + 0.1, f"{v:.2f}s", ha='center')

        ## Save the plot
        plt.tight_layout()
        plt.savefig('performance_comparison.png')
        print("Performance comparison plot saved as 'performance_comparison.png'")
    else:
        print("No valid execution times to plot")

if __name__ == "__main__":
    main()

比較を実行しましょう。

python3 compare_performance.py

結果を確認し、performance_comparison.png 画像を調べて、さまざまなアプローチ間の実行時間の違いを確認します。

主なポイント

この実際の例から、multiprocessing のいくつかの重要な側面を観察できます。

  1. 並列処理は、画像処理などの CPU 集約型タスクを処理する場合、実行時間を大幅に短縮できます。

  2. 引数を渡す方法は、コードの複雑さとパフォーマンスの両方に影響します。

    • 単純な単一引数関数には、単純な map()
    • 特定のパラメーターを修正するための部分関数
    • 複雑なワークフローのための高度なテクニック
  3. 実際のアプリケーションに関する考慮事項:

    • リソース管理(CPU、メモリ)
    • タスク分散戦略
    • 並列環境でのエラー処理
    • プロセス間の調整

この実践的な例は、このラボ全体で学習した multiprocessing の概念を適用して、実際の問題を効率的に解決する方法を示しています。

まとめ

この Lab では、Python の multiprocessing と、並列プロセスに引数を渡すためのさまざまなテクニックについて学習しました。以下は、これまでに達成したことの概要です。

  1. Multiprocessing の基礎: 最初の multiprocessing プログラムを作成し、プロセスとスレッドの違いを理解しました。

  2. 引数の受け渡しテクニック: さまざまな引数の受け渡し方法を探求しました。

    • Process.args を使用した基本的な引数の受け渡し
    • Pool.map() を使用した単一引数の受け渡し
    • Pool.starmap() を使用した複数の引数
    • apply()apply_async() を使用した動的な実行
    • 部分的な関数適用に functools.partial を使用
    • 効率的なデータ共有のための共有メモリ
  3. 実際のアプリケーション: これらの概念を実践的な画像処理の例に適用し、並列化による大幅なパフォーマンス向上を示しました。

この Lab で学習したスキルは、CPU を集中的に使用する Python アプリケーションにとって貴重であり、マルチコアシステムのすべての計算能力を活用できます。より複雑なアプリケーションを開発する際には、さまざまな引数の受け渡しテクニック間のトレードオフを考慮し、特定のユースケースに最適な方法を選択することを忘れないでください。