Comment passer des arguments en multiprocessing Python

PythonBeginner
Pratiquer maintenant

Introduction

Le module multiprocessing de Python permet le calcul parallèle en exploitant plusieurs cœurs de CPU, ce qui peut améliorer considérablement les performances de calcul pour les tâches intensives en CPU. Ce lab explore différentes techniques pour passer des arguments aux processus dans le module multiprocessing, abordant les défis courants de la programmation concurrente et démontrant des stratégies pratiques pour une parallélisation efficace.

À la fin de ce lab, vous comprendrez comment utiliser différentes méthodes pour passer des données aux processus parallèles et appliquer ces techniques à des scénarios réels.

Introduction au Multiprocessing en Python

Dans cette étape, nous allons apprendre les bases du multiprocessing en Python et créer notre premier programme parallèle.

Qu'est-ce que le Multiprocessing ?

Le multiprocessing est un module Python qui nous permet d'exécuter plusieurs processus en parallèle, utilisant efficacement plusieurs cœurs de CPU. Ceci est différent du threading (multithreading), qui est limité par le Global Interpreter Lock (GIL) qui empêche une véritable exécution parallèle du code Python.

Voici une comparaison simple :

Caractéristique Multiprocessing Threading
Mémoire Espaces mémoire séparés Espace mémoire partagé
Parallélisme Véritable exécution parallèle Limité par le GIL
Cas d'utilisation Tâches liées au CPU Tâches liées aux E/S
Surcharge Plus élevée (processus séparés) Plus faible

Création de votre premier programme Multiprocessing

Créons un exemple simple de multiprocessing. Ouvrez un nouveau fichier dans le WebIDE et nommez-le simple_process.py :

import multiprocessing
import time
import os

def worker():
    """Simple function to demonstrate a process"""
    print(f"Worker process id: {os.getpid()}")
    print(f"Worker parent process id: {os.getppid()}")
    time.sleep(1)
    print("Worker process completed")

if __name__ == "__main__":
    ## Print information about the main process
    print(f"Main process id: {os.getpid()}")

    ## Create a process
    process = multiprocessing.Process(target=worker)

    ## Start the process
    print("Starting worker process...")
    process.start()

    ## Wait for the process to complete
    process.join()

    print("Main process completed")

Maintenant, exécutons ce programme :

python3 simple_process.py

Vous devriez voir une sortie similaire à :

Main process id: 12345
Starting worker process...
Worker process id: 12346
Worker parent process id: 12345
Worker process completed
Main process completed

Les ID de processus exacts différeront sur votre système. Remarquez comment le processus worker a un ID de processus différent du processus principal, confirmant qu'il s'agit de processus séparés.

Comprendre la création de processus

Lorsque nous créons un Process de multiprocessing et que nous le démarrons, Python :

  1. Crée un nouveau processus
  2. Importe le module contenant la fonction cible
  3. Exécute la fonction cible dans le nouveau processus
  4. Retourne le contrôle au processus parent

La garde if __name__ == "__main__": est importante lorsque vous travaillez avec le multiprocessing en Python. Cela empêche la création de processus en double lorsque le module est importé.

Processus multiples

Modifions notre exemple pour créer plusieurs processus. Créez un nouveau fichier nommé multiple_processes.py :

import multiprocessing
import time
import os

def worker(worker_id):
    """Worker function that accepts an argument"""
    print(f"Worker {worker_id} (PID: {os.getpid()}) started")
    time.sleep(1)
    print(f"Worker {worker_id} completed")
    return worker_id * 2  ## This return value won't be captured

if __name__ == "__main__":
    print(f"Main process PID: {os.getpid()}")

    ## Create multiple processes
    processes = []
    for i in range(3):
        p = multiprocessing.Process(target=worker, args=(i,))
        processes.append(p)
        p.start()

    ## Wait for all processes to complete
    for p in processes:
        p.join()

    print("All processes completed")

Exécutez ce code :

python3 multiple_processes.py

Vous devriez voir une sortie similaire à :

