Wie man Argumente in Python Multiprocessing übergibt

PythonBeginner
Jetzt üben

Einführung

Das multiprocessing-Modul von Python ermöglicht Parallelverarbeitung durch die Nutzung mehrerer CPU-Kerne, was die Rechenleistung für CPU-intensive Aufgaben erheblich verbessern kann. Dieses Lab untersucht verschiedene Techniken zur Übergabe von Argumenten an Prozesse im multiprocessing-Modul, geht auf häufige Herausforderungen in der nebenläufigen Programmierung ein und demonstriert praktische Strategien für eine effektive Parallelisierung.

Am Ende dieses Labs werden Sie verstehen, wie man verschiedene Methoden zur Datenübergabe an parallele Prozesse verwendet und diese Techniken auf reale Szenarien anwendet.

Einführung in Multiprocessing in Python

In diesem Schritt lernen wir die Grundlagen von Multiprocessing in Python und erstellen unser erstes paralleles Programm.

Was ist Multiprocessing?

Multiprocessing ist ein Python-Modul, das es uns ermöglicht, mehrere Prozesse parallel auszuführen und so effektiv mehrere CPU-Kerne zu nutzen. Dies unterscheidet sich von Threading, das durch den Global Interpreter Lock (GIL) eingeschränkt ist, der eine echte parallele Ausführung von Python-Code verhindert.

Hier ist ein einfacher Vergleich:

Merkmal Multiprocessing Threading
Speicher Separate Speicherbereiche Gemeinsamer Speicherbereich
Parallelität Echte parallele Ausführung Begrenzt durch GIL
Anwendungsfall CPU-gebundene Aufgaben I/O-gebundene Aufgaben
Overhead Höher (separate Prozesse) Geringer

Erstellen Ihres ersten Multiprocessing-Programms

Lassen Sie uns ein einfaches Multiprocessing-Beispiel erstellen. Öffnen Sie eine neue Datei in der WebIDE und nennen Sie sie simple_process.py:

import multiprocessing
import time
import os

def worker():
    """Simple function to demonstrate a process"""
    print(f"Worker process id: {os.getpid()}")
    print(f"Worker parent process id: {os.getppid()}")
    time.sleep(1)
    print("Worker process completed")

if __name__ == "__main__":
    ## Print information about the main process
    print(f"Main process id: {os.getpid()}")

    ## Create a process
    process = multiprocessing.Process(target=worker)

    ## Start the process
    print("Starting worker process...")
    process.start()

    ## Wait for the process to complete
    process.join()

    print("Main process completed")

Führen wir nun dieses Programm aus:

python3 simple_process.py

Sie sollten eine Ausgabe ähnlich der folgenden sehen:

Main process id: 12345
Starting worker process...
Worker process id: 12346
Worker parent process id: 12345
Worker process completed
Main process completed

Die genauen Prozess-IDs unterscheiden sich auf Ihrem System. Beachten Sie, dass der Worker-Prozess eine andere Prozess-ID als der Hauptprozess hat, was bestätigt, dass es sich um separate Prozesse handelt.

Verstehen der Prozesserstellung

Wenn wir einen Multiprocessing-Prozess erstellen und ihn starten, führt Python Folgendes aus:

  1. Erstellt einen neuen Prozess
  2. Importiert das Modul, das die Zielfunktion enthält
  3. Führt die Zielfunktion im neuen Prozess aus
  4. Gibt die Kontrolle an den übergeordneten Prozess zurück

Der if __name__ == "__main__": Guard ist wichtig, wenn Sie mit Multiprocessing in Python arbeiten. Dies verhindert die Erstellung doppelter Prozesse, wenn das Modul importiert wird.

Mehrere Prozesse

Ändern wir unser Beispiel, um mehrere Prozesse zu erstellen. Erstellen Sie eine neue Datei mit dem Namen multiple_processes.py:

import multiprocessing
import time
import os

def worker(worker_id):
    """Worker function that accepts an argument"""
    print(f"Worker {worker_id} (PID: {os.getpid()}) started")
    time.sleep(1)
    print(f"Worker {worker_id} completed")
    return worker_id * 2  ## This return value won't be captured

if __name__ == "__main__":
    print(f"Main process PID: {os.getpid()}")

    ## Create multiple processes
    processes = []
    for i in range(3):
        p = multiprocessing.Process(target=worker, args=(i,))
        processes.append(p)
        p.start()

    ## Wait for all processes to complete
    for p in processes:
        p.join()

    print("All processes completed")

