¿Cómo pasar argumentos en multiprocessing de Python?

PythonBeginner
Practicar Ahora

Introducción

El módulo multiprocessing de Python permite la computación paralela al aprovechar múltiples núcleos de CPU, lo que puede mejorar significativamente el rendimiento computacional para tareas intensivas en CPU. Este laboratorio explora diferentes técnicas para pasar argumentos a los procesos en el módulo multiprocessing, abordando desafíos comunes en la programación concurrente y demostrando estrategias prácticas para una paralelización efectiva.

Al final de este laboratorio, comprenderá cómo usar diferentes métodos para pasar datos a procesos paralelos y aplicará estas técnicas a escenarios del mundo real.

Introducción a Multiprocessing en Python

En este paso, aprenderemos los fundamentos de multiprocessing en Python y crearemos nuestro primer programa paralelo.

¿Qué es Multiprocessing?

Multiprocessing es un módulo de Python que nos permite ejecutar múltiples procesos en paralelo, utilizando eficazmente múltiples núcleos de CPU. Esto es diferente de threading (hilos), que está limitado por el Global Interpreter Lock (GIL) que impide la verdadera ejecución paralela del código Python.

Aquí hay una comparación simple:

Característica Multiprocessing Threading
Memoria Espacios de memoria separados Espacio de memoria compartido
Paralelismo Verdadera ejecución paralela Limitado por GIL
Caso de uso Tareas con uso intensivo de CPU Tareas con uso intensivo de E/S
Sobrecarga Mayor (procesos separados) Menor

Creando tu Primer Programa Multiprocessing

Creemos un ejemplo simple de multiprocessing. Abre un nuevo archivo en el WebIDE y nómbralo simple_process.py:

import multiprocessing
import time
import os

def worker():
    """Simple function to demonstrate a process"""
    print(f"Worker process id: {os.getpid()}")
    print(f"Worker parent process id: {os.getppid()}")
    time.sleep(1)
    print("Worker process completed")

if __name__ == "__main__":
    ## Print information about the main process
    print(f"Main process id: {os.getpid()}")

    ## Create a process
    process = multiprocessing.Process(target=worker)

    ## Start the process
    print("Starting worker process...")
    process.start()

    ## Wait for the process to complete
    process.join()

    print("Main process completed")

Ahora ejecutemos este programa:

python3 simple_process.py

Deberías ver una salida similar a:

Main process id: 12345
Starting worker process...
Worker process id: 12346
Worker parent process id: 12345
Worker process completed
Main process completed

Los IDs de proceso exactos diferirán en tu sistema. Observa cómo el proceso trabajador tiene un ID de proceso diferente al del proceso principal, confirmando que son procesos separados.

Entendiendo la Creación de Procesos

Cuando creamos un Process de multiprocessing y lo iniciamos, Python:

  1. Crea un nuevo proceso
  2. Importa el módulo que contiene la función objetivo
  3. Ejecuta la función objetivo en el nuevo proceso
  4. Retorna el control al proceso padre

La guardia if __name__ == "__main__": es importante cuando se trabaja con multiprocessing en Python. Esto evita la creación de procesos duplicados cuando se importa el módulo.

Múltiples Procesos

Modifiquemos nuestro ejemplo para crear múltiples procesos. Crea un nuevo archivo llamado multiple_processes.py:

import multiprocessing
import time
import os

def worker(worker_id):
    """Worker function that accepts an argument"""
    print(f"Worker {worker_id} (PID: {os.getpid()}) started")
    time.sleep(1)
    print(f"Worker {worker_id} completed")
    return worker_id * 2  ## This return value won't be captured

if __name__ == "__main__":
    print(f"Main process PID: {os.getpid()}")

    ## Create multiple processes
    processes = []
    for i in range(3):
        p = multiprocessing.Process(target=worker, args=(i,))
        processes.append(p)
        p.start()

    ## Wait for all processes to complete
    for p in processes:
        p.join()

    print("All processes completed")

Ejecuta este código:

python3 multiple_processes.py

Deberías ver una salida similar a:

