Python Multiprocessing 에서 인수를 전달하는 방법

PythonBeginner
지금 연습하기

소개

Python 의 multiprocessing 모듈은 여러 CPU 코어를 활용하여 병렬 컴퓨팅을 가능하게 하며, 이는 CPU 집약적인 작업의 계산 성능을 크게 향상시킬 수 있습니다. 이 랩에서는 multiprocessing 모듈에서 프로세스에 인수를 전달하는 다양한 기술을 살펴보고, 동시 프로그래밍의 일반적인 문제점을 해결하며, 효과적인 병렬화를 위한 실용적인 전략을 시연합니다.

이 랩을 마치면 병렬 프로세스에 데이터를 전달하는 다양한 방법을 이해하고 이러한 기술을 실제 시나리오에 적용하는 방법을 알게 될 것입니다.

Python 의 Multiprocessing 소개

이 단계에서는 Python 에서 multiprocessing 의 기본 사항을 배우고 첫 번째 병렬 프로그램을 만들 것입니다.

Multiprocessing 이란 무엇인가요?

Multiprocessing 은 여러 프로세스를 병렬로 실행하여 여러 CPU 코어를 효과적으로 사용할 수 있게 해주는 Python 모듈입니다. 이는 Python 코드의 진정한 병렬 실행을 방지하는 Global Interpreter Lock (GIL) 에 의해 제한되는 threading 과는 다릅니다.

다음은 간단한 비교입니다.

기능 Multiprocessing Threading
메모리 별도의 메모리 공간 공유 메모리 공간
병렬성 진정한 병렬 실행 GIL 에 의해 제한됨
사용 사례 CPU 집약적 작업 I/O 집약적 작업
오버헤드 더 높음 (별도의 프로세스) 더 낮음

첫 번째 Multiprocessing 프로그램 만들기

간단한 multiprocessing 예제를 만들어 보겠습니다. WebIDE 에서 새 파일을 열고 이름을 simple_process.py로 지정합니다.

import multiprocessing
import time
import os

def worker():
    """Simple function to demonstrate a process"""
    print(f"Worker process id: {os.getpid()}")
    print(f"Worker parent process id: {os.getppid()}")
    time.sleep(1)
    print("Worker process completed")

if __name__ == "__main__":
    ## Print information about the main process
    print(f"Main process id: {os.getpid()}")

    ## Create a process
    process = multiprocessing.Process(target=worker)

    ## Start the process
    print("Starting worker process...")
    process.start()

    ## Wait for the process to complete
    process.join()

    print("Main process completed")

이제 이 프로그램을 실행해 보겠습니다.

python3 simple_process.py

다음과 유사한 출력을 볼 수 있습니다.

Main process id: 12345
Starting worker process...
Worker process id: 12346
Worker parent process id: 12345
Worker process completed
Main process completed

정확한 프로세스 ID 는 시스템에 따라 다릅니다. 작업자 프로세스가 메인 프로세스와 다른 프로세스 ID 를 가지고 있음을 확인하여 별도의 프로세스임을 확인합니다.

프로세스 생성 이해

multiprocessing Process 를 생성하고 시작하면 Python 은 다음을 수행합니다.

  1. 새 프로세스를 생성합니다.
  2. 대상 함수를 포함하는 모듈을 가져옵니다.
  3. 새 프로세스에서 대상 함수를 실행합니다.
  4. 상위 프로세스로 제어를 반환합니다.

if __name__ == "__main__": 가드는 Python 에서 multiprocessing 을 사용할 때 중요합니다. 이는 모듈을 가져올 때 중복 프로세스가 생성되는 것을 방지합니다.

여러 프로세스

여러 프로세스를 생성하도록 예제를 수정해 보겠습니다. multiple_processes.py라는 새 파일을 만듭니다.

import multiprocessing
import time
import os

def worker(worker_id):
    """Worker function that accepts an argument"""
    print(f"Worker {worker_id} (PID: {os.getpid()}) started")
    time.sleep(1)
    print(f"Worker {worker_id} completed")
    return worker_id * 2  ## This return value won't be captured

