实际线程示例
使用并发线程进行网页抓取
import threading
import requests
from queue import Queue
class WebScraper:
def __init__(self, urls):
self.urls = urls
self.results = {}
self.queue = Queue()
self.lock = threading.Lock()
def fetch_url(self):
while not self.queue.empty():
url = self.queue.get()
try:
response = requests.get(url, timeout=5)
with self.lock:
self.results[url] = len(response.text)
except Exception as e:
with self.lock:
self.results[url] = str(e)
finally:
self.queue.task_done()
def scrape(self, max_threads=5):
for url in self.urls:
self.queue.put(url)
threads = []
for _ in range(max_threads):
t = threading.Thread(target=self.fetch_url)
t.start()
threads.append(t)
self.queue.join()
return self.results
## 示例用法
urls = [
'https://www.example.com',
'https://www.python.org',
'https://www.github.com'
]
scraper = WebScraper(urls)
results = scraper.scrape()
print(results)
并行文件处理
import os
import threading
from concurrent.futures import ThreadPoolExecutor
class FileProcessor:
def __init__(self, directory):
self.directory = directory
self.processed_files = []
self.lock = threading.Lock()
def process_file(self, filename):
file_path = os.path.join(self.directory, filename)
try:
with open(file_path, 'r') as f:
content = f.read()
processed_content = content.upper()
with self.lock:
self.processed_files.append({
'filename': filename,
'size': len(processed_content)
})
except Exception as e:
print(f"处理 {filename} 时出错: {e}")
def process_files(self, max_workers=4):
with ThreadPoolExecutor(max_workers=max_workers) as executor:
files = [f for f in os.listdir(self.directory) if os.path.isfile(os.path.join(self.directory, f))]
executor.map(self.process_file, files)
return self.processed_files
使用事件进行线程通信
import threading
import time
class TrafficLight:
def __init__(self):
self.green_light = threading.Event()
self.red_light = threading.Event()
def traffic_controller(self):
while True:
## 绿灯
self.green_light.set()
self.red_light.clear()
print("绿灯 - 交通通行")
time.sleep(5)
## 红灯
self.green_light.clear()
self.red_light.set()
print("红灯 - 交通停止")
time.sleep(3)
def vehicle(self, name):
while True:
if self.green_light.is_set():
print(f"{name} 正在通过")
else:
print(f"{name} 正在等待")
time.sleep(1)
## 示例用法
traffic = TrafficLight()
controller = threading.Thread(target=traffic.traffic_controller)
controller.daemon = True
controller.start()
vehicles = []
for i in range(3):
v = threading.Thread(target=traffic.vehicle, args=(f"车辆-{i}",))
v.daemon = True
v.start()
vehicles.append(v)
## 保持主线程运行
for v in vehicles:
v.join()
线程性能比较
场景 |
线程 |
多进程 |
异步 |
I/O密集型 |
优秀 |
良好 |
优秀 |
CPU密集型 |
有限 |
优秀 |
良好 |
复杂度 |
低 |
中等 |
高 |
线程生命周期可视化
stateDiagram-v2
[*] --> 创建
创建 --> 可运行
可运行 --> 运行
运行 --> 等待
等待 --> 可运行
运行 --> 终止
终止 --> [*]
高级线程模式
- 生产者-消费者模式
- 线程池
- 异步任务执行
LabEx提示
在LabEx的交互式Python环境中探索这些实际的线程示例,以获得并发编程技术的实践经验。
性能注意事项
- 对I/O密集型任务使用线程
- 对CPU密集型任务考虑使用多进程
- 注意全局解释器锁(GIL)
- 进行性能分析和测量
线程中的错误处理
- 使用try-except块
- 记录异常
- 实现优雅的错误恢复
- 考虑使用线程安全的日志记录机制