Main process PID: 12345
Worker 0 (PID: 12346) started
Worker 1 (PID: 12347) started
Worker 2 (PID: 12348) started
Worker 0 completed
Worker 1 completed
Worker 2 completed
All processes completed

Observa cómo pasamos un argumento (worker_id) a cada proceso usando el parámetro args. Este es nuestro primer ejemplo de pasar argumentos a un proceso.

También observa que los procesos podrían no necesariamente ejecutarse en un orden predecible, ya que se están ejecutando en paralelo.

Paso de Argumentos Básico con Pool

En el paso anterior, pasamos un argumento simple a un proceso. Ahora, exploremos una forma más eficiente de manejar múltiples tareas utilizando la clase Pool y aprendamos sobre diferentes formas de pasar argumentos.

La Clase Pool

La clase Pool proporciona una forma conveniente de paralelizar la ejecución de una función a través de múltiples valores de entrada. Distribuye los datos de entrada entre los procesos y recopila los resultados.

Creemos un ejemplo simple para entender cómo funciona Pool. Crea un nuevo archivo llamado pool_example.py:

import multiprocessing
import time

def square(x):
    """Function that squares a number and returns the result"""
    print(f"Processing {x}...")
    time.sleep(1)  ## Simulate a time-consuming operation
    return x * x

if __name__ == "__main__":
    ## Create a list of numbers to process
    numbers = [1, 2, 3, 4, 5]

    ## Create a Pool with 2 processes
    with multiprocessing.Pool(processes=2) as pool:
        ## Map the square function to the numbers
        start_time = time.time()
        results = pool.map(square, numbers)
        end_time = time.time()

        print(f"Results: {results}")
        print(f"Time taken: {end_time - start_time:.2f} seconds")

    ## Compare with sequential processing
    start_time = time.time()
    sequential_results = [square(n) for n in numbers]
    end_time = time.time()

    print(f"Sequential results: {sequential_results}")
    print(f"Sequential time: {end_time - start_time:.2f} seconds")

Ejecuta el código:

python3 pool_example.py

Deberías ver una salida similar a:

Processing 1...
Processing 2...
Processing 3...
Processing 4...
Processing 5...
Results: [1, 4, 9, 16, 25]
Time taken: 3.01 seconds
Processing 1...
Processing 2...
Processing 3...
Processing 4...
Processing 5...
Sequential results: [1, 4, 9, 16, 25]
Sequential time: 5.01 seconds

Observa cómo el Pool es más rápido porque procesa múltiples números en paralelo. Con un tamaño de pool de 2, puede procesar 2 números a la vez, mientras que el enfoque secuencial los procesa uno tras otro.

Pasar Argumentos Únicos con Pool.map()

El método map() es perfecto para cuando tu función toma un solo argumento. Aplica la función a cada elemento en un iterable, en paralelo.

Exploremos esto más con otro ejemplo. Crea un archivo llamado pool_map.py:

import multiprocessing
import time
import os

def process_item(item):
    """Process a single item"""
    process_id = os.getpid()
    print(f"Process {process_id} processing item {item}")
    time.sleep(1)  ## Simulate work
    return item * 10

if __name__ == "__main__":
    ## Create a list of items to process
    items = list(range(10))

    ## Get the number of CPU cores available
    num_cores = multiprocessing.cpu_count()
    print(f"System has {num_cores} CPU cores")

    ## Create a Pool with the number of available cores
    with multiprocessing.Pool(processes=num_cores) as pool:
        print("Starting parallel processing...")
        start_time = time.time()

        ## Process all items in parallel
        results = pool.map(process_item, items)

        end_time = time.time()
        print(f"All processing completed in {end_time - start_time:.2f} seconds")
        print(f"Results: {results}")

Ejecuta el código:

python3 pool_map.py

Tu salida debería mostrar los elementos que se procesan en paralelo a través de múltiples procesos, con un tiempo significativamente más corto que 10 segundos (que sería el tiempo de procesamiento secuencial para 10 elementos con un retraso de 1 segundo cada uno).

Pasar Múltiples Argumentos con Pool.starmap()