Führen Sie diesen Code aus:

python3 multiple_processes.py

Sie sollten eine Ausgabe ähnlich der folgenden sehen:

Main process PID: 12345
Worker 0 (PID: 12346) started
Worker 1 (PID: 12347) started
Worker 2 (PID: 12348) started
Worker 0 completed
Worker 1 completed
Worker 2 completed
All processes completed

Beachten Sie, wie wir jedem Prozess ein Argument (worker_id) mit dem Parameter args übergeben haben. Dies ist unser erstes Beispiel für die Übergabe von Argumenten an einen Prozess.

Beachten Sie auch, dass die Prozesse möglicherweise nicht unbedingt in einer vorhersagbaren Reihenfolge ausgeführt werden, da sie parallel laufen.

Grundlegende Argumentübergabe mit Pool

Im vorherigen Schritt haben wir ein einfaches Argument an einen Prozess übergeben. Lassen Sie uns nun einen effizienteren Weg zur Handhabung mehrerer Aufgaben mit der Pool-Klasse untersuchen und verschiedene Möglichkeiten zur Übergabe von Argumenten kennenlernen.

Die Pool-Klasse

Die Pool-Klasse bietet eine praktische Möglichkeit, die Ausführung einer Funktion über mehrere Eingabewerte zu parallelisieren. Sie verteilt die Eingabedaten auf mehrere Prozesse und sammelt Ergebnisse.

Lassen Sie uns ein einfaches Beispiel erstellen, um zu verstehen, wie Pool funktioniert. Erstellen Sie eine neue Datei mit dem Namen pool_example.py:

import multiprocessing
import time

def square(x):
    """Function that squares a number and returns the result"""
    print(f"Processing {x}...")
    time.sleep(1)  ## Simulate a time-consuming operation
    return x * x

if __name__ == "__main__":
    ## Create a list of numbers to process
    numbers = [1, 2, 3, 4, 5]

    ## Create a Pool with 2 processes
    with multiprocessing.Pool(processes=2) as pool:
        ## Map the square function to the numbers
        start_time = time.time()
        results = pool.map(square, numbers)
        end_time = time.time()

        print(f"Results: {results}")
        print(f"Time taken: {end_time - start_time:.2f} seconds")

    ## Compare with sequential processing
    start_time = time.time()
    sequential_results = [square(n) for n in numbers]
    end_time = time.time()

    print(f"Sequential results: {sequential_results}")
    print(f"Sequential time: {end_time - start_time:.2f} seconds")

Führen Sie den Code aus:

python3 pool_example.py

Sie sollten eine Ausgabe ähnlich der folgenden sehen:

Processing 1...
Processing 2...
Processing 3...
Processing 4...
Processing 5...
Results: [1, 4, 9, 16, 25]
Time taken: 3.01 seconds
Processing 1...
Processing 2...
Processing 3...
Processing 4...
Processing 5...
Sequential results: [1, 4, 9, 16, 25]
Sequential time: 5.01 seconds

Beachten Sie, wie der Pool schneller ist, da er mehrere Zahlen parallel verarbeitet. Mit einer Poolgröße von 2 kann er 2 Zahlen gleichzeitig verarbeiten, während der sequenzielle Ansatz sie nacheinander verarbeitet.

Übergabe einzelner Argumente mit Pool.map()

Die map()-Methode ist perfekt, wenn Ihre Funktion ein einzelnes Argument entgegennimmt. Sie wendet die Funktion parallel auf jedes Element in einem Iterable an.

Lassen Sie uns dies mit einem weiteren Beispiel genauer untersuchen. Erstellen Sie eine Datei mit dem Namen pool_map.py:

import multiprocessing
import time
import os

def process_item(item):
    """Process a single item"""
    process_id = os.getpid()
    print(f"Process {process_id} processing item {item}")
    time.sleep(1)  ## Simulate work
    return item * 10

if __name__ == "__main__":
    ## Create a list of items to process
    items = list(range(10))

    ## Get the number of CPU cores available
    num_cores = multiprocessing.cpu_count()
    print(f"System has {num_cores} CPU cores")

    ## Create a Pool with the number of available cores
    with multiprocessing.Pool(processes=num_cores) as pool:
        print("Starting parallel processing...")
        start_time = time.time()

        ## Process all items in parallel
        results = pool.map(process_item, items)

        end_time = time.time()
        print(f"All processing completed in {end_time - start_time:.2f} seconds")
        print(f"Results: {results}")