if __name__ == "__main__":
    print(f"Main process PID: {os.getpid()}")

    ## Create multiple processes
    processes = []
    for i in range(3):
        p = multiprocessing.Process(target=worker, args=(i,))
        processes.append(p)
        p.start()

    ## Wait for all processes to complete
    for p in processes:
        p.join()

    print("All processes completed")

이 코드를 실행합니다.

python3 multiple_processes.py

다음과 유사한 출력을 볼 수 있습니다.

Main process PID: 12345
Worker 0 (PID: 12346) started
Worker 1 (PID: 12347) started
Worker 2 (PID: 12348) started
Worker 0 completed
Worker 1 completed
Worker 2 completed
All processes completed

args 매개변수를 사용하여 각 프로세스에 인수 (worker_id) 를 전달했음을 확인하십시오. 이것이 프로세스에 인수를 전달하는 첫 번째 예입니다.

또한 프로세스가 병렬로 실행되므로 반드시 예측 가능한 순서로 실행되지 않을 수 있습니다.

Pool 을 사용한 기본 인수 전달

이전 단계에서는 간단한 인수를 프로세스에 전달했습니다. 이제 Pool 클래스를 사용하여 여러 작업을 처리하는 보다 효율적인 방법을 살펴보고 인수를 전달하는 다양한 방법에 대해 알아보겠습니다.

Pool 클래스

Pool 클래스는 여러 입력 값에 걸쳐 함수의 실행을 병렬화하는 편리한 방법을 제공합니다. 입력 데이터를 프로세스에 분산시키고 결과를 수집합니다.

Pool이 어떻게 작동하는지 이해하기 위해 간단한 예제를 만들어 보겠습니다. pool_example.py라는 새 파일을 만듭니다.

import multiprocessing
import time

def square(x):
    """Function that squares a number and returns the result"""
    print(f"Processing {x}...")
    time.sleep(1)  ## Simulate a time-consuming operation
    return x * x

if __name__ == "__main__":
    ## Create a list of numbers to process
    numbers = [1, 2, 3, 4, 5]

    ## Create a Pool with 2 processes
    with multiprocessing.Pool(processes=2) as pool:
        ## Map the square function to the numbers
        start_time = time.time()
        results = pool.map(square, numbers)
        end_time = time.time()

        print(f"Results: {results}")
        print(f"Time taken: {end_time - start_time:.2f} seconds")

    ## Compare with sequential processing
    start_time = time.time()
    sequential_results = [square(n) for n in numbers]
    end_time = time.time()

    print(f"Sequential results: {sequential_results}")
    print(f"Sequential time: {end_time - start_time:.2f} seconds")

코드를 실행합니다.

python3 pool_example.py

다음과 유사한 출력을 볼 수 있습니다.

Processing 1...
Processing 2...
Processing 3...
Processing 4...
Processing 5...
Results: [1, 4, 9, 16, 25]
Time taken: 3.01 seconds
Processing 1...
Processing 2...
Processing 3...
Processing 4...
Processing 5...
Sequential results: [1, 4, 9, 16, 25]
Sequential time: 5.01 seconds

Pool이 여러 숫자를 병렬로 처리하기 때문에 더 빠르다는 것을 알 수 있습니다. 풀 크기가 2 이면 한 번에 2 개의 숫자를 처리할 수 있는 반면, 순차적 접근 방식은 숫자를 하나씩 처리합니다.

Pool.map() 을 사용한 단일 인수 전달

map() 메서드는 함수가 단일 인수를 사용하는 경우에 적합합니다. 반복 가능한 객체의 각 항목에 함수를 병렬로 적용합니다.

다른 예제를 통해 자세히 살펴보겠습니다. pool_map.py라는 파일을 만듭니다.

import multiprocessing
import time
import os

def process_item(item):
    """Process a single item"""
    process_id = os.getpid()
    print(f"Process {process_id} processing item {item}")
    time.sleep(1)  ## Simulate work
    return item * 10

if __name__ == "__main__":
    ## Create a list of items to process
    items = list(range(10))

    ## Get the number of CPU cores available
    num_cores = multiprocessing.cpu_count()
    print(f"System has {num_cores} CPU cores")

    ## Create a Pool with the number of available cores
    with multiprocessing.Pool(processes=num_cores) as pool:
        print("Starting parallel processing...")
        start_time = time.time()

        ## Process all items in parallel
        results = pool.map(process_item, items)

        end_time = time.time()
        print(f"All processing completed in {end_time - start_time:.2f} seconds")
        print(f"Results: {results}")