¿Qué pasa si tu función necesita múltiples argumentos? Aquí es donde entra starmap().

Crea un archivo llamado pool_starmap.py:

import multiprocessing
import time

def process_data(id, value, multiplier):
    """Process data with multiple arguments"""
    print(f"Processing item {id}: {value} with multiplier {multiplier}")
    time.sleep(1)  ## Simulate work
    return value * multiplier

if __name__ == "__main__":
    ## Create a list of argument tuples
    ## Each tuple contains all arguments for one function call
    arguments = [
        (1, 5, 2),    ## id=1, value=5, multiplier=2
        (2, 10, 3),   ## id=2, value=10, multiplier=3
        (3, 15, 2),   ## id=3, value=15, multiplier=2
        (4, 20, 4),   ## id=4, value=20, multiplier=4
        (5, 25, 5)    ## id=5, value=25, multiplier=5
    ]

    ## Create a Pool with 3 processes
    with multiprocessing.Pool(processes=3) as pool:
        print("Starting parallel processing with multiple arguments...")
        start_time = time.time()

        ## Process all items in parallel using starmap
        results = pool.starmap(process_data, arguments)

        end_time = time.time()
        print(f"All processing completed in {end_time - start_time:.2f} seconds")
        print(f"Results: {results}")

Ejecuta el código:

python3 pool_starmap.py

Deberías ver la salida mostrando cada elemento que se procesa con su propio conjunto de argumentos, y la lista de resultados final que contiene todos los valores calculados.

Puntos Clave sobre Pool

  • Pool maneja automáticamente la distribución de tareas a través de los procesos
  • Administra una cola de tareas y las asigna a los procesos de trabajo disponibles
  • Recopila los resultados y los devuelve en el mismo orden que los datos de entrada
  • El número de procesos se puede optimizar en función de los núcleos de CPU de tu sistema y la naturaleza de tu tarea

Técnicas Avanzadas de Paso de Argumentos

En los pasos anteriores, aprendimos sobre el paso de argumentos básico con objetos Process y la clase Pool. Ahora, exploremos técnicas más avanzadas para pasar argumentos en escenarios de multiprocessing.

Pasar Datos Complejos con apply() y apply_async()

A veces, necesitas más flexibilidad de la que proporcionan map() y starmap(). Los métodos apply() y apply_async() te permiten ejecutar una función con argumentos específicos y manejar los resultados de forma más dinámica.

Crea un archivo llamado pool_apply.py:

import multiprocessing
import time
import random

def process_task(task_id, duration, data):
    """Process a task with multiple arguments including complex data"""
    print(f"Starting task {task_id} with duration {duration}s and data: {data}")
    time.sleep(duration)  ## Simulate variable work time
    result = sum(data) * task_id
    print(f"Task {task_id} completed")
    return result

if __name__ == "__main__":
    ## Create a Pool with 2 processes
    with multiprocessing.Pool(processes=2) as pool:
        print("Starting tasks with apply()...")

        ## Execute tasks with apply() - blocking
        result1 = pool.apply(process_task, args=(1, 2, [1, 2, 3]))
        print(f"Result 1: {result1}")

        result2 = pool.apply(process_task, args=(2, 1, [4, 5, 6]))
        print(f"Result 2: {result2}")

        print("\nStarting tasks with apply_async()...")

        ## Execute tasks with apply_async() - non-blocking
        async_result1 = pool.apply_async(process_task, args=(3, 3, [7, 8, 9]))
        async_result2 = pool.apply_async(process_task, args=(4, 2, [10, 11, 12]))

        ## Do other work while tasks are running
        print("Tasks are running in the background...")
        time.sleep(1)
        print("Main process is doing other work...")

        ## Get results when needed (will wait if not yet available)
        print(f"Result 3: {async_result1.get()}")
        print(f"Result 4: {async_result2.get()}")

Ejecuta el código:

python3 pool_apply.py

La salida mostrará cómo apply() es bloqueante (secuencial) mientras que apply_async() permite que las tareas se ejecuten en paralelo.

Usando functools.partial para la Aplicación Parcial de Funciones