Führen Sie den Code aus:

python3 pool_map.py

Ihre Ausgabe sollte zeigen, dass die Elemente parallel über mehrere Prozesse verarbeitet werden, mit einer Zeit, die deutlich kürzer ist als 10 Sekunden (was die sequenzielle Verarbeitungszeit für 10 Elemente mit einer Verzögerung von 1 Sekunde wäre).

Übergabe mehrerer Argumente mit Pool.starmap()

Was ist, wenn Ihre Funktion mehrere Argumente benötigt? Hier kommt starmap() ins Spiel.

Erstellen Sie eine Datei mit dem Namen pool_starmap.py:

import multiprocessing
import time

def process_data(id, value, multiplier):
    """Process data with multiple arguments"""
    print(f"Processing item {id}: {value} with multiplier {multiplier}")
    time.sleep(1)  ## Simulate work
    return value * multiplier

if __name__ == "__main__":
    ## Create a list of argument tuples
    ## Each tuple contains all arguments for one function call
    arguments = [
        (1, 5, 2),    ## id=1, value=5, multiplier=2
        (2, 10, 3),   ## id=2, value=10, multiplier=3
        (3, 15, 2),   ## id=3, value=15, multiplier=2
        (4, 20, 4),   ## id=4, value=20, multiplier=4
        (5, 25, 5)    ## id=5, value=25, multiplier=5
    ]

    ## Create a Pool with 3 processes
    with multiprocessing.Pool(processes=3) as pool:
        print("Starting parallel processing with multiple arguments...")
        start_time = time.time()

        ## Process all items in parallel using starmap
        results = pool.starmap(process_data, arguments)

        end_time = time.time()
        print(f"All processing completed in {end_time - start_time:.2f} seconds")
        print(f"Results: {results}")

Führen Sie den Code aus:

python3 pool_starmap.py

Sie sollten eine Ausgabe sehen, die zeigt, dass jedes Element mit seinem eigenen Satz von Argumenten verarbeitet wird, und die endgültige Ergebnisliste, die alle berechneten Werte enthält.

Wichtige Punkte zu Pool

  • Pool kümmert sich automatisch um die Verteilung von Aufgaben auf Prozesse
  • Es verwaltet eine Warteschlange von Aufgaben und weist sie verfügbaren Worker-Prozessen zu
  • Es sammelt Ergebnisse und gibt sie in der gleichen Reihenfolge wie die Eingabedaten zurück
  • Die Anzahl der Prozesse kann basierend auf den CPU-Kernen Ihres Systems und der Art Ihrer Aufgabe optimiert werden

Erweiterte Argumentübergabetechniken

In den vorherigen Schritten haben wir die grundlegende Argumentübergabe mit Process-Objekten und der Pool-Klasse kennengelernt. Lassen Sie uns nun fortgeschrittenere Techniken zur Übergabe von Argumenten in Multiprocessing-Szenarien untersuchen.

Übergabe komplexer Daten mit apply() und apply_async()

Manchmal benötigen Sie mehr Flexibilität als map() und starmap() bieten. Mit den Methoden apply() und apply_async() können Sie eine Funktion mit bestimmten Argumenten ausführen und Ergebnisse dynamischer verarbeiten.

Erstellen Sie eine Datei mit dem Namen pool_apply.py:

import multiprocessing
import time
import random

def process_task(task_id, duration, data):
    """Process a task with multiple arguments including complex data"""
    print(f"Starting task {task_id} with duration {duration}s and data: {data}")
    time.sleep(duration)  ## Simulate variable work time
    result = sum(data) * task_id
    print(f"Task {task_id} completed")
    return result

if __name__ == "__main__":
    ## Create a Pool with 2 processes
    with multiprocessing.Pool(processes=2) as pool:
        print("Starting tasks with apply()...")

        ## Execute tasks with apply() - blocking
        result1 = pool.apply(process_task, args=(1, 2, [1, 2, 3]))
        print(f"Result 1: {result1}")

        result2 = pool.apply(process_task, args=(2, 1, [4, 5, 6]))
        print(f"Result 2: {result2}")

        print("\nStarting tasks with apply_async()...")

        ## Execute tasks with apply_async() - non-blocking
        async_result1 = pool.apply_async(process_task, args=(3, 3, [7, 8, 9]))
        async_result2 = pool.apply_async(process_task, args=(4, 2, [10, 11, 12]))

        ## Do other work while tasks are running
        print("Tasks are running in the background...")
        time.sleep(1)
        print("Main process is doing other work...")

        ## Get results when needed (will wait if not yet available)
        print(f"Result 3: {async_result1.get()}")
        print(f"Result 4: {async_result2.get()}")

