Como passar argumentos em multiprocessing Python

PythonBeginner
Pratique Agora

Introdução

O módulo multiprocessing do Python permite a computação paralela, aproveitando múltiplos núcleos de CPU, o que pode melhorar significativamente o desempenho computacional para tarefas intensivas em CPU. Este laboratório explora diferentes técnicas para passar argumentos para processos no módulo multiprocessing, abordando desafios comuns na programação concorrente e demonstrando estratégias práticas para uma paralelização eficaz.

Ao final deste laboratório, você entenderá como usar diferentes métodos para passar dados para processos paralelos e aplicar essas técnicas a cenários do mundo real.

Introdução ao Multiprocessing em Python

Nesta etapa, aprenderemos os fundamentos do multiprocessing em Python e criaremos nosso primeiro programa paralelo.

O que é Multiprocessing?

Multiprocessing é um módulo Python que nos permite executar múltiplos processos em paralelo, utilizando efetivamente múltiplos núcleos de CPU. Isso é diferente de threading (encadeamento), que é limitado pelo Global Interpreter Lock (GIL), que impede a verdadeira execução paralela do código Python.

Aqui está uma comparação simples:

Característica Multiprocessing Threading
Memória Espaços de memória separados Espaço de memória compartilhado
Paralelismo Execução paralela verdadeira Limitado pelo GIL
Caso de uso Tarefas com uso intensivo de CPU Tarefas com uso intensivo de I/O
Sobrecarga Maior (processos separados) Menor

Criando Seu Primeiro Programa Multiprocessing

Vamos criar um exemplo simples de multiprocessing. Abra um novo arquivo no WebIDE e nomeie-o simple_process.py:

import multiprocessing
import time
import os

def worker():
    """Simple function to demonstrate a process"""
    print(f"Worker process id: {os.getpid()}")
    print(f"Worker parent process id: {os.getppid()}")
    time.sleep(1)
    print("Worker process completed")

if __name__ == "__main__":
    ## Print information about the main process
    print(f"Main process id: {os.getpid()}")

    ## Create a process
    process = multiprocessing.Process(target=worker)

    ## Start the process
    print("Starting worker process...")
    process.start()

    ## Wait for the process to complete
    process.join()

    print("Main process completed")

Agora, vamos executar este programa:

python3 simple_process.py

Você deve ver uma saída semelhante a:

Main process id: 12345
Starting worker process...
Worker process id: 12346
Worker parent process id: 12345
Worker process completed
Main process completed

Os IDs de processo exatos serão diferentes no seu sistema. Observe como o processo worker tem um ID de processo diferente do processo principal, confirmando que são processos separados.

Entendendo a Criação de Processos

Quando criamos um Process de multiprocessing e o iniciamos, o Python:

  1. Cria um novo processo
  2. Importa o módulo contendo a função alvo
  3. Executa a função alvo no novo processo
  4. Retorna o controle para o processo pai

A proteção if __name__ == "__main__": é importante ao trabalhar com multiprocessing em Python. Isso impede a criação de processos duplicados quando o módulo é importado.

Múltiplos Processos

Vamos modificar nosso exemplo para criar múltiplos processos. Crie um novo arquivo chamado multiple_processes.py:

import multiprocessing
import time
import os

def worker(worker_id):
    """Worker function that accepts an argument"""
    print(f"Worker {worker_id} (PID: {os.getpid()}) started")
    time.sleep(1)
    print(f"Worker {worker_id} completed")
    return worker_id * 2  ## This return value won't be captured

if __name__ == "__main__":
    print(f"Main process PID: {os.getpid()}")

    ## Create multiple processes
    processes = []
    for i in range(3):
        p = multiprocessing.Process(target=worker, args=(i,))
        processes.append(p)
        p.start()

    ## Wait for all processes to complete
    for p in processes:
        p.join()

    print("All processes completed")

Execute este código:

python3 multiple_processes.py

Você deve ver uma saída semelhante a:

