Exemples pratiques de files d'attente
File d'attente de traitement de tâches
import queue
import threading
import time
class TraiteurDeTaches:
def __init__(self, max_travailleurs=3):
self.file_dattente_de_taches = queue.Queue(maxsize=10)
self.travailleurs = []
self.max_travailleurs = max_travailleurs
def travailleur(self):
while True:
try:
tache = self.file_dattente_de_taches.get(block=False)
print(f"Traitement de la tâche : {tache}")
time.sleep(1) ## Simuler le traitement de la tâche
self.file_dattente_de_taches.task_done()
except queue.Empty:
break
def ajouter_tache(self, tache):
try:
self.file_dattente_de_taches.put(tache, block=False)
print(f"La tâche {tache} a été ajoutée à la file d'attente")
except queue.Full:
print("La file d'attente est pleine. Impossible d'ajouter plus de tâches")
def traiter_taches(self):
for _ in range(self.max_travailleurs):
thread_travailleur = threading.Thread(target=self.travailleur)
thread_travailleur.start()
self.travailleurs.append(thread_travailleur)
## Attendre que toutes les tâches soient terminées
self.file_dattente_de_taches.join()
## Utilisation exemple
processeur = TraiteurDeTaches()
for i in range(15):
processeur.add_task(f"Tâche-{i}")
processeur.process_tasks()
File d'attente de limitation de débit
graph TD
A[Requêtes entrantes] --> B{Nombre d'éléments dans la file d'attente}
B -->|Dans les limites| C[Traiter la requête]
B -->|Dépassement des limites| D[Refuser/différer la requête]
Implémentation du limiteur de débit
import queue
import time
import threading
class LimiteurDeDebit:
def __init__(self, max_requetes=5, fenetre_de_temps=1):
self.file_dattente_de_requetes = queue.Queue(maxsize=max_requetes)
self.max_requetes = max_requetes
self.fenetre_de_temps = fenetre_de_temps
def traiter_requete(self, requete):
try:
## Essayer d'ajouter la requête à la file d'attente
self.file_dattente_de_requetes.put(requete, block=False)
print(f"Traitement de la requête : {requete}")
## Simuler le traitement de la requête
time.sleep(0.2)
## Retirer la requête de la file d'attente
self.file_dattente_de_requetes.get()
self.file_dattente_de_requetes.task_done()
except queue.Full:
print(f"Limite de débit dépassée pour la requête : {requete}")
def gerer_requetes(self, requetes):
threads = []
for requete in requetes:
thread = threading.Thread(target=self.process_request, args=(requete,))
thread.start()
threads.append(thread)
## Attendre que tous les threads soient terminés
for thread in threads:
thread.join()
## Utilisation exemple
limiteur = LimiteurDeDebit(max_requetes=5, fenetre_de_temps=1)
requetes = [f"Requête-{i}" for i in range(20)]
limiteur.handle_requests(requests)
Type de file d'attente |
Cas d'utilisation |
Avantages |
Inconvénients |
queue.Queue |
Applications multithreadées |
Thread-safe |
Plus lent pour de grands jeux de données |
collections.deque |
Utilisation générale |
Opérations rapides |
Non thread-safe |
multiprocessing.Queue |
Multi-processus |
Prise en charge de la communication inter-processus |
Plus grande surcharge |
Scénarios du monde réel
- Gestion des requêtes d'un serveur web
- Traitement des tâches en arrière-plan
- Brokers de messages
- Traitement de données par lots
Au LabEx, nous recommandons de concevoir soigneusement les mécanismes de file d'attente pour optimiser les performances et l'utilisation des ressources.