Führen Sie den Code aus:

python3 pool_apply.py

Die Ausgabe zeigt, wie apply() blockiert (sequenziell), während apply_async() es ermöglicht, dass Aufgaben parallel ausgeführt werden.

Verwendung von functools.partial für partielle Funktionsanwendung

Wenn Sie eine Funktion haben, die mehrere Argumente entgegennimmt, aber einige davon festlegen möchten, ist functools.partial sehr nützlich.

Erstellen Sie eine Datei mit dem Namen partial_example.py:

import multiprocessing
from functools import partial
import time

def process_with_config(data, config_a, config_b):
    """Process data with specific configuration parameters"""
    print(f"Processing {data} with config_a={config_a}, config_b={config_b}")
    time.sleep(1)
    return data * config_a + config_b

if __name__ == "__main__":
    ## Data to process
    data_items = [1, 2, 3, 4, 5]

    ## Configuration values
    config_a = 10
    config_b = 5

    ## Create a partially applied function with fixed config values
    partial_process = partial(process_with_config, config_a=config_a, config_b=config_b)

    ## Create a Pool
    with multiprocessing.Pool(processes=3) as pool:
        ## Use map with the partially applied function
        print("Starting processing with partial function...")
        results = pool.map(partial_process, data_items)
        print(f"Results: {results}")

Führen Sie den Code aus:

python3 partial_example.py

Die Ausgabe zeigt, wie jedes Datenelement mit denselben Konfigurationswerten verarbeitet wird, die mit partial() festgelegt wurden.

Übergabe von Argumenten über Shared Memory

Für große Daten, die nicht für jeden Prozess kopiert werden sollen, kann Shared Memory effizienter sein. Lassen Sie uns diese Technik untersuchen.

Erstellen Sie eine Datei mit dem Namen shared_memory_example.py:

import multiprocessing
import numpy as np
import time

def process_array_chunk(array, start_idx, end_idx):
    """Process a chunk of a shared array"""
    print(f"Processing chunk from index {start_idx} to {end_idx}")

    ## Simulate processing each element
    for i in range(start_idx, end_idx):
        array[i] = array[i] ** 2

    time.sleep(1)  ## Simulate additional work
    return start_idx, end_idx

if __name__ == "__main__":
    ## Create a shared array
    array_size = 1000000
    shared_array = multiprocessing.Array('d', array_size)  ## 'd' for double precision

    ## Fill the array with initial values
    temp_array = np.frombuffer(shared_array.get_obj())
    temp_array[:] = np.arange(array_size)

    print(f"Array initialized with size {array_size}")
    print(f"First 5 elements: {temp_array[:5]}")

    ## Define chunk size and create tasks
    num_processes = 4
    chunk_size = array_size // num_processes
    tasks = []

    for i in range(num_processes):
        start = i * chunk_size
        end = start + chunk_size if i < num_processes - 1 else array_size
        tasks.append((start, end))

    ## Process chunks in parallel
    with multiprocessing.Pool(processes=num_processes) as pool:
        ## Create arguments for each task
        args = [(shared_array, start, end) for start, end in tasks]

        print("Starting parallel processing...")
        start_time = time.time()

        ## Using starmap to pass multiple arguments
        results = pool.starmap(process_array_chunk, args)

        end_time = time.time()
        print(f"Processing completed in {end_time - start_time:.2f} seconds")

    ## Verify results
    print(f"First 5 elements after processing: {temp_array[:5]}")
    print(f"Results summary: {results}")

Führen Sie den Code aus:

python3 shared_memory_example.py

Die Ausgabe zeigt, wie ein großes Array in Chunks von verschiedenen Prozessen verarbeitet wird, wobei alle Prozesse Zugriff auf denselben Shared Memory haben.

Leistungsvergleich

Lassen Sie uns diese verschiedenen Techniken für eine reale Aufgabe vergleichen.

Erstellen Sie eine Datei mit dem Namen performance_comparison.py:

import multiprocessing
import time
import numpy as np
from functools import partial

def process_data_simple(data):
    """Simple processing function"""
    return sum(x ** 2 for x in data)

