Praktische Warteschlangenbeispiele
Warteschlange für die Task-Verarbeitung
import queue
import threading
import time
class TaskProcessor:
def __init__(self, max_workers=3):
self.task_queue = queue.Queue(maxsize=10)
self.workers = []
self.max_workers = max_workers
def worker(self):
while True:
try:
task = self.task_queue.get(block=False)
print(f"Verarbeite Aufgabe: {task}")
time.sleep(1) ## Simuliere die Task-Verarbeitung
self.task_queue.task_done()
except queue.Empty:
break
def add_task(self, task):
try:
self.task_queue.put(task, block=False)
print(f"Aufgabe {task} zur Warteschlange hinzugefügt")
except queue.Full:
print("Warteschlange ist voll. Es können keine weiteren Aufgaben hinzugefügt werden")
def process_tasks(self):
for _ in range(self.max_workers):
worker_thread = threading.Thread(target=self.worker)
worker_thread.start()
self.workers.append(worker_thread)
## Warte, bis alle Aufgaben abgeschlossen sind
self.task_queue.join()
## Beispielverwendung
processor = TaskProcessor()
for i in range(15):
processor.add_task(f"Aufgabe-{i}")
processor.process_tasks()
Rate Limiting Warteschlange
graph TD
A[Eingehende Anfragen] --> B{Warteschlangengröße}
B -->|Innerhalb der Grenze| C[Verarbeite Anfrage]
B -->|Grenze überschritten| D[Lehne/Verzögere Anfrage ab]
Implementierung des Rate Limiters
import queue
import time
import threading
class RateLimiter:
def __init__(self, max_requests=5, time_window=1):
self.request_queue = queue.Queue(maxsize=max_requests)
self.max_requests = max_requests
self.time_window = time_window
def process_request(self, request):
try:
## Versuche, Anfrage zur Warteschlange hinzuzufügen
self.request_queue.put(request, block=False)
print(f"Verarbeite Anfrage: {request}")
## Simuliere die Anfrageverarbeitung
time.sleep(0.2)
## Entferne Anfrage aus der Warteschlange
self.request_queue.get()
self.request_queue.task_done()
except queue.Full:
print(f"Rate Limit überschritten für Anfrage: {request}")
def handle_requests(self, requests):
threads = []
for request in requests:
thread = threading.Thread(target=self.process_request, args=(request,))
thread.start()
threads.append(thread)
## Warte, bis alle Threads abgeschlossen sind
for thread in threads:
thread.join()
## Beispielverwendung
limiter = RateLimiter(max_requests=5, time_window=1)
requests = [f"Anfrage-{i}" for i in range(20)]
limiter.handle_requests(requests)
Vergleich der Warteschlangenleistung
Warteschlangentyp |
Anwendungsfall |
Vorteile |
Nachteile |
queue.Queue |
Threadierte Anwendungen |
Thread-sicher |
Langsam für große Datensätze |
collections.deque |
Allgemeiner Zweck |
Schnelle Operationen |
Nicht thread-sicher |
multiprocessing.Queue |
Mehrprozess |
IPC-Unterstützung |
Höhere Overhead |
Echte Welt-Szenarien
- Webserver-Anfragebehandlung
- Hintergrundjob-Verarbeitung
- Nachrichtenbroker
- Batch-Datenverarbeitung
Beim LabEx empfehlen wir, die Warteschlangenmechanismen sorgfältig zu entwerfen, um die Leistung und die Ressourcenutilisierung zu optimieren.