Main process PID: 12345
Worker 0 (PID: 12346) started
Worker 1 (PID: 12347) started
Worker 2 (PID: 12348) started
Worker 0 completed
Worker 1 completed
Worker 2 completed
All processes completed

Observe como passamos um argumento (worker_id) para cada processo usando o parâmetro args. Este é nosso primeiro exemplo de passagem de argumentos para um processo.

Observe também que os processos podem não ser necessariamente executados em uma ordem previsível, pois estão sendo executados em paralelo.

Passagem Básica de Argumentos com Pool

Na etapa anterior, passamos um argumento simples para um processo. Agora, vamos explorar uma maneira mais eficiente de lidar com múltiplas tarefas usando a classe Pool e aprender sobre diferentes maneiras de passar argumentos.

A Classe Pool

A classe Pool fornece uma maneira conveniente de paralelizar a execução de uma função em vários valores de entrada. Ela distribui os dados de entrada entre os processos e coleta os resultados.

Vamos criar um exemplo simples para entender como o Pool funciona. Crie um novo arquivo chamado pool_example.py:

import multiprocessing
import time

def square(x):
    """Function that squares a number and returns the result"""
    print(f"Processing {x}...")
    time.sleep(1)  ## Simulate a time-consuming operation
    return x * x

if __name__ == "__main__":
    ## Create a list of numbers to process
    numbers = [1, 2, 3, 4, 5]

    ## Create a Pool with 2 processes
    with multiprocessing.Pool(processes=2) as pool:
        ## Map the square function to the numbers
        start_time = time.time()
        results = pool.map(square, numbers)
        end_time = time.time()

        print(f"Results: {results}")
        print(f"Time taken: {end_time - start_time:.2f} seconds")

    ## Compare with sequential processing
    start_time = time.time()
    sequential_results = [square(n) for n in numbers]
    end_time = time.time()

    print(f"Sequential results: {sequential_results}")
    print(f"Sequential time: {end_time - start_time:.2f} seconds")

Execute o código:

python3 pool_example.py

Você deve ver uma saída semelhante a:

Processing 1...
Processing 2...
Processing 3...
Processing 4...
Processing 5...
Results: [1, 4, 9, 16, 25]
Time taken: 3.01 seconds
Processing 1...
Processing 2...
Processing 3...
Processing 4...
Processing 5...
Sequential results: [1, 4, 9, 16, 25]
Sequential time: 5.01 seconds

Observe como o Pool é mais rápido porque processa vários números em paralelo. Com um tamanho de pool de 2, ele pode processar 2 números por vez, enquanto a abordagem sequencial os processa um após o outro.

Passando Argumentos Únicos com Pool.map()

O método map() é perfeito para quando sua função recebe um único argumento. Ele aplica a função a cada item em um iterável, em paralelo.

Vamos explorar isso mais com outro exemplo. Crie um arquivo chamado pool_map.py:

import multiprocessing
import time
import os

def process_item(item):
    """Process a single item"""
    process_id = os.getpid()
    print(f"Process {process_id} processing item {item}")
    time.sleep(1)  ## Simulate work
    return item * 10

if __name__ == "__main__":
    ## Create a list of items to process
    items = list(range(10))

    ## Get the number of CPU cores available
    num_cores = multiprocessing.cpu_count()
    print(f"System has {num_cores} CPU cores")

    ## Create a Pool with the number of available cores
    with multiprocessing.Pool(processes=num_cores) as pool:
        print("Starting parallel processing...")
        start_time = time.time()

        ## Process all items in parallel
        results = pool.map(process_item, items)

        end_time = time.time()
        print(f"All processing completed in {end_time - start_time:.2f} seconds")
        print(f"Results: {results}")

Execute o código:

python3 pool_map.py

Sua saída deve mostrar os itens sendo processados em paralelo em vários processos, com um tempo significativamente menor que 10 segundos (que seria o tempo de processamento sequencial para 10 itens com um atraso de 1 segundo cada).

Passando Múltiplos Argumentos com Pool.starmap()

E se sua função precisar de múltiplos argumentos? É aqui que o starmap() entra em ação.

