实际应用
并发网页抓取
import asyncio
import aiohttp
from bs4 import BeautifulSoup
async def fetch_website(url):
async with aiohttp.ClientSession() as session:
async with session.get(url) as response:
return await response.text()
async def extract_title(url):
try:
html = await fetch_website(url)
soup = BeautifulSoup(html, 'html.parser')
return f"{url}: {soup.title.string}"
except Exception as e:
return f"{url}: Error {str(e)}"
async def concurrent_scraping():
urls = [
'https://python.org',
'https://github.com',
'https://stackoverflow.com'
]
tasks = [extract_title(url) for url in urls]
results = await asyncio.gather(*tasks)
for result in results:
print(result)
asyncio.run(concurrent_scraping())
微服务通信
graph LR
A[API 网关] --> B[服务 1]
A --> C[服务 2]
A --> D[服务 3]
B --> E[消息队列]
C --> E
D --> E
分布式任务处理
import asyncio
import random
class TaskDistributor:
def __init__(self, worker_count=3):
self.task_queue = asyncio.Queue()
self.result_queue = asyncio.Queue()
self.worker_count = worker_count
async def producer(self):
for _ in range(10):
task = random.randint(1, 100)
await self.task_queue.put(task)
## 添加终止信号
for _ in range(self.worker_count):
await self.task_queue.put(None)
async def worker(self, worker_id):
while True:
task = await self.task_queue.get()
if task is None:
break
## 模拟处理
result = task * 2
await self.result_queue.put((worker_id, result))
self.task_queue.task_done()
async def result_collector(self):
results = []
for _ in range(10):
worker_id, result = await self.result_queue.get()
results.append((worker_id, result))
return results
async def run(self):
producer = asyncio.create_task(self.producer())
workers = [
asyncio.create_task(self.worker(i))
for i in range(self.worker_count)
]
collector = asyncio.create_task(self.result_collector())
await asyncio.gather(producer, *workers, collector)
return await collector
async def main():
distributor = TaskDistributor()
results = await distributor.run()
print("分布式处理结果:", results)
asyncio.run(main())
应用领域
领域 |
协程用例 |
主要优势 |
网页开发 |
处理多个请求 |
高并发 |
物联网系统 |
设备通信 |
低开销 |
数据处理 |
并行数据流 |
高效资源利用 |
网络服务 |
连接管理 |
可扩展架构 |
实时数据流
import asyncio
import random
async def data_generator(stream_id):
for _ in range(5):
data = random.random()
print(f"流 {stream_id}: 生成 {data}")
yield data
await asyncio.sleep(1)
async def data_processor(stream):
async for value in stream:
processed = value * 2
print(f"处理后: {processed}")
async def main():
streams = [
data_generator(i) for i in range(3)
]
processors = [data_processor(stream) for stream in streams]
await asyncio.gather(*processors)
asyncio.run(main())
性能优化策略
- 使用非阻塞 I/O 操作
- 实现智能并发限制
- 高效利用 asyncio 的事件循环
- 监控和分析协程性能
高级考量
- 处理复杂错误场景
- 实现优雅关闭机制
- 设计可扩展和有弹性的架构
LabEx 建议持续学习并对基于协程的应用进行实际试验,以掌握这些技术。