def process_data_with_config(data, multiplier):
    """Processing with additional configuration"""
    return sum(x ** 2 for x in data) * multiplier

def run_test(test_name, func, data_chunks, pool_size=4, **kwargs):
    """Run a test and measure performance"""
    start_time = time.time()

    with multiprocessing.Pool(processes=pool_size) as pool:
        if kwargs:
            partial_func = partial(func, **kwargs)
            results = pool.map(partial_func, data_chunks)
        else:
            results = pool.map(func, data_chunks)

    end_time = time.time()
    print(f"{test_name} completed in {end_time - start_time:.4f} seconds")
    return results

if __name__ == "__main__":
    ## Create test data
    chunk_size = 1000000
    num_chunks = 8
    data_chunks = [np.random.rand(chunk_size) for _ in range(num_chunks)]

    print(f"Created {num_chunks} data chunks of size {chunk_size} each")

    ## Test 1: Simple map
    results1 = run_test("Simple map", process_data_simple, data_chunks)

    ## Test 2: Using partial
    results2 = run_test("Map with partial", process_data_with_config,
                        data_chunks, multiplier=2)

    ## Test 3: Sequential processing for comparison
    start_time = time.time()
    seq_results = [process_data_simple(chunk) for chunk in data_chunks]
    end_time = time.time()
    print(f"Sequential processing completed in {end_time - start_time:.4f} seconds")

    ## Validate results
    print(f"\nFirst result from each test:")
    print(f"Test 1 first result: {results1[0]:.4f}")
    print(f"Test 2 first result: {results2[0]:.4f} (should be double of Test 1)")
    print(f"Sequential first result: {seq_results[0]:.4f}")

Führen Sie den Leistungsvergleich aus:

python3 performance_comparison.py

Die Ausgabe zeigt die Leistungsunterschiede zwischen den Ansätzen und demonstriert die Vorteile der parallelen Verarbeitung und den Overhead verschiedener Argumentübergabetechniken.

Anwendung in der Praxis: Parallele Bildverarbeitung

Nachdem wir verschiedene Techniken zur Übergabe von Argumenten in der Python-Multiprocessing-Umgebung verstanden haben, wollen wir diese Konzepte auf ein reales Szenario anwenden: die parallele Bildverarbeitung. Dies ist ein häufiger Anwendungsfall, bei dem Multiprocessing die Leistung erheblich verbessern kann.

Einrichten der Umgebung

Installieren wir zunächst die erforderlichen Pakete:

pip install Pillow numpy

Erstellen von Beispielbildern

Erstellen wir ein Skript, um einige Beispielbilder für unsere Verarbeitung zu generieren. Erstellen Sie eine Datei mit dem Namen create_images.py:

from PIL import Image
import numpy as np
import os

def create_sample_image(filename, width=800, height=600):
    """Create a sample image with random data"""
    ## Create random array data
    data = np.random.randint(0, 255, (height, width, 3), dtype=np.uint8)

    ## Create image from array
    img = Image.fromarray(data, 'RGB')

    ## Save the image
    img.save(filename)
    print(f"Created image: {filename}")

if __name__ == "__main__":
    ## Create a directory for sample images
    os.makedirs("sample_images", exist_ok=True)

    ## Create 5 sample images
    for i in range(5):
        create_sample_image(f"sample_images/sample_{i+1}.png")

    print("All sample images created successfully")

Führen Sie dieses Skript aus, um Beispielbilder zu erstellen:

python3 create_images.py

Sequenzielle Bildverarbeitung

Implementieren wir zunächst eine sequenzielle Version unserer Bildverarbeitungsaufgaben. Erstellen Sie eine Datei mit dem Namen sequential_image_processing.py:

from PIL import Image, ImageFilter
import os
import time

def apply_filters(image_path, output_dir):
    """Apply multiple filters to an image and save the results"""
    ## Load the image
    image_name = os.path.basename(image_path)
    print(f"Processing image: {image_name}")
    img = Image.open(image_path)

    ## Apply filters
    ## 1. Blur filter
    start_time = time.time()
    blurred = img.filter(ImageFilter.GaussianBlur(radius=5))
    blurred.save(os.path.join(output_dir, f"blur_{image_name}"))

    ## 2. Edge detection filter
    edges = img.filter(ImageFilter.FIND_EDGES)
    edges.save(os.path.join(output_dir, f"edges_{image_name}"))

    ## 3. Emboss filter
    emboss = img.filter(ImageFilter.EMBOSS)
    emboss.save(os.path.join(output_dir, f"emboss_{image_name}"))

    ## 4. Sharpen filter
    sharpen = img.filter(ImageFilter.SHARPEN)
    sharpen.save(os.path.join(output_dir, f"sharpen_{image_name}"))

    end_time = time.time()
    processing_time = end_time - start_time

    print(f"Completed processing {image_name} in {processing_time:.2f} seconds")
    return image_name, processing_time