코드를 실행합니다.

python3 pool_map.py

출력은 여러 프로세스에서 병렬로 처리되는 항목을 표시하며, 시간은 10 초보다 훨씬 짧습니다 (각 항목에 1 초 지연이 있는 10 개의 항목에 대한 순차적 처리 시간).

Pool.starmap() 을 사용한 여러 인수 전달

함수에 여러 인수가 필요한 경우 어떻게 해야 할까요? 이럴 때 starmap()이 사용됩니다.

pool_starmap.py라는 파일을 만듭니다.

import multiprocessing
import time

def process_data(id, value, multiplier):
    """Process data with multiple arguments"""
    print(f"Processing item {id}: {value} with multiplier {multiplier}")
    time.sleep(1)  ## Simulate work
    return value * multiplier

if __name__ == "__main__":
    ## Create a list of argument tuples
    ## Each tuple contains all arguments for one function call
    arguments = [
        (1, 5, 2),    ## id=1, value=5, multiplier=2
        (2, 10, 3),   ## id=2, value=10, multiplier=3
        (3, 15, 2),   ## id=3, value=15, multiplier=2
        (4, 20, 4),   ## id=4, value=20, multiplier=4
        (5, 25, 5)    ## id=5, value=25, multiplier=5
    ]

    ## Create a Pool with 3 processes
    with multiprocessing.Pool(processes=3) as pool:
        print("Starting parallel processing with multiple arguments...")
        start_time = time.time()

        ## Process all items in parallel using starmap
        results = pool.starmap(process_data, arguments)

        end_time = time.time()
        print(f"All processing completed in {end_time - start_time:.2f} seconds")
        print(f"Results: {results}")

코드를 실행합니다.

python3 pool_starmap.py

각 항목이 자체 인수 집합으로 처리되고 최종 결과 목록에 모든 계산된 값이 포함되어 있음을 보여주는 출력을 볼 수 있습니다.

Pool 에 대한 주요 사항

  • Pool은 자동으로 프로세스 간에 작업 분배를 처리합니다.
  • 작업 대기열을 관리하고 사용 가능한 작업자 프로세스에 할당합니다.
  • 결과를 수집하고 입력 데이터와 동일한 순서로 반환합니다.
  • 프로세스 수는 시스템의 CPU 코어 및 작업의 특성에 따라 최적화할 수 있습니다.

고급 인수 전달 기술

이전 단계에서는 Process 객체와 Pool 클래스를 사용한 기본 인수 전달에 대해 배웠습니다. 이제 multiprocessing 시나리오에서 인수를 전달하는 더 고급 기술을 살펴보겠습니다.

apply() 및 apply_async() 를 사용한 복잡한 데이터 전달

때로는 map()starmap()이 제공하는 것보다 더 많은 유연성이 필요합니다. apply()apply_async() 메서드를 사용하면 특정 인수로 함수를 실행하고 결과를 보다 동적으로 처리할 수 있습니다.

pool_apply.py라는 파일을 만듭니다.

import multiprocessing
import time
import random

def process_task(task_id, duration, data):
    """Process a task with multiple arguments including complex data"""
    print(f"Starting task {task_id} with duration {duration}s and data: {data}")
    time.sleep(duration)  ## Simulate variable work time
    result = sum(data) * task_id
    print(f"Task {task_id} completed")
    return result

if __name__ == "__main__":
    ## Create a Pool with 2 processes
    with multiprocessing.Pool(processes=2) as pool:
        print("Starting tasks with apply()...")

        ## Execute tasks with apply() - blocking
        result1 = pool.apply(process_task, args=(1, 2, [1, 2, 3]))
        print(f"Result 1: {result1}")

        result2 = pool.apply(process_task, args=(2, 1, [4, 5, 6]))
        print(f"Result 2: {result2}")

        print("\nStarting tasks with apply_async()...")

        ## Execute tasks with apply_async() - non-blocking
        async_result1 = pool.apply_async(process_task, args=(3, 3, [7, 8, 9]))
        async_result2 = pool.apply_async(process_task, args=(4, 2, [10, 11, 12]))

        ## Do other work while tasks are running
        print("Tasks are running in the background...")
        time.sleep(1)
        print("Main process is doing other work...")

        ## Get results when needed (will wait if not yet available)
        print(f"Result 3: {async_result1.get()}")
        print(f"Result 4: {async_result2.get()}")