Cuando tienes una función que toma múltiples argumentos pero quieres fijar algunos de ellos, functools.partial es muy útil.

Crea un archivo llamado partial_example.py:

import multiprocessing
from functools import partial
import time

def process_with_config(data, config_a, config_b):
    """Process data with specific configuration parameters"""
    print(f"Processing {data} with config_a={config_a}, config_b={config_b}")
    time.sleep(1)
    return data * config_a + config_b

if __name__ == "__main__":
    ## Data to process
    data_items = [1, 2, 3, 4, 5]

    ## Configuration values
    config_a = 10
    config_b = 5

    ## Create a partially applied function with fixed config values
    partial_process = partial(process_with_config, config_a=config_a, config_b=config_b)

    ## Create a Pool
    with multiprocessing.Pool(processes=3) as pool:
        ## Use map with the partially applied function
        print("Starting processing with partial function...")
        results = pool.map(partial_process, data_items)
        print(f"Results: {results}")

Ejecuta el código:

python3 partial_example.py

La salida mostrará cómo cada elemento de datos se procesa con los mismos valores de configuración que se fijaron con partial().

Pasar Argumentos a través de Memoria Compartida

Para datos grandes que no deberían copiarse para cada proceso, la memoria compartida puede ser más eficiente. Exploremos esta técnica.

Crea un archivo llamado shared_memory_example.py:

import multiprocessing
import numpy as np
import time

def process_array_chunk(array, start_idx, end_idx):
    """Process a chunk of a shared array"""
    print(f"Processing chunk from index {start_idx} to {end_idx}")

    ## Simulate processing each element
    for i in range(start_idx, end_idx):
        array[i] = array[i] ** 2

    time.sleep(1)  ## Simulate additional work
    return start_idx, end_idx

if __name__ == "__main__":
    ## Create a shared array
    array_size = 1000000
    shared_array = multiprocessing.Array('d', array_size)  ## 'd' for double precision

    ## Fill the array with initial values
    temp_array = np.frombuffer(shared_array.get_obj())
    temp_array[:] = np.arange(array_size)

    print(f"Array initialized with size {array_size}")
    print(f"First 5 elements: {temp_array[:5]}")

    ## Define chunk size and create tasks
    num_processes = 4
    chunk_size = array_size // num_processes
    tasks = []

    for i in range(num_processes):
        start = i * chunk_size
        end = start + chunk_size if i < num_processes - 1 else array_size
        tasks.append((start, end))

    ## Process chunks in parallel
    with multiprocessing.Pool(processes=num_processes) as pool:
        ## Create arguments for each task
        args = [(shared_array, start, end) for start, end in tasks]

        print("Starting parallel processing...")
        start_time = time.time()

        ## Using starmap to pass multiple arguments
        results = pool.starmap(process_array_chunk, args)

        end_time = time.time()
        print(f"Processing completed in {end_time - start_time:.2f} seconds")

    ## Verify results
    print(f"First 5 elements after processing: {temp_array[:5]}")
    print(f"Results summary: {results}")

Ejecuta el código:

python3 shared_memory_example.py

La salida mostrará cómo un gran array se procesa en trozos por diferentes procesos, con todos los procesos teniendo acceso a la misma memoria compartida.

Comparación de Rendimiento

Comparemos estas diferentes técnicas para una tarea real.

Crea un archivo llamado performance_comparison.py:

import multiprocessing
import time
import numpy as np
from functools import partial

def process_data_simple(data):
    """Simple processing function"""
    return sum(x ** 2 for x in data)

def process_data_with_config(data, multiplier):
    """Processing with additional configuration"""
    return sum(x ** 2 for x in data) * multiplier

def run_test(test_name, func, data_chunks, pool_size=4, **kwargs):
    """Run a test and measure performance"""
    start_time = time.time()

    with multiprocessing.Pool(processes=pool_size) as pool:
        if kwargs:
            partial_func = partial(func, **kwargs)
            results = pool.map(partial_func, data_chunks)
        else:
            results = pool.map(func, data_chunks)

    end_time = time.time()
    print(f"{test_name} completed in {end_time - start_time:.4f} seconds")
    return results

