Practical Queue Examples
Task Processing Queue
import queue
import threading
import time
class TaskProcessor:
def __init__(self, max_workers=3):
self.task_queue = queue.Queue(maxsize=10)
self.workers = []
self.max_workers = max_workers
def worker(self):
while True:
try:
task = self.task_queue.get(block=False)
print(f"Processing task: {task}")
time.sleep(1) ## Simulate task processing
self.task_queue.task_done()
except queue.Empty:
break
def add_task(self, task):
try:
self.task_queue.put(task, block=False)
print(f"Task {task} added to queue")
except queue.Full:
print("Queue is full. Cannot add more tasks")
def process_tasks(self):
for _ in range(self.max_workers):
worker_thread = threading.Thread(target=self.worker)
worker_thread.start()
self.workers.append(worker_thread)
## Wait for all tasks to complete
self.task_queue.join()
## Example usage
processor = TaskProcessor()
for i in range(15):
processor.add_task(f"Task-{i}")
processor.process_tasks()
Rate Limiting Queue
graph TD
A[Incoming Requests] --> B{Queue Size}
B -->|Within Limit| C[Process Request]
B -->|Exceeded Limit| D[Reject/Delay Request]
Rate Limiter Implementation
import queue
import time
import threading
class RateLimiter:
def __init__(self, max_requests=5, time_window=1):
self.request_queue = queue.Queue(maxsize=max_requests)
self.max_requests = max_requests
self.time_window = time_window
def process_request(self, request):
try:
## Try to add request to queue
self.request_queue.put(request, block=False)
print(f"Processing request: {request}")
## Simulate request processing
time.sleep(0.2)
## Remove request from queue
self.request_queue.get()
self.request_queue.task_done()
except queue.Full:
print(f"Rate limit exceeded for request: {request}")
def handle_requests(self, requests):
threads = []
for request in requests:
thread = threading.Thread(target=self.process_request, args=(request,))
thread.start()
threads.append(thread)
## Wait for all threads to complete
for thread in threads:
thread.join()
## Example usage
limiter = RateLimiter(max_requests=5, time_window=1)
requests = [f"Request-{i}" for i in range(20)]
limiter.handle_requests(requests)
Queue Type |
Use Case |
Pros |
Cons |
queue.Queue |
Threaded Applications |
Thread-safe |
Slower for large datasets |
collections.deque |
General Purpose |
Fast operations |
Not thread-safe |
multiprocessing.Queue |
Multi-process |
IPC Support |
Higher overhead |
Real-world Scenarios
- Web Server Request Handling
- Background Job Processing
- Message Brokers
- Batch Data Processing
At LabEx, we recommend carefully designing queue mechanisms to optimize performance and resource utilization.