实际应用中的消费者模式
1. 日志处理系统
import multiprocessing
import logging
from queue import Queue
import time
class LogConsumer(multiprocessing.Process):
def __init__(self, log_queue):
multiprocessing.Process.__init__(self)
self.log_queue = log_queue
self.logger = logging.getLogger('log_consumer')
self.logger.setLevel(logging.INFO)
def run(self):
while True:
try:
log_entry = self.log_queue.get(timeout=5)
if log_entry is None:
break
self.process_log(log_entry)
except Queue.Empty:
break
def process_log(self, log_entry):
## 模拟日志处理
with open('/var/log/application.log', 'a') as log_file:
log_file.write(f"{log_entry}\n")
def create_log_processing_system():
log_queue = multiprocessing.Queue()
num_consumers = multiprocessing.cpu_count()
consumers = [LogConsumer(log_queue) for _ in range(num_consumers)]
for consumer in consumers:
consumer.start()
return log_queue, consumers
2. 消息队列代理
import redis
import json
import threading
class MessageConsumer(threading.Thread):
def __init__(self, channel):
threading.Thread.__init__(self)
self.redis_client = redis.Redis(host='localhost', port=6379)
self.pubsub = self.redis_client.pubsub()
self.channel = channel
self.daemon = True
def run(self):
self.pubsub.subscribe(self.channel)
for message in self.pubsub.listen():
if message['type'] =='message':
self.process_message(message['data'])
def process_message(self, raw_message):
try:
message = json.loads(raw_message)
## 根据消息类型处理消息
if message['type'] == 'order':
self.handle_order(message)
elif message['type'] == 'notification':
self.handle_notification(message)
except json.JSONDecodeError:
print(f"无效的消息格式: {raw_message}")
def handle_order(self, order):
print(f"处理订单: {order['id']}")
def handle_notification(self, notification):
print(f"发送通知: {notification['message']}")
3. 数据管道消费者
from concurrent.futures import ThreadPoolExecutor
import pandas as pd
class DataPipelineConsumer:
def __init__(self, max_workers=4):
self.executor = ThreadPoolExecutor(max_workers=max_workers)
def process_data_chunk(self, chunk):
## 数据转换和处理
cleaned_chunk = chunk.dropna()
processed_chunk = cleaned_chunk.apply(self.transform_row)
return processed_chunk
def transform_row(self, row):
## 自定义转换逻辑
row['processed'] = row['value'] * 2
return row
def consume_data_stream(self, data_stream):
futures = []
for chunk in data_stream:
future = self.executor.submit(self.process_data_chunk, chunk)
futures.append(future)
results = [future.result() for future in futures]
return pd.concat(results)
消费者模式场景
场景 |
模式 |
关键特性 |
日志处理 |
多进程 |
高吞吐量,并行处理 |
消息代理 |
发布/订阅 |
实时通信 |
数据管道 |
并发处理 |
大规模数据转换 |
架构考量
graph TD
A[实际应用中的消费者] --> B[可扩展性]
A --> C[容错性]
A --> D[性能]
A --> E[错误处理]
最佳实践
- 使用合适的并发模型
- 实现健壮的错误处理
- 设计支持水平扩展
- 监控和记录消费者活动
在 LabEx,我们建议仔细设计消费者模式,以满足特定的应用需求和性能约束。