Crie um arquivo chamado pool_starmap.py:

import multiprocessing
import time

def process_data(id, value, multiplier):
    """Process data with multiple arguments"""
    print(f"Processing item {id}: {value} with multiplier {multiplier}")
    time.sleep(1)  ## Simulate work
    return value * multiplier

if __name__ == "__main__":
    ## Create a list of argument tuples
    ## Each tuple contains all arguments for one function call
    arguments = [
        (1, 5, 2),    ## id=1, value=5, multiplier=2
        (2, 10, 3),   ## id=2, value=10, multiplier=3
        (3, 15, 2),   ## id=3, value=15, multiplier=2
        (4, 20, 4),   ## id=4, value=20, multiplier=4
        (5, 25, 5)    ## id=5, value=25, multiplier=5
    ]

    ## Create a Pool with 3 processes
    with multiprocessing.Pool(processes=3) as pool:
        print("Starting parallel processing with multiple arguments...")
        start_time = time.time()

        ## Process all items in parallel using starmap
        results = pool.starmap(process_data, arguments)

        end_time = time.time()
        print(f"All processing completed in {end_time - start_time:.2f} seconds")
        print(f"Results: {results}")

Execute o código:

python3 pool_starmap.py

Você deve ver a saída mostrando cada item sendo processado com seu próprio conjunto de argumentos, e a lista de resultados final contendo todos os valores calculados.

Pontos Chave Sobre Pool

  • Pool lida automaticamente com a distribuição de tarefas entre os processos
  • Ele gerencia uma fila de tarefas e as atribui aos processos de trabalho disponíveis
  • Ele coleta os resultados e os retorna na mesma ordem dos dados de entrada
  • O número de processos pode ser otimizado com base nos núcleos de CPU do seu sistema e na natureza da sua tarefa

Técnicas Avançadas de Passagem de Argumentos

Nas etapas anteriores, aprendemos sobre a passagem básica de argumentos com objetos Process e a classe Pool. Agora, vamos explorar técnicas mais avançadas para passar argumentos em cenários de multiprocessing.

Passando Dados Complexos com apply() e apply_async()

Às vezes, você precisa de mais flexibilidade do que map() e starmap() fornecem. Os métodos apply() e apply_async() permitem que você execute uma função com argumentos específicos e lide com os resultados de forma mais dinâmica.

Crie um arquivo chamado pool_apply.py:

import multiprocessing
import time
import random

def process_task(task_id, duration, data):
    """Process a task with multiple arguments including complex data"""
    print(f"Starting task {task_id} with duration {duration}s and data: {data}")
    time.sleep(duration)  ## Simulate variable work time
    result = sum(data) * task_id
    print(f"Task {task_id} completed")
    return result

if __name__ == "__main__":
    ## Create a Pool with 2 processes
    with multiprocessing.Pool(processes=2) as pool:
        print("Starting tasks with apply()...")

        ## Execute tasks with apply() - blocking
        result1 = pool.apply(process_task, args=(1, 2, [1, 2, 3]))
        print(f"Result 1: {result1}")

        result2 = pool.apply(process_task, args=(2, 1, [4, 5, 6]))
        print(f"Result 2: {result2}")

        print("\nStarting tasks with apply_async()...")

        ## Execute tasks with apply_async() - non-blocking
        async_result1 = pool.apply_async(process_task, args=(3, 3, [7, 8, 9]))
        async_result2 = pool.apply_async(process_task, args=(4, 2, [10, 11, 12]))

        ## Do other work while tasks are running
        print("Tasks are running in the background...")
        time.sleep(1)
        print("Main process is doing other work...")

        ## Get results when needed (will wait if not yet available)
        print(f"Result 3: {async_result1.get()}")
        print(f"Result 4: {async_result2.get()}")

Execute o código:

python3 pool_apply.py

A saída mostrará como apply() é bloqueante (sequencial), enquanto apply_async() permite que as tarefas sejam executadas em paralelo.

Usando functools.partial para Aplicação Parcial de Funções