코드를 실행합니다.

python3 pool_apply.py

출력은 apply()가 블로킹 (순차적) 인 반면 apply_async()가 작업을 병렬로 실행할 수 있도록 하는 방식을 보여줍니다.

functools.partial 을 사용한 부분 함수 적용

여러 인수를 사용하지만 그 중 일부를 고정하려는 경우 functools.partial이 매우 유용합니다.

partial_example.py라는 파일을 만듭니다.

import multiprocessing
from functools import partial
import time

def process_with_config(data, config_a, config_b):
    """Process data with specific configuration parameters"""
    print(f"Processing {data} with config_a={config_a}, config_b={config_b}")
    time.sleep(1)
    return data * config_a + config_b

if __name__ == "__main__":
    ## Data to process
    data_items = [1, 2, 3, 4, 5]

    ## Configuration values
    config_a = 10
    config_b = 5

    ## Create a partially applied function with fixed config values
    partial_process = partial(process_with_config, config_a=config_a, config_b=config_b)

    ## Create a Pool
    with multiprocessing.Pool(processes=3) as pool:
        ## Use map with the partially applied function
        print("Starting processing with partial function...")
        results = pool.map(partial_process, data_items)
        print(f"Results: {results}")

코드를 실행합니다.

python3 partial_example.py

출력은 각 데이터 항목이 partial()로 고정된 동일한 구성 값으로 처리되는 방식을 보여줍니다.

공유 메모리를 통한 인수 전달

각 프로세스에 대해 복사해서는 안 되는 대용량 데이터의 경우 공유 메모리가 더 효율적일 수 있습니다. 이 기술을 살펴보겠습니다.

shared_memory_example.py라는 파일을 만듭니다.

import multiprocessing
import numpy as np
import time

def process_array_chunk(array, start_idx, end_idx):
    """Process a chunk of a shared array"""
    print(f"Processing chunk from index {start_idx} to {end_idx}")

    ## Simulate processing each element
    for i in range(start_idx, end_idx):
        array[i] = array[i] ** 2

    time.sleep(1)  ## Simulate additional work
    return start_idx, end_idx

if __name__ == "__main__":
    ## Create a shared array
    array_size = 1000000
    shared_array = multiprocessing.Array('d', array_size)  ## 'd' for double precision

    ## Fill the array with initial values
    temp_array = np.frombuffer(shared_array.get_obj())
    temp_array[:] = np.arange(array_size)

    print(f"Array initialized with size {array_size}")
    print(f"First 5 elements: {temp_array[:5]}")

    ## Define chunk size and create tasks
    num_processes = 4
    chunk_size = array_size // num_processes
    tasks = []

    for i in range(num_processes):
        start = i * chunk_size
        end = start + chunk_size if i < num_processes - 1 else array_size
        tasks.append((start, end))

    ## Process chunks in parallel
    with multiprocessing.Pool(processes=num_processes) as pool:
        ## Create arguments for each task
        args = [(shared_array, start, end) for start, end in tasks]

        print("Starting parallel processing...")
        start_time = time.time()

        ## Using starmap to pass multiple arguments
        results = pool.starmap(process_array_chunk, args)

        end_time = time.time()
        print(f"Processing completed in {end_time - start_time:.2f} seconds")

    ## Verify results
    print(f"First 5 elements after processing: {temp_array[:5]}")
    print(f"Results summary: {results}")

코드를 실행합니다.

python3 shared_memory_example.py

출력은 대형 배열이 서로 다른 프로세스에 의해 청크로 처리되는 방식을 보여주며, 모든 프로세스가 동일한 공유 메모리에 액세스할 수 있습니다.

성능 비교

실제 작업에 대해 이러한 다양한 기술을 비교해 보겠습니다.

performance_comparison.py라는 파일을 만듭니다.

import multiprocessing
import time
import numpy as np
from functools import partial

def process_data_simple(data):
    """Simple processing function"""
    return sum(x ** 2 for x in data)