def process_images_sequentially(image_dir, output_dir):
    """Process all images in a directory sequentially"""
    ## Ensure output directory exists
    os.makedirs(output_dir, exist_ok=True)

    ## Get all image files
    image_files = [os.path.join(image_dir, f) for f in os.listdir(image_dir)
                   if f.endswith(('.png', '.jpg', '.jpeg'))]

    print(f"Found {len(image_files)} images to process")

    ## Process each image sequentially
    start_time = time.time()
    results = []

    for image_path in image_files:
        result = apply_filters(image_path, output_dir)
        results.append(result)

    end_time = time.time()
    total_time = end_time - start_time

    print(f"\nSequential processing completed")
    print(f"Total processing time: {total_time:.2f} seconds")
    return results

if __name__ == "__main__":
    ## Define directories
    image_dir = "sample_images"
    output_dir = "processed_sequential"

    ## Process images
    results = process_images_sequentially(image_dir, output_dir)

    ## Print summary
    print("\nProcessing summary:")
    for image_name, proc_time in results:
        print(f"  {image_name}: {proc_time:.2f} seconds")

Führen Sie die sequenzielle Verarbeitung aus:

python3 sequential_image_processing.py

Parallele Bildverarbeitung

Implementieren wir nun die parallele Bildverarbeitung mit den Techniken, die wir gelernt haben. Erstellen Sie eine Datei mit dem Namen parallel_image_processing.py:

from PIL import Image, ImageFilter
import multiprocessing
import os
import time

def apply_filters(args):
    """Apply multiple filters to an image and save the results"""
    image_path, output_dir = args

    ## Load the image
    image_name = os.path.basename(image_path)
    print(f"Process {os.getpid()} processing image: {image_name}")
    img = Image.open(image_path)

    ## Apply filters
    ## 1. Blur filter
    start_time = time.time()
    blurred = img.filter(ImageFilter.GaussianBlur(radius=5))
    blurred.save(os.path.join(output_dir, f"blur_{image_name}"))

    ## 2. Edge detection filter
    edges = img.filter(ImageFilter.FIND_EDGES)
    edges.save(os.path.join(output_dir, f"edges_{image_name}"))

    ## 3. Emboss filter
    emboss = img.filter(ImageFilter.EMBOSS)
    emboss.save(os.path.join(output_dir, f"emboss_{image_name}"))

    ## 4. Sharpen filter
    sharpen = img.filter(ImageFilter.SHARPEN)
    sharpen.save(os.path.join(output_dir, f"sharpen_{image_name}"))

    end_time = time.time()
    processing_time = end_time - start_time

    print(f"Process {os.getpid()} completed {image_name} in {processing_time:.2f} seconds")
    return image_name, processing_time

def process_images_in_parallel(image_dir, output_dir, num_processes=None):
    """Process all images in a directory in parallel"""
    ## Ensure output directory exists
    os.makedirs(output_dir, exist_ok=True)

    ## Get all image files
    image_files = [os.path.join(image_dir, f) for f in os.listdir(image_dir)
                   if f.endswith(('.png', '.jpg', '.jpeg'))]

    print(f"Found {len(image_files)} images to process")

    ## Determine number of processes
    if num_processes is None:
        num_processes = min(multiprocessing.cpu_count(), len(image_files))

    print(f"Using {num_processes} processes for parallel processing")

    ## Create arguments for each task
    task_args = [(image_path, output_dir) for image_path in image_files]

    ## Process images in parallel
    start_time = time.time()

    with multiprocessing.Pool(processes=num_processes) as pool:
        results = pool.map(apply_filters, task_args)

    end_time = time.time()
    total_time = end_time - start_time

    print(f"\nParallel processing completed")
    print(f"Total processing time: {total_time:.2f} seconds")
    return results

if __name__ == "__main__":
    ## Define directories
    image_dir = "sample_images"
    output_dir = "processed_parallel"

    ## Process images
    results = process_images_in_parallel(image_dir, output_dir)

    ## Print summary
    print("\nProcessing summary:")
    for image_name, proc_time in results:
        print(f"  {image_name}: {proc_time:.2f} seconds")