if __name__ == "__main__":
    ## Create test data
    chunk_size = 1000000
    num_chunks = 8
    data_chunks = [np.random.rand(chunk_size) for _ in range(num_chunks)]

    print(f"Created {num_chunks} data chunks of size {chunk_size} each")

    ## Test 1: Simple map
    results1 = run_test("Simple map", process_data_simple, data_chunks)

    ## Test 2: Using partial
    results2 = run_test("Map with partial", process_data_with_config,
                        data_chunks, multiplier=2)

    ## Test 3: Sequential processing for comparison
    start_time = time.time()
    seq_results = [process_data_simple(chunk) for chunk in data_chunks]
    end_time = time.time()
    print(f"Sequential processing completed in {end_time - start_time:.4f} seconds")

    ## Validate results
    print(f"\nFirst result from each test:")
    print(f"Test 1 first result: {results1[0]:.4f}")
    print(f"Test 2 first result: {results2[0]:.4f} (should be double of Test 1)")
    print(f"Sequential first result: {seq_results[0]:.4f}")

Ejecuta la comparación de rendimiento:

python3 performance_comparison.py

La salida mostrará las diferencias de rendimiento entre los enfoques, demostrando los beneficios del procesamiento paralelo y la sobrecarga de diferentes técnicas de paso de argumentos.

Aplicación en el Mundo Real: Procesamiento de Imágenes en Paralelo

Ahora que entendemos varias técnicas para pasar argumentos en el multiprocessing de Python, apliquemos estos conceptos a un escenario del mundo real: el procesamiento de imágenes en paralelo. Este es un caso de uso común donde el multiprocessing puede mejorar significativamente el rendimiento.

Configuración del Entorno

Primero, instalemos los paquetes requeridos:

pip install Pillow numpy

Creación de Imágenes de Muestra

Creemos un script para generar algunas imágenes de muestra para nuestro procesamiento. Crea un archivo llamado create_images.py:

from PIL import Image
import numpy as np
import os

def create_sample_image(filename, width=800, height=600):
    """Create a sample image with random data"""
    ## Create random array data
    data = np.random.randint(0, 255, (height, width, 3), dtype=np.uint8)

    ## Create image from array
    img = Image.fromarray(data, 'RGB')

    ## Save the image
    img.save(filename)
    print(f"Created image: {filename}")

if __name__ == "__main__":
    ## Create a directory for sample images
    os.makedirs("sample_images", exist_ok=True)

    ## Create 5 sample images
    for i in range(5):
        create_sample_image(f"sample_images/sample_{i+1}.png")

    print("All sample images created successfully")

Ejecuta este script para crear imágenes de muestra:

python3 create_images.py

Procesamiento Secuencial de Imágenes

Primero, implementemos una versión secuencial de nuestras tareas de procesamiento de imágenes. Crea un archivo llamado sequential_image_processing.py:

from PIL import Image, ImageFilter
import os
import time

def apply_filters(image_path, output_dir):
    """Apply multiple filters to an image and save the results"""
    ## Load the image
    image_name = os.path.basename(image_path)
    print(f"Processing image: {image_name}")
    img = Image.open(image_path)

    ## Apply filters
    ## 1. Blur filter
    start_time = time.time()
    blurred = img.filter(ImageFilter.GaussianBlur(radius=5))
    blurred.save(os.path.join(output_dir, f"blur_{image_name}"))

    ## 2. Edge detection filter
    edges = img.filter(ImageFilter.FIND_EDGES)
    edges.save(os.path.join(output_dir, f"edges_{image_name}"))

    ## 3. Emboss filter
    emboss = img.filter(ImageFilter.EMBOSS)
    emboss.save(os.path.join(output_dir, f"emboss_{image_name}"))

    ## 4. Sharpen filter
    sharpen = img.filter(ImageFilter.SHARPEN)
    sharpen.save(os.path.join(output_dir, f"sharpen_{image_name}"))

    end_time = time.time()
    processing_time = end_time - start_time

    print(f"Completed processing {image_name} in {processing_time:.2f} seconds")
    return image_name, processing_time