def process_data_with_config(data, multiplier):
    """Processing with additional configuration"""
    return sum(x ** 2 for x in data) * multiplier

def run_test(test_name, func, data_chunks, pool_size=4, **kwargs):
    """Run a test and measure performance"""
    start_time = time.time()

    with multiprocessing.Pool(processes=pool_size) as pool:
        if kwargs:
            partial_func = partial(func, **kwargs)
            results = pool.map(partial_func, data_chunks)
        else:
            results = pool.map(func, data_chunks)

    end_time = time.time()
    print(f"{test_name} completed in {end_time - start_time:.4f} seconds")
    return results

if __name__ == "__main__":
    ## Create test data
    chunk_size = 1000000
    num_chunks = 8
    data_chunks = [np.random.rand(chunk_size) for _ in range(num_chunks)]

    print(f"Created {num_chunks} data chunks of size {chunk_size} each")

    ## Test 1: Simple map
    results1 = run_test("Simple map", process_data_simple, data_chunks)

    ## Test 2: Using partial
    results2 = run_test("Map with partial", process_data_with_config,
                        data_chunks, multiplier=2)

    ## Test 3: Sequential processing for comparison
    start_time = time.time()
    seq_results = [process_data_simple(chunk) for chunk in data_chunks]
    end_time = time.time()
    print(f"Sequential processing completed in {end_time - start_time:.4f} seconds")

    ## Validate results
    print(f"\nFirst result from each test:")
    print(f"Test 1 first result: {results1[0]:.4f}")
    print(f"Test 2 first result: {results2[0]:.4f} (should be double of Test 1)")
    print(f"Sequential first result: {seq_results[0]:.4f}")

성능 비교를 실행합니다.

python3 performance_comparison.py

출력은 접근 방식 간의 성능 차이를 보여주며, 병렬 처리의 이점과 다양한 인수 전달 기술의 오버헤드를 보여줍니다.

실제 적용: 병렬 이미지 처리

이제 Python multiprocessing 에서 인수를 전달하는 다양한 기술을 이해했으므로 이러한 개념을 실제 시나리오, 즉 병렬 이미지 처리에 적용해 보겠습니다. 이는 multiprocessing 이 성능을 크게 향상시킬 수 있는 일반적인 사용 사례입니다.

환경 설정

먼저 필요한 패키지를 설치해 보겠습니다.

pip install Pillow numpy

샘플 이미지 생성

처리할 샘플 이미지를 생성하는 스크립트를 만들어 보겠습니다. create_images.py라는 파일을 만듭니다.

from PIL import Image
import numpy as np
import os

def create_sample_image(filename, width=800, height=600):
    """Create a sample image with random data"""
    ## Create random array data
    data = np.random.randint(0, 255, (height, width, 3), dtype=np.uint8)

    ## Create image from array
    img = Image.fromarray(data, 'RGB')

    ## Save the image
    img.save(filename)
    print(f"Created image: {filename}")

if __name__ == "__main__":
    ## Create a directory for sample images
    os.makedirs("sample_images", exist_ok=True)

    ## Create 5 sample images
    for i in range(5):
        create_sample_image(f"sample_images/sample_{i+1}.png")

    print("All sample images created successfully")

이 스크립트를 실행하여 샘플 이미지를 생성합니다.

python3 create_images.py

순차적 이미지 처리

먼저 이미지 처리 작업의 순차적 버전을 구현해 보겠습니다. sequential_image_processing.py라는 파일을 만듭니다.

from PIL import Image, ImageFilter
import os
import time

def apply_filters(image_path, output_dir):
    """Apply multiple filters to an image and save the results"""
    ## Load the image
    image_name = os.path.basename(image_path)
    print(f"Processing image: {image_name}")
    img = Image.open(image_path)

    ## Apply filters
    ## 1. Blur filter
    start_time = time.time()
    blurred = img.filter(ImageFilter.GaussianBlur(radius=5))
    blurred.save(os.path.join(output_dir, f"blur_{image_name}"))

    ## 2. Edge detection filter
    edges = img.filter(ImageFilter.FIND_EDGES)
    edges.save(os.path.join(output_dir, f"edges_{image_name}"))

    ## 3. Emboss filter
    emboss = img.filter(ImageFilter.EMBOSS)
    emboss.save(os.path.join(output_dir, f"emboss_{image_name}"))

    ## 4. Sharpen filter
    sharpen = img.filter(ImageFilter.SHARPEN)
    sharpen.save(os.path.join(output_dir, f"sharpen_{image_name}"))

    end_time = time.time()
    processing_time = end_time - start_time

    print(f"Completed processing {image_name} in {processing_time:.2f} seconds")
    return image_name, processing_time

