Introducción
El módulo multiprocessing de Python permite la computación paralela al aprovechar múltiples núcleos de CPU, lo que puede mejorar significativamente el rendimiento computacional para tareas intensivas en CPU. Este laboratorio explora diferentes técnicas para pasar argumentos a los procesos en el módulo multiprocessing, abordando desafíos comunes en la programación concurrente y demostrando estrategias prácticas para una paralelización efectiva.
Al final de este laboratorio, comprenderá cómo usar diferentes métodos para pasar datos a procesos paralelos y aplicará estas técnicas a escenarios del mundo real.
Introducción a Multiprocessing en Python
En este paso, aprenderemos los fundamentos de multiprocessing en Python y crearemos nuestro primer programa paralelo.
¿Qué es Multiprocessing?
Multiprocessing es un módulo de Python que nos permite ejecutar múltiples procesos en paralelo, utilizando eficazmente múltiples núcleos de CPU. Esto es diferente de threading (hilos), que está limitado por el Global Interpreter Lock (GIL) que impide la verdadera ejecución paralela del código Python.
Aquí hay una comparación simple:
| Característica | Multiprocessing | Threading |
|---|---|---|
| Memoria | Espacios de memoria separados | Espacio de memoria compartido |
| Paralelismo | Verdadera ejecución paralela | Limitado por GIL |
| Caso de uso | Tareas con uso intensivo de CPU | Tareas con uso intensivo de E/S |
| Sobrecarga | Mayor (procesos separados) | Menor |
Creando tu Primer Programa Multiprocessing
Creemos un ejemplo simple de multiprocessing. Abre un nuevo archivo en el WebIDE y nómbralo simple_process.py:
import multiprocessing
import time
import os
def worker():
"""Simple function to demonstrate a process"""
print(f"Worker process id: {os.getpid()}")
print(f"Worker parent process id: {os.getppid()}")
time.sleep(1)
print("Worker process completed")
if __name__ == "__main__":
## Print information about the main process
print(f"Main process id: {os.getpid()}")
## Create a process
process = multiprocessing.Process(target=worker)
## Start the process
print("Starting worker process...")
process.start()
## Wait for the process to complete
process.join()
print("Main process completed")
Ahora ejecutemos este programa:
python3 simple_process.py
Deberías ver una salida similar a:
Main process id: 12345
Starting worker process...
Worker process id: 12346
Worker parent process id: 12345
Worker process completed
Main process completed
Los IDs de proceso exactos diferirán en tu sistema. Observa cómo el proceso trabajador tiene un ID de proceso diferente al del proceso principal, confirmando que son procesos separados.
Entendiendo la Creación de Procesos
Cuando creamos un Process de multiprocessing y lo iniciamos, Python:
- Crea un nuevo proceso
- Importa el módulo que contiene la función objetivo
- Ejecuta la función objetivo en el nuevo proceso
- Retorna el control al proceso padre
La guardia if __name__ == "__main__": es importante cuando se trabaja con multiprocessing en Python. Esto evita la creación de procesos duplicados cuando se importa el módulo.
Múltiples Procesos
Modifiquemos nuestro ejemplo para crear múltiples procesos. Crea un nuevo archivo llamado multiple_processes.py:
import multiprocessing
import time
import os
def worker(worker_id):
"""Worker function that accepts an argument"""
print(f"Worker {worker_id} (PID: {os.getpid()}) started")
time.sleep(1)
print(f"Worker {worker_id} completed")
return worker_id * 2 ## This return value won't be captured
if __name__ == "__main__":
print(f"Main process PID: {os.getpid()}")
## Create multiple processes
processes = []
for i in range(3):
p = multiprocessing.Process(target=worker, args=(i,))
processes.append(p)
p.start()
## Wait for all processes to complete
for p in processes:
p.join()
print("All processes completed")
Ejecuta este código:
python3 multiple_processes.py
Deberías ver una salida similar a:
Main process PID: 12345
Worker 0 (PID: 12346) started
Worker 1 (PID: 12347) started
Worker 2 (PID: 12348) started
Worker 0 completed
Worker 1 completed
Worker 2 completed
All processes completed
Observa cómo pasamos un argumento (worker_id) a cada proceso usando el parámetro args. Este es nuestro primer ejemplo de pasar argumentos a un proceso.
También observa que los procesos podrían no necesariamente ejecutarse en un orden predecible, ya que se están ejecutando en paralelo.
Paso de Argumentos Básico con Pool
En el paso anterior, pasamos un argumento simple a un proceso. Ahora, exploremos una forma más eficiente de manejar múltiples tareas utilizando la clase Pool y aprendamos sobre diferentes formas de pasar argumentos.
La Clase Pool
La clase Pool proporciona una forma conveniente de paralelizar la ejecución de una función a través de múltiples valores de entrada. Distribuye los datos de entrada entre los procesos y recopila los resultados.
Creemos un ejemplo simple para entender cómo funciona Pool. Crea un nuevo archivo llamado pool_example.py:
import multiprocessing
import time
def square(x):
"""Function that squares a number and returns the result"""
print(f"Processing {x}...")
time.sleep(1) ## Simulate a time-consuming operation
return x * x
if __name__ == "__main__":
## Create a list of numbers to process
numbers = [1, 2, 3, 4, 5]
## Create a Pool with 2 processes
with multiprocessing.Pool(processes=2) as pool:
## Map the square function to the numbers
start_time = time.time()
results = pool.map(square, numbers)
end_time = time.time()
print(f"Results: {results}")
print(f"Time taken: {end_time - start_time:.2f} seconds")
## Compare with sequential processing
start_time = time.time()
sequential_results = [square(n) for n in numbers]
end_time = time.time()
print(f"Sequential results: {sequential_results}")
print(f"Sequential time: {end_time - start_time:.2f} seconds")
Ejecuta el código:
python3 pool_example.py
Deberías ver una salida similar a:
Processing 1...
Processing 2...
Processing 3...
Processing 4...
Processing 5...
Results: [1, 4, 9, 16, 25]
Time taken: 3.01 seconds
Processing 1...
Processing 2...
Processing 3...
Processing 4...
Processing 5...
Sequential results: [1, 4, 9, 16, 25]
Sequential time: 5.01 seconds
Observa cómo el Pool es más rápido porque procesa múltiples números en paralelo. Con un tamaño de pool de 2, puede procesar 2 números a la vez, mientras que el enfoque secuencial los procesa uno tras otro.
Pasar Argumentos Únicos con Pool.map()
El método map() es perfecto para cuando tu función toma un solo argumento. Aplica la función a cada elemento en un iterable, en paralelo.
Exploremos esto más con otro ejemplo. Crea un archivo llamado pool_map.py:
import multiprocessing
import time
import os
def process_item(item):
"""Process a single item"""
process_id = os.getpid()
print(f"Process {process_id} processing item {item}")
time.sleep(1) ## Simulate work
return item * 10
if __name__ == "__main__":
## Create a list of items to process
items = list(range(10))
## Get the number of CPU cores available
num_cores = multiprocessing.cpu_count()
print(f"System has {num_cores} CPU cores")
## Create a Pool with the number of available cores
with multiprocessing.Pool(processes=num_cores) as pool:
print("Starting parallel processing...")
start_time = time.time()
## Process all items in parallel
results = pool.map(process_item, items)
end_time = time.time()
print(f"All processing completed in {end_time - start_time:.2f} seconds")
print(f"Results: {results}")
Ejecuta el código:
python3 pool_map.py
Tu salida debería mostrar los elementos que se procesan en paralelo a través de múltiples procesos, con un tiempo significativamente más corto que 10 segundos (que sería el tiempo de procesamiento secuencial para 10 elementos con un retraso de 1 segundo cada uno).
Pasar Múltiples Argumentos con Pool.starmap()
¿Qué pasa si tu función necesita múltiples argumentos? Aquí es donde entra starmap().
Crea un archivo llamado pool_starmap.py:
import multiprocessing
import time
def process_data(id, value, multiplier):
"""Process data with multiple arguments"""
print(f"Processing item {id}: {value} with multiplier {multiplier}")
time.sleep(1) ## Simulate work
return value * multiplier
if __name__ == "__main__":
## Create a list of argument tuples
## Each tuple contains all arguments for one function call
arguments = [
(1, 5, 2), ## id=1, value=5, multiplier=2
(2, 10, 3), ## id=2, value=10, multiplier=3
(3, 15, 2), ## id=3, value=15, multiplier=2
(4, 20, 4), ## id=4, value=20, multiplier=4
(5, 25, 5) ## id=5, value=25, multiplier=5
]
## Create a Pool with 3 processes
with multiprocessing.Pool(processes=3) as pool:
print("Starting parallel processing with multiple arguments...")
start_time = time.time()
## Process all items in parallel using starmap
results = pool.starmap(process_data, arguments)
end_time = time.time()
print(f"All processing completed in {end_time - start_time:.2f} seconds")
print(f"Results: {results}")
Ejecuta el código:
python3 pool_starmap.py
Deberías ver la salida mostrando cada elemento que se procesa con su propio conjunto de argumentos, y la lista de resultados final que contiene todos los valores calculados.
Puntos Clave sobre Pool
Poolmaneja automáticamente la distribución de tareas a través de los procesos- Administra una cola de tareas y las asigna a los procesos de trabajo disponibles
- Recopila los resultados y los devuelve en el mismo orden que los datos de entrada
- El número de procesos se puede optimizar en función de los núcleos de CPU de tu sistema y la naturaleza de tu tarea
Técnicas Avanzadas de Paso de Argumentos
En los pasos anteriores, aprendimos sobre el paso de argumentos básico con objetos Process y la clase Pool. Ahora, exploremos técnicas más avanzadas para pasar argumentos en escenarios de multiprocessing.
Pasar Datos Complejos con apply() y apply_async()
A veces, necesitas más flexibilidad de la que proporcionan map() y starmap(). Los métodos apply() y apply_async() te permiten ejecutar una función con argumentos específicos y manejar los resultados de forma más dinámica.
Crea un archivo llamado pool_apply.py:
import multiprocessing
import time
import random
def process_task(task_id, duration, data):
"""Process a task with multiple arguments including complex data"""
print(f"Starting task {task_id} with duration {duration}s and data: {data}")
time.sleep(duration) ## Simulate variable work time
result = sum(data) * task_id
print(f"Task {task_id} completed")
return result
if __name__ == "__main__":
## Create a Pool with 2 processes
with multiprocessing.Pool(processes=2) as pool:
print("Starting tasks with apply()...")
## Execute tasks with apply() - blocking
result1 = pool.apply(process_task, args=(1, 2, [1, 2, 3]))
print(f"Result 1: {result1}")
result2 = pool.apply(process_task, args=(2, 1, [4, 5, 6]))
print(f"Result 2: {result2}")
print("\nStarting tasks with apply_async()...")
## Execute tasks with apply_async() - non-blocking
async_result1 = pool.apply_async(process_task, args=(3, 3, [7, 8, 9]))
async_result2 = pool.apply_async(process_task, args=(4, 2, [10, 11, 12]))
## Do other work while tasks are running
print("Tasks are running in the background...")
time.sleep(1)
print("Main process is doing other work...")
## Get results when needed (will wait if not yet available)
print(f"Result 3: {async_result1.get()}")
print(f"Result 4: {async_result2.get()}")
Ejecuta el código:
python3 pool_apply.py
La salida mostrará cómo apply() es bloqueante (secuencial) mientras que apply_async() permite que las tareas se ejecuten en paralelo.
Usando functools.partial para la Aplicación Parcial de Funciones
Cuando tienes una función que toma múltiples argumentos pero quieres fijar algunos de ellos, functools.partial es muy útil.
Crea un archivo llamado partial_example.py:
import multiprocessing
from functools import partial
import time
def process_with_config(data, config_a, config_b):
"""Process data with specific configuration parameters"""
print(f"Processing {data} with config_a={config_a}, config_b={config_b}")
time.sleep(1)
return data * config_a + config_b
if __name__ == "__main__":
## Data to process
data_items = [1, 2, 3, 4, 5]
## Configuration values
config_a = 10
config_b = 5
## Create a partially applied function with fixed config values
partial_process = partial(process_with_config, config_a=config_a, config_b=config_b)
## Create a Pool
with multiprocessing.Pool(processes=3) as pool:
## Use map with the partially applied function
print("Starting processing with partial function...")
results = pool.map(partial_process, data_items)
print(f"Results: {results}")
Ejecuta el código:
python3 partial_example.py
La salida mostrará cómo cada elemento de datos se procesa con los mismos valores de configuración que se fijaron con partial().
Pasar Argumentos a través de Memoria Compartida
Para datos grandes que no deberían copiarse para cada proceso, la memoria compartida puede ser más eficiente. Exploremos esta técnica.
Crea un archivo llamado shared_memory_example.py:
import multiprocessing
import numpy as np
import time
def process_array_chunk(array, start_idx, end_idx):
"""Process a chunk of a shared array"""
print(f"Processing chunk from index {start_idx} to {end_idx}")
## Simulate processing each element
for i in range(start_idx, end_idx):
array[i] = array[i] ** 2
time.sleep(1) ## Simulate additional work
return start_idx, end_idx
if __name__ == "__main__":
## Create a shared array
array_size = 1000000
shared_array = multiprocessing.Array('d', array_size) ## 'd' for double precision
## Fill the array with initial values
temp_array = np.frombuffer(shared_array.get_obj())
temp_array[:] = np.arange(array_size)
print(f"Array initialized with size {array_size}")
print(f"First 5 elements: {temp_array[:5]}")
## Define chunk size and create tasks
num_processes = 4
chunk_size = array_size // num_processes
tasks = []
for i in range(num_processes):
start = i * chunk_size
end = start + chunk_size if i < num_processes - 1 else array_size
tasks.append((start, end))
## Process chunks in parallel
with multiprocessing.Pool(processes=num_processes) as pool:
## Create arguments for each task
args = [(shared_array, start, end) for start, end in tasks]
print("Starting parallel processing...")
start_time = time.time()
## Using starmap to pass multiple arguments
results = pool.starmap(process_array_chunk, args)
end_time = time.time()
print(f"Processing completed in {end_time - start_time:.2f} seconds")
## Verify results
print(f"First 5 elements after processing: {temp_array[:5]}")
print(f"Results summary: {results}")
Ejecuta el código:
python3 shared_memory_example.py
La salida mostrará cómo un gran array se procesa en trozos por diferentes procesos, con todos los procesos teniendo acceso a la misma memoria compartida.
Comparación de Rendimiento
Comparemos estas diferentes técnicas para una tarea real.
Crea un archivo llamado performance_comparison.py:
import multiprocessing
import time
import numpy as np
from functools import partial
def process_data_simple(data):
"""Simple processing function"""
return sum(x ** 2 for x in data)
def process_data_with_config(data, multiplier):
"""Processing with additional configuration"""
return sum(x ** 2 for x in data) * multiplier
def run_test(test_name, func, data_chunks, pool_size=4, **kwargs):
"""Run a test and measure performance"""
start_time = time.time()
with multiprocessing.Pool(processes=pool_size) as pool:
if kwargs:
partial_func = partial(func, **kwargs)
results = pool.map(partial_func, data_chunks)
else:
results = pool.map(func, data_chunks)
end_time = time.time()
print(f"{test_name} completed in {end_time - start_time:.4f} seconds")
return results
if __name__ == "__main__":
## Create test data
chunk_size = 1000000
num_chunks = 8
data_chunks = [np.random.rand(chunk_size) for _ in range(num_chunks)]
print(f"Created {num_chunks} data chunks of size {chunk_size} each")
## Test 1: Simple map
results1 = run_test("Simple map", process_data_simple, data_chunks)
## Test 2: Using partial
results2 = run_test("Map with partial", process_data_with_config,
data_chunks, multiplier=2)
## Test 3: Sequential processing for comparison
start_time = time.time()
seq_results = [process_data_simple(chunk) for chunk in data_chunks]
end_time = time.time()
print(f"Sequential processing completed in {end_time - start_time:.4f} seconds")
## Validate results
print(f"\nFirst result from each test:")
print(f"Test 1 first result: {results1[0]:.4f}")
print(f"Test 2 first result: {results2[0]:.4f} (should be double of Test 1)")
print(f"Sequential first result: {seq_results[0]:.4f}")
Ejecuta la comparación de rendimiento:
python3 performance_comparison.py
La salida mostrará las diferencias de rendimiento entre los enfoques, demostrando los beneficios del procesamiento paralelo y la sobrecarga de diferentes técnicas de paso de argumentos.
Aplicación en el Mundo Real: Procesamiento de Imágenes en Paralelo
Ahora que entendemos varias técnicas para pasar argumentos en el multiprocessing de Python, apliquemos estos conceptos a un escenario del mundo real: el procesamiento de imágenes en paralelo. Este es un caso de uso común donde el multiprocessing puede mejorar significativamente el rendimiento.
Configuración del Entorno
Primero, instalemos los paquetes requeridos:
pip install Pillow numpy
Creación de Imágenes de Muestra
Creemos un script para generar algunas imágenes de muestra para nuestro procesamiento. Crea un archivo llamado create_images.py:
from PIL import Image
import numpy as np
import os
def create_sample_image(filename, width=800, height=600):
"""Create a sample image with random data"""
## Create random array data
data = np.random.randint(0, 255, (height, width, 3), dtype=np.uint8)
## Create image from array
img = Image.fromarray(data, 'RGB')
## Save the image
img.save(filename)
print(f"Created image: {filename}")
if __name__ == "__main__":
## Create a directory for sample images
os.makedirs("sample_images", exist_ok=True)
## Create 5 sample images
for i in range(5):
create_sample_image(f"sample_images/sample_{i+1}.png")
print("All sample images created successfully")
Ejecuta este script para crear imágenes de muestra:
python3 create_images.py
Procesamiento Secuencial de Imágenes
Primero, implementemos una versión secuencial de nuestras tareas de procesamiento de imágenes. Crea un archivo llamado sequential_image_processing.py:
from PIL import Image, ImageFilter
import os
import time
def apply_filters(image_path, output_dir):
"""Apply multiple filters to an image and save the results"""
## Load the image
image_name = os.path.basename(image_path)
print(f"Processing image: {image_name}")
img = Image.open(image_path)
## Apply filters
## 1. Blur filter
start_time = time.time()
blurred = img.filter(ImageFilter.GaussianBlur(radius=5))
blurred.save(os.path.join(output_dir, f"blur_{image_name}"))
## 2. Edge detection filter
edges = img.filter(ImageFilter.FIND_EDGES)
edges.save(os.path.join(output_dir, f"edges_{image_name}"))
## 3. Emboss filter
emboss = img.filter(ImageFilter.EMBOSS)
emboss.save(os.path.join(output_dir, f"emboss_{image_name}"))
## 4. Sharpen filter
sharpen = img.filter(ImageFilter.SHARPEN)
sharpen.save(os.path.join(output_dir, f"sharpen_{image_name}"))
end_time = time.time()
processing_time = end_time - start_time
print(f"Completed processing {image_name} in {processing_time:.2f} seconds")
return image_name, processing_time
def process_images_sequentially(image_dir, output_dir):
"""Process all images in a directory sequentially"""
## Ensure output directory exists
os.makedirs(output_dir, exist_ok=True)
## Get all image files
image_files = [os.path.join(image_dir, f) for f in os.listdir(image_dir)
if f.endswith(('.png', '.jpg', '.jpeg'))]
print(f"Found {len(image_files)} images to process")
## Process each image sequentially
start_time = time.time()
results = []
for image_path in image_files:
result = apply_filters(image_path, output_dir)
results.append(result)
end_time = time.time()
total_time = end_time - start_time
print(f"\nSequential processing completed")
print(f"Total processing time: {total_time:.2f} seconds")
return results
if __name__ == "__main__":
## Define directories
image_dir = "sample_images"
output_dir = "processed_sequential"
## Process images
results = process_images_sequentially(image_dir, output_dir)
## Print summary
print("\nProcessing summary:")
for image_name, proc_time in results:
print(f" {image_name}: {proc_time:.2f} seconds")
Ejecuta el procesamiento secuencial:
python3 sequential_image_processing.py
Procesamiento Paralelo de Imágenes
Ahora, implementemos el procesamiento paralelo de imágenes utilizando las técnicas que hemos aprendido. Crea un archivo llamado parallel_image_processing.py:
from PIL import Image, ImageFilter
import multiprocessing
import os
import time
def apply_filters(args):
"""Apply multiple filters to an image and save the results"""
image_path, output_dir = args
## Load the image
image_name = os.path.basename(image_path)
print(f"Process {os.getpid()} processing image: {image_name}")
img = Image.open(image_path)
## Apply filters
## 1. Blur filter
start_time = time.time()
blurred = img.filter(ImageFilter.GaussianBlur(radius=5))
blurred.save(os.path.join(output_dir, f"blur_{image_name}"))
## 2. Edge detection filter
edges = img.filter(ImageFilter.FIND_EDGES)
edges.save(os.path.join(output_dir, f"edges_{image_name}"))
## 3. Emboss filter
emboss = img.filter(ImageFilter.EMBOSS)
emboss.save(os.path.join(output_dir, f"emboss_{image_name}"))
## 4. Sharpen filter
sharpen = img.filter(ImageFilter.SHARPEN)
sharpen.save(os.path.join(output_dir, f"sharpen_{image_name}"))
end_time = time.time()
processing_time = end_time - start_time
print(f"Process {os.getpid()} completed {image_name} in {processing_time:.2f} seconds")
return image_name, processing_time
def process_images_in_parallel(image_dir, output_dir, num_processes=None):
"""Process all images in a directory in parallel"""
## Ensure output directory exists
os.makedirs(output_dir, exist_ok=True)
## Get all image files
image_files = [os.path.join(image_dir, f) for f in os.listdir(image_dir)
if f.endswith(('.png', '.jpg', '.jpeg'))]
print(f"Found {len(image_files)} images to process")
## Determine number of processes
if num_processes is None:
num_processes = min(multiprocessing.cpu_count(), len(image_files))
print(f"Using {num_processes} processes for parallel processing")
## Create arguments for each task
task_args = [(image_path, output_dir) for image_path in image_files]
## Process images in parallel
start_time = time.time()
with multiprocessing.Pool(processes=num_processes) as pool:
results = pool.map(apply_filters, task_args)
end_time = time.time()
total_time = end_time - start_time
print(f"\nParallel processing completed")
print(f"Total processing time: {total_time:.2f} seconds")
return results
if __name__ == "__main__":
## Define directories
image_dir = "sample_images"
output_dir = "processed_parallel"
## Process images
results = process_images_in_parallel(image_dir, output_dir)
## Print summary
print("\nProcessing summary:")
for image_name, proc_time in results:
print(f" {image_name}: {proc_time:.2f} seconds")
Ejecuta el procesamiento paralelo:
python3 parallel_image_processing.py
Procesamiento Paralelo Avanzado con Diferentes Filtros
Ahora, implementemos una versión más compleja donde aplicamos diferentes filtros a la misma imagen en paralelo. Crea un archivo llamado advanced_parallel_processing.py:
from PIL import Image, ImageFilter
import multiprocessing
import os
import time
from functools import partial
def apply_filter(filter_info, image_path, output_dir):
"""Apply a specific filter to an image"""
filter_name, filter_obj = filter_info
## Load the image
image_name = os.path.basename(image_path)
print(f"Process {os.getpid()} applying {filter_name} to {image_name}")
## Open image each time to avoid potential issues with sharing PIL objects
img = Image.open(image_path)
## Apply filter
start_time = time.time()
filtered = img.filter(filter_obj)
## Save the filtered image
output_filename = f"{filter_name}_{image_name}"
output_path = os.path.join(output_dir, output_filename)
filtered.save(output_path)
end_time = time.time()
processing_time = end_time - start_time
print(f"Process {os.getpid()} completed {filter_name} on {image_name} in {processing_time:.2f} seconds")
return filter_name, image_name, processing_time
def process_image_with_multiple_filters(image_path, output_dir, filters):
"""Process an image with multiple filters in parallel"""
## Create a partial function with fixed image_path and output_dir
apply_filter_to_image = partial(apply_filter,
image_path=image_path,
output_dir=output_dir)
## Process the image with multiple filters in parallel
with multiprocessing.Pool() as pool:
results = pool.map(apply_filter_to_image, filters)
return results
def main():
## Define directories
image_dir = "sample_images"
output_dir = "processed_advanced"
os.makedirs(output_dir, exist_ok=True)
## Define filters to apply
filters = [
("blur", ImageFilter.GaussianBlur(radius=5)),
("edges", ImageFilter.FIND_EDGES),
("emboss", ImageFilter.EMBOSS),
("sharpen", ImageFilter.SHARPEN),
("contour", ImageFilter.CONTOUR),
("detail", ImageFilter.DETAIL),
("smooth", ImageFilter.SMOOTH),
("smoothmore", ImageFilter.SMOOTH_MORE)
]
## Get all image files
image_files = [os.path.join(image_dir, f) for f in os.listdir(image_dir)
if f.endswith(('.png', '.jpg', '.jpeg'))]
print(f"Found {len(image_files)} images to process with {len(filters)} filters each")
## Process each image with all filters
all_results = []
overall_start = time.time()
for image_path in image_files:
image_name = os.path.basename(image_path)
print(f"\nProcessing {image_name} with {len(filters)} filters in parallel")
start_time = time.time()
results = process_image_with_multiple_filters(image_path, output_dir, filters)
end_time = time.time()
image_time = end_time - start_time
print(f"Completed all filters for {image_name} in {image_time:.2f} seconds")
all_results.extend(results)
overall_end = time.time()
total_time = overall_end - overall_start
## Print summary
print("\nProcessing Summary:")
print(f"Total processing time: {total_time:.2f} seconds")
print(f"Total operations: {len(all_results)}")
## Calculate average time per filter
avg_time = sum(time for _, _, time in all_results) / len(all_results)
print(f"Average processing time per filter: {avg_time:.2f} seconds")
if __name__ == "__main__":
main()
Ejecuta el procesamiento paralelo avanzado:
python3 advanced_parallel_processing.py
Comparación de Rendimiento
Creemos un script para comparar el rendimiento de los diferentes enfoques. Crea un archivo llamado compare_performance.py:
import os
import time
import subprocess
import matplotlib.pyplot as plt
import numpy as np
def run_script(script_name):
"""Run a Python script and measure its execution time"""
print(f"Running {script_name}...")
start_time = time.time()
## Run the script
process = subprocess.run(['python3', script_name],
capture_output=True, text=True)
end_time = time.time()
execution_time = end_time - start_time
print(f"Completed {script_name} in {execution_time:.2f} seconds")
print(f"Output: {process.stdout[-200:]}") ## Show last part of output
return execution_time
def main():
## Scripts to compare
scripts = [
'sequential_image_processing.py',
'parallel_image_processing.py',
'advanced_parallel_processing.py'
]
## Run each script and measure time
times = []
for script in scripts:
if os.path.exists(script):
times.append(run_script(script))
else:
print(f"Script {script} not found!")
times.append(0)
## Plot results
if any(times):
labels = ['Sequential', 'Basic Parallel', 'Advanced Parallel']
x = np.arange(len(labels))
plt.figure(figsize=(10, 6))
plt.bar(x, times, color=['red', 'blue', 'green'])
plt.xlabel('Processing Method')
plt.ylabel('Execution Time (s)')
plt.title('Image Processing Performance Comparison')
plt.xticks(x, labels)
## Add execution time as text on bars
for i, v in enumerate(times):
plt.text(i, v + 0.1, f"{v:.2f}s", ha='center')
## Save the plot
plt.tight_layout()
plt.savefig('performance_comparison.png')
print("Performance comparison plot saved as 'performance_comparison.png'")
else:
print("No valid execution times to plot")
if __name__ == "__main__":
main()
Ejecutemos la comparación:
python3 compare_performance.py
Revisa los resultados y examina la imagen performance_comparison.png para ver las diferencias en el tiempo de ejecución entre los diferentes enfoques.
Conclusiones Clave
De este ejemplo del mundo real, puedes observar varios aspectos importantes del multiprocessing:
El procesamiento paralelo puede reducir significativamente el tiempo de ejecución cuando se trata de tareas intensivas en CPU como el procesamiento de imágenes.
El método de paso de argumentos afecta tanto la complejidad del código como el rendimiento:
map()simple para funciones de un solo argumento directas- Funciones parciales para fijar ciertos parámetros
- Técnicas avanzadas para flujos de trabajo complejos
Consideraciones para aplicaciones del mundo real:
- Gestión de recursos (CPU, memoria)
- Estrategias de distribución de tareas
- Manejo de errores en entornos paralelos
- Coordinación entre procesos
Este ejemplo práctico demuestra cómo aplicar los conceptos de multiprocessing que hemos aprendido a lo largo de este laboratorio para resolver problemas del mundo real de manera eficiente.
Resumen
En este laboratorio, aprendiste sobre el multiprocessing de Python y diferentes técnicas para pasar argumentos a procesos paralelos. Aquí hay un resumen de lo que has logrado:
Fundamentos del Multiprocessing: Creaste tu primer programa de
multiprocessingy comprendiste la diferencia entre procesos e hilos.Técnicas de Paso de Argumentos: Exploraste varios métodos para pasar argumentos:
- Paso de argumentos básico con
Process.args - Paso de un solo argumento con
Pool.map() - Múltiples argumentos con
Pool.starmap() - Ejecución dinámica con
apply()yapply_async() - Uso de
functools.partialpara la aplicación parcial de funciones - Memoria compartida para el intercambio eficiente de datos
- Paso de argumentos básico con
Aplicación en el Mundo Real: Aplicaste estos conceptos a un ejemplo práctico de procesamiento de imágenes, demostrando mejoras significativas en el rendimiento a través de la paralelización.
Las habilidades que has aprendido en este laboratorio son valiosas para cualquier aplicación de Python intensiva en CPU, lo que te permite aprovechar toda la potencia computacional de los sistemas de múltiples núcleos. A medida que desarrolles aplicaciones más complejas, recuerda considerar las compensaciones entre las diferentes técnicas de paso de argumentos y elegir el método más apropiado para tu caso de uso específico.