def process_images_sequentially(image_dir, output_dir):
    """Process all images in a directory sequentially"""
    ## Ensure output directory exists
    os.makedirs(output_dir, exist_ok=True)

    ## Get all image files
    image_files = [os.path.join(image_dir, f) for f in os.listdir(image_dir)
                   if f.endswith(('.png', '.jpg', '.jpeg'))]

    print(f"Found {len(image_files)} images to process")

    ## Process each image sequentially
    start_time = time.time()
    results = []

    for image_path in image_files:
        result = apply_filters(image_path, output_dir)
        results.append(result)

    end_time = time.time()
    total_time = end_time - start_time

    print(f"\nSequential processing completed")
    print(f"Total processing time: {total_time:.2f} seconds")
    return results

if __name__ == "__main__":
    ## Define directories
    image_dir = "sample_images"
    output_dir = "processed_sequential"

    ## Process images
    results = process_images_sequentially(image_dir, output_dir)

    ## Print summary
    print("\nProcessing summary:")
    for image_name, proc_time in results:
        print(f"  {image_name}: {proc_time:.2f} seconds")

Ejecuta el procesamiento secuencial:

python3 sequential_image_processing.py

Procesamiento Paralelo de Imágenes

Ahora, implementemos el procesamiento paralelo de imágenes utilizando las técnicas que hemos aprendido. Crea un archivo llamado parallel_image_processing.py:

from PIL import Image, ImageFilter
import multiprocessing
import os
import time

def apply_filters(args):
    """Apply multiple filters to an image and save the results"""
    image_path, output_dir = args

    ## Load the image
    image_name = os.path.basename(image_path)
    print(f"Process {os.getpid()} processing image: {image_name}")
    img = Image.open(image_path)

    ## Apply filters
    ## 1. Blur filter
    start_time = time.time()
    blurred = img.filter(ImageFilter.GaussianBlur(radius=5))
    blurred.save(os.path.join(output_dir, f"blur_{image_name}"))

    ## 2. Edge detection filter
    edges = img.filter(ImageFilter.FIND_EDGES)
    edges.save(os.path.join(output_dir, f"edges_{image_name}"))

    ## 3. Emboss filter
    emboss = img.filter(ImageFilter.EMBOSS)
    emboss.save(os.path.join(output_dir, f"emboss_{image_name}"))

    ## 4. Sharpen filter
    sharpen = img.filter(ImageFilter.SHARPEN)
    sharpen.save(os.path.join(output_dir, f"sharpen_{image_name}"))

    end_time = time.time()
    processing_time = end_time - start_time

    print(f"Process {os.getpid()} completed {image_name} in {processing_time:.2f} seconds")
    return image_name, processing_time

def process_images_in_parallel(image_dir, output_dir, num_processes=None):
    """Process all images in a directory in parallel"""
    ## Ensure output directory exists
    os.makedirs(output_dir, exist_ok=True)

    ## Get all image files
    image_files = [os.path.join(image_dir, f) for f in os.listdir(image_dir)
                   if f.endswith(('.png', '.jpg', '.jpeg'))]

    print(f"Found {len(image_files)} images to process")

    ## Determine number of processes
    if num_processes is None:
        num_processes = min(multiprocessing.cpu_count(), len(image_files))

    print(f"Using {num_processes} processes for parallel processing")

    ## Create arguments for each task
    task_args = [(image_path, output_dir) for image_path in image_files]

    ## Process images in parallel
    start_time = time.time()

    with multiprocessing.Pool(processes=num_processes) as pool:
        results = pool.map(apply_filters, task_args)

    end_time = time.time()
    total_time = end_time - start_time

    print(f"\nParallel processing completed")
    print(f"Total processing time: {total_time:.2f} seconds")
    return results

if __name__ == "__main__":
    ## Define directories
    image_dir = "sample_images"
    output_dir = "processed_parallel"

    ## Process images
    results = process_images_in_parallel(image_dir, output_dir)

    ## Print summary
    print("\nProcessing summary:")
    for image_name, proc_time in results:
        print(f"  {image_name}: {proc_time:.2f} seconds")

