Practical Thread Examples
Web Scraping with Concurrent Threads
import threading
import requests
from queue import Queue
class WebScraper:
def __init__(self, urls):
self.urls = urls
self.results = {}
self.queue = Queue()
self.lock = threading.Lock()
def fetch_url(self):
while not self.queue.empty():
url = self.queue.get()
try:
response = requests.get(url, timeout=5)
with self.lock:
self.results[url] = len(response.text)
except Exception as e:
with self.lock:
self.results[url] = str(e)
finally:
self.queue.task_done()
def scrape(self, max_threads=5):
for url in self.urls:
self.queue.put(url)
threads = []
for _ in range(max_threads):
t = threading.Thread(target=self.fetch_url)
t.start()
threads.append(t)
self.queue.join()
return self.results
## Example usage
urls = [
'https://www.example.com',
'https://www.python.org',
'https://www.github.com'
]
scraper = WebScraper(urls)
results = scraper.scrape()
print(results)
Parallel File Processing
import os
import threading
from concurrent.futures import ThreadPoolExecutor
class FileProcessor:
def __init__(self, directory):
self.directory = directory
self.processed_files = []
self.lock = threading.Lock()
def process_file(self, filename):
file_path = os.path.join(self.directory, filename)
try:
with open(file_path, 'r') as f:
content = f.read()
processed_content = content.upper()
with self.lock:
self.processed_files.append({
'filename': filename,
'size': len(processed_content)
})
except Exception as e:
print(f"Error processing {filename}: {e}")
def process_files(self, max_workers=4):
with ThreadPoolExecutor(max_workers=max_workers) as executor:
files = [f for f in os.listdir(self.directory) if os.path.isfile(os.path.join(self.directory, f))]
executor.map(self.process_file, files)
return self.processed_files
Thread Communication with Event
import threading
import time
class TrafficLight:
def __init__(self):
self.green_light = threading.Event()
self.red_light = threading.Event()
def traffic_controller(self):
while True:
## Green light
self.green_light.set()
self.red_light.clear()
print("Green Light - Traffic Flows")
time.sleep(5)
## Red light
self.green_light.clear()
self.red_light.set()
print("Red Light - Traffic Stops")
time.sleep(3)
def vehicle(self, name):
while True:
if self.green_light.is_set():
print(f"{name} is passing")
else:
print(f"{name} is waiting")
time.sleep(1)
## Example usage
traffic = TrafficLight()
controller = threading.Thread(target=traffic.traffic_controller)
controller.daemon = True
controller.start()
vehicles = []
for i in range(3):
v = threading.Thread(target=traffic.vehicle, args=(f"Vehicle-{i}",))
v.daemon = True
v.start()
vehicles.append(v)
## Keep main thread running
for v in vehicles:
v.join()
| Scenario |
Threading |
Multiprocessing |
Async |
| I/O Bound |
Excellent |
Good |
Excellent |
| CPU Bound |
Limited |
Excellent |
Good |
| Complexity |
Low |
Medium |
High |
Thread Lifecycle Visualization
stateDiagram-v2
[*] --> Created
Created --> Runnable
Runnable --> Running
Running --> Waiting
Waiting --> Runnable
Running --> Terminated
Terminated --> [*]
Advanced Thread Patterns
- Producer-Consumer Pattern
- Thread Pool
- Asynchronous Task Execution
LabEx Tip
Explore these practical threading examples in LabEx's interactive Python environments to gain hands-on experience with concurrent programming techniques.
- Use threading for I/O-bound tasks
- Consider multiprocessing for CPU-bound tasks
- Be mindful of the Global Interpreter Lock (GIL)
- Profile and measure performance
Error Handling in Threads
- Use try-except blocks
- Log exceptions
- Implement graceful error recovery
- Consider using thread-safe logging mechanisms