Practical Implementation
Real-world Background Computation Scenarios
Web Scraping with Concurrent Processing
import concurrent.futures
import requests
from bs4 import BeautifulSoup
def fetch_website_data(url):
try:
response = requests.get(url, timeout=5)
soup = BeautifulSoup(response.text, 'html.parser')
return {
'url': url,
'title': soup.title.string if soup.title else 'No Title',
'length': len(response.text)
}
except Exception as e:
return {'url': url, 'error': str(e)}
def concurrent_web_scraping(urls):
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
results = list(executor.map(fetch_website_data, urls))
return results
## Example usage
urls = [
'https://python.org',
'https://github.com',
'https://stackoverflow.com'
]
scraped_data = concurrent_web_scraping(urls)
Background Task Queue System
graph TD
A[Task Queue] --> B[Worker Processes]
B --> C[Task Execution]
B --> D[Result Storage]
A --> E[Task Prioritization]
Robust Task Queue Implementation
import multiprocessing
from queue import Queue
import time
class BackgroundTaskManager:
def __init__(self, num_workers=4):
self.task_queue = multiprocessing.Queue()
self.result_queue = multiprocessing.Queue()
self.workers = []
self.num_workers = num_workers
def worker(self):
while True:
task = self.task_queue.get()
if task is None:
break
try:
result = task()
self.result_queue.put(result)
except Exception as e:
self.result_queue.put(e)
def start_workers(self):
for _ in range(self.num_workers):
p = multiprocessing.Process(target=self.worker)
p.start()
self.workers.append(p)
def add_task(self, task):
self.task_queue.put(task)
def get_results(self):
results = []
while not self.result_queue.empty():
results.append(self.result_queue.get())
return results
def shutdown(self):
for _ in range(self.num_workers):
self.task_queue.put(None)
for w in self.workers:
w.join()
Metric |
Measurement Technique |
Tool |
CPU Usage |
Multiprocessing Monitor |
psutil |
Memory Consumption |
Memory Profiler |
memory_profiler |
Execution Time |
Timing Decorators |
timeit |
Asynchronous File Processing
import asyncio
import aiofiles
async def process_large_file(filename):
async with aiofiles.open(filename, mode='r') as file:
content = await file.read()
## Perform complex processing
processed_data = content.upper()
async with aiofiles.open(f'processed_{filename}', mode='w') as outfile:
await outfile.write(processed_data)
async def batch_file_processing(files):
tasks = [process_large_file(file) for file in files]
await asyncio.gather(*tasks)
## Usage in LabEx environment
files = ['data1.txt', 'data2.txt', 'data3.txt']
asyncio.run(batch_file_processing(files))
Error Handling and Resilience
Key Considerations
- Implement robust error handling
- Use timeout mechanisms
- Create retry strategies
- Log exceptions comprehensively
Best Practices for Background Computation
- Choose appropriate concurrency model
- Minimize shared state
- Use thread-safe data structures
- Implement proper resource management
- Monitor and profile performance
By mastering these practical implementation techniques, developers can create efficient, scalable background computation systems in their LabEx Python projects.