Führen Sie die parallele Verarbeitung aus:

python3 parallel_image_processing.py

Erweiterte parallele Verarbeitung mit verschiedenen Filtern

Implementieren wir nun eine komplexere Version, in der wir verschiedene Filter parallel auf dasselbe Bild anwenden. Erstellen Sie eine Datei mit dem Namen advanced_parallel_processing.py:

from PIL import Image, ImageFilter
import multiprocessing
import os
import time
from functools import partial

def apply_filter(filter_info, image_path, output_dir):
    """Apply a specific filter to an image"""
    filter_name, filter_obj = filter_info

    ## Load the image
    image_name = os.path.basename(image_path)
    print(f"Process {os.getpid()} applying {filter_name} to {image_name}")

    ## Open image each time to avoid potential issues with sharing PIL objects
    img = Image.open(image_path)

    ## Apply filter
    start_time = time.time()
    filtered = img.filter(filter_obj)

    ## Save the filtered image
    output_filename = f"{filter_name}_{image_name}"
    output_path = os.path.join(output_dir, output_filename)
    filtered.save(output_path)

    end_time = time.time()
    processing_time = end_time - start_time

    print(f"Process {os.getpid()} completed {filter_name} on {image_name} in {processing_time:.2f} seconds")
    return filter_name, image_name, processing_time

def process_image_with_multiple_filters(image_path, output_dir, filters):
    """Process an image with multiple filters in parallel"""
    ## Create a partial function with fixed image_path and output_dir
    apply_filter_to_image = partial(apply_filter,
                                   image_path=image_path,
                                   output_dir=output_dir)

    ## Process the image with multiple filters in parallel
    with multiprocessing.Pool() as pool:
        results = pool.map(apply_filter_to_image, filters)

    return results

def main():
    ## Define directories
    image_dir = "sample_images"
    output_dir = "processed_advanced"
    os.makedirs(output_dir, exist_ok=True)

    ## Define filters to apply
    filters = [
        ("blur", ImageFilter.GaussianBlur(radius=5)),
        ("edges", ImageFilter.FIND_EDGES),
        ("emboss", ImageFilter.EMBOSS),
        ("sharpen", ImageFilter.SHARPEN),
        ("contour", ImageFilter.CONTOUR),
        ("detail", ImageFilter.DETAIL),
        ("smooth", ImageFilter.SMOOTH),
        ("smoothmore", ImageFilter.SMOOTH_MORE)
    ]

    ## Get all image files
    image_files = [os.path.join(image_dir, f) for f in os.listdir(image_dir)
                   if f.endswith(('.png', '.jpg', '.jpeg'))]

    print(f"Found {len(image_files)} images to process with {len(filters)} filters each")

    ## Process each image with all filters
    all_results = []
    overall_start = time.time()

    for image_path in image_files:
        image_name = os.path.basename(image_path)
        print(f"\nProcessing {image_name} with {len(filters)} filters in parallel")

        start_time = time.time()
        results = process_image_with_multiple_filters(image_path, output_dir, filters)
        end_time = time.time()

        image_time = end_time - start_time
        print(f"Completed all filters for {image_name} in {image_time:.2f} seconds")

        all_results.extend(results)

    overall_end = time.time()
    total_time = overall_end - overall_start

    ## Print summary
    print("\nProcessing Summary:")
    print(f"Total processing time: {total_time:.2f} seconds")
    print(f"Total operations: {len(all_results)}")

    ## Calculate average time per filter
    avg_time = sum(time for _, _, time in all_results) / len(all_results)
    print(f"Average processing time per filter: {avg_time:.2f} seconds")

if __name__ == "__main__":
    main()

Führen Sie die erweiterte parallele Verarbeitung aus:

python3 advanced_parallel_processing.py

Leistungsvergleich

Erstellen wir ein Skript, um die Leistung der verschiedenen Ansätze zu vergleichen. Erstellen Sie eine Datei mit dem Namen compare_performance.py:

import os
import time
import subprocess
import matplotlib.pyplot as plt
import numpy as np

def run_script(script_name):
    """Run a Python script and measure its execution time"""
    print(f"Running {script_name}...")
    start_time = time.time()

    ## Run the script
    process = subprocess.run(['python3', script_name],
                            capture_output=True, text=True)

    end_time = time.time()
    execution_time = end_time - start_time

    print(f"Completed {script_name} in {execution_time:.2f} seconds")
    print(f"Output: {process.stdout[-200:]}") ## Show last part of output

    return execution_time

