Real-World Application: Parallel Image Processing
Now that we understand various techniques for passing arguments in Python multiprocessing, let's apply these concepts to a real-world scenario: parallel image processing. This is a common use case where multiprocessing can significantly improve performance.
Setting Up the Environment
First, let's install the required packages:
pip install Pillow numpy
Creating Sample Images
Let's create a script to generate some sample images for our processing. Create a file named create_images.py
:
from PIL import Image
import numpy as np
import os
def create_sample_image(filename, width=800, height=600):
"""Create a sample image with random data"""
## Create random array data
data = np.random.randint(0, 255, (height, width, 3), dtype=np.uint8)
## Create image from array
img = Image.fromarray(data, 'RGB')
## Save the image
img.save(filename)
print(f"Created image: {filename}")
if __name__ == "__main__":
## Create a directory for sample images
os.makedirs("sample_images", exist_ok=True)
## Create 5 sample images
for i in range(5):
create_sample_image(f"sample_images/sample_{i+1}.png")
print("All sample images created successfully")
Run this script to create sample images:
python3 create_images.py
Sequential Image Processing
Let's first implement a sequential version of our image processing tasks. Create a file named sequential_image_processing.py
:
from PIL import Image, ImageFilter
import os
import time
def apply_filters(image_path, output_dir):
"""Apply multiple filters to an image and save the results"""
## Load the image
image_name = os.path.basename(image_path)
print(f"Processing image: {image_name}")
img = Image.open(image_path)
## Apply filters
## 1. Blur filter
start_time = time.time()
blurred = img.filter(ImageFilter.GaussianBlur(radius=5))
blurred.save(os.path.join(output_dir, f"blur_{image_name}"))
## 2. Edge detection filter
edges = img.filter(ImageFilter.FIND_EDGES)
edges.save(os.path.join(output_dir, f"edges_{image_name}"))
## 3. Emboss filter
emboss = img.filter(ImageFilter.EMBOSS)
emboss.save(os.path.join(output_dir, f"emboss_{image_name}"))
## 4. Sharpen filter
sharpen = img.filter(ImageFilter.SHARPEN)
sharpen.save(os.path.join(output_dir, f"sharpen_{image_name}"))
end_time = time.time()
processing_time = end_time - start_time
print(f"Completed processing {image_name} in {processing_time:.2f} seconds")
return image_name, processing_time
def process_images_sequentially(image_dir, output_dir):
"""Process all images in a directory sequentially"""
## Ensure output directory exists
os.makedirs(output_dir, exist_ok=True)
## Get all image files
image_files = [os.path.join(image_dir, f) for f in os.listdir(image_dir)
if f.endswith(('.png', '.jpg', '.jpeg'))]
print(f"Found {len(image_files)} images to process")
## Process each image sequentially
start_time = time.time()
results = []
for image_path in image_files:
result = apply_filters(image_path, output_dir)
results.append(result)
end_time = time.time()
total_time = end_time - start_time
print(f"\nSequential processing completed")
print(f"Total processing time: {total_time:.2f} seconds")
return results
if __name__ == "__main__":
## Define directories
image_dir = "sample_images"
output_dir = "processed_sequential"
## Process images
results = process_images_sequentially(image_dir, output_dir)
## Print summary
print("\nProcessing summary:")
for image_name, proc_time in results:
print(f" {image_name}: {proc_time:.2f} seconds")
Run the sequential processing:
python3 sequential_image_processing.py
Parallel Image Processing
Now, let's implement parallel image processing using the techniques we've learned. Create a file named parallel_image_processing.py
:
from PIL import Image, ImageFilter
import multiprocessing
import os
import time
def apply_filters(args):
"""Apply multiple filters to an image and save the results"""
image_path, output_dir = args
## Load the image
image_name = os.path.basename(image_path)
print(f"Process {os.getpid()} processing image: {image_name}")
img = Image.open(image_path)
## Apply filters
## 1. Blur filter
start_time = time.time()
blurred = img.filter(ImageFilter.GaussianBlur(radius=5))
blurred.save(os.path.join(output_dir, f"blur_{image_name}"))
## 2. Edge detection filter
edges = img.filter(ImageFilter.FIND_EDGES)
edges.save(os.path.join(output_dir, f"edges_{image_name}"))
## 3. Emboss filter
emboss = img.filter(ImageFilter.EMBOSS)
emboss.save(os.path.join(output_dir, f"emboss_{image_name}"))
## 4. Sharpen filter
sharpen = img.filter(ImageFilter.SHARPEN)
sharpen.save(os.path.join(output_dir, f"sharpen_{image_name}"))
end_time = time.time()
processing_time = end_time - start_time
print(f"Process {os.getpid()} completed {image_name} in {processing_time:.2f} seconds")
return image_name, processing_time
def process_images_in_parallel(image_dir, output_dir, num_processes=None):
"""Process all images in a directory in parallel"""
## Ensure output directory exists
os.makedirs(output_dir, exist_ok=True)
## Get all image files
image_files = [os.path.join(image_dir, f) for f in os.listdir(image_dir)
if f.endswith(('.png', '.jpg', '.jpeg'))]
print(f"Found {len(image_files)} images to process")
## Determine number of processes
if num_processes is None:
num_processes = min(multiprocessing.cpu_count(), len(image_files))
print(f"Using {num_processes} processes for parallel processing")
## Create arguments for each task
task_args = [(image_path, output_dir) for image_path in image_files]
## Process images in parallel
start_time = time.time()
with multiprocessing.Pool(processes=num_processes) as pool:
results = pool.map(apply_filters, task_args)
end_time = time.time()
total_time = end_time - start_time
print(f"\nParallel processing completed")
print(f"Total processing time: {total_time:.2f} seconds")
return results
if __name__ == "__main__":
## Define directories
image_dir = "sample_images"
output_dir = "processed_parallel"
## Process images
results = process_images_in_parallel(image_dir, output_dir)
## Print summary
print("\nProcessing summary:")
for image_name, proc_time in results:
print(f" {image_name}: {proc_time:.2f} seconds")
Run the parallel processing:
python3 parallel_image_processing.py
Advanced Parallel Processing with Different Filters
Now, let's implement a more complex version where we apply different filters to the same image in parallel. Create a file named advanced_parallel_processing.py
:
from PIL import Image, ImageFilter
import multiprocessing
import os
import time
from functools import partial
def apply_filter(filter_info, image_path, output_dir):
"""Apply a specific filter to an image"""
filter_name, filter_obj = filter_info
## Load the image
image_name = os.path.basename(image_path)
print(f"Process {os.getpid()} applying {filter_name} to {image_name}")
## Open image each time to avoid potential issues with sharing PIL objects
img = Image.open(image_path)
## Apply filter
start_time = time.time()
filtered = img.filter(filter_obj)
## Save the filtered image
output_filename = f"{filter_name}_{image_name}"
output_path = os.path.join(output_dir, output_filename)
filtered.save(output_path)
end_time = time.time()
processing_time = end_time - start_time
print(f"Process {os.getpid()} completed {filter_name} on {image_name} in {processing_time:.2f} seconds")
return filter_name, image_name, processing_time
def process_image_with_multiple_filters(image_path, output_dir, filters):
"""Process an image with multiple filters in parallel"""
## Create a partial function with fixed image_path and output_dir
apply_filter_to_image = partial(apply_filter,
image_path=image_path,
output_dir=output_dir)
## Process the image with multiple filters in parallel
with multiprocessing.Pool() as pool:
results = pool.map(apply_filter_to_image, filters)
return results
def main():
## Define directories
image_dir = "sample_images"
output_dir = "processed_advanced"
os.makedirs(output_dir, exist_ok=True)
## Define filters to apply
filters = [
("blur", ImageFilter.GaussianBlur(radius=5)),
("edges", ImageFilter.FIND_EDGES),
("emboss", ImageFilter.EMBOSS),
("sharpen", ImageFilter.SHARPEN),
("contour", ImageFilter.CONTOUR),
("detail", ImageFilter.DETAIL),
("smooth", ImageFilter.SMOOTH),
("smoothmore", ImageFilter.SMOOTH_MORE)
]
## Get all image files
image_files = [os.path.join(image_dir, f) for f in os.listdir(image_dir)
if f.endswith(('.png', '.jpg', '.jpeg'))]
print(f"Found {len(image_files)} images to process with {len(filters)} filters each")
## Process each image with all filters
all_results = []
overall_start = time.time()
for image_path in image_files:
image_name = os.path.basename(image_path)
print(f"\nProcessing {image_name} with {len(filters)} filters in parallel")
start_time = time.time()
results = process_image_with_multiple_filters(image_path, output_dir, filters)
end_time = time.time()
image_time = end_time - start_time
print(f"Completed all filters for {image_name} in {image_time:.2f} seconds")
all_results.extend(results)
overall_end = time.time()
total_time = overall_end - overall_start
## Print summary
print("\nProcessing Summary:")
print(f"Total processing time: {total_time:.2f} seconds")
print(f"Total operations: {len(all_results)}")
## Calculate average time per filter
avg_time = sum(time for _, _, time in all_results) / len(all_results)
print(f"Average processing time per filter: {avg_time:.2f} seconds")
if __name__ == "__main__":
main()
Run the advanced parallel processing:
python3 advanced_parallel_processing.py
Let's create a script to compare the performance of the different approaches. Create a file named compare_performance.py
:
import os
import time
import subprocess
import matplotlib.pyplot as plt
import numpy as np
def run_script(script_name):
"""Run a Python script and measure its execution time"""
print(f"Running {script_name}...")
start_time = time.time()
## Run the script
process = subprocess.run(['python3', script_name],
capture_output=True, text=True)
end_time = time.time()
execution_time = end_time - start_time
print(f"Completed {script_name} in {execution_time:.2f} seconds")
print(f"Output: {process.stdout[-200:]}") ## Show last part of output
return execution_time
def main():
## Scripts to compare
scripts = [
'sequential_image_processing.py',
'parallel_image_processing.py',
'advanced_parallel_processing.py'
]
## Run each script and measure time
times = []
for script in scripts:
if os.path.exists(script):
times.append(run_script(script))
else:
print(f"Script {script} not found!")
times.append(0)
## Plot results
if any(times):
labels = ['Sequential', 'Basic Parallel', 'Advanced Parallel']
x = np.arange(len(labels))
plt.figure(figsize=(10, 6))
plt.bar(x, times, color=['red', 'blue', 'green'])
plt.xlabel('Processing Method')
plt.ylabel('Execution Time (s)')
plt.title('Image Processing Performance Comparison')
plt.xticks(x, labels)
## Add execution time as text on bars
for i, v in enumerate(times):
plt.text(i, v + 0.1, f"{v:.2f}s", ha='center')
## Save the plot
plt.tight_layout()
plt.savefig('performance_comparison.png')
print("Performance comparison plot saved as 'performance_comparison.png'")
else:
print("No valid execution times to plot")
if __name__ == "__main__":
main()
Let's run the comparison:
python3 compare_performance.py
Review the results and examine the performance_comparison.png
image to see the differences in execution time between the different approaches.
Key Takeaways
From this real-world example, you can observe several important aspects of multiprocessing:
-
Parallel processing can significantly reduce execution time when dealing with CPU-intensive tasks like image processing.
-
The method of passing arguments affects both code complexity and performance:
- Simple map() for straightforward single-argument functions
- Partial functions for fixing certain parameters
- Advanced techniques for complex workflows
-
Considerations for real-world applications:
- Resource management (CPU, memory)
- Task distribution strategies
- Error handling in parallel environments
- Coordination between processes
This practical example demonstrates how to apply the multiprocessing concepts we've learned throughout this lab to solve real-world problems efficiently.