Einführung
Das multiprocessing-Modul von Python ermöglicht Parallelverarbeitung durch die Nutzung mehrerer CPU-Kerne, was die Rechenleistung für CPU-intensive Aufgaben erheblich verbessern kann. Dieses Lab untersucht verschiedene Techniken zur Übergabe von Argumenten an Prozesse im multiprocessing-Modul, geht auf häufige Herausforderungen in der nebenläufigen Programmierung ein und demonstriert praktische Strategien für eine effektive Parallelisierung.
Am Ende dieses Labs werden Sie verstehen, wie man verschiedene Methoden zur Datenübergabe an parallele Prozesse verwendet und diese Techniken auf reale Szenarien anwendet.
Einführung in Multiprocessing in Python
In diesem Schritt lernen wir die Grundlagen von Multiprocessing in Python und erstellen unser erstes paralleles Programm.
Was ist Multiprocessing?
Multiprocessing ist ein Python-Modul, das es uns ermöglicht, mehrere Prozesse parallel auszuführen und so effektiv mehrere CPU-Kerne zu nutzen. Dies unterscheidet sich von Threading, das durch den Global Interpreter Lock (GIL) eingeschränkt ist, der eine echte parallele Ausführung von Python-Code verhindert.
Hier ist ein einfacher Vergleich:
| Merkmal | Multiprocessing | Threading |
|---|---|---|
| Speicher | Separate Speicherbereiche | Gemeinsamer Speicherbereich |
| Parallelität | Echte parallele Ausführung | Begrenzt durch GIL |
| Anwendungsfall | CPU-gebundene Aufgaben | I/O-gebundene Aufgaben |
| Overhead | Höher (separate Prozesse) | Geringer |
Erstellen Ihres ersten Multiprocessing-Programms
Lassen Sie uns ein einfaches Multiprocessing-Beispiel erstellen. Öffnen Sie eine neue Datei in der WebIDE und nennen Sie sie simple_process.py:
import multiprocessing
import time
import os
def worker():
"""Simple function to demonstrate a process"""
print(f"Worker process id: {os.getpid()}")
print(f"Worker parent process id: {os.getppid()}")
time.sleep(1)
print("Worker process completed")
if __name__ == "__main__":
## Print information about the main process
print(f"Main process id: {os.getpid()}")
## Create a process
process = multiprocessing.Process(target=worker)
## Start the process
print("Starting worker process...")
process.start()
## Wait for the process to complete
process.join()
print("Main process completed")
Führen wir nun dieses Programm aus:
python3 simple_process.py
Sie sollten eine Ausgabe ähnlich der folgenden sehen:
Main process id: 12345
Starting worker process...
Worker process id: 12346
Worker parent process id: 12345
Worker process completed
Main process completed
Die genauen Prozess-IDs unterscheiden sich auf Ihrem System. Beachten Sie, dass der Worker-Prozess eine andere Prozess-ID als der Hauptprozess hat, was bestätigt, dass es sich um separate Prozesse handelt.
Verstehen der Prozesserstellung
Wenn wir einen Multiprocessing-Prozess erstellen und ihn starten, führt Python Folgendes aus:
- Erstellt einen neuen Prozess
- Importiert das Modul, das die Zielfunktion enthält
- Führt die Zielfunktion im neuen Prozess aus
- Gibt die Kontrolle an den übergeordneten Prozess zurück
Der if __name__ == "__main__": Guard ist wichtig, wenn Sie mit Multiprocessing in Python arbeiten. Dies verhindert die Erstellung doppelter Prozesse, wenn das Modul importiert wird.
Mehrere Prozesse
Ändern wir unser Beispiel, um mehrere Prozesse zu erstellen. Erstellen Sie eine neue Datei mit dem Namen multiple_processes.py:
import multiprocessing
import time
import os
def worker(worker_id):
"""Worker function that accepts an argument"""
print(f"Worker {worker_id} (PID: {os.getpid()}) started")
time.sleep(1)
print(f"Worker {worker_id} completed")
return worker_id * 2 ## This return value won't be captured
if __name__ == "__main__":
print(f"Main process PID: {os.getpid()}")
## Create multiple processes
processes = []
for i in range(3):
p = multiprocessing.Process(target=worker, args=(i,))
processes.append(p)
p.start()
## Wait for all processes to complete
for p in processes:
p.join()
print("All processes completed")
Führen Sie diesen Code aus:
python3 multiple_processes.py
Sie sollten eine Ausgabe ähnlich der folgenden sehen:
Main process PID: 12345
Worker 0 (PID: 12346) started
Worker 1 (PID: 12347) started
Worker 2 (PID: 12348) started
Worker 0 completed
Worker 1 completed
Worker 2 completed
All processes completed
Beachten Sie, wie wir jedem Prozess ein Argument (worker_id) mit dem Parameter args übergeben haben. Dies ist unser erstes Beispiel für die Übergabe von Argumenten an einen Prozess.
Beachten Sie auch, dass die Prozesse möglicherweise nicht unbedingt in einer vorhersagbaren Reihenfolge ausgeführt werden, da sie parallel laufen.
Grundlegende Argumentübergabe mit Pool
Im vorherigen Schritt haben wir ein einfaches Argument an einen Prozess übergeben. Lassen Sie uns nun einen effizienteren Weg zur Handhabung mehrerer Aufgaben mit der Pool-Klasse untersuchen und verschiedene Möglichkeiten zur Übergabe von Argumenten kennenlernen.
Die Pool-Klasse
Die Pool-Klasse bietet eine praktische Möglichkeit, die Ausführung einer Funktion über mehrere Eingabewerte zu parallelisieren. Sie verteilt die Eingabedaten auf mehrere Prozesse und sammelt Ergebnisse.
Lassen Sie uns ein einfaches Beispiel erstellen, um zu verstehen, wie Pool funktioniert. Erstellen Sie eine neue Datei mit dem Namen pool_example.py:
import multiprocessing
import time
def square(x):
"""Function that squares a number and returns the result"""
print(f"Processing {x}...")
time.sleep(1) ## Simulate a time-consuming operation
return x * x
if __name__ == "__main__":
## Create a list of numbers to process
numbers = [1, 2, 3, 4, 5]
## Create a Pool with 2 processes
with multiprocessing.Pool(processes=2) as pool:
## Map the square function to the numbers
start_time = time.time()
results = pool.map(square, numbers)
end_time = time.time()
print(f"Results: {results}")
print(f"Time taken: {end_time - start_time:.2f} seconds")
## Compare with sequential processing
start_time = time.time()
sequential_results = [square(n) for n in numbers]
end_time = time.time()
print(f"Sequential results: {sequential_results}")
print(f"Sequential time: {end_time - start_time:.2f} seconds")
Führen Sie den Code aus:
python3 pool_example.py
Sie sollten eine Ausgabe ähnlich der folgenden sehen:
Processing 1...
Processing 2...
Processing 3...
Processing 4...
Processing 5...
Results: [1, 4, 9, 16, 25]
Time taken: 3.01 seconds
Processing 1...
Processing 2...
Processing 3...
Processing 4...
Processing 5...
Sequential results: [1, 4, 9, 16, 25]
Sequential time: 5.01 seconds
Beachten Sie, wie der Pool schneller ist, da er mehrere Zahlen parallel verarbeitet. Mit einer Poolgröße von 2 kann er 2 Zahlen gleichzeitig verarbeiten, während der sequenzielle Ansatz sie nacheinander verarbeitet.
Übergabe einzelner Argumente mit Pool.map()
Die map()-Methode ist perfekt, wenn Ihre Funktion ein einzelnes Argument entgegennimmt. Sie wendet die Funktion parallel auf jedes Element in einem Iterable an.
Lassen Sie uns dies mit einem weiteren Beispiel genauer untersuchen. Erstellen Sie eine Datei mit dem Namen pool_map.py:
import multiprocessing
import time
import os
def process_item(item):
"""Process a single item"""
process_id = os.getpid()
print(f"Process {process_id} processing item {item}")
time.sleep(1) ## Simulate work
return item * 10
if __name__ == "__main__":
## Create a list of items to process
items = list(range(10))
## Get the number of CPU cores available
num_cores = multiprocessing.cpu_count()
print(f"System has {num_cores} CPU cores")
## Create a Pool with the number of available cores
with multiprocessing.Pool(processes=num_cores) as pool:
print("Starting parallel processing...")
start_time = time.time()
## Process all items in parallel
results = pool.map(process_item, items)
end_time = time.time()
print(f"All processing completed in {end_time - start_time:.2f} seconds")
print(f"Results: {results}")
Führen Sie den Code aus:
python3 pool_map.py
Ihre Ausgabe sollte zeigen, dass die Elemente parallel über mehrere Prozesse verarbeitet werden, mit einer Zeit, die deutlich kürzer ist als 10 Sekunden (was die sequenzielle Verarbeitungszeit für 10 Elemente mit einer Verzögerung von 1 Sekunde wäre).
Übergabe mehrerer Argumente mit Pool.starmap()
Was ist, wenn Ihre Funktion mehrere Argumente benötigt? Hier kommt starmap() ins Spiel.
Erstellen Sie eine Datei mit dem Namen pool_starmap.py:
import multiprocessing
import time
def process_data(id, value, multiplier):
"""Process data with multiple arguments"""
print(f"Processing item {id}: {value} with multiplier {multiplier}")
time.sleep(1) ## Simulate work
return value * multiplier
if __name__ == "__main__":
## Create a list of argument tuples
## Each tuple contains all arguments for one function call
arguments = [
(1, 5, 2), ## id=1, value=5, multiplier=2
(2, 10, 3), ## id=2, value=10, multiplier=3
(3, 15, 2), ## id=3, value=15, multiplier=2
(4, 20, 4), ## id=4, value=20, multiplier=4
(5, 25, 5) ## id=5, value=25, multiplier=5
]
## Create a Pool with 3 processes
with multiprocessing.Pool(processes=3) as pool:
print("Starting parallel processing with multiple arguments...")
start_time = time.time()
## Process all items in parallel using starmap
results = pool.starmap(process_data, arguments)
end_time = time.time()
print(f"All processing completed in {end_time - start_time:.2f} seconds")
print(f"Results: {results}")
Führen Sie den Code aus:
python3 pool_starmap.py
Sie sollten eine Ausgabe sehen, die zeigt, dass jedes Element mit seinem eigenen Satz von Argumenten verarbeitet wird, und die endgültige Ergebnisliste, die alle berechneten Werte enthält.
Wichtige Punkte zu Pool
Poolkümmert sich automatisch um die Verteilung von Aufgaben auf Prozesse- Es verwaltet eine Warteschlange von Aufgaben und weist sie verfügbaren Worker-Prozessen zu
- Es sammelt Ergebnisse und gibt sie in der gleichen Reihenfolge wie die Eingabedaten zurück
- Die Anzahl der Prozesse kann basierend auf den CPU-Kernen Ihres Systems und der Art Ihrer Aufgabe optimiert werden
Erweiterte Argumentübergabetechniken
In den vorherigen Schritten haben wir die grundlegende Argumentübergabe mit Process-Objekten und der Pool-Klasse kennengelernt. Lassen Sie uns nun fortgeschrittenere Techniken zur Übergabe von Argumenten in Multiprocessing-Szenarien untersuchen.
Übergabe komplexer Daten mit apply() und apply_async()
Manchmal benötigen Sie mehr Flexibilität als map() und starmap() bieten. Mit den Methoden apply() und apply_async() können Sie eine Funktion mit bestimmten Argumenten ausführen und Ergebnisse dynamischer verarbeiten.
Erstellen Sie eine Datei mit dem Namen pool_apply.py:
import multiprocessing
import time
import random
def process_task(task_id, duration, data):
"""Process a task with multiple arguments including complex data"""
print(f"Starting task {task_id} with duration {duration}s and data: {data}")
time.sleep(duration) ## Simulate variable work time
result = sum(data) * task_id
print(f"Task {task_id} completed")
return result
if __name__ == "__main__":
## Create a Pool with 2 processes
with multiprocessing.Pool(processes=2) as pool:
print("Starting tasks with apply()...")
## Execute tasks with apply() - blocking
result1 = pool.apply(process_task, args=(1, 2, [1, 2, 3]))
print(f"Result 1: {result1}")
result2 = pool.apply(process_task, args=(2, 1, [4, 5, 6]))
print(f"Result 2: {result2}")
print("\nStarting tasks with apply_async()...")
## Execute tasks with apply_async() - non-blocking
async_result1 = pool.apply_async(process_task, args=(3, 3, [7, 8, 9]))
async_result2 = pool.apply_async(process_task, args=(4, 2, [10, 11, 12]))
## Do other work while tasks are running
print("Tasks are running in the background...")
time.sleep(1)
print("Main process is doing other work...")
## Get results when needed (will wait if not yet available)
print(f"Result 3: {async_result1.get()}")
print(f"Result 4: {async_result2.get()}")
Führen Sie den Code aus:
python3 pool_apply.py
Die Ausgabe zeigt, wie apply() blockiert (sequenziell), während apply_async() es ermöglicht, dass Aufgaben parallel ausgeführt werden.
Verwendung von functools.partial für partielle Funktionsanwendung
Wenn Sie eine Funktion haben, die mehrere Argumente entgegennimmt, aber einige davon festlegen möchten, ist functools.partial sehr nützlich.
Erstellen Sie eine Datei mit dem Namen partial_example.py:
import multiprocessing
from functools import partial
import time
def process_with_config(data, config_a, config_b):
"""Process data with specific configuration parameters"""
print(f"Processing {data} with config_a={config_a}, config_b={config_b}")
time.sleep(1)
return data * config_a + config_b
if __name__ == "__main__":
## Data to process
data_items = [1, 2, 3, 4, 5]
## Configuration values
config_a = 10
config_b = 5
## Create a partially applied function with fixed config values
partial_process = partial(process_with_config, config_a=config_a, config_b=config_b)
## Create a Pool
with multiprocessing.Pool(processes=3) as pool:
## Use map with the partially applied function
print("Starting processing with partial function...")
results = pool.map(partial_process, data_items)
print(f"Results: {results}")
Führen Sie den Code aus:
python3 partial_example.py
Die Ausgabe zeigt, wie jedes Datenelement mit denselben Konfigurationswerten verarbeitet wird, die mit partial() festgelegt wurden.
Übergabe von Argumenten über Shared Memory
Für große Daten, die nicht für jeden Prozess kopiert werden sollen, kann Shared Memory effizienter sein. Lassen Sie uns diese Technik untersuchen.
Erstellen Sie eine Datei mit dem Namen shared_memory_example.py:
import multiprocessing
import numpy as np
import time
def process_array_chunk(array, start_idx, end_idx):
"""Process a chunk of a shared array"""
print(f"Processing chunk from index {start_idx} to {end_idx}")
## Simulate processing each element
for i in range(start_idx, end_idx):
array[i] = array[i] ** 2
time.sleep(1) ## Simulate additional work
return start_idx, end_idx
if __name__ == "__main__":
## Create a shared array
array_size = 1000000
shared_array = multiprocessing.Array('d', array_size) ## 'd' for double precision
## Fill the array with initial values
temp_array = np.frombuffer(shared_array.get_obj())
temp_array[:] = np.arange(array_size)
print(f"Array initialized with size {array_size}")
print(f"First 5 elements: {temp_array[:5]}")
## Define chunk size and create tasks
num_processes = 4
chunk_size = array_size // num_processes
tasks = []
for i in range(num_processes):
start = i * chunk_size
end = start + chunk_size if i < num_processes - 1 else array_size
tasks.append((start, end))
## Process chunks in parallel
with multiprocessing.Pool(processes=num_processes) as pool:
## Create arguments for each task
args = [(shared_array, start, end) for start, end in tasks]
print("Starting parallel processing...")
start_time = time.time()
## Using starmap to pass multiple arguments
results = pool.starmap(process_array_chunk, args)
end_time = time.time()
print(f"Processing completed in {end_time - start_time:.2f} seconds")
## Verify results
print(f"First 5 elements after processing: {temp_array[:5]}")
print(f"Results summary: {results}")
Führen Sie den Code aus:
python3 shared_memory_example.py
Die Ausgabe zeigt, wie ein großes Array in Chunks von verschiedenen Prozessen verarbeitet wird, wobei alle Prozesse Zugriff auf denselben Shared Memory haben.
Leistungsvergleich
Lassen Sie uns diese verschiedenen Techniken für eine reale Aufgabe vergleichen.
Erstellen Sie eine Datei mit dem Namen performance_comparison.py:
import multiprocessing
import time
import numpy as np
from functools import partial
def process_data_simple(data):
"""Simple processing function"""
return sum(x ** 2 for x in data)
def process_data_with_config(data, multiplier):
"""Processing with additional configuration"""
return sum(x ** 2 for x in data) * multiplier
def run_test(test_name, func, data_chunks, pool_size=4, **kwargs):
"""Run a test and measure performance"""
start_time = time.time()
with multiprocessing.Pool(processes=pool_size) as pool:
if kwargs:
partial_func = partial(func, **kwargs)
results = pool.map(partial_func, data_chunks)
else:
results = pool.map(func, data_chunks)
end_time = time.time()
print(f"{test_name} completed in {end_time - start_time:.4f} seconds")
return results
if __name__ == "__main__":
## Create test data
chunk_size = 1000000
num_chunks = 8
data_chunks = [np.random.rand(chunk_size) for _ in range(num_chunks)]
print(f"Created {num_chunks} data chunks of size {chunk_size} each")
## Test 1: Simple map
results1 = run_test("Simple map", process_data_simple, data_chunks)
## Test 2: Using partial
results2 = run_test("Map with partial", process_data_with_config,
data_chunks, multiplier=2)
## Test 3: Sequential processing for comparison
start_time = time.time()
seq_results = [process_data_simple(chunk) for chunk in data_chunks]
end_time = time.time()
print(f"Sequential processing completed in {end_time - start_time:.4f} seconds")
## Validate results
print(f"\nFirst result from each test:")
print(f"Test 1 first result: {results1[0]:.4f}")
print(f"Test 2 first result: {results2[0]:.4f} (should be double of Test 1)")
print(f"Sequential first result: {seq_results[0]:.4f}")
Führen Sie den Leistungsvergleich aus:
python3 performance_comparison.py
Die Ausgabe zeigt die Leistungsunterschiede zwischen den Ansätzen und demonstriert die Vorteile der parallelen Verarbeitung und den Overhead verschiedener Argumentübergabetechniken.
Anwendung in der Praxis: Parallele Bildverarbeitung
Nachdem wir verschiedene Techniken zur Übergabe von Argumenten in der Python-Multiprocessing-Umgebung verstanden haben, wollen wir diese Konzepte auf ein reales Szenario anwenden: die parallele Bildverarbeitung. Dies ist ein häufiger Anwendungsfall, bei dem Multiprocessing die Leistung erheblich verbessern kann.
Einrichten der Umgebung
Installieren wir zunächst die erforderlichen Pakete:
pip install Pillow numpy
Erstellen von Beispielbildern
Erstellen wir ein Skript, um einige Beispielbilder für unsere Verarbeitung zu generieren. Erstellen Sie eine Datei mit dem Namen create_images.py:
from PIL import Image
import numpy as np
import os
def create_sample_image(filename, width=800, height=600):
"""Create a sample image with random data"""
## Create random array data
data = np.random.randint(0, 255, (height, width, 3), dtype=np.uint8)
## Create image from array
img = Image.fromarray(data, 'RGB')
## Save the image
img.save(filename)
print(f"Created image: {filename}")
if __name__ == "__main__":
## Create a directory for sample images
os.makedirs("sample_images", exist_ok=True)
## Create 5 sample images
for i in range(5):
create_sample_image(f"sample_images/sample_{i+1}.png")
print("All sample images created successfully")
Führen Sie dieses Skript aus, um Beispielbilder zu erstellen:
python3 create_images.py
Sequenzielle Bildverarbeitung
Implementieren wir zunächst eine sequenzielle Version unserer Bildverarbeitungsaufgaben. Erstellen Sie eine Datei mit dem Namen sequential_image_processing.py:
from PIL import Image, ImageFilter
import os
import time
def apply_filters(image_path, output_dir):
"""Apply multiple filters to an image and save the results"""
## Load the image
image_name = os.path.basename(image_path)
print(f"Processing image: {image_name}")
img = Image.open(image_path)
## Apply filters
## 1. Blur filter
start_time = time.time()
blurred = img.filter(ImageFilter.GaussianBlur(radius=5))
blurred.save(os.path.join(output_dir, f"blur_{image_name}"))
## 2. Edge detection filter
edges = img.filter(ImageFilter.FIND_EDGES)
edges.save(os.path.join(output_dir, f"edges_{image_name}"))
## 3. Emboss filter
emboss = img.filter(ImageFilter.EMBOSS)
emboss.save(os.path.join(output_dir, f"emboss_{image_name}"))
## 4. Sharpen filter
sharpen = img.filter(ImageFilter.SHARPEN)
sharpen.save(os.path.join(output_dir, f"sharpen_{image_name}"))
end_time = time.time()
processing_time = end_time - start_time
print(f"Completed processing {image_name} in {processing_time:.2f} seconds")
return image_name, processing_time
def process_images_sequentially(image_dir, output_dir):
"""Process all images in a directory sequentially"""
## Ensure output directory exists
os.makedirs(output_dir, exist_ok=True)
## Get all image files
image_files = [os.path.join(image_dir, f) for f in os.listdir(image_dir)
if f.endswith(('.png', '.jpg', '.jpeg'))]
print(f"Found {len(image_files)} images to process")
## Process each image sequentially
start_time = time.time()
results = []
for image_path in image_files:
result = apply_filters(image_path, output_dir)
results.append(result)
end_time = time.time()
total_time = end_time - start_time
print(f"\nSequential processing completed")
print(f"Total processing time: {total_time:.2f} seconds")
return results
if __name__ == "__main__":
## Define directories
image_dir = "sample_images"
output_dir = "processed_sequential"
## Process images
results = process_images_sequentially(image_dir, output_dir)
## Print summary
print("\nProcessing summary:")
for image_name, proc_time in results:
print(f" {image_name}: {proc_time:.2f} seconds")
Führen Sie die sequenzielle Verarbeitung aus:
python3 sequential_image_processing.py
Parallele Bildverarbeitung
Implementieren wir nun die parallele Bildverarbeitung mit den Techniken, die wir gelernt haben. Erstellen Sie eine Datei mit dem Namen parallel_image_processing.py:
from PIL import Image, ImageFilter
import multiprocessing
import os
import time
def apply_filters(args):
"""Apply multiple filters to an image and save the results"""
image_path, output_dir = args
## Load the image
image_name = os.path.basename(image_path)
print(f"Process {os.getpid()} processing image: {image_name}")
img = Image.open(image_path)
## Apply filters
## 1. Blur filter
start_time = time.time()
blurred = img.filter(ImageFilter.GaussianBlur(radius=5))
blurred.save(os.path.join(output_dir, f"blur_{image_name}"))
## 2. Edge detection filter
edges = img.filter(ImageFilter.FIND_EDGES)
edges.save(os.path.join(output_dir, f"edges_{image_name}"))
## 3. Emboss filter
emboss = img.filter(ImageFilter.EMBOSS)
emboss.save(os.path.join(output_dir, f"emboss_{image_name}"))
## 4. Sharpen filter
sharpen = img.filter(ImageFilter.SHARPEN)
sharpen.save(os.path.join(output_dir, f"sharpen_{image_name}"))
end_time = time.time()
processing_time = end_time - start_time
print(f"Process {os.getpid()} completed {image_name} in {processing_time:.2f} seconds")
return image_name, processing_time
def process_images_in_parallel(image_dir, output_dir, num_processes=None):
"""Process all images in a directory in parallel"""
## Ensure output directory exists
os.makedirs(output_dir, exist_ok=True)
## Get all image files
image_files = [os.path.join(image_dir, f) for f in os.listdir(image_dir)
if f.endswith(('.png', '.jpg', '.jpeg'))]
print(f"Found {len(image_files)} images to process")
## Determine number of processes
if num_processes is None:
num_processes = min(multiprocessing.cpu_count(), len(image_files))
print(f"Using {num_processes} processes for parallel processing")
## Create arguments for each task
task_args = [(image_path, output_dir) for image_path in image_files]
## Process images in parallel
start_time = time.time()
with multiprocessing.Pool(processes=num_processes) as pool:
results = pool.map(apply_filters, task_args)
end_time = time.time()
total_time = end_time - start_time
print(f"\nParallel processing completed")
print(f"Total processing time: {total_time:.2f} seconds")
return results
if __name__ == "__main__":
## Define directories
image_dir = "sample_images"
output_dir = "processed_parallel"
## Process images
results = process_images_in_parallel(image_dir, output_dir)
## Print summary
print("\nProcessing summary:")
for image_name, proc_time in results:
print(f" {image_name}: {proc_time:.2f} seconds")
Führen Sie die parallele Verarbeitung aus:
python3 parallel_image_processing.py
Erweiterte parallele Verarbeitung mit verschiedenen Filtern
Implementieren wir nun eine komplexere Version, in der wir verschiedene Filter parallel auf dasselbe Bild anwenden. Erstellen Sie eine Datei mit dem Namen advanced_parallel_processing.py:
from PIL import Image, ImageFilter
import multiprocessing
import os
import time
from functools import partial
def apply_filter(filter_info, image_path, output_dir):
"""Apply a specific filter to an image"""
filter_name, filter_obj = filter_info
## Load the image
image_name = os.path.basename(image_path)
print(f"Process {os.getpid()} applying {filter_name} to {image_name}")
## Open image each time to avoid potential issues with sharing PIL objects
img = Image.open(image_path)
## Apply filter
start_time = time.time()
filtered = img.filter(filter_obj)
## Save the filtered image
output_filename = f"{filter_name}_{image_name}"
output_path = os.path.join(output_dir, output_filename)
filtered.save(output_path)
end_time = time.time()
processing_time = end_time - start_time
print(f"Process {os.getpid()} completed {filter_name} on {image_name} in {processing_time:.2f} seconds")
return filter_name, image_name, processing_time
def process_image_with_multiple_filters(image_path, output_dir, filters):
"""Process an image with multiple filters in parallel"""
## Create a partial function with fixed image_path and output_dir
apply_filter_to_image = partial(apply_filter,
image_path=image_path,
output_dir=output_dir)
## Process the image with multiple filters in parallel
with multiprocessing.Pool() as pool:
results = pool.map(apply_filter_to_image, filters)
return results
def main():
## Define directories
image_dir = "sample_images"
output_dir = "processed_advanced"
os.makedirs(output_dir, exist_ok=True)
## Define filters to apply
filters = [
("blur", ImageFilter.GaussianBlur(radius=5)),
("edges", ImageFilter.FIND_EDGES),
("emboss", ImageFilter.EMBOSS),
("sharpen", ImageFilter.SHARPEN),
("contour", ImageFilter.CONTOUR),
("detail", ImageFilter.DETAIL),
("smooth", ImageFilter.SMOOTH),
("smoothmore", ImageFilter.SMOOTH_MORE)
]
## Get all image files
image_files = [os.path.join(image_dir, f) for f in os.listdir(image_dir)
if f.endswith(('.png', '.jpg', '.jpeg'))]
print(f"Found {len(image_files)} images to process with {len(filters)} filters each")
## Process each image with all filters
all_results = []
overall_start = time.time()
for image_path in image_files:
image_name = os.path.basename(image_path)
print(f"\nProcessing {image_name} with {len(filters)} filters in parallel")
start_time = time.time()
results = process_image_with_multiple_filters(image_path, output_dir, filters)
end_time = time.time()
image_time = end_time - start_time
print(f"Completed all filters for {image_name} in {image_time:.2f} seconds")
all_results.extend(results)
overall_end = time.time()
total_time = overall_end - overall_start
## Print summary
print("\nProcessing Summary:")
print(f"Total processing time: {total_time:.2f} seconds")
print(f"Total operations: {len(all_results)}")
## Calculate average time per filter
avg_time = sum(time for _, _, time in all_results) / len(all_results)
print(f"Average processing time per filter: {avg_time:.2f} seconds")
if __name__ == "__main__":
main()
Führen Sie die erweiterte parallele Verarbeitung aus:
python3 advanced_parallel_processing.py
Leistungsvergleich
Erstellen wir ein Skript, um die Leistung der verschiedenen Ansätze zu vergleichen. Erstellen Sie eine Datei mit dem Namen compare_performance.py:
import os
import time
import subprocess
import matplotlib.pyplot as plt
import numpy as np
def run_script(script_name):
"""Run a Python script and measure its execution time"""
print(f"Running {script_name}...")
start_time = time.time()
## Run the script
process = subprocess.run(['python3', script_name],
capture_output=True, text=True)
end_time = time.time()
execution_time = end_time - start_time
print(f"Completed {script_name} in {execution_time:.2f} seconds")
print(f"Output: {process.stdout[-200:]}") ## Show last part of output
return execution_time
def main():
## Scripts to compare
scripts = [
'sequential_image_processing.py',
'parallel_image_processing.py',
'advanced_parallel_processing.py'
]
## Run each script and measure time
times = []
for script in scripts:
if os.path.exists(script):
times.append(run_script(script))
else:
print(f"Script {script} not found!")
times.append(0)
## Plot results
if any(times):
labels = ['Sequential', 'Basic Parallel', 'Advanced Parallel']
x = np.arange(len(labels))
plt.figure(figsize=(10, 6))
plt.bar(x, times, color=['red', 'blue', 'green'])
plt.xlabel('Processing Method')
plt.ylabel('Execution Time (s)')
plt.title('Image Processing Performance Comparison')
plt.xticks(x, labels)
## Add execution time as text on bars
for i, v in enumerate(times):
plt.text(i, v + 0.1, f"{v:.2f}s", ha='center')
## Save the plot
plt.tight_layout()
plt.savefig('performance_comparison.png')
print("Performance comparison plot saved as 'performance_comparison.png'")
else:
print("No valid execution times to plot")
if __name__ == "__main__":
main()
Führen wir den Vergleich aus:
python3 compare_performance.py
Überprüfen Sie die Ergebnisse und untersuchen Sie das Bild performance_comparison.png, um die Unterschiede in der Ausführungszeit zwischen den verschiedenen Ansätzen zu sehen.
Wichtige Erkenntnisse
An diesem realen Beispiel können Sie mehrere wichtige Aspekte des Multiprocessing beobachten:
Parallele Verarbeitung kann die Ausführungszeit bei CPU-intensiven Aufgaben wie der Bildverarbeitung erheblich reduzieren.
Die Methode zur Übergabe von Argumenten beeinflusst sowohl die Codekomplexität als auch die Leistung:
- Einfaches map() für unkomplizierte Funktionen mit einem Argument
- Partielle Funktionen zum Festlegen bestimmter Parameter
- Erweiterte Techniken für komplexe Workflows
Überlegungen für reale Anwendungen:
- Ressourcenverwaltung (CPU, Speicher)
- Strategien zur Aufgabenverteilung
- Fehlerbehandlung in parallelen Umgebungen
- Koordination zwischen Prozessen
Dieses praktische Beispiel zeigt, wie Sie die Multiprocessing-Konzepte, die wir in diesem Lab gelernt haben, anwenden können, um reale Probleme effizient zu lösen.
Zusammenfassung
In diesem Lab haben Sie Python Multiprocessing und verschiedene Techniken zur Übergabe von Argumenten an parallele Prozesse kennengelernt. Hier ist eine Zusammenfassung dessen, was Sie erreicht haben:
Grundlagen des Multiprocessing: Sie haben Ihr erstes Multiprocessing-Programm erstellt und den Unterschied zwischen Prozessen und Threads verstanden.
Argumentübergabetechniken: Sie haben verschiedene Methoden zur Übergabe von Argumenten untersucht:
- Grundlegende Argumentübergabe mit
Process.args - Übergabe eines einzelnen Arguments mit
Pool.map() - Mehrere Argumente mit
Pool.starmap() - Dynamische Ausführung mit
apply()undapply_async() - Verwendung von
functools.partialfür die partielle Funktionsanwendung - Shared Memory für effizientes Teilen von Daten
- Grundlegende Argumentübergabe mit
Anwendung in der Praxis: Sie haben diese Konzepte auf ein praktisches Beispiel der Bildverarbeitung angewendet und erhebliche Leistungsverbesserungen durch Parallelisierung demonstriert.
Die Fähigkeiten, die Sie in diesem Lab erlernt haben, sind wertvoll für jede CPU-intensive Python-Anwendung und ermöglichen es Ihnen, die volle Rechenleistung von Multi-Core-Systemen zu nutzen. Denken Sie bei der Entwicklung komplexerer Anwendungen daran, die Vor- und Nachteile verschiedener Argumentübergabetechniken zu berücksichtigen und die am besten geeignete Methode für Ihren spezifischen Anwendungsfall zu wählen.