def main():
    ## Scripts to compare
    scripts = [
        'sequential_image_processing.py',
        'parallel_image_processing.py',
        'advanced_parallel_processing.py'
    ]

    ## Run each script and measure time
    times = []
    for script in scripts:
        if os.path.exists(script):
            times.append(run_script(script))
        else:
            print(f"Script {script} not found!")
            times.append(0)

    ## Plot results
    if any(times):
        labels = ['Sequential', 'Basic Parallel', 'Advanced Parallel']
        x = np.arange(len(labels))

        plt.figure(figsize=(10, 6))
        plt.bar(x, times, color=['red', 'blue', 'green'])
        plt.xlabel('Processing Method')
        plt.ylabel('Execution Time (s)')
        plt.title('Image Processing Performance Comparison')
        plt.xticks(x, labels)

        ## Add execution time as text on bars
        for i, v in enumerate(times):
            plt.text(i, v + 0.1, f"{v:.2f}s", ha='center')

        ## Save the plot
        plt.tight_layout()
        plt.savefig('performance_comparison.png')
        print("Performance comparison plot saved as 'performance_comparison.png'")
    else:
        print("No valid execution times to plot")

if __name__ == "__main__":
    main()

Führen wir den Vergleich aus:

python3 compare_performance.py

Überprüfen Sie die Ergebnisse und untersuchen Sie das Bild performance_comparison.png, um die Unterschiede in der Ausführungszeit zwischen den verschiedenen Ansätzen zu sehen.

Wichtige Erkenntnisse

An diesem realen Beispiel können Sie mehrere wichtige Aspekte des Multiprocessing beobachten:

  1. Parallele Verarbeitung kann die Ausführungszeit bei CPU-intensiven Aufgaben wie der Bildverarbeitung erheblich reduzieren.

  2. Die Methode zur Übergabe von Argumenten beeinflusst sowohl die Codekomplexität als auch die Leistung:

    • Einfaches map() für unkomplizierte Funktionen mit einem Argument
    • Partielle Funktionen zum Festlegen bestimmter Parameter
    • Erweiterte Techniken für komplexe Workflows
  3. Überlegungen für reale Anwendungen:

    • Ressourcenverwaltung (CPU, Speicher)
    • Strategien zur Aufgabenverteilung
    • Fehlerbehandlung in parallelen Umgebungen
    • Koordination zwischen Prozessen

Dieses praktische Beispiel zeigt, wie Sie die Multiprocessing-Konzepte, die wir in diesem Lab gelernt haben, anwenden können, um reale Probleme effizient zu lösen.

Zusammenfassung

In diesem Lab haben Sie Python Multiprocessing und verschiedene Techniken zur Übergabe von Argumenten an parallele Prozesse kennengelernt. Hier ist eine Zusammenfassung dessen, was Sie erreicht haben:

  1. Grundlagen des Multiprocessing: Sie haben Ihr erstes Multiprocessing-Programm erstellt und den Unterschied zwischen Prozessen und Threads verstanden.

  2. Argumentübergabetechniken: Sie haben verschiedene Methoden zur Übergabe von Argumenten untersucht:

    • Grundlegende Argumentübergabe mit Process.args
    • Übergabe eines einzelnen Arguments mit Pool.map()
    • Mehrere Argumente mit Pool.starmap()
    • Dynamische Ausführung mit apply() und apply_async()
    • Verwendung von functools.partial für die partielle Funktionsanwendung
    • Shared Memory für effizientes Teilen von Daten
  3. Anwendung in der Praxis: Sie haben diese Konzepte auf ein praktisches Beispiel der Bildverarbeitung angewendet und erhebliche Leistungsverbesserungen durch Parallelisierung demonstriert.

Die Fähigkeiten, die Sie in diesem Lab erlernt haben, sind wertvoll für jede CPU-intensive Python-Anwendung und ermöglichen es Ihnen, die volle Rechenleistung von Multi-Core-Systemen zu nutzen. Denken Sie bei der Entwicklung komplexerer Anwendungen daran, die Vor- und Nachteile verschiedener Argumentübergabetechniken zu berücksichtigen und die am besten geeignete Methode für Ihren spezifischen Anwendungsfall zu wählen.