Ejecuta el procesamiento paralelo:

python3 parallel_image_processing.py

Procesamiento Paralelo Avanzado con Diferentes Filtros

Ahora, implementemos una versión más compleja donde aplicamos diferentes filtros a la misma imagen en paralelo. Crea un archivo llamado advanced_parallel_processing.py:

from PIL import Image, ImageFilter
import multiprocessing
import os
import time
from functools import partial

def apply_filter(filter_info, image_path, output_dir):
    """Apply a specific filter to an image"""
    filter_name, filter_obj = filter_info

    ## Load the image
    image_name = os.path.basename(image_path)
    print(f"Process {os.getpid()} applying {filter_name} to {image_name}")

    ## Open image each time to avoid potential issues with sharing PIL objects
    img = Image.open(image_path)

    ## Apply filter
    start_time = time.time()
    filtered = img.filter(filter_obj)

    ## Save the filtered image
    output_filename = f"{filter_name}_{image_name}"
    output_path = os.path.join(output_dir, output_filename)
    filtered.save(output_path)

    end_time = time.time()
    processing_time = end_time - start_time

    print(f"Process {os.getpid()} completed {filter_name} on {image_name} in {processing_time:.2f} seconds")
    return filter_name, image_name, processing_time

def process_image_with_multiple_filters(image_path, output_dir, filters):
    """Process an image with multiple filters in parallel"""
    ## Create a partial function with fixed image_path and output_dir
    apply_filter_to_image = partial(apply_filter,
                                   image_path=image_path,
                                   output_dir=output_dir)

    ## Process the image with multiple filters in parallel
    with multiprocessing.Pool() as pool:
        results = pool.map(apply_filter_to_image, filters)

    return results

def main():
    ## Define directories
    image_dir = "sample_images"
    output_dir = "processed_advanced"
    os.makedirs(output_dir, exist_ok=True)

    ## Define filters to apply
    filters = [
        ("blur", ImageFilter.GaussianBlur(radius=5)),
        ("edges", ImageFilter.FIND_EDGES),
        ("emboss", ImageFilter.EMBOSS),
        ("sharpen", ImageFilter.SHARPEN),
        ("contour", ImageFilter.CONTOUR),
        ("detail", ImageFilter.DETAIL),
        ("smooth", ImageFilter.SMOOTH),
        ("smoothmore", ImageFilter.SMOOTH_MORE)
    ]

    ## Get all image files
    image_files = [os.path.join(image_dir, f) for f in os.listdir(image_dir)
                   if f.endswith(('.png', '.jpg', '.jpeg'))]

    print(f"Found {len(image_files)} images to process with {len(filters)} filters each")

    ## Process each image with all filters
    all_results = []
    overall_start = time.time()

    for image_path in image_files:
        image_name = os.path.basename(image_path)
        print(f"\nProcessing {image_name} with {len(filters)} filters in parallel")

        start_time = time.time()
        results = process_image_with_multiple_filters(image_path, output_dir, filters)
        end_time = time.time()

        image_time = end_time - start_time
        print(f"Completed all filters for {image_name} in {image_time:.2f} seconds")

        all_results.extend(results)

    overall_end = time.time()
    total_time = overall_end - overall_start

    ## Print summary
    print("\nProcessing Summary:")
    print(f"Total processing time: {total_time:.2f} seconds")
    print(f"Total operations: {len(all_results)}")

    ## Calculate average time per filter
    avg_time = sum(time for _, _, time in all_results) / len(all_results)
    print(f"Average processing time per filter: {avg_time:.2f} seconds")

if __name__ == "__main__":
    main()

Ejecuta el procesamiento paralelo avanzado:

python3 advanced_parallel_processing.py

Comparación de Rendimiento

Creemos un script para comparar el rendimiento de los diferentes enfoques. Crea un archivo llamado compare_performance.py:

import os
import time
import subprocess
import matplotlib.pyplot as plt
import numpy as np