Main process PID: 12345
Worker 0 (PID: 12346) started
Worker 1 (PID: 12347) started
Worker 2 (PID: 12348) started
Worker 0 completed
Worker 1 completed
Worker 2 completed
All processes completed

Remarquez comment nous avons passé un argument (worker_id) à chaque processus en utilisant le paramètre args. C'est notre premier exemple de passage d'arguments à un processus.

Remarquez également que les processus peuvent ne pas nécessairement s'exécuter dans un ordre prévisible, car ils s'exécutent en parallèle.

Passage d'arguments de base avec Pool

Dans l'étape précédente, nous avons passé un argument simple à un processus. Maintenant, explorons une manière plus efficace de gérer plusieurs tâches en utilisant la classe Pool et apprenons différentes façons de passer des arguments.

La classe Pool

La classe Pool fournit un moyen pratique de paralléliser l'exécution d'une fonction sur plusieurs valeurs d'entrée. Elle distribue les données d'entrée sur plusieurs processus et collecte les résultats.

Créons un exemple simple pour comprendre comment Pool fonctionne. Créez un nouveau fichier nommé pool_example.py :

import multiprocessing
import time

def square(x):
    """Function that squares a number and returns the result"""
    print(f"Processing {x}...")
    time.sleep(1)  ## Simulate a time-consuming operation
    return x * x

if __name__ == "__main__":
    ## Create a list of numbers to process
    numbers = [1, 2, 3, 4, 5]

    ## Create a Pool with 2 processes
    with multiprocessing.Pool(processes=2) as pool:
        ## Map the square function to the numbers
        start_time = time.time()
        results = pool.map(square, numbers)
        end_time = time.time()

        print(f"Results: {results}")
        print(f"Time taken: {end_time - start_time:.2f} seconds")

    ## Compare with sequential processing
    start_time = time.time()
    sequential_results = [square(n) for n in numbers]
    end_time = time.time()

    print(f"Sequential results: {sequential_results}")
    print(f"Sequential time: {end_time - start_time:.2f} seconds")

Exécutez le code :

python3 pool_example.py

Vous devriez voir une sortie similaire à :

Processing 1...
Processing 2...
Processing 3...
Processing 4...
Processing 5...
Results: [1, 4, 9, 16, 25]
Time taken: 3.01 seconds
Processing 1...
Processing 2...
Processing 3...
Processing 4...
Processing 5...
Sequential results: [1, 4, 9, 16, 25]
Sequential time: 5.01 seconds

Remarquez comment le Pool est plus rapide car il traite plusieurs nombres en parallèle. Avec une taille de pool de 2, il peut traiter 2 nombres à la fois, alors que l'approche séquentielle les traite un par un.

Passage d'arguments uniques avec Pool.map()

La méthode map() est parfaite lorsque votre fonction prend un seul argument. Elle applique la fonction à chaque élément d'un itérable, en parallèle.

Explorons cela davantage avec un autre exemple. Créez un fichier nommé pool_map.py :

import multiprocessing
import time
import os

def process_item(item):
    """Process a single item"""
    process_id = os.getpid()
    print(f"Process {process_id} processing item {item}")
    time.sleep(1)  ## Simulate work
    return item * 10

if __name__ == "__main__":
    ## Create a list of items to process
    items = list(range(10))

    ## Get the number of CPU cores available
    num_cores = multiprocessing.cpu_count()
    print(f"System has {num_cores} CPU cores")

    ## Create a Pool with the number of available cores
    with multiprocessing.Pool(processes=num_cores) as pool:
        print("Starting parallel processing...")
        start_time = time.time()

        ## Process all items in parallel
        results = pool.map(process_item, items)

        end_time = time.time()
        print(f"All processing completed in {end_time - start_time:.2f} seconds")
        print(f"Results: {results}")

Exécutez le code :

python3 pool_map.py