def process_images_sequentially(image_dir, output_dir):
    """Process all images in a directory sequentially"""
    ## Ensure output directory exists
    os.makedirs(output_dir, exist_ok=True)

    ## Get all image files
    image_files = [os.path.join(image_dir, f) for f in os.listdir(image_dir)
                   if f.endswith(('.png', '.jpg', '.jpeg'))]

    print(f"Found {len(image_files)} images to process")

    ## Process each image sequentially
    start_time = time.time()
    results = []

    for image_path in image_files:
        result = apply_filters(image_path, output_dir)
        results.append(result)

    end_time = time.time()
    total_time = end_time - start_time

    print(f"\nSequential processing completed")
    print(f"Total processing time: {total_time:.2f} seconds")
    return results

if __name__ == "__main__":
    ## Define directories
    image_dir = "sample_images"
    output_dir = "processed_sequential"

    ## Process images
    results = process_images_sequentially(image_dir, output_dir)

    ## Print summary
    print("\nProcessing summary:")
    for image_name, proc_time in results:
        print(f"  {image_name}: {proc_time:.2f} seconds")

순차적 처리를 실행합니다.

python3 sequential_image_processing.py

병렬 이미지 처리

이제 배운 기술을 사용하여 병렬 이미지 처리를 구현해 보겠습니다. parallel_image_processing.py라는 파일을 만듭니다.

from PIL import Image, ImageFilter
import multiprocessing
import os
import time

def apply_filters(args):
    """Apply multiple filters to an image and save the results"""
    image_path, output_dir = args

    ## Load the image
    image_name = os.path.basename(image_path)
    print(f"Process {os.getpid()} processing image: {image_name}")
    img = Image.open(image_path)

    ## Apply filters
    ## 1. Blur filter
    start_time = time.time()
    blurred = img.filter(ImageFilter.GaussianBlur(radius=5))
    blurred.save(os.path.join(output_dir, f"blur_{image_name}"))

    ## 2. Edge detection filter
    edges = img.filter(ImageFilter.FIND_EDGES)
    edges.save(os.path.join(output_dir, f"edges_{image_name}"))

    ## 3. Emboss filter
    emboss = img.filter(ImageFilter.EMBOSS)
    emboss.save(os.path.join(output_dir, f"emboss_{image_name}"))

    ## 4. Sharpen filter
    sharpen = img.filter(ImageFilter.SHARPEN)
    sharpen.save(os.path.join(output_dir, f"sharpen_{image_name}"))

    end_time = time.time()
    processing_time = end_time - start_time

    print(f"Process {os.getpid()} completed {image_name} in {processing_time:.2f} seconds")
    return image_name, processing_time

def process_images_in_parallel(image_dir, output_dir, num_processes=None):
    """Process all images in a directory in parallel"""
    ## Ensure output directory exists
    os.makedirs(output_dir, exist_ok=True)

    ## Get all image files
    image_files = [os.path.join(image_dir, f) for f in os.listdir(image_dir)
                   if f.endswith(('.png', '.jpg', '.jpeg'))]

    print(f"Found {len(image_files)} images to process")

    ## Determine number of processes
    if num_processes is None:
        num_processes = min(multiprocessing.cpu_count(), len(image_files))

    print(f"Using {num_processes} processes for parallel processing")

    ## Create arguments for each task
    task_args = [(image_path, output_dir) for image_path in image_files]

    ## Process images in parallel
    start_time = time.time()

    with multiprocessing.Pool(processes=num_processes) as pool:
        results = pool.map(apply_filters, task_args)

    end_time = time.time()
    total_time = end_time - start_time

    print(f"\nParallel processing completed")
    print(f"Total processing time: {total_time:.2f} seconds")
    return results

