Ejemplos Prácticos de Colas
Cola de Procesamiento de Tareas
import queue
import threading
import time
class TaskProcessor:
def __init__(self, max_workers=3):
self.task_queue = queue.Queue(maxsize=10)
self.workers = []
self.max_workers = max_workers
def worker(self):
while True:
try:
task = self.task_queue.get(block=False)
print(f"Procesando tarea: {task}")
time.sleep(1) ## Simular el procesamiento de la tarea
self.task_queue.task_done()
except queue.Empty:
break
def add_task(self, task):
try:
self.task_queue.put(task, block=False)
print(f"Tarea {task} agregada a la cola")
except queue.Full:
print("La cola está llena. No se pueden agregar más tareas")
def process_tasks(self):
for _ in range(self.max_workers):
worker_thread = threading.Thread(target=self.worker)
worker_thread.start()
self.workers.append(worker_thread)
## Esperar a que todas las tareas se completen
self.task_queue.join()
## Uso de ejemplo
processor = TaskProcessor()
for i in range(15):
processor.add_task(f"Tarea-{i}")
processor.process_tasks()
Cola de Límite de Tasa
graph TD
A[Solicitudes Entrantes] --> B{Tamaño de la Cola}
B -->|Dentro del Límite| C[Procesar Solicitud]
B -->|Superado el Límite| D[Rechazar/Demorar Solicitud]
Implementación del Límite de Tasa
import queue
import time
import threading
class RateLimiter:
def __init__(self, max_requests=5, time_window=1):
self.request_queue = queue.Queue(maxsize=max_requests)
self.max_requests = max_requests
self.time_window = time_window
def process_request(self, request):
try:
## Intentar agregar la solicitud a la cola
self.request_queue.put(request, block=False)
print(f"Procesando solicitud: {request}")
## Simular el procesamiento de la solicitud
time.sleep(0.2)
## Quitar la solicitud de la cola
self.request_queue.get()
self.request_queue.task_done()
except queue.Full:
print(f"Se ha excedido el límite de tasa para la solicitud: {request}")
def handle_requests(self, requests):
threads = []
for request in requests:
thread = threading.Thread(target=self.process_request, args=(request,))
thread.start()
threads.append(thread)
## Esperar a que todos los hilos se completen
for thread in threads:
thread.join()
## Uso de ejemplo
limiter = RateLimiter(max_requests=5, time_window=1)
requests = [f"Solicitud-{i}" for i in range(20)]
limiter.handle_requests(requests)
Comparación de Rendimiento de las Colas
Tipo de Cola |
Caso de Uso |
Ventajas |
Desventajas |
queue.Queue |
Aplicaciones con Hilos |
Segura para hilos |
Más lenta para conjuntos de datos grandes |
collections.deque |
Propósito General |
Operaciones rápidas |
No segura para hilos |
multiprocessing.Queue |
Multi-proceso |
Soporte para IPC |
Mayor sobrecarga |
Escenarios del Mundo Real
- Manejo de Solicitudes de Servidor Web
- Procesamiento de Tareas en Segundo Plano
- Brokers de Mensajes
- Procesamiento de Datos en Lote
En LabEx, recomendamos diseñar cuidadosamente los mecanismos de cola para optimizar el rendimiento y la utilización de recursos.