Quando você tem uma função que recebe vários argumentos, mas deseja fixar alguns deles, functools.partial é muito útil.

Crie um arquivo chamado partial_example.py:

import multiprocessing
from functools import partial
import time

def process_with_config(data, config_a, config_b):
    """Process data with specific configuration parameters"""
    print(f"Processing {data} with config_a={config_a}, config_b={config_b}")
    time.sleep(1)
    return data * config_a + config_b

if __name__ == "__main__":
    ## Data to process
    data_items = [1, 2, 3, 4, 5]

    ## Configuration values
    config_a = 10
    config_b = 5

    ## Create a partially applied function with fixed config values
    partial_process = partial(process_with_config, config_a=config_a, config_b=config_b)

    ## Create a Pool
    with multiprocessing.Pool(processes=3) as pool:
        ## Use map with the partially applied function
        print("Starting processing with partial function...")
        results = pool.map(partial_process, data_items)
        print(f"Results: {results}")

Execute o código:

python3 partial_example.py

A saída mostrará como cada item de dados é processado com os mesmos valores de configuração que foram fixados com partial().

Passando Argumentos via Memória Compartilhada

Para grandes dados que não devem ser copiados para cada processo, a memória compartilhada pode ser mais eficiente. Vamos explorar essa técnica.

Crie um arquivo chamado shared_memory_example.py:

import multiprocessing
import numpy as np
import time

def process_array_chunk(array, start_idx, end_idx):
    """Process a chunk of a shared array"""
    print(f"Processing chunk from index {start_idx} to {end_idx}")

    ## Simulate processing each element
    for i in range(start_idx, end_idx):
        array[i] = array[i] ** 2

    time.sleep(1)  ## Simulate additional work
    return start_idx, end_idx

if __name__ == "__main__":
    ## Create a shared array
    array_size = 1000000
    shared_array = multiprocessing.Array('d', array_size)  ## 'd' for double precision

    ## Fill the array with initial values
    temp_array = np.frombuffer(shared_array.get_obj())
    temp_array[:] = np.arange(array_size)

    print(f"Array initialized with size {array_size}")
    print(f"First 5 elements: {temp_array[:5]}")

    ## Define chunk size and create tasks
    num_processes = 4
    chunk_size = array_size // num_processes
    tasks = []

    for i in range(num_processes):
        start = i * chunk_size
        end = start + chunk_size if i < num_processes - 1 else array_size
        tasks.append((start, end))

    ## Process chunks in parallel
    with multiprocessing.Pool(processes=num_processes) as pool:
        ## Create arguments for each task
        args = [(shared_array, start, end) for start, end in tasks]

        print("Starting parallel processing...")
        start_time = time.time()

        ## Using starmap to pass multiple arguments
        results = pool.starmap(process_array_chunk, args)

        end_time = time.time()
        print(f"Processing completed in {end_time - start_time:.2f} seconds")

    ## Verify results
    print(f"First 5 elements after processing: {temp_array[:5]}")
    print(f"Results summary: {results}")

Execute o código:

python3 shared_memory_example.py

A saída mostrará como um grande array é processado em pedaços por diferentes processos, com todos os processos tendo acesso à mesma memória compartilhada.

Comparação de Desempenho

Vamos comparar essas diferentes técnicas para uma tarefa real.

Crie um arquivo chamado performance_comparison.py:

import multiprocessing
import time
import numpy as np
from functools import partial

def process_data_simple(data):
    """Simple processing function"""
    return sum(x ** 2 for x in data)

def process_data_with_config(data, multiplier):
    """Processing with additional configuration"""
    return sum(x ** 2 for x in data) * multiplier

def run_test(test_name, func, data_chunks, pool_size=4, **kwargs):
    """Run a test and measure performance"""
    start_time = time.time()

    with multiprocessing.Pool(processes=pool_size) as pool:
        if kwargs:
            partial_func = partial(func, **kwargs)
            results = pool.map(partial_func, data_chunks)
        else:
            results = pool.map(func, data_chunks)

    end_time = time.time()
    print(f"{test_name} completed in {end_time - start_time:.4f} seconds")
    return results