if __name__ == "__main__":
    ## Define directories
    image_dir = "sample_images"
    output_dir = "processed_parallel"

    ## Process images
    results = process_images_in_parallel(image_dir, output_dir)

    ## Print summary
    print("\nProcessing summary:")
    for image_name, proc_time in results:
        print(f"  {image_name}: {proc_time:.2f} seconds")

병렬 처리를 실행합니다.

python3 parallel_image_processing.py

다양한 필터를 사용한 고급 병렬 처리

이제 동일한 이미지에 서로 다른 필터를 병렬로 적용하는 보다 복잡한 버전을 구현해 보겠습니다. advanced_parallel_processing.py라는 파일을 만듭니다.

from PIL import Image, ImageFilter
import multiprocessing
import os
import time
from functools import partial

def apply_filter(filter_info, image_path, output_dir):
    """Apply a specific filter to an image"""
    filter_name, filter_obj = filter_info

    ## Load the image
    image_name = os.path.basename(image_path)
    print(f"Process {os.getpid()} applying {filter_name} to {image_name}")

    ## Open image each time to avoid potential issues with sharing PIL objects
    img = Image.open(image_path)

    ## Apply filter
    start_time = time.time()
    filtered = img.filter(filter_obj)

    ## Save the filtered image
    output_filename = f"{filter_name}_{image_name}"
    output_path = os.path.join(output_dir, output_filename)
    filtered.save(output_path)

    end_time = time.time()
    processing_time = end_time - start_time

    print(f"Process {os.getpid()} completed {filter_name} on {image_name} in {processing_time:.2f} seconds")
    return filter_name, image_name, processing_time

def process_image_with_multiple_filters(image_path, output_dir, filters):
    """Process an image with multiple filters in parallel"""
    ## Create a partial function with fixed image_path and output_dir
    apply_filter_to_image = partial(apply_filter,
                                   image_path=image_path,
                                   output_dir=output_dir)

    ## Process the image with multiple filters in parallel
    with multiprocessing.Pool() as pool:
        results = pool.map(apply_filter_to_image, filters)

    return results

def main():
    ## Define directories
    image_dir = "sample_images"
    output_dir = "processed_advanced"
    os.makedirs(output_dir, exist_ok=True)

    ## Define filters to apply
    filters = [
        ("blur", ImageFilter.GaussianBlur(radius=5)),
        ("edges", ImageFilter.FIND_EDGES),
        ("emboss", ImageFilter.EMBOSS),
        ("sharpen", ImageFilter.SHARPEN),
        ("contour", ImageFilter.CONTOUR),
        ("detail", ImageFilter.DETAIL),
        ("smooth", ImageFilter.SMOOTH),
        ("smoothmore", ImageFilter.SMOOTH_MORE)
    ]

    ## Get all image files
    image_files = [os.path.join(image_dir, f) for f in os.listdir(image_dir)
                   if f.endswith(('.png', '.jpg', '.jpeg'))]

    print(f"Found {len(image_files)} images to process with {len(filters)} filters each")

    ## Process each image with all filters
    all_results = []
    overall_start = time.time()

    for image_path in image_files:
        image_name = os.path.basename(image_path)
        print(f"\nProcessing {image_name} with {len(filters)} filters in parallel")

        start_time = time.time()
        results = process_image_with_multiple_filters(image_path, output_dir, filters)
        end_time = time.time()

        image_time = end_time - start_time
        print(f"Completed all filters for {image_name} in {image_time:.2f} seconds")

        all_results.extend(results)

    overall_end = time.time()
    total_time = overall_end - overall_start

    ## Print summary
    print("\nProcessing Summary:")
    print(f"Total processing time: {total_time:.2f} seconds")
    print(f"Total operations: {len(all_results)}")

    ## Calculate average time per filter
    avg_time = sum(time for _, _, time in all_results) / len(all_results)
    print(f"Average processing time per filter: {avg_time:.2f} seconds")

if __name__ == "__main__":
    main()

고급 병렬 처리를 실행합니다.

python3 advanced_parallel_processing.py

성능 비교

다양한 접근 방식의 성능을 비교하는 스크립트를 만들어 보겠습니다. compare_performance.py라는 파일을 만듭니다.

import os
import time
import subprocess
import matplotlib.pyplot as plt
import numpy as np