def run_script(script_name):
    """Run a Python script and measure its execution time"""
    print(f"Running {script_name}...")
    start_time = time.time()

    ## Run the script
    process = subprocess.run(['python3', script_name],
                            capture_output=True, text=True)

    end_time = time.time()
    execution_time = end_time - start_time

    print(f"Completed {script_name} in {execution_time:.2f} seconds")
    print(f"Output: {process.stdout[-200:]}") ## Show last part of output

    return execution_time

def main():
    ## Scripts to compare
    scripts = [
        'sequential_image_processing.py',
        'parallel_image_processing.py',
        'advanced_parallel_processing.py'
    ]

    ## Run each script and measure time
    times = []
    for script in scripts:
        if os.path.exists(script):
            times.append(run_script(script))
        else:
            print(f"Script {script} not found!")
            times.append(0)

    ## Plot results
    if any(times):
        labels = ['Sequential', 'Basic Parallel', 'Advanced Parallel']
        x = np.arange(len(labels))

        plt.figure(figsize=(10, 6))
        plt.bar(x, times, color=['red', 'blue', 'green'])
        plt.xlabel('Processing Method')
        plt.ylabel('Execution Time (s)')
        plt.title('Image Processing Performance Comparison')
        plt.xticks(x, labels)

        ## Add execution time as text on bars
        for i, v in enumerate(times):
            plt.text(i, v + 0.1, f"{v:.2f}s", ha='center')

        ## Save the plot
        plt.tight_layout()
        plt.savefig('performance_comparison.png')
        print("Performance comparison plot saved as 'performance_comparison.png'")
    else:
        print("No valid execution times to plot")

if __name__ == "__main__":
    main()

Ejecutemos la comparación:

python3 compare_performance.py

Revisa los resultados y examina la imagen performance_comparison.png para ver las diferencias en el tiempo de ejecución entre los diferentes enfoques.

Conclusiones Clave

De este ejemplo del mundo real, puedes observar varios aspectos importantes del multiprocessing:

  1. El procesamiento paralelo puede reducir significativamente el tiempo de ejecución cuando se trata de tareas intensivas en CPU como el procesamiento de imágenes.

  2. El método de paso de argumentos afecta tanto la complejidad del código como el rendimiento:

    • map() simple para funciones de un solo argumento directas
    • Funciones parciales para fijar ciertos parámetros
    • Técnicas avanzadas para flujos de trabajo complejos
  3. Consideraciones para aplicaciones del mundo real:

    • Gestión de recursos (CPU, memoria)
    • Estrategias de distribución de tareas
    • Manejo de errores en entornos paralelos
    • Coordinación entre procesos

Este ejemplo práctico demuestra cómo aplicar los conceptos de multiprocessing que hemos aprendido a lo largo de este laboratorio para resolver problemas del mundo real de manera eficiente.

Resumen

En este laboratorio, aprendiste sobre el multiprocessing de Python y diferentes técnicas para pasar argumentos a procesos paralelos. Aquí hay un resumen de lo que has logrado:

  1. Fundamentos del Multiprocessing: Creaste tu primer programa de multiprocessing y comprendiste la diferencia entre procesos e hilos.

  2. Técnicas de Paso de Argumentos: Exploraste varios métodos para pasar argumentos:

    • Paso de argumentos básico con Process.args
    • Paso de un solo argumento con Pool.map()
    • Múltiples argumentos con Pool.starmap()
    • Ejecución dinámica con apply() y apply_async()
    • Uso de functools.partial para la aplicación parcial de funciones
    • Memoria compartida para el intercambio eficiente de datos
  3. Aplicación en el Mundo Real: Aplicaste estos conceptos a un ejemplo práctico de procesamiento de imágenes, demostrando mejoras significativas en el rendimiento a través de la paralelización.

Las habilidades que has aprendido en este laboratorio son valiosas para cualquier aplicación de Python intensiva en CPU, lo que te permite aprovechar toda la potencia computacional de los sistemas de múltiples núcleos. A medida que desarrolles aplicaciones más complejas, recuerda considerar las compensaciones entre las diferentes técnicas de paso de argumentos y elegir el método más apropiado para tu caso de uso específico.