if __name__ == "__main__":
    ## Create test data
    chunk_size = 1000000
    num_chunks = 8
    data_chunks = [np.random.rand(chunk_size) for _ in range(num_chunks)]

    print(f"Created {num_chunks} data chunks of size {chunk_size} each")

    ## Test 1: Simple map
    results1 = run_test("Simple map", process_data_simple, data_chunks)

    ## Test 2: Using partial
    results2 = run_test("Map with partial", process_data_with_config,
                        data_chunks, multiplier=2)

    ## Test 3: Sequential processing for comparison
    start_time = time.time()
    seq_results = [process_data_simple(chunk) for chunk in data_chunks]
    end_time = time.time()
    print(f"Sequential processing completed in {end_time - start_time:.4f} seconds")

    ## Validate results
    print(f"\nFirst result from each test:")
    print(f"Test 1 first result: {results1[0]:.4f}")
    print(f"Test 2 first result: {results2[0]:.4f} (should be double of Test 1)")
    print(f"Sequential first result: {seq_results[0]:.4f}")

Execute a comparação de desempenho:

python3 performance_comparison.py

A saída mostrará as diferenças de desempenho entre as abordagens, demonstrando os benefícios do processamento paralelo e a sobrecarga de diferentes técnicas de passagem de argumentos.

Aplicação no Mundo Real: Processamento de Imagens Paralelo

Agora que entendemos várias técnicas para passar argumentos em multiprocessing Python, vamos aplicar esses conceitos a um cenário do mundo real: processamento de imagens paralelo. Este é um caso de uso comum onde o multiprocessing pode melhorar significativamente o desempenho.

Configurando o Ambiente

Primeiro, vamos instalar os pacotes necessários:

pip install Pillow numpy

Criando Imagens de Amostra

Vamos criar um script para gerar algumas imagens de amostra para nosso processamento. Crie um arquivo chamado create_images.py:

from PIL import Image
import numpy as np
import os

def create_sample_image(filename, width=800, height=600):
    """Create a sample image with random data"""
    ## Create random array data
    data = np.random.randint(0, 255, (height, width, 3), dtype=np.uint8)

    ## Create image from array
    img = Image.fromarray(data, 'RGB')

    ## Save the image
    img.save(filename)
    print(f"Created image: {filename}")

if __name__ == "__main__":
    ## Create a directory for sample images
    os.makedirs("sample_images", exist_ok=True)

    ## Create 5 sample images
    for i in range(5):
        create_sample_image(f"sample_images/sample_{i+1}.png")

    print("All sample images created successfully")

Execute este script para criar imagens de amostra:

python3 create_images.py

Processamento de Imagens Sequencial

Vamos primeiro implementar uma versão sequencial de nossas tarefas de processamento de imagens. Crie um arquivo chamado sequential_image_processing.py:

from PIL import Image, ImageFilter
import os
import time

def apply_filters(image_path, output_dir):
    """Apply multiple filters to an image and save the results"""
    ## Load the image
    image_name = os.path.basename(image_path)
    print(f"Processing image: {image_name}")
    img = Image.open(image_path)

    ## Apply filters
    ## 1. Blur filter
    start_time = time.time()
    blurred = img.filter(ImageFilter.GaussianBlur(radius=5))
    blurred.save(os.path.join(output_dir, f"blur_{image_name}"))

    ## 2. Edge detection filter
    edges = img.filter(ImageFilter.FIND_EDGES)
    edges.save(os.path.join(output_dir, f"edges_{image_name}"))

    ## 3. Emboss filter
    emboss = img.filter(ImageFilter.EMBOSS)
    emboss.save(os.path.join(output_dir, f"emboss_{image_name}"))

    ## 4. Sharpen filter
    sharpen = img.filter(ImageFilter.SHARPEN)
    sharpen.save(os.path.join(output_dir, f"sharpen_{image_name}"))

    end_time = time.time()
    processing_time = end_time - start_time

    print(f"Completed processing {image_name} in {processing_time:.2f} seconds")
    return image_name, processing_time