def run_script(script_name):
    """Run a Python script and measure its execution time"""
    print(f"Running {script_name}...")
    start_time = time.time()

    ## Run the script
    process = subprocess.run(['python3', script_name],
                            capture_output=True, text=True)

    end_time = time.time()
    execution_time = end_time - start_time

    print(f"Completed {script_name} in {execution_time:.2f} seconds")
    print(f"Output: {process.stdout[-200:]}") ## Show last part of output

    return execution_time

def main():
    ## Scripts to compare
    scripts = [
        'sequential_image_processing.py',
        'parallel_image_processing.py',
        'advanced_parallel_processing.py'
    ]

    ## Run each script and measure time
    times = []
    for script in scripts:
        if os.path.exists(script):
            times.append(run_script(script))
        else:
            print(f"Script {script} not found!")
            times.append(0)

    ## Plot results
    if any(times):
        labels = ['Sequential', 'Basic Parallel', 'Advanced Parallel']
        x = np.arange(len(labels))

        plt.figure(figsize=(10, 6))
        plt.bar(x, times, color=['red', 'blue', 'green'])
        plt.xlabel('Processing Method')
        plt.ylabel('Execution Time (s)')
        plt.title('Image Processing Performance Comparison')
        plt.xticks(x, labels)

        ## Add execution time as text on bars
        for i, v in enumerate(times):
            plt.text(i, v + 0.1, f"{v:.2f}s", ha='center')

        ## Save the plot
        plt.tight_layout()
        plt.savefig('performance_comparison.png')
        print("Performance comparison plot saved as 'performance_comparison.png'")
    else:
        print("No valid execution times to plot")

if __name__ == "__main__":
    main()

비교를 실행해 보겠습니다.

python3 compare_performance.py

결과를 검토하고 performance_comparison.png 이미지를 검토하여 다양한 접근 방식 간의 실행 시간 차이를 확인합니다.

주요 내용

이 실제 예제에서 multiprocessing 의 몇 가지 중요한 측면을 관찰할 수 있습니다.

  1. 병렬 처리는 이미지 처리와 같이 CPU 사용량이 많은 작업을 처리할 때 실행 시간을 크게 줄일 수 있습니다.

  2. 인수를 전달하는 방법은 코드 복잡성과 성능 모두에 영향을 미칩니다.

    • 간단한 단일 인수 함수에 대한 Simple map()
    • 특정 매개변수를 고정하기 위한 부분 함수
    • 복잡한 워크플로우를 위한 고급 기술
  3. 실제 응용 프로그램에 대한 고려 사항:

    • 리소스 관리 (CPU, 메모리)
    • 작업 분배 전략
    • 병렬 환경에서의 오류 처리
    • 프로세스 간의 조정

이 실용적인 예제는 이 랩에서 배운 multiprocessing 개념을 실제 문제를 효율적으로 해결하는 방법에 대한 데모입니다.

요약

이 랩에서는 Python multiprocessing 과 병렬 프로세스에 인수를 전달하는 다양한 기술에 대해 배웠습니다. 다음은 수행한 작업에 대한 요약입니다.

  1. Multiprocessing 의 기본: 첫 번째 multiprocessing 프로그램을 만들고 프로세스와 스레드의 차이점을 이해했습니다.

  2. 인수 전달 기술: 다양한 인수 전달 방법을 탐색했습니다.

    • Process.args를 사용한 기본 인수 전달
    • Pool.map()을 사용한 단일 인수 전달
    • Pool.starmap()을 사용한 다중 인수 전달
    • apply()apply_async()를 사용한 동적 실행
    • 부분 함수 적용을 위한 functools.partial 사용
    • 효율적인 데이터 공유를 위한 공유 메모리
  3. 실제 적용: 이러한 개념을 실제 이미지 처리 예제에 적용하여 병렬화를 통해 상당한 성능 향상을 보여주었습니다.

이 랩에서 배운 기술은 모든 CPU 집약적인 Python 애플리케이션에 유용하며, 다중 코어 시스템의 전체 계산 능력을 활용할 수 있습니다. 더 복잡한 애플리케이션을 개발할 때 다양한 인수 전달 기술 간의 트레이드 오프를 고려하고 특정 사용 사례에 가장 적합한 방법을 선택하는 것을 잊지 마십시오.