Practical Thread Usage
Real-World Threading Scenarios
1. Parallel Web Scraping
import threading
import requests
from queue import Queue
def fetch_url(url_queue, results):
while not url_queue.empty():
url = url_queue.get()
try:
response = requests.get(url, timeout=5)
results[url] = response.status_code
except Exception as e:
results[url] = str(e)
finally:
url_queue.task_done()
def parallel_web_scraping(urls, max_threads=5):
url_queue = Queue()
for url in urls:
url_queue.put(url)
results = {}
threads = []
for _ in range(min(max_threads, len(urls))):
thread = threading.Thread(target=fetch_url, args=(url_queue, results))
thread.start()
threads.append(thread)
url_queue.join()
for thread in threads:
thread.join()
return results
2. Background Task Processing
import threading
import time
import queue
class BackgroundTaskProcessor:
def __init__(self, num_workers=3):
self.task_queue = queue.Queue()
self.workers = []
self.stop_event = threading.Event()
for _ in range(num_workers):
worker = threading.Thread(target=self._worker)
worker.start()
self.workers.append(worker)
def _worker(self):
while not self.stop_event.is_set():
try:
task = self.task_queue.get(timeout=1)
task()
self.task_queue.task_done()
except queue.Empty:
continue
def add_task(self, task):
self.task_queue.put(task)
def shutdown(self):
self.stop_event.set()
for worker in self.workers:
worker.join()
Thread Pool Management
flowchart TD
A[Task Queue] --> B{Thread Pool}
B --> C[Worker Thread 1]
B --> D[Worker Thread 2]
B --> E[Worker Thread 3]
C --> F[Complete Task]
D --> F
E --> F
Thread Usage Patterns
Pattern |
Description |
Use Case |
Producer-Consumer |
Separate task generation and processing |
Message queues, work distribution |
Thread Pool |
Reuse a fixed number of threads |
Concurrent I/O operations |
Parallel Processing |
Distribute computational tasks |
Data processing, scientific computing |
import threading
import time
import psutil
class ThreadPerformanceMonitor:
def __init__(self):
self.threads = []
self.performance_data = {}
def start_monitoring(self, thread):
thread_id = thread.ident
self.performance_data[thread_id] = {
'start_time': time.time(),
'cpu_usage': [],
'memory_usage': []
}
def monitor(self, thread):
thread_id = thread.ident
if thread_id in self.performance_data:
process = psutil.Process()
self.performance_data[thread_id]['cpu_usage'].append(
process.cpu_percent()
)
self.performance_data[thread_id]['memory_usage'].append(
process.memory_info().rss / (1024 * 1024)
)
Advanced Thread Coordination
Thread Event Synchronization
import threading
import time
class CoordinatedTask:
def __init__(self):
self.ready_event = threading.Event()
self.complete_event = threading.Event()
def prepare_task(self):
print("Preparing task")
time.sleep(2)
self.ready_event.set()
def execute_task(self):
self.ready_event.wait()
print("Executing task")
time.sleep(3)
self.complete_event.set()
LabEx Recommendations
At LabEx, we suggest:
- Use threads for I/O-bound tasks
- Avoid CPU-bound computations with threading
- Leverage multiprocessing for parallel computation
Best Practices
- Limit thread count
- Use thread-safe data structures
- Implement proper error handling
- Monitor and profile thread performance
Common Pitfalls
- Overusing threads
- Neglecting synchronization
- Creating uncontrolled thread growth
- Ignoring thread lifecycle management