Real-world Consumer Patterns
1. Log Processing System
import multiprocessing
import logging
from queue import Queue
import time
class LogConsumer(multiprocessing.Process):
def __init__(self, log_queue):
multiprocessing.Process.__init__(self)
self.log_queue = log_queue
self.logger = logging.getLogger('log_consumer')
self.logger.setLevel(logging.INFO)
def run(self):
while True:
try:
log_entry = self.log_queue.get(timeout=5)
if log_entry is None:
break
self.process_log(log_entry)
except Queue.Empty:
break
def process_log(self, log_entry):
## Simulate log processing
with open('/var/log/application.log', 'a') as log_file:
log_file.write(f"{log_entry}\n")
def create_log_processing_system():
log_queue = multiprocessing.Queue()
num_consumers = multiprocessing.cpu_count()
consumers = [LogConsumer(log_queue) for _ in range(num_consumers)]
for consumer in consumers:
consumer.start()
return log_queue, consumers
2. Message Queue Broker
import redis
import json
import threading
class MessageConsumer(threading.Thread):
def __init__(self, channel):
threading.Thread.__init__(self)
self.redis_client = redis.Redis(host='localhost', port=6379)
self.pubsub = self.redis_client.pubsub()
self.channel = channel
self.daemon = True
def run(self):
self.pubsub.subscribe(self.channel)
for message in self.pubsub.listen():
if message['type'] == 'message':
self.process_message(message['data'])
def process_message(self, raw_message):
try:
message = json.loads(raw_message)
## Process message based on type
if message['type'] == 'order':
self.handle_order(message)
elif message['type'] == 'notification':
self.handle_notification(message)
except json.JSONDecodeError:
print(f"Invalid message format: {raw_message}")
def handle_order(self, order):
print(f"Processing order: {order['id']}")
def handle_notification(self, notification):
print(f"Sending notification: {notification['message']}")
3. Data Pipeline Consumer
from concurrent.futures import ThreadPoolExecutor
import pandas as pd
class DataPipelineConsumer:
def __init__(self, max_workers=4):
self.executor = ThreadPoolExecutor(max_workers=max_workers)
def process_data_chunk(self, chunk):
## Data transformation and processing
cleaned_chunk = chunk.dropna()
processed_chunk = cleaned_chunk.apply(self.transform_row)
return processed_chunk
def transform_row(self, row):
## Custom transformation logic
row['processed'] = row['value'] * 2
return row
def consume_data_stream(self, data_stream):
futures = []
for chunk in data_stream:
future = self.executor.submit(self.process_data_chunk, chunk)
futures.append(future)
results = [future.result() for future in futures]
return pd.concat(results)
Consumer Pattern Scenarios
Scenario |
Pattern |
Key Characteristics |
Log Processing |
Multiprocessing |
High throughput, parallel processing |
Message Broker |
Pub/Sub |
Real-time communication |
Data Pipeline |
Concurrent Processing |
Large-scale data transformation |
Architectural Considerations
graph TD
A[Real-world Consumer] --> B[Scalability]
A --> C[Fault Tolerance]
A --> D[Performance]
A --> E[Error Handling]
Best Practices
- Use appropriate concurrency models
- Implement robust error handling
- Design for horizontal scaling
- Monitor and log consumer activities
At LabEx, we recommend carefully designing consumer patterns to meet specific application requirements and performance constraints.