Votre sortie devrait montrer les éléments étant traités en parallèle sur plusieurs processus, avec un temps significativement plus court que 10 secondes (ce qui serait le temps de traitement séquentiel pour 10 éléments avec un délai d'une seconde chacun).

Passage de plusieurs arguments avec Pool.starmap()

Et si votre fonction a besoin de plusieurs arguments ? C'est là que starmap() entre en jeu.

Créez un fichier nommé pool_starmap.py :

import multiprocessing
import time

def process_data(id, value, multiplier):
    """Process data with multiple arguments"""
    print(f"Processing item {id}: {value} with multiplier {multiplier}")
    time.sleep(1)  ## Simulate work
    return value * multiplier

if __name__ == "__main__":
    ## Create a list of argument tuples
    ## Each tuple contains all arguments for one function call
    arguments = [
        (1, 5, 2),    ## id=1, value=5, multiplier=2
        (2, 10, 3),   ## id=2, value=10, multiplier=3
        (3, 15, 2),   ## id=3, value=15, multiplier=2
        (4, 20, 4),   ## id=4, value=20, multiplier=4
        (5, 25, 5)    ## id=5, value=25, multiplier=5
    ]

    ## Create a Pool with 3 processes
    with multiprocessing.Pool(processes=3) as pool:
        print("Starting parallel processing with multiple arguments...")
        start_time = time.time()

        ## Process all items in parallel using starmap
        results = pool.starmap(process_data, arguments)

        end_time = time.time()
        print(f"All processing completed in {end_time - start_time:.2f} seconds")
        print(f"Results: {results}")

Exécutez le code :

python3 pool_starmap.py

Vous devriez voir une sortie montrant chaque élément étant traité avec son propre ensemble d'arguments, et la liste des résultats finale contenant toutes les valeurs calculées.

Points clés concernant Pool

  • Pool gère automatiquement la distribution des tâches sur plusieurs processus
  • Il gère une file d'attente de tâches et les affecte aux processus de travail disponibles
  • Il collecte les résultats et les renvoie dans le même ordre que les données d'entrée
  • Le nombre de processus peut être optimisé en fonction des cœurs de CPU de votre système et de la nature de votre tâche

Techniques avancées de passage d'arguments

Dans les étapes précédentes, nous avons appris le passage d'arguments de base avec les objets Process et la classe Pool. Maintenant, explorons des techniques plus avancées pour le passage d'arguments dans les scénarios de multiprocessing.

Passage de données complexes avec apply() et apply_async()

Parfois, vous avez besoin de plus de flexibilité que ce que map() et starmap() fournissent. Les méthodes apply() et apply_async() vous permettent d'exécuter une fonction avec des arguments spécifiques et de gérer les résultats de manière plus dynamique.

Créez un fichier nommé pool_apply.py :

import multiprocessing
import time
import random

def process_task(task_id, duration, data):
    """Process a task with multiple arguments including complex data"""
    print(f"Starting task {task_id} with duration {duration}s and data: {data}")
    time.sleep(duration)  ## Simulate variable work time
    result = sum(data) * task_id
    print(f"Task {task_id} completed")
    return result

if __name__ == "__main__":
    ## Create a Pool with 2 processes
    with multiprocessing.Pool(processes=2) as pool:
        print("Starting tasks with apply()...")

        ## Execute tasks with apply() - blocking
        result1 = pool.apply(process_task, args=(1, 2, [1, 2, 3]))
        print(f"Result 1: {result1}")

        result2 = pool.apply(process_task, args=(2, 1, [4, 5, 6]))
        print(f"Result 2: {result2}")

        print("\nStarting tasks with apply_async()...")

        ## Execute tasks with apply_async() - non-blocking
        async_result1 = pool.apply_async(process_task, args=(3, 3, [7, 8, 9]))
        async_result2 = pool.apply_async(process_task, args=(4, 2, [10, 11, 12]))

        ## Do other work while tasks are running
        print("Tasks are running in the background...")
        time.sleep(1)
        print("Main process is doing other work...")

        ## Get results when needed (will wait if not yet available)
        print(f"Result 3: {async_result1.get()}")
        print(f"Result 4: {async_result2.get()}")

Exécutez le code :

python3 pool_apply.py

La sortie montrera comment apply() est bloquant (séquentiel) tandis que apply_async() permet aux tâches de s'exécuter en parallèle.

Utilisation de functools.partial pour l'application partielle de fonction

Lorsque vous avez une fonction qui prend plusieurs arguments mais que vous souhaitez en fixer certains, functools.partial est très utile.

Créez un fichier nommé partial_example.py :

import multiprocessing
from functools import partial
import time

def process_with_config(data, config_a, config_b):
    """Process data with specific configuration parameters"""
    print(f"Processing {data} with config_a={config_a}, config_b={config_b}")
    time.sleep(1)
    return data * config_a + config_b

if __name__ == "__main__":
    ## Data to process
    data_items = [1, 2, 3, 4, 5]

    ## Configuration values
    config_a = 10
    config_b = 5

    ## Create a partially applied function with fixed config values
    partial_process = partial(process_with_config, config_a=config_a, config_b=config_b)

    ## Create a Pool
    with multiprocessing.Pool(processes=3) as pool:
        ## Use map with the partially applied function
        print("Starting processing with partial function...")
        results = pool.map(partial_process, data_items)
        print(f"Results: {results}")

Exécutez le code :

python3 partial_example.py

La sortie montrera comment chaque élément de données est traité avec les mêmes valeurs de configuration qui ont été fixées avec partial().

Passage d'arguments via la mémoire partagée

Pour les grandes données qui ne doivent pas être copiées pour chaque processus, la mémoire partagée peut être plus efficace. Explorons cette technique.

Créez un fichier nommé shared_memory_example.py :

import multiprocessing
import numpy as np
import time

def process_array_chunk(array, start_idx, end_idx):
    """Process a chunk of a shared array"""
    print(f"Processing chunk from index {start_idx} to {end_idx}")

    ## Simulate processing each element
    for i in range(start_idx, end_idx):
        array[i] = array[i] ** 2

    time.sleep(1)  ## Simulate additional work
    return start_idx, end_idx

if __name__ == "__main__":
    ## Create a shared array
    array_size = 1000000
    shared_array = multiprocessing.Array('d', array_size)  ## 'd' for double precision

    ## Fill the array with initial values
    temp_array = np.frombuffer(shared_array.get_obj())
    temp_array[:] = np.arange(array_size)

    print(f"Array initialized with size {array_size}")
    print(f"First 5 elements: {temp_array[:5]}")

    ## Define chunk size and create tasks
    num_processes = 4
    chunk_size = array_size // num_processes
    tasks = []

    for i in range(num_processes):
        start = i * chunk_size
        end = start + chunk_size if i < num_processes - 1 else array_size
        tasks.append((start, end))

    ## Process chunks in parallel
    with multiprocessing.Pool(processes=num_processes) as pool:
        ## Create arguments for each task
        args = [(shared_array, start, end) for start, end in tasks]

        print("Starting parallel processing...")
        start_time = time.time()

        ## Using starmap to pass multiple arguments
        results = pool.starmap(process_array_chunk, args)

        end_time = time.time()
        print(f"Processing completed in {end_time - start_time:.2f} seconds")

    ## Verify results
    print(f"First 5 elements after processing: {temp_array[:5]}")
    print(f"Results summary: {results}")

Exécutez le code :

python3 shared_memory_example.py

La sortie montrera comment un grand tableau est traité par morceaux par différents processus, tous les processus ayant accès à la même mémoire partagée.

Comparaison des performances

Comparons ces différentes techniques pour une tâche réelle.

Créez un fichier nommé performance_comparison.py :

import multiprocessing
import time
import numpy as np
from functools import partial

def process_data_simple(data):
    """Simple processing function"""
    return sum(x ** 2 for x in data)

def process_data_with_config(data, multiplier):
    """Processing with additional configuration"""
    return sum(x ** 2 for x in data) * multiplier

def run_test(test_name, func, data_chunks, pool_size=4, **kwargs):
    """Run a test and measure performance"""
    start_time = time.time()

    with multiprocessing.Pool(processes=pool_size) as pool:
        if kwargs:
            partial_func = partial(func, **kwargs)
            results = pool.map(partial_func, data_chunks)
        else:
            results = pool.map(func, data_chunks)

    end_time = time.time()
    print(f"{test_name} completed in {end_time - start_time:.4f} seconds")
    return results

if __name__ == "__main__":
    ## Create test data
    chunk_size = 1000000
    num_chunks = 8
    data_chunks = [np.random.rand(chunk_size) for _ in range(num_chunks)]

    print(f"Created {num_chunks} data chunks of size {chunk_size} each")

    ## Test 1: Simple map
    results1 = run_test("Simple map", process_data_simple, data_chunks)

    ## Test 2: Using partial
    results2 = run_test("Map with partial", process_data_with_config,
                        data_chunks, multiplier=2)

    ## Test 3: Sequential processing for comparison
    start_time = time.time()
    seq_results = [process_data_simple(chunk) for chunk in data_chunks]
    end_time = time.time()
    print(f"Sequential processing completed in {end_time - start_time:.4f} seconds")

    ## Validate results
    print(f"\nFirst result from each test:")
    print(f"Test 1 first result: {results1[0]:.4f}")
    print(f"Test 2 first result: {results2[0]:.4f} (should be double of Test 1)")
    print(f"Sequential first result: {seq_results[0]:.4f}")

Exécutez la comparaison des performances :

python3 performance_comparison.py

La sortie montrera les différences de performances entre les approches, démontrant les avantages du traitement parallèle et la surcharge des différentes techniques de passage d'arguments.

Application concrète : Traitement d'images en parallèle

Maintenant que nous comprenons diverses techniques pour passer des arguments en multiprocessing Python, appliquons ces concepts à un scénario concret : le traitement d'images en parallèle. Il s'agit d'un cas d'utilisation courant où le multiprocessing peut améliorer considérablement les performances.

Configuration de l'environnement

Tout d'abord, installons les packages requis :

pip install Pillow numpy

Création d'images d'exemple

Créons un script pour générer des images d'exemple pour notre traitement. Créez un fichier nommé create_images.py :

from PIL import Image
import numpy as np
import os

def create_sample_image(filename, width=800, height=600):
    """Create a sample image with random data"""
    ## Create random array data
    data = np.random.randint(0, 255, (height, width, 3), dtype=np.uint8)

    ## Create image from array
    img = Image.fromarray(data, 'RGB')

    ## Save the image
    img.save(filename)
    print(f"Created image: {filename}")

if __name__ == "__main__":
    ## Create a directory for sample images
    os.makedirs("sample_images", exist_ok=True)

    ## Create 5 sample images
    for i in range(5):
        create_sample_image(f"sample_images/sample_{i+1}.png")

    print("All sample images created successfully")

Exécutez ce script pour créer des images d'exemple :

python3 create_images.py

Traitement d'images séquentiel

Implémentons d'abord une version séquentielle de nos tâches de traitement d'images. Créez un fichier nommé sequential_image_processing.py :

from PIL import Image, ImageFilter
import os
import time

def apply_filters(image_path, output_dir):
    """Apply multiple filters to an image and save the results"""
    ## Load the image
    image_name = os.path.basename(image_path)
    print(f"Processing image: {image_name}")
    img = Image.open(image_path)

    ## Apply filters
    ## 1. Blur filter
    start_time = time.time()
    blurred = img.filter(ImageFilter.GaussianBlur(radius=5))
    blurred.save(os.path.join(output_dir, f"blur_{image_name}"))

    ## 2. Edge detection filter
    edges = img.filter(ImageFilter.FIND_EDGES)
    edges.save(os.path.join(output_dir, f"edges_{image_name}"))

    ## 3. Emboss filter
    emboss = img.filter(ImageFilter.EMBOSS)
    emboss.save(os.path.join(output_dir, f"emboss_{image_name}"))

    ## 4. Sharpen filter
    sharpen = img.filter(ImageFilter.SHARPEN)
    sharpen.save(os.path.join(output_dir, f"sharpen_{image_name}"))

    end_time = time.time()
    processing_time = end_time - start_time

    print(f"Completed processing {image_name} in {processing_time:.2f} seconds")
    return image_name, processing_time

def process_images_sequentially(image_dir, output_dir):
    """Process all images in a directory sequentially"""
    ## Ensure output directory exists
    os.makedirs(output_dir, exist_ok=True)

    ## Get all image files
    image_files = [os.path.join(image_dir, f) for f in os.listdir(image_dir)
                   if f.endswith(('.png', '.jpg', '.jpeg'))]

    print(f"Found {len(image_files)} images to process")

    ## Process each image sequentially
    start_time = time.time()
    results = []

    for image_path in image_files:
        result = apply_filters(image_path, output_dir)
        results.append(result)

    end_time = time.time()
    total_time = end_time - start_time

    print(f"\nSequential processing completed")
    print(f"Total processing time: {total_time:.2f} seconds")
    return results

if __name__ == "__main__":
    ## Define directories
    image_dir = "sample_images"
    output_dir = "processed_sequential"

    ## Process images
    results = process_images_sequentially(image_dir, output_dir)

    ## Print summary
    print("\nProcessing summary:")
    for image_name, proc_time in results:
        print(f"  {image_name}: {proc_time:.2f} seconds")

Exécutez le traitement séquentiel :

python3 sequential_image_processing.py

Traitement d'images en parallèle

Maintenant, implémentons le traitement d'images en parallèle en utilisant les techniques que nous avons apprises. Créez un fichier nommé parallel_image_processing.py :

from PIL import Image, ImageFilter
import multiprocessing
import os
import time

def apply_filters(args):
    """Apply multiple filters to an image and save the results"""
    image_path, output_dir = args

    ## Load the image
    image_name = os.path.basename(image_path)
    print(f"Process {os.getpid()} processing image: {image_name}")
    img = Image.open(image_path)

    ## Apply filters
    ## 1. Blur filter
    start_time = time.time()
    blurred = img.filter(ImageFilter.GaussianBlur(radius=5))
    blurred.save(os.path.join(output_dir, f"blur_{image_name}"))

    ## 2. Edge detection filter
    edges = img.filter(ImageFilter.FIND_EDGES)
    edges.save(os.path.join(output_dir, f"edges_{image_name}"))

    ## 3. Emboss filter
    emboss = img.filter(ImageFilter.EMBOSS)
    emboss.save(os.path.join(output_dir, f"emboss_{image_name}"))

    ## 4. Sharpen filter
    sharpen = img.filter(ImageFilter.SHARPEN)
    sharpen.save(os.path.join(output_dir, f"sharpen_{image_name}"))

    end_time = time.time()
    processing_time = end_time - start_time

    print(f"Process {os.getpid()} completed {image_name} in {processing_time:.2f} seconds")
    return image_name, processing_time

def process_images_in_parallel(image_dir, output_dir, num_processes=None):
    """Process all images in a directory in parallel"""
    ## Ensure output directory exists
    os.makedirs(output_dir, exist_ok=True)

    ## Get all image files
    image_files = [os.path.join(image_dir, f) for f in os.listdir(image_dir)
                   if f.endswith(('.png', '.jpg', '.jpeg'))]

    print(f"Found {len(image_files)} images to process")

    ## Determine number of processes
    if num_processes is None:
        num_processes = min(multiprocessing.cpu_count(), len(image_files))

    print(f"Using {num_processes} processes for parallel processing")

    ## Create arguments for each task
    task_args = [(image_path, output_dir) for image_path in image_files]

    ## Process images in parallel
    start_time = time.time()

    with multiprocessing.Pool(processes=num_processes) as pool:
        results = pool.map(apply_filters, task_args)

    end_time = time.time()
    total_time = end_time - start_time

    print(f"\nParallel processing completed")
    print(f"Total processing time: {total_time:.2f} seconds")
    return results

if __name__ == "__main__":
    ## Define directories
    image_dir = "sample_images"
    output_dir = "processed_parallel"

    ## Process images
    results = process_images_in_parallel(image_dir, output_dir)

    ## Print summary
    print("\nProcessing summary:")
    for image_name, proc_time in results:
        print(f"  {image_name}: {proc_time:.2f} seconds")

Exécutez le traitement en parallèle :

python3 parallel_image_processing.py

Traitement parallèle avancé avec différents filtres

Maintenant, implémentons une version plus complexe où nous appliquons différents filtres à la même image en parallèle. Créez un fichier nommé advanced_parallel_processing.py :

from PIL import Image, ImageFilter
import multiprocessing
import os
import time
from functools import partial

def apply_filter(filter_info, image_path, output_dir):
    """Apply a specific filter to an image"""
    filter_name, filter_obj = filter_info

    ## Load the image
    image_name = os.path.basename(image_path)
    print(f"Process {os.getpid()} applying {filter_name} to {image_name}")

    ## Open image each time to avoid potential issues with sharing PIL objects
    img = Image.open(image_path)

    ## Apply filter
    start_time = time.time()
    filtered = img.filter(filter_obj)

    ## Save the filtered image
    output_filename = f"{filter_name}_{image_name}"
    output_path = os.path.join(output_dir, output_filename)
    filtered.save(output_path)

    end_time = time.time()
    processing_time = end_time - start_time

    print(f"Process {os.getpid()} completed {filter_name} on {image_name} in {processing_time:.2f} seconds")
    return filter_name, image_name, processing_time

def process_image_with_multiple_filters(image_path, output_dir, filters):
    """Process an image with multiple filters in parallel"""
    ## Create a partial function with fixed image_path and output_dir
    apply_filter_to_image = partial(apply_filter,
                                   image_path=image_path,
                                   output_dir=output_dir)

    ## Process the image with multiple filters in parallel
    with multiprocessing.Pool() as pool:
        results = pool.map(apply_filter_to_image, filters)

    return results

def main():
    ## Define directories
    image_dir = "sample_images"
    output_dir = "processed_advanced"
    os.makedirs(output_dir, exist_ok=True)

    ## Define filters to apply
    filters = [
        ("blur", ImageFilter.GaussianBlur(radius=5)),
        ("edges", ImageFilter.FIND_EDGES),
        ("emboss", ImageFilter.EMBOSS),
        ("sharpen", ImageFilter.SHARPEN),
        ("contour", ImageFilter.CONTOUR),
        ("detail", ImageFilter.DETAIL),
        ("smooth", ImageFilter.SMOOTH),
        ("smoothmore", ImageFilter.SMOOTH_MORE)
    ]

    ## Get all image files
    image_files = [os.path.join(image_dir, f) for f in os.listdir(image_dir)
                   if f.endswith(('.png', '.jpg', '.jpeg'))]

    print(f"Found {len(image_files)} images to process with {len(filters)} filters each")

    ## Process each image with all filters
    all_results = []
    overall_start = time.time()

    for image_path in image_files:
        image_name = os.path.basename(image_path)
        print(f"\nProcessing {image_name} with {len(filters)} filters in parallel")

        start_time = time.time()
        results = process_image_with_multiple_filters(image_path, output_dir, filters)
        end_time = time.time()

        image_time = end_time - start_time
        print(f"Completed all filters for {image_name} in {image_time:.2f} seconds")

        all_results.extend(results)

    overall_end = time.time()
    total_time = overall_end - overall_start

    ## Print summary
    print("\nProcessing Summary:")
    print(f"Total processing time: {total_time:.2f} seconds")
    print(f"Total operations: {len(all_results)}")

    ## Calculate average time per filter
    avg_time = sum(time for _, _, time in all_results) / len(all_results)
    print(f"Average processing time per filter: {avg_time:.2f} seconds")

if __name__ == "__main__":
    main()

Exécutez le traitement parallèle avancé :

python3 advanced_parallel_processing.py

Comparaison des performances

Créons un script pour comparer les performances des différentes approches. Créez un fichier nommé compare_performance.py :

import os
import time
import subprocess
import matplotlib.pyplot as plt
import numpy as np

def run_script(script_name):
    """Run a Python script and measure its execution time"""
    print(f"Running {script_name}...")
    start_time = time.time()

    ## Run the script
    process = subprocess.run(['python3', script_name],
                            capture_output=True, text=True)

    end_time = time.time()
    execution_time = end_time - start_time

    print(f"Completed {script_name} in {execution_time:.2f} seconds")
    print(f"Output: {process.stdout[-200:]}") ## Show last part of output

    return execution_time

def main():
    ## Scripts to compare
    scripts = [
        'sequential_image_processing.py',
        'parallel_image_processing.py',
        'advanced_parallel_processing.py'
    ]

    ## Run each script and measure time
    times = []
    for script in scripts:
        if os.path.exists(script):
            times.append(run_script(script))
        else:
            print(f"Script {script} not found!")
            times.append(0)

    ## Plot results
    if any(times):
        labels = ['Séquentiel', 'Parallèle de base', 'Parallèle avancé']
        x = np.arange(len(labels))

        plt.figure(figsize=(10, 6))
        plt.bar(x, times, color=['red', 'blue', 'green'])
        plt.xlabel('Méthode de traitement')
        plt.ylabel('Temps d\'exécution (s)')
        plt.title('Comparaison des performances de traitement d\'images')
        plt.xticks(x, labels)

        ## Add execution time as text on bars
        for i, v in enumerate(times):
            plt.text(i, v + 0.1, f"{v:.2f}s", ha='center')

        ## Save the plot
        plt.tight_layout()
        plt.savefig('performance_comparison.png')
        print("Performance comparison plot saved as 'performance_comparison.png'")
    else:
        print("No valid execution times to plot")

if __name__ == "__main__":
    main()

Exécutons la comparaison :

python3 compare_performance.py

Examinez les résultats et examinez l'image performance_comparison.png pour voir les différences de temps d'exécution entre les différentes approches.

Principaux points à retenir

De cet exemple concret, vous pouvez observer plusieurs aspects importants du multiprocessing :

  1. Le traitement parallèle peut réduire considérablement le temps d'exécution lorsque vous traitez des tâches gourmandes en CPU comme le traitement d'images.

  2. La méthode de passage des arguments affecte à la fois la complexité du code et les performances :

    • map() simple pour les fonctions à un seul argument simples
    • Fonctions partielles pour fixer certains paramètres
    • Techniques avancées pour les flux de travail complexes
  3. Considérations pour les applications concrètes :

    • Gestion des ressources (CPU, mémoire)
    • Stratégies de distribution des tâches
    • Gestion des erreurs dans les environnements parallèles
    • Coordination entre les processus

Cet exemple pratique démontre comment appliquer les concepts de multiprocessing que nous avons appris tout au long de ce lab pour résoudre efficacement des problèmes concrets.

Résumé

Dans ce lab, vous avez appris le multiprocessing Python et différentes techniques pour passer des arguments aux processus parallèles. Voici un résumé de ce que vous avez accompli :

  1. Principes de base du Multiprocessing : Vous avez créé votre premier programme de multiprocessing et compris la différence entre les processus et les threads.

  2. Techniques de passage d'arguments : Vous avez exploré diverses méthodes pour passer des arguments :

    • Passage d'arguments de base avec Process.args
    • Passage d'un seul argument avec Pool.map()
    • Arguments multiples avec Pool.starmap()
    • Exécution dynamique avec apply() et apply_async()
    • Utilisation de functools.partial pour l'application partielle de fonction
    • Mémoire partagée pour un partage efficace des données
  3. Application concrète : Vous avez appliqué ces concepts à un exemple pratique de traitement d'images, démontrant des améliorations significatives des performances grâce à la parallélisation.

Les compétences que vous avez acquises dans ce lab sont précieuses pour toute application Python gourmande en CPU, vous permettant de tirer parti de toute la puissance de calcul des systèmes multi-cœurs. Au fur et à mesure que vous développerez des applications plus complexes, n'oubliez pas de prendre en compte les compromis entre les différentes techniques de passage d'arguments et de choisir la méthode la plus appropriée pour votre cas d'utilisation spécifique.