def process_images_sequentially(image_dir, output_dir):
    """Process all images in a directory sequentially"""
    ## Ensure output directory exists
    os.makedirs(output_dir, exist_ok=True)

    ## Get all image files
    image_files = [os.path.join(image_dir, f) for f in os.listdir(image_dir)
                   if f.endswith(('.png', '.jpg', '.jpeg'))]

    print(f"Found {len(image_files)} images to process")

    ## Process each image sequentially
    start_time = time.time()
    results = []

    for image_path in image_files:
        result = apply_filters(image_path, output_dir)
        results.append(result)

    end_time = time.time()
    total_time = end_time - start_time

    print(f"\nSequential processing completed")
    print(f"Total processing time: {total_time:.2f} seconds")
    return results

if __name__ == "__main__":
    ## Define directories
    image_dir = "sample_images"
    output_dir = "processed_sequential"

    ## Process images
    results = process_images_sequentially(image_dir, output_dir)

    ## Print summary
    print("\nProcessing summary:")
    for image_name, proc_time in results:
        print(f"  {image_name}: {proc_time:.2f} seconds")

Execute o processamento sequencial:

python3 sequential_image_processing.py

Processamento de Imagens Paralelo

Agora, vamos implementar o processamento de imagens paralelo usando as técnicas que aprendemos. Crie um arquivo chamado parallel_image_processing.py:

from PIL import Image, ImageFilter
import multiprocessing
import os
import time

def apply_filters(args):
    """Apply multiple filters to an image and save the results"""
    image_path, output_dir = args

    ## Load the image
    image_name = os.path.basename(image_path)
    print(f"Process {os.getpid()} processing image: {image_name}")
    img = Image.open(image_path)

    ## Apply filters
    ## 1. Blur filter
    start_time = time.time()
    blurred = img.filter(ImageFilter.GaussianBlur(radius=5))
    blurred.save(os.path.join(output_dir, f"blur_{image_name}"))

    ## 2. Edge detection filter
    edges = img.filter(ImageFilter.FIND_EDGES)
    edges.save(os.path.join(output_dir, f"edges_{image_name}"))

    ## 3. Emboss filter
    emboss = img.filter(ImageFilter.EMBOSS)
    emboss.save(os.path.join(output_dir, f"emboss_{image_name}"))

    ## 4. Sharpen filter
    sharpen = img.filter(ImageFilter.SHARPEN)
    sharpen.save(os.path.join(output_dir, f"sharpen_{image_name}"))

    end_time = time.time()
    processing_time = end_time - start_time

    print(f"Process {os.getpid()} completed {image_name} in {processing_time:.2f} seconds")
    return image_name, processing_time

def process_images_in_parallel(image_dir, output_dir, num_processes=None):
    """Process all images in a directory in parallel"""
    ## Ensure output directory exists
    os.makedirs(output_dir, exist_ok=True)

    ## Get all image files
    image_files = [os.path.join(image_dir, f) for f in os.listdir(image_dir)
                   if f.endswith(('.png', '.jpg', '.jpeg'))]

    print(f"Found {len(image_files)} images to process")

    ## Determine number of processes
    if num_processes is None:
        num_processes = min(multiprocessing.cpu_count(), len(image_files))

    print(f"Using {num_processes} processes for parallel processing")

    ## Create arguments for each task
    task_args = [(image_path, output_dir) for image_path in image_files]

    ## Process images in parallel
    start_time = time.time()

    with multiprocessing.Pool(processes=num_processes) as pool:
        results = pool.map(apply_filters, task_args)

    end_time = time.time()
    total_time = end_time - start_time

    print(f"\nParallel processing completed")
    print(f"Total processing time: {total_time:.2f} seconds")
    return results

if __name__ == "__main__":
    ## Define directories
    image_dir = "sample_images"
    output_dir = "processed_parallel"

    ## Process images
    results = process_images_in_parallel(image_dir, output_dir)

    ## Print summary
    print("\nProcessing summary:")
    for image_name, proc_time in results:
        print(f"  {image_name}: {proc_time:.2f} seconds")

