Introduction
Le module multiprocessing de Python permet le calcul parallèle en exploitant plusieurs cœurs de CPU, ce qui peut améliorer considérablement les performances de calcul pour les tâches intensives en CPU. Ce lab explore différentes techniques pour passer des arguments aux processus dans le module multiprocessing, abordant les défis courants de la programmation concurrente et démontrant des stratégies pratiques pour une parallélisation efficace.
À la fin de ce lab, vous comprendrez comment utiliser différentes méthodes pour passer des données aux processus parallèles et appliquer ces techniques à des scénarios réels.
Introduction au Multiprocessing en Python
Dans cette étape, nous allons apprendre les bases du multiprocessing en Python et créer notre premier programme parallèle.
Qu'est-ce que le Multiprocessing ?
Le multiprocessing est un module Python qui nous permet d'exécuter plusieurs processus en parallèle, utilisant efficacement plusieurs cœurs de CPU. Ceci est différent du threading (multithreading), qui est limité par le Global Interpreter Lock (GIL) qui empêche une véritable exécution parallèle du code Python.
Voici une comparaison simple :
| Caractéristique | Multiprocessing | Threading |
|---|---|---|
| Mémoire | Espaces mémoire séparés | Espace mémoire partagé |
| Parallélisme | Véritable exécution parallèle | Limité par le GIL |
| Cas d'utilisation | Tâches liées au CPU | Tâches liées aux E/S |
| Surcharge | Plus élevée (processus séparés) | Plus faible |
Création de votre premier programme Multiprocessing
Créons un exemple simple de multiprocessing. Ouvrez un nouveau fichier dans le WebIDE et nommez-le simple_process.py :
import multiprocessing
import time
import os
def worker():
"""Simple function to demonstrate a process"""
print(f"Worker process id: {os.getpid()}")
print(f"Worker parent process id: {os.getppid()}")
time.sleep(1)
print("Worker process completed")
if __name__ == "__main__":
## Print information about the main process
print(f"Main process id: {os.getpid()}")
## Create a process
process = multiprocessing.Process(target=worker)
## Start the process
print("Starting worker process...")
process.start()
## Wait for the process to complete
process.join()
print("Main process completed")
Maintenant, exécutons ce programme :
python3 simple_process.py
Vous devriez voir une sortie similaire à :
Main process id: 12345
Starting worker process...
Worker process id: 12346
Worker parent process id: 12345
Worker process completed
Main process completed
Les ID de processus exacts différeront sur votre système. Remarquez comment le processus worker a un ID de processus différent du processus principal, confirmant qu'il s'agit de processus séparés.
Comprendre la création de processus
Lorsque nous créons un Process de multiprocessing et que nous le démarrons, Python :
- Crée un nouveau processus
- Importe le module contenant la fonction cible
- Exécute la fonction cible dans le nouveau processus
- Retourne le contrôle au processus parent
La garde if __name__ == "__main__": est importante lorsque vous travaillez avec le multiprocessing en Python. Cela empêche la création de processus en double lorsque le module est importé.
Processus multiples
Modifions notre exemple pour créer plusieurs processus. Créez un nouveau fichier nommé multiple_processes.py :
import multiprocessing
import time
import os
def worker(worker_id):
"""Worker function that accepts an argument"""
print(f"Worker {worker_id} (PID: {os.getpid()}) started")
time.sleep(1)
print(f"Worker {worker_id} completed")
return worker_id * 2 ## This return value won't be captured
if __name__ == "__main__":
print(f"Main process PID: {os.getpid()}")
## Create multiple processes
processes = []
for i in range(3):
p = multiprocessing.Process(target=worker, args=(i,))
processes.append(p)
p.start()
## Wait for all processes to complete
for p in processes:
p.join()
print("All processes completed")
Exécutez ce code :
python3 multiple_processes.py
Vous devriez voir une sortie similaire à :
Main process PID: 12345
Worker 0 (PID: 12346) started
Worker 1 (PID: 12347) started
Worker 2 (PID: 12348) started
Worker 0 completed
Worker 1 completed
Worker 2 completed
All processes completed
Remarquez comment nous avons passé un argument (worker_id) à chaque processus en utilisant le paramètre args. C'est notre premier exemple de passage d'arguments à un processus.
Remarquez également que les processus peuvent ne pas nécessairement s'exécuter dans un ordre prévisible, car ils s'exécutent en parallèle.
Passage d'arguments de base avec Pool
Dans l'étape précédente, nous avons passé un argument simple à un processus. Maintenant, explorons une manière plus efficace de gérer plusieurs tâches en utilisant la classe Pool et apprenons différentes façons de passer des arguments.
La classe Pool
La classe Pool fournit un moyen pratique de paralléliser l'exécution d'une fonction sur plusieurs valeurs d'entrée. Elle distribue les données d'entrée sur plusieurs processus et collecte les résultats.
Créons un exemple simple pour comprendre comment Pool fonctionne. Créez un nouveau fichier nommé pool_example.py :
import multiprocessing
import time
def square(x):
"""Function that squares a number and returns the result"""
print(f"Processing {x}...")
time.sleep(1) ## Simulate a time-consuming operation
return x * x
if __name__ == "__main__":
## Create a list of numbers to process
numbers = [1, 2, 3, 4, 5]
## Create a Pool with 2 processes
with multiprocessing.Pool(processes=2) as pool:
## Map the square function to the numbers
start_time = time.time()
results = pool.map(square, numbers)
end_time = time.time()
print(f"Results: {results}")
print(f"Time taken: {end_time - start_time:.2f} seconds")
## Compare with sequential processing
start_time = time.time()
sequential_results = [square(n) for n in numbers]
end_time = time.time()
print(f"Sequential results: {sequential_results}")
print(f"Sequential time: {end_time - start_time:.2f} seconds")
Exécutez le code :
python3 pool_example.py
Vous devriez voir une sortie similaire à :
Processing 1...
Processing 2...
Processing 3...
Processing 4...
Processing 5...
Results: [1, 4, 9, 16, 25]
Time taken: 3.01 seconds
Processing 1...
Processing 2...
Processing 3...
Processing 4...
Processing 5...
Sequential results: [1, 4, 9, 16, 25]
Sequential time: 5.01 seconds
Remarquez comment le Pool est plus rapide car il traite plusieurs nombres en parallèle. Avec une taille de pool de 2, il peut traiter 2 nombres à la fois, alors que l'approche séquentielle les traite un par un.
Passage d'arguments uniques avec Pool.map()
La méthode map() est parfaite lorsque votre fonction prend un seul argument. Elle applique la fonction à chaque élément d'un itérable, en parallèle.
Explorons cela davantage avec un autre exemple. Créez un fichier nommé pool_map.py :
import multiprocessing
import time
import os
def process_item(item):
"""Process a single item"""
process_id = os.getpid()
print(f"Process {process_id} processing item {item}")
time.sleep(1) ## Simulate work
return item * 10
if __name__ == "__main__":
## Create a list of items to process
items = list(range(10))
## Get the number of CPU cores available
num_cores = multiprocessing.cpu_count()
print(f"System has {num_cores} CPU cores")
## Create a Pool with the number of available cores
with multiprocessing.Pool(processes=num_cores) as pool:
print("Starting parallel processing...")
start_time = time.time()
## Process all items in parallel
results = pool.map(process_item, items)
end_time = time.time()
print(f"All processing completed in {end_time - start_time:.2f} seconds")
print(f"Results: {results}")
Exécutez le code :
python3 pool_map.py
Votre sortie devrait montrer les éléments étant traités en parallèle sur plusieurs processus, avec un temps significativement plus court que 10 secondes (ce qui serait le temps de traitement séquentiel pour 10 éléments avec un délai d'une seconde chacun).
Passage de plusieurs arguments avec Pool.starmap()
Et si votre fonction a besoin de plusieurs arguments ? C'est là que starmap() entre en jeu.
Créez un fichier nommé pool_starmap.py :
import multiprocessing
import time
def process_data(id, value, multiplier):
"""Process data with multiple arguments"""
print(f"Processing item {id}: {value} with multiplier {multiplier}")
time.sleep(1) ## Simulate work
return value * multiplier
if __name__ == "__main__":
## Create a list of argument tuples
## Each tuple contains all arguments for one function call
arguments = [
(1, 5, 2), ## id=1, value=5, multiplier=2
(2, 10, 3), ## id=2, value=10, multiplier=3
(3, 15, 2), ## id=3, value=15, multiplier=2
(4, 20, 4), ## id=4, value=20, multiplier=4
(5, 25, 5) ## id=5, value=25, multiplier=5
]
## Create a Pool with 3 processes
with multiprocessing.Pool(processes=3) as pool:
print("Starting parallel processing with multiple arguments...")
start_time = time.time()
## Process all items in parallel using starmap
results = pool.starmap(process_data, arguments)
end_time = time.time()
print(f"All processing completed in {end_time - start_time:.2f} seconds")
print(f"Results: {results}")
Exécutez le code :
python3 pool_starmap.py
Vous devriez voir une sortie montrant chaque élément étant traité avec son propre ensemble d'arguments, et la liste des résultats finale contenant toutes les valeurs calculées.
Points clés concernant Pool
Poolgère automatiquement la distribution des tâches sur plusieurs processus- Il gère une file d'attente de tâches et les affecte aux processus de travail disponibles
- Il collecte les résultats et les renvoie dans le même ordre que les données d'entrée
- Le nombre de processus peut être optimisé en fonction des cœurs de CPU de votre système et de la nature de votre tâche
Techniques avancées de passage d'arguments
Dans les étapes précédentes, nous avons appris le passage d'arguments de base avec les objets Process et la classe Pool. Maintenant, explorons des techniques plus avancées pour le passage d'arguments dans les scénarios de multiprocessing.
Passage de données complexes avec apply() et apply_async()
Parfois, vous avez besoin de plus de flexibilité que ce que map() et starmap() fournissent. Les méthodes apply() et apply_async() vous permettent d'exécuter une fonction avec des arguments spécifiques et de gérer les résultats de manière plus dynamique.
Créez un fichier nommé pool_apply.py :
import multiprocessing
import time
import random
def process_task(task_id, duration, data):
"""Process a task with multiple arguments including complex data"""
print(f"Starting task {task_id} with duration {duration}s and data: {data}")
time.sleep(duration) ## Simulate variable work time
result = sum(data) * task_id
print(f"Task {task_id} completed")
return result
if __name__ == "__main__":
## Create a Pool with 2 processes
with multiprocessing.Pool(processes=2) as pool:
print("Starting tasks with apply()...")
## Execute tasks with apply() - blocking
result1 = pool.apply(process_task, args=(1, 2, [1, 2, 3]))
print(f"Result 1: {result1}")
result2 = pool.apply(process_task, args=(2, 1, [4, 5, 6]))
print(f"Result 2: {result2}")
print("\nStarting tasks with apply_async()...")
## Execute tasks with apply_async() - non-blocking
async_result1 = pool.apply_async(process_task, args=(3, 3, [7, 8, 9]))
async_result2 = pool.apply_async(process_task, args=(4, 2, [10, 11, 12]))
## Do other work while tasks are running
print("Tasks are running in the background...")
time.sleep(1)
print("Main process is doing other work...")
## Get results when needed (will wait if not yet available)
print(f"Result 3: {async_result1.get()}")
print(f"Result 4: {async_result2.get()}")
Exécutez le code :
python3 pool_apply.py
La sortie montrera comment apply() est bloquant (séquentiel) tandis que apply_async() permet aux tâches de s'exécuter en parallèle.
Utilisation de functools.partial pour l'application partielle de fonction
Lorsque vous avez une fonction qui prend plusieurs arguments mais que vous souhaitez en fixer certains, functools.partial est très utile.
Créez un fichier nommé partial_example.py :
import multiprocessing
from functools import partial
import time
def process_with_config(data, config_a, config_b):
"""Process data with specific configuration parameters"""
print(f"Processing {data} with config_a={config_a}, config_b={config_b}")
time.sleep(1)
return data * config_a + config_b
if __name__ == "__main__":
## Data to process
data_items = [1, 2, 3, 4, 5]
## Configuration values
config_a = 10
config_b = 5
## Create a partially applied function with fixed config values
partial_process = partial(process_with_config, config_a=config_a, config_b=config_b)
## Create a Pool
with multiprocessing.Pool(processes=3) as pool:
## Use map with the partially applied function
print("Starting processing with partial function...")
results = pool.map(partial_process, data_items)
print(f"Results: {results}")
Exécutez le code :
python3 partial_example.py
La sortie montrera comment chaque élément de données est traité avec les mêmes valeurs de configuration qui ont été fixées avec partial().
Passage d'arguments via la mémoire partagée
Pour les grandes données qui ne doivent pas être copiées pour chaque processus, la mémoire partagée peut être plus efficace. Explorons cette technique.
Créez un fichier nommé shared_memory_example.py :
import multiprocessing
import numpy as np
import time
def process_array_chunk(array, start_idx, end_idx):
"""Process a chunk of a shared array"""
print(f"Processing chunk from index {start_idx} to {end_idx}")
## Simulate processing each element
for i in range(start_idx, end_idx):
array[i] = array[i] ** 2
time.sleep(1) ## Simulate additional work
return start_idx, end_idx
if __name__ == "__main__":
## Create a shared array
array_size = 1000000
shared_array = multiprocessing.Array('d', array_size) ## 'd' for double precision
## Fill the array with initial values
temp_array = np.frombuffer(shared_array.get_obj())
temp_array[:] = np.arange(array_size)
print(f"Array initialized with size {array_size}")
print(f"First 5 elements: {temp_array[:5]}")
## Define chunk size and create tasks
num_processes = 4
chunk_size = array_size // num_processes
tasks = []
for i in range(num_processes):
start = i * chunk_size
end = start + chunk_size if i < num_processes - 1 else array_size
tasks.append((start, end))
## Process chunks in parallel
with multiprocessing.Pool(processes=num_processes) as pool:
## Create arguments for each task
args = [(shared_array, start, end) for start, end in tasks]
print("Starting parallel processing...")
start_time = time.time()
## Using starmap to pass multiple arguments
results = pool.starmap(process_array_chunk, args)
end_time = time.time()
print(f"Processing completed in {end_time - start_time:.2f} seconds")
## Verify results
print(f"First 5 elements after processing: {temp_array[:5]}")
print(f"Results summary: {results}")
Exécutez le code :
python3 shared_memory_example.py
La sortie montrera comment un grand tableau est traité par morceaux par différents processus, tous les processus ayant accès à la même mémoire partagée.
Comparaison des performances
Comparons ces différentes techniques pour une tâche réelle.
Créez un fichier nommé performance_comparison.py :
import multiprocessing
import time
import numpy as np
from functools import partial
def process_data_simple(data):
"""Simple processing function"""
return sum(x ** 2 for x in data)
def process_data_with_config(data, multiplier):
"""Processing with additional configuration"""
return sum(x ** 2 for x in data) * multiplier
def run_test(test_name, func, data_chunks, pool_size=4, **kwargs):
"""Run a test and measure performance"""
start_time = time.time()
with multiprocessing.Pool(processes=pool_size) as pool:
if kwargs:
partial_func = partial(func, **kwargs)
results = pool.map(partial_func, data_chunks)
else:
results = pool.map(func, data_chunks)
end_time = time.time()
print(f"{test_name} completed in {end_time - start_time:.4f} seconds")
return results
if __name__ == "__main__":
## Create test data
chunk_size = 1000000
num_chunks = 8
data_chunks = [np.random.rand(chunk_size) for _ in range(num_chunks)]
print(f"Created {num_chunks} data chunks of size {chunk_size} each")
## Test 1: Simple map
results1 = run_test("Simple map", process_data_simple, data_chunks)
## Test 2: Using partial
results2 = run_test("Map with partial", process_data_with_config,
data_chunks, multiplier=2)
## Test 3: Sequential processing for comparison
start_time = time.time()
seq_results = [process_data_simple(chunk) for chunk in data_chunks]
end_time = time.time()
print(f"Sequential processing completed in {end_time - start_time:.4f} seconds")
## Validate results
print(f"\nFirst result from each test:")
print(f"Test 1 first result: {results1[0]:.4f}")
print(f"Test 2 first result: {results2[0]:.4f} (should be double of Test 1)")
print(f"Sequential first result: {seq_results[0]:.4f}")
Exécutez la comparaison des performances :
python3 performance_comparison.py
La sortie montrera les différences de performances entre les approches, démontrant les avantages du traitement parallèle et la surcharge des différentes techniques de passage d'arguments.
Application concrète : Traitement d'images en parallèle
Maintenant que nous comprenons diverses techniques pour passer des arguments en multiprocessing Python, appliquons ces concepts à un scénario concret : le traitement d'images en parallèle. Il s'agit d'un cas d'utilisation courant où le multiprocessing peut améliorer considérablement les performances.
Configuration de l'environnement
Tout d'abord, installons les packages requis :
pip install Pillow numpy
Création d'images d'exemple
Créons un script pour générer des images d'exemple pour notre traitement. Créez un fichier nommé create_images.py :
from PIL import Image
import numpy as np
import os
def create_sample_image(filename, width=800, height=600):
"""Create a sample image with random data"""
## Create random array data
data = np.random.randint(0, 255, (height, width, 3), dtype=np.uint8)
## Create image from array
img = Image.fromarray(data, 'RGB')
## Save the image
img.save(filename)
print(f"Created image: {filename}")
if __name__ == "__main__":
## Create a directory for sample images
os.makedirs("sample_images", exist_ok=True)
## Create 5 sample images
for i in range(5):
create_sample_image(f"sample_images/sample_{i+1}.png")
print("All sample images created successfully")
Exécutez ce script pour créer des images d'exemple :
python3 create_images.py
Traitement d'images séquentiel
Implémentons d'abord une version séquentielle de nos tâches de traitement d'images. Créez un fichier nommé sequential_image_processing.py :
from PIL import Image, ImageFilter
import os
import time
def apply_filters(image_path, output_dir):
"""Apply multiple filters to an image and save the results"""
## Load the image
image_name = os.path.basename(image_path)
print(f"Processing image: {image_name}")
img = Image.open(image_path)
## Apply filters
## 1. Blur filter
start_time = time.time()
blurred = img.filter(ImageFilter.GaussianBlur(radius=5))
blurred.save(os.path.join(output_dir, f"blur_{image_name}"))
## 2. Edge detection filter
edges = img.filter(ImageFilter.FIND_EDGES)
edges.save(os.path.join(output_dir, f"edges_{image_name}"))
## 3. Emboss filter
emboss = img.filter(ImageFilter.EMBOSS)
emboss.save(os.path.join(output_dir, f"emboss_{image_name}"))
## 4. Sharpen filter
sharpen = img.filter(ImageFilter.SHARPEN)
sharpen.save(os.path.join(output_dir, f"sharpen_{image_name}"))
end_time = time.time()
processing_time = end_time - start_time
print(f"Completed processing {image_name} in {processing_time:.2f} seconds")
return image_name, processing_time
def process_images_sequentially(image_dir, output_dir):
"""Process all images in a directory sequentially"""
## Ensure output directory exists
os.makedirs(output_dir, exist_ok=True)
## Get all image files
image_files = [os.path.join(image_dir, f) for f in os.listdir(image_dir)
if f.endswith(('.png', '.jpg', '.jpeg'))]
print(f"Found {len(image_files)} images to process")
## Process each image sequentially
start_time = time.time()
results = []
for image_path in image_files:
result = apply_filters(image_path, output_dir)
results.append(result)
end_time = time.time()
total_time = end_time - start_time
print(f"\nSequential processing completed")
print(f"Total processing time: {total_time:.2f} seconds")
return results
if __name__ == "__main__":
## Define directories
image_dir = "sample_images"
output_dir = "processed_sequential"
## Process images
results = process_images_sequentially(image_dir, output_dir)
## Print summary
print("\nProcessing summary:")
for image_name, proc_time in results:
print(f" {image_name}: {proc_time:.2f} seconds")
Exécutez le traitement séquentiel :
python3 sequential_image_processing.py
Traitement d'images en parallèle
Maintenant, implémentons le traitement d'images en parallèle en utilisant les techniques que nous avons apprises. Créez un fichier nommé parallel_image_processing.py :
from PIL import Image, ImageFilter
import multiprocessing
import os
import time
def apply_filters(args):
"""Apply multiple filters to an image and save the results"""
image_path, output_dir = args
## Load the image
image_name = os.path.basename(image_path)
print(f"Process {os.getpid()} processing image: {image_name}")
img = Image.open(image_path)
## Apply filters
## 1. Blur filter
start_time = time.time()
blurred = img.filter(ImageFilter.GaussianBlur(radius=5))
blurred.save(os.path.join(output_dir, f"blur_{image_name}"))
## 2. Edge detection filter
edges = img.filter(ImageFilter.FIND_EDGES)
edges.save(os.path.join(output_dir, f"edges_{image_name}"))
## 3. Emboss filter
emboss = img.filter(ImageFilter.EMBOSS)
emboss.save(os.path.join(output_dir, f"emboss_{image_name}"))
## 4. Sharpen filter
sharpen = img.filter(ImageFilter.SHARPEN)
sharpen.save(os.path.join(output_dir, f"sharpen_{image_name}"))
end_time = time.time()
processing_time = end_time - start_time
print(f"Process {os.getpid()} completed {image_name} in {processing_time:.2f} seconds")
return image_name, processing_time
def process_images_in_parallel(image_dir, output_dir, num_processes=None):
"""Process all images in a directory in parallel"""
## Ensure output directory exists
os.makedirs(output_dir, exist_ok=True)
## Get all image files
image_files = [os.path.join(image_dir, f) for f in os.listdir(image_dir)
if f.endswith(('.png', '.jpg', '.jpeg'))]
print(f"Found {len(image_files)} images to process")
## Determine number of processes
if num_processes is None:
num_processes = min(multiprocessing.cpu_count(), len(image_files))
print(f"Using {num_processes} processes for parallel processing")
## Create arguments for each task
task_args = [(image_path, output_dir) for image_path in image_files]
## Process images in parallel
start_time = time.time()
with multiprocessing.Pool(processes=num_processes) as pool:
results = pool.map(apply_filters, task_args)
end_time = time.time()
total_time = end_time - start_time
print(f"\nParallel processing completed")
print(f"Total processing time: {total_time:.2f} seconds")
return results
if __name__ == "__main__":
## Define directories
image_dir = "sample_images"
output_dir = "processed_parallel"
## Process images
results = process_images_in_parallel(image_dir, output_dir)
## Print summary
print("\nProcessing summary:")
for image_name, proc_time in results:
print(f" {image_name}: {proc_time:.2f} seconds")
Exécutez le traitement en parallèle :
python3 parallel_image_processing.py
Traitement parallèle avancé avec différents filtres
Maintenant, implémentons une version plus complexe où nous appliquons différents filtres à la même image en parallèle. Créez un fichier nommé advanced_parallel_processing.py :
from PIL import Image, ImageFilter
import multiprocessing
import os
import time
from functools import partial
def apply_filter(filter_info, image_path, output_dir):
"""Apply a specific filter to an image"""
filter_name, filter_obj = filter_info
## Load the image
image_name = os.path.basename(image_path)
print(f"Process {os.getpid()} applying {filter_name} to {image_name}")
## Open image each time to avoid potential issues with sharing PIL objects
img = Image.open(image_path)
## Apply filter
start_time = time.time()
filtered = img.filter(filter_obj)
## Save the filtered image
output_filename = f"{filter_name}_{image_name}"
output_path = os.path.join(output_dir, output_filename)
filtered.save(output_path)
end_time = time.time()
processing_time = end_time - start_time
print(f"Process {os.getpid()} completed {filter_name} on {image_name} in {processing_time:.2f} seconds")
return filter_name, image_name, processing_time
def process_image_with_multiple_filters(image_path, output_dir, filters):
"""Process an image with multiple filters in parallel"""
## Create a partial function with fixed image_path and output_dir
apply_filter_to_image = partial(apply_filter,
image_path=image_path,
output_dir=output_dir)
## Process the image with multiple filters in parallel
with multiprocessing.Pool() as pool:
results = pool.map(apply_filter_to_image, filters)
return results
def main():
## Define directories
image_dir = "sample_images"
output_dir = "processed_advanced"
os.makedirs(output_dir, exist_ok=True)
## Define filters to apply
filters = [
("blur", ImageFilter.GaussianBlur(radius=5)),
("edges", ImageFilter.FIND_EDGES),
("emboss", ImageFilter.EMBOSS),
("sharpen", ImageFilter.SHARPEN),
("contour", ImageFilter.CONTOUR),
("detail", ImageFilter.DETAIL),
("smooth", ImageFilter.SMOOTH),
("smoothmore", ImageFilter.SMOOTH_MORE)
]
## Get all image files
image_files = [os.path.join(image_dir, f) for f in os.listdir(image_dir)
if f.endswith(('.png', '.jpg', '.jpeg'))]
print(f"Found {len(image_files)} images to process with {len(filters)} filters each")
## Process each image with all filters
all_results = []
overall_start = time.time()
for image_path in image_files:
image_name = os.path.basename(image_path)
print(f"\nProcessing {image_name} with {len(filters)} filters in parallel")
start_time = time.time()
results = process_image_with_multiple_filters(image_path, output_dir, filters)
end_time = time.time()
image_time = end_time - start_time
print(f"Completed all filters for {image_name} in {image_time:.2f} seconds")
all_results.extend(results)
overall_end = time.time()
total_time = overall_end - overall_start
## Print summary
print("\nProcessing Summary:")
print(f"Total processing time: {total_time:.2f} seconds")
print(f"Total operations: {len(all_results)}")
## Calculate average time per filter
avg_time = sum(time for _, _, time in all_results) / len(all_results)
print(f"Average processing time per filter: {avg_time:.2f} seconds")
if __name__ == "__main__":
main()
Exécutez le traitement parallèle avancé :
python3 advanced_parallel_processing.py
Comparaison des performances
Créons un script pour comparer les performances des différentes approches. Créez un fichier nommé compare_performance.py :
import os
import time
import subprocess
import matplotlib.pyplot as plt
import numpy as np
def run_script(script_name):
"""Run a Python script and measure its execution time"""
print(f"Running {script_name}...")
start_time = time.time()
## Run the script
process = subprocess.run(['python3', script_name],
capture_output=True, text=True)
end_time = time.time()
execution_time = end_time - start_time
print(f"Completed {script_name} in {execution_time:.2f} seconds")
print(f"Output: {process.stdout[-200:]}") ## Show last part of output
return execution_time
def main():
## Scripts to compare
scripts = [
'sequential_image_processing.py',
'parallel_image_processing.py',
'advanced_parallel_processing.py'
]
## Run each script and measure time
times = []
for script in scripts:
if os.path.exists(script):
times.append(run_script(script))
else:
print(f"Script {script} not found!")
times.append(0)
## Plot results
if any(times):
labels = ['Séquentiel', 'Parallèle de base', 'Parallèle avancé']
x = np.arange(len(labels))
plt.figure(figsize=(10, 6))
plt.bar(x, times, color=['red', 'blue', 'green'])
plt.xlabel('Méthode de traitement')
plt.ylabel('Temps d\'exécution (s)')
plt.title('Comparaison des performances de traitement d\'images')
plt.xticks(x, labels)
## Add execution time as text on bars
for i, v in enumerate(times):
plt.text(i, v + 0.1, f"{v:.2f}s", ha='center')
## Save the plot
plt.tight_layout()
plt.savefig('performance_comparison.png')
print("Performance comparison plot saved as 'performance_comparison.png'")
else:
print("No valid execution times to plot")
if __name__ == "__main__":
main()
Exécutons la comparaison :
python3 compare_performance.py
Examinez les résultats et examinez l'image performance_comparison.png pour voir les différences de temps d'exécution entre les différentes approches.
Principaux points à retenir
De cet exemple concret, vous pouvez observer plusieurs aspects importants du multiprocessing :
Le traitement parallèle peut réduire considérablement le temps d'exécution lorsque vous traitez des tâches gourmandes en CPU comme le traitement d'images.
La méthode de passage des arguments affecte à la fois la complexité du code et les performances :
map()simple pour les fonctions à un seul argument simples- Fonctions partielles pour fixer certains paramètres
- Techniques avancées pour les flux de travail complexes
Considérations pour les applications concrètes :
- Gestion des ressources (CPU, mémoire)
- Stratégies de distribution des tâches
- Gestion des erreurs dans les environnements parallèles
- Coordination entre les processus
Cet exemple pratique démontre comment appliquer les concepts de multiprocessing que nous avons appris tout au long de ce lab pour résoudre efficacement des problèmes concrets.
Résumé
Dans ce lab, vous avez appris le multiprocessing Python et différentes techniques pour passer des arguments aux processus parallèles. Voici un résumé de ce que vous avez accompli :
Principes de base du Multiprocessing : Vous avez créé votre premier programme de multiprocessing et compris la différence entre les processus et les threads.
Techniques de passage d'arguments : Vous avez exploré diverses méthodes pour passer des arguments :
- Passage d'arguments de base avec
Process.args - Passage d'un seul argument avec
Pool.map() - Arguments multiples avec
Pool.starmap() - Exécution dynamique avec
apply()etapply_async() - Utilisation de
functools.partialpour l'application partielle de fonction - Mémoire partagée pour un partage efficace des données
- Passage d'arguments de base avec
Application concrète : Vous avez appliqué ces concepts à un exemple pratique de traitement d'images, démontrant des améliorations significatives des performances grâce à la parallélisation.
Les compétences que vous avez acquises dans ce lab sont précieuses pour toute application Python gourmande en CPU, vous permettant de tirer parti de toute la puissance de calcul des systèmes multi-cœurs. Au fur et à mesure que vous développerez des applications plus complexes, n'oubliez pas de prendre en compte les compromis entre les différentes techniques de passage d'arguments et de choisir la méthode la plus appropriée pour votre cas d'utilisation spécifique.