Execute o processamento paralelo:

python3 parallel_image_processing.py

Processamento Paralelo Avançado com Diferentes Filtros

Agora, vamos implementar uma versão mais complexa onde aplicamos diferentes filtros à mesma imagem em paralelo. Crie um arquivo chamado advanced_parallel_processing.py:

from PIL import Image, ImageFilter
import multiprocessing
import os
import time
from functools import partial

def apply_filter(filter_info, image_path, output_dir):
    """Apply a specific filter to an image"""
    filter_name, filter_obj = filter_info

    ## Load the image
    image_name = os.path.basename(image_path)
    print(f"Process {os.getpid()} applying {filter_name} to {image_name}")

    ## Open image each time to avoid potential issues with sharing PIL objects
    img = Image.open(image_path)

    ## Apply filter
    start_time = time.time()
    filtered = img.filter(filter_obj)

    ## Save the filtered image
    output_filename = f"{filter_name}_{image_name}"
    output_path = os.path.join(output_dir, output_filename)
    filtered.save(output_path)

    end_time = time.time()
    processing_time = end_time - start_time

    print(f"Process {os.getpid()} completed {filter_name} on {image_name} in {processing_time:.2f} seconds")
    return filter_name, image_name, processing_time

def process_image_with_multiple_filters(image_path, output_dir, filters):
    """Process an image with multiple filters in parallel"""
    ## Create a partial function with fixed image_path and output_dir
    apply_filter_to_image = partial(apply_filter,
                                   image_path=image_path,
                                   output_dir=output_dir)

    ## Process the image with multiple filters in parallel
    with multiprocessing.Pool() as pool:
        results = pool.map(apply_filter_to_image, filters)

    return results

def main():
    ## Define directories
    image_dir = "sample_images"
    output_dir = "processed_advanced"
    os.makedirs(output_dir, exist_ok=True)

    ## Define filters to apply
    filters = [
        ("blur", ImageFilter.GaussianBlur(radius=5)),
        ("edges", ImageFilter.FIND_EDGES),
        ("emboss", ImageFilter.EMBOSS),
        ("sharpen", ImageFilter.SHARPEN),
        ("contour", ImageFilter.CONTOUR),
        ("detail", ImageFilter.DETAIL),
        ("smooth", ImageFilter.SMOOTH),
        ("smoothmore", ImageFilter.SMOOTH_MORE)
    ]

    ## Get all image files
    image_files = [os.path.join(image_dir, f) for f in os.listdir(image_dir)
                   if f.endswith(('.png', '.jpg', '.jpeg'))]

    print(f"Found {len(image_files)} images to process with {len(filters)} filters each")

    ## Process each image with all filters
    all_results = []
    overall_start = time.time()

    for image_path in image_files:
        image_name = os.path.basename(image_path)
        print(f"\nProcessing {image_name} with {len(filters)} filters in parallel")

        start_time = time.time()
        results = process_image_with_multiple_filters(image_path, output_dir, filters)
        end_time = time.time()

        image_time = end_time - start_time
        print(f"Completed all filters for {image_name} in {image_time:.2f} seconds")

        all_results.extend(results)

    overall_end = time.time()
    total_time = overall_end - overall_start

    ## Print summary
    print("\nProcessing Summary:")
    print(f"Total processing time: {total_time:.2f} seconds")
    print(f"Total operations: {len(all_results)}")

    ## Calculate average time per filter
    avg_time = sum(time for _, _, time in all_results) / len(all_results)
    print(f"Average processing time per filter: {avg_time:.2f} seconds")

if __name__ == "__main__":
    main()

Execute o processamento paralelo avançado:

python3 advanced_parallel_processing.py

Comparando o Desempenho

Vamos criar um script para comparar o desempenho das diferentes abordagens. Crie um arquivo chamado compare_performance.py:

import os
import time
import subprocess
import matplotlib.pyplot as plt
import numpy as np

def run_script(script_name):
    """Run a Python script and measure its execution time"""
    print(f"Running {script_name}...")
    start_time = time.time()

    ## Run the script
    process = subprocess.run(['python3', script_name],
                            capture_output=True, text=True)

    end_time = time.time()
    execution_time = end_time - start_time

    print(f"Completed {script_name} in {execution_time:.2f} seconds")
    print(f"Output: {process.stdout[-200:]}") ## Show last part of output

    return execution_time

def main():
    ## Scripts to compare
    scripts = [
        'sequential_image_processing.py',
        'parallel_image_processing.py',
        'advanced_parallel_processing.py'
    ]

    ## Run each script and measure time
    times = []
    for script in scripts:
        if os.path.exists(script):
            times.append(run_script(script))
        else:
            print(f"Script {script} not found!")
            times.append(0)

    ## Plot results
    if any(times):
        labels = ['Sequencial', 'Paralelo Básico', 'Paralelo Avançado']
        x = np.arange(len(labels))

        plt.figure(figsize=(10, 6))
        plt.bar(x, times, color=['red', 'blue', 'green'])
        plt.xlabel('Método de Processamento')
        plt.ylabel('Tempo de Execução (s)')
        plt.title('Comparação de Desempenho de Processamento de Imagens')
        plt.xticks(x, labels)

        ## Add execution time as text on bars
        for i, v in enumerate(times):
            plt.text(i, v + 0.1, f"{v:.2f}s", ha='center')

        ## Save the plot
        plt.tight_layout()
        plt.savefig('performance_comparison.png')
        print("Gráfico de comparação de desempenho salvo como 'performance_comparison.png'")
    else:
        print("Nenhum tempo de execução válido para plotar")

if __name__ == "__main__":
    main()

Vamos executar a comparação:

python3 compare_performance.py

Revise os resultados e examine a imagem performance_comparison.png para ver as diferenças no tempo de execução entre as diferentes abordagens.

Principais Conclusões

Deste exemplo do mundo real, você pode observar vários aspectos importantes do multiprocessing:

  1. O processamento paralelo pode reduzir significativamente o tempo de execução ao lidar com tarefas intensivas em CPU, como o processamento de imagens.

  2. O método de passagem de argumentos afeta a complexidade do código e o desempenho:

    • map() simples para funções de argumento único diretas
    • Funções parciais para fixar certos parâmetros
    • Técnicas avançadas para fluxos de trabalho complexos
  3. Considerações para aplicações do mundo real:

    • Gerenciamento de recursos (CPU, memória)
    • Estratégias de distribuição de tarefas
    • Tratamento de erros em ambientes paralelos
    • Coordenação entre processos

Este exemplo prático demonstra como aplicar os conceitos de multiprocessing que aprendemos ao longo deste laboratório para resolver problemas do mundo real de forma eficiente.

Resumo

Neste laboratório, você aprendeu sobre multiprocessing em Python e diferentes técnicas para passar argumentos para processos paralelos. Aqui está um resumo do que você realizou:

  1. Fundamentos de Multiprocessing: Você criou seu primeiro programa de multiprocessing e entendeu a diferença entre processos e threads.

  2. Técnicas de Passagem de Argumentos: Você explorou vários métodos para passar argumentos:

    • Passagem básica de argumentos com Process.args
    • Passagem de argumento único com Pool.map()
    • Múltiplos argumentos com Pool.starmap()
    • Execução dinâmica com apply() e apply_async()
    • Usando functools.partial para aplicação parcial de funções
    • Memória compartilhada para compartilhamento eficiente de dados
  3. Aplicação no Mundo Real: Você aplicou esses conceitos a um exemplo prático de processamento de imagens, demonstrando melhorias significativas de desempenho por meio da paralelização.

As habilidades que você aprendeu neste laboratório são valiosas para qualquer aplicação Python intensiva em CPU, permitindo que você aproveite todo o poder computacional de sistemas multi-core. Ao desenvolver aplicações mais complexas, lembre-se de considerar as compensações entre diferentes técnicas de passagem de argumentos e escolher o método mais apropriado para seu caso de uso específico.