Introduction
Dans ce laboratoire, vous allez apprendre à connaître les générateurs gérés (managed generators) et comprendre comment les piloter de manière inhabituelle. Vous allez également construire un simple planificateur de tâches (task scheduler) et créer un serveur réseau en utilisant des générateurs.
Une fonction générateur en Python nécessite un code externe pour s'exécuter. Par exemple, un générateur d'itération ne fonctionne que lorsqu'il est itéré avec une boucle for, et les coroutines ont besoin que leur méthode send() soit appelée. Dans ce laboratoire, nous allons explorer des exemples pratiques de pilotage de générateurs dans des applications avancées. Les fichiers créés lors de ce laboratoire sont multitask.py et server.py.
Comprendre les générateurs Python
Commençons par revoir ce que sont les générateurs en Python. En Python, les générateurs sont un type spécial de fonction. Ils sont différents des fonctions ordinaires. Lorsque vous appelez une fonction ordinaire, elle s'exécute du début à la fin et retourne une seule valeur. Cependant, une fonction générateur retourne un itérateur, qui est un objet à travers lequel nous pouvons itérer, ce qui signifie que nous pouvons accéder à ses valeurs une par une.
Les générateurs utilisent l'instruction yield pour retourner des valeurs. Au lieu de retourner toutes les valeurs d'un coup comme une fonction ordinaire, un générateur retourne les valeurs une à une. Après avoir rendu une valeur, le générateur suspend son exécution. La prochaine fois que nous demandons une valeur, il reprend l'exécution là où il s'était arrêté.
Créer un générateur simple
Maintenant, créons un générateur simple. Dans l'IDE Web, vous devez créer un nouveau fichier. Ce fichier contiendra le code de notre générateur. Nommez le fichier generator_demo.py et placez - le dans le répertoire /home/labex/project. Voici le contenu que vous devriez mettre dans le fichier :
## Generator function that counts down from n
def countdown(n):
print(f"Starting countdown from {n}")
while n > 0:
yield n
n -= 1
print("Countdown complete!")
## Create a generator object
counter = countdown(5)
## Drive the generator manually
print(next(counter)) ## 5
print(next(counter)) ## 4
print(next(counter)) ## 3
## Iterate through remaining values
for value in counter:
print(value) ## 2, 1
Dans ce code, nous définissons d'abord une fonction générateur appelée countdown. Cette fonction prend un nombre n comme argument et compte à rebours de n à 1. À l'intérieur de la fonction, nous utilisons une boucle while pour décrémenter n et rendre chaque valeur. Lorsque nous appelons countdown(5), cela crée un objet générateur nommé counter.
Nous utilisons ensuite la fonction next() pour obtenir manuellement des valeurs du générateur. Chaque fois que nous appelons next(counter), le générateur reprend l'exécution là où il s'était arrêté et rend la valeur suivante. Après avoir obtenu manuellement trois valeurs, nous utilisons une boucle for pour itérer à travers les valeurs restantes dans le générateur.
Pour exécuter ce code, ouvrez le terminal et exécutez la commande suivante :
python3 /home/labex/project/generator_demo.py
Lorsque vous exécutez le code, vous devriez voir la sortie suivante :
Starting countdown from 5
5
4
3
2
1
Countdown complete!
Remarquons le comportement de la fonction générateur :
- La fonction générateur commence son exécution lorsque nous appelons
next(counter)pour la première fois. Avant cela, la fonction est simplement définie et aucun décompte réel n'a commencé. - Elle se met en pause à chaque instruction
yield. Après avoir rendu une valeur, elle s'arrête et attend le prochain appel ànext(). - Lorsque nous appelons
next()à nouveau, elle continue là où elle s'était arrêtée. Par exemple, après avoir rendu 5, elle se souvient de l'état et continue de décrémenternet de rendre la valeur suivante. - La fonction générateur termine son exécution après que la dernière valeur a été rendue. Dans notre cas, après avoir rendu 1, elle affiche "Countdown complete!".
Cette capacité à mettre en pause et reprendre l'exécution est ce qui rend les générateurs puissants. Elle est très utile pour des tâches telles que la planification de tâches (task scheduling) et la programmation asynchrone, où nous devons effectuer plusieurs tâches de manière efficace sans bloquer l'exécution des autres tâches.
Créer un planificateur de tâches (task scheduler) avec des générateurs
En programmation, un planificateur de tâches est un outil essentiel qui aide à gérer et exécuter efficacement plusieurs tâches. Dans cette section, nous allons utiliser des générateurs pour construire un simple planificateur de tâches capable d'exécuter plusieurs fonctions générateur de manière concurrente. Cela vous montrera comment les générateurs peuvent être gérés pour effectuer une multitâche coopérative, ce qui signifie que les tâches s'exécutent tour à tour et partagent le temps d'exécution.
Tout d'abord, vous devez créer un nouveau fichier. Accédez au répertoire /home/labex/project et créez un fichier nommé multitask.py. Ce fichier contiendra le code de notre planificateur de tâches.
## multitask.py
from collections import deque
## Task queue
tasks = deque()
## Simple task scheduler
def run():
while tasks:
task = tasks.popleft() ## Get the next task
try:
task.send(None) ## Resume the task
tasks.append(task) ## Put it back in the queue
except StopIteration:
print('Task done') ## Task is complete
## Example task 1: Countdown
def countdown(n):
while n > 0:
print('T-minus', n)
yield ## Pause execution
n -= 1
## Example task 2: Count up
def countup(n):
x = 0
while x < n:
print('Up we go', x)
yield ## Pause execution
x += 1
Maintenant, décomposons le fonctionnement de ce planificateur de tâches :
- Nous utilisons un
deque(file doublement chaînée) pour stocker nos tâches générateur. Undequeest une structure de données qui permet d'ajouter et de supprimer des éléments efficacement des deux extrémités. C'est un excellent choix pour notre file de tâches car nous devons ajouter des tâches à la fin et les supprimer du début. - La fonction
run()est le cœur de notre planificateur de tâches. Elle prend les tâches une par une dans la file :- Elle reprend chaque tâche en utilisant
send(None). Cela est similaire à l'utilisation denext()sur un générateur. Cela indique au générateur de reprendre l'exécution là où il s'était arrêté. - Après que la tâche ait cédé le contrôle (yield), elle est ajoutée à nouveau à la fin de la file. De cette façon, la tâche aura une autre chance de s'exécuter plus tard.
- Lorsqu'une tâche est terminée (lève l'exception
StopIteration), elle est supprimée de la file. Cela indique que la tâche a terminé son exécution.
- Elle reprend chaque tâche en utilisant
- Chaque instruction
yielddans nos tâches générateur agit comme un point d'arrêt. Lorsqu'un générateur atteint une instructionyield, il met en pause son exécution et rend le contrôle au planificateur. Cela permet aux autres tâches de s'exécuter.
Cette approche met en œuvre une multitâche coopérative. Chaque tâche cède volontairement le contrôle au planificateur, permettant aux autres tâches de s'exécuter. De cette façon, plusieurs tâches peuvent partager le temps d'exécution et s'exécuter de manière concurrente.
Tester notre planificateur de tâches (task scheduler)
Maintenant, nous allons ajouter un test à notre fichier multitask.py. Le but de ce test est d'exécuter plusieurs tâches en même temps, ce qui est appelé une exécution concurrente. L'exécution concurrente permet à différentes tâches de progresser apparemment en même temps, même si, dans un environnement mono - thread, les tâches s'exécutent en réalité tour à tour.
Pour effectuer ce test, ajoutez le code suivant à la fin du fichier multitask.py :
## Test our scheduler
if __name__ == '__main__':
## Add tasks to the queue
tasks.append(countdown(10)) ## Count down from 10
tasks.append(countdown(5)) ## Count down from 5
tasks.append(countup(20)) ## Count up to 20
## Run all tasks
run()
Dans ce code, nous vérifions d'abord si le script est exécuté directement en utilisant if __name__ == '__main__':. Ensuite, nous ajoutons trois tâches différentes à la file tasks. Les tâches countdown compteront à rebours à partir des nombres donnés, et la tâche countup comptera jusqu'au nombre spécifié. Enfin, nous appelons la fonction run() pour commencer l'exécution de ces tâches.
Après avoir ajouté le code, exécutez - le avec la commande suivante dans le terminal :
python3 /home/labex/project/multitask.py
Lorsque vous exécutez le code, vous devriez voir une sortie similaire à celle - ci (l'ordre exact des lignes peut varier) :
T-minus 10
T-minus 5
Up we go 0
T-minus 9
T-minus 4
Up we go 1
T-minus 8
T-minus 3
Up we go 2
...
Remarquez comment les sorties des différentes tâches sont mélangées. Cela est une indication claire que notre planificateur exécute les trois tâches de manière concurrente. Chaque fois qu'une tâche atteint une instruction yield, le planificateur met en pause cette tâche et passe à une autre, permettant à toutes les tâches de progresser au fil du temps.
Comment cela fonctionne
Regardons de plus près ce qui se passe lorsque notre planificateur s'exécute :
- Tout d'abord, nous ajoutons trois tâches générateur à la file :
countdown(10),countdown(5)etcountup(20). Ces tâches générateur sont des fonctions spéciales qui peuvent mettre en pause et reprendre leur exécution aux instructionsyield. - Ensuite, la fonction
run()commence son travail :- Elle prend la première tâche,
countdown(10), dans la file. - Elle exécute cette tâche jusqu'à ce qu'elle atteigne une instruction
yield. Lorsqu'elle atteint leyield, elle affiche "T-minus 10". - Après cela, elle ajoute la tâche
countdown(10)de nouveau à la file afin qu'elle puisse être exécutée plus tard. - Ensuite, elle prend la tâche
countdown(5)dans la file. - Elle exécute la tâche
countdown(5)jusqu'à ce qu'elle atteigne une instructionyield, affichant "T-minus 5". - Et ce processus se poursuit...
- Elle prend la première tâche,
Ce cycle se poursuit jusqu'à ce que toutes les tâches soient terminées. Chaque tâche a l'occasion de s'exécuter pendant un court instant, ce qui donne l'illusion d'une exécution concurrente sans avoir besoin d'utiliser des threads ou des callbacks. Les threads sont un moyen plus complexe d'obtenir de la concurrence, et les callbacks sont utilisés en programmation asynchrone. Notre simple planificateur utilise des générateurs pour obtenir un effet similaire de manière plus simple.
Construire un serveur réseau avec des générateurs
Dans cette section, nous allons prendre le concept de planificateur de tâches que nous avons appris et l'étendre pour créer quelque chose de plus pratique : un simple serveur réseau. Ce serveur peut gérer plusieurs connexions clientes en même temps en utilisant des générateurs. Les générateurs sont une fonctionnalité puissante de Python qui permet aux fonctions de mettre en pause et de reprendre leur exécution, ce qui est très utile pour gérer plusieurs tâches sans bloquer.
Tout d'abord, vous devez créer un nouveau fichier nommé server.py dans le répertoire /home/labex/project. Ce fichier contiendra le code de notre serveur réseau.
## server.py
from socket import *
from select import select
from collections import deque
## Task system
tasks = deque()
recv_wait = {} ## Map: socket -> task (for tasks waiting to receive)
send_wait = {} ## Map: socket -> task (for tasks waiting to send)
def run():
while any([tasks, recv_wait, send_wait]):
## If no active tasks, wait for I/O
while not tasks:
## Wait for any socket to become ready for I/O
can_recv, can_send, _ = select(recv_wait, send_wait, [])
## Add tasks waiting on readable sockets back to active queue
for s in can_recv:
tasks.append(recv_wait.pop(s))
## Add tasks waiting on writable sockets back to active queue
for s in can_send:
tasks.append(send_wait.pop(s))
## Get next task to run
task = tasks.popleft()
try:
## Resume the task
reason, resource = task.send(None)
## Handle different yield reasons
if reason == 'recv':
## Task is waiting to receive data
recv_wait[resource] = task
elif reason == 'send':
## Task is waiting to send data
send_wait[resource] = task
else:
raise RuntimeError('Unknown yield reason %r' % reason)
except StopIteration:
print('Task done')
Ce planificateur amélioré est un peu plus compliqué que le précédent, mais il suit les mêmes idées fondamentales. Analysons les principales différences :
- Les tâches peuvent céder (yield) une raison ('recv' ou 'send') et une ressource (un socket). Cela signifie qu'une tâche peut indiquer au planificateur qu'elle attend soit de recevoir, soit d'envoyer des données sur un socket spécifique.
- Selon la raison du cession (yield), la tâche est déplacée dans une zone d'attente différente. Si une tâche attend de recevoir des données, elle est placée dans le dictionnaire
recv_wait. Si elle attend d'envoyer des données, elle est placée dans le dictionnairesend_wait. - La fonction
select()est utilisée pour déterminer quels sockets sont prêts pour des opérations d'E/S. Cette fonction vérifie les sockets dans les dictionnairesrecv_waitetsend_waitet renvoie ceux qui sont prêts à recevoir ou à envoyer des données. - Lorsqu'un socket est prêt, la tâche associée est déplacée de nouveau dans la file d'attente active. Cela permet à la tâche de reprendre son exécution et d'effectuer l'opération d'E/S pour laquelle elle attendait.
En utilisant ces techniques, nos tâches peuvent attendre efficacement les opérations d'E/S réseau sans bloquer l'exécution des autres tâches. Cela rend notre serveur réseau plus réactif et capable de gérer plusieurs connexions clientes de manière concurrente.
Implémentation d'un serveur d'écho (echo server)
Maintenant, nous allons ajouter l'implémentation d'un serveur d'écho à notre fichier server.py. Un serveur d'écho est un type de serveur qui renvoie simplement les données qu'il reçoit d'un client. C'est un excellent moyen de comprendre comment les serveurs gèrent les données entrantes et communiquent avec les clients.
Ajoutez le code suivant à la fin du fichier server.py. Ce code configurera notre serveur d'écho et gérera les connexions clientes.
## TCP Server implementation
def tcp_server(address, handler):
## Create a TCP socket
sock = socket(AF_INET, SOCK_STREAM)
## Set the socket option to reuse the address
sock.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1)
## Bind the socket to the given address
sock.bind(address)
## Start listening for incoming connections, with a backlog of 5
sock.listen(5)
while True:
## Yield to pause the function until a client connects
yield 'recv', sock ## Wait for a client connection
## Accept a client connection
client, addr = sock.accept()
## Add a new handler task for this client to the tasks list
tasks.append(handler(client, addr)) ## Start a handler task for this client
## Echo handler - echoes back whatever the client sends
def echo_handler(client, address):
print('Connection from', address)
while True:
## Yield to pause the function until the client sends data
yield 'recv', client ## Wait until client sends data
## Receive up to 1000 bytes of data from the client
data = client.recv(1000)
if not data: ## Client closed connection
break
## Yield to pause the function until the client can receive data
yield 'send', client ## Wait until client can receive data
## Send the data back to the client with 'GOT:' prefix
client.send(b'GOT:' + data)
print('Connection closed')
## Close the client connection
client.close()
## Start the server
if __name__ == '__main__':
## Add the tcp_server task to the tasks list
tasks.append(tcp_server(('', 25000), echo_handler))
## Start the scheduler
run()
Comprenons ce code étape par étape :
La fonction
tcp_server:- Tout d'abord, elle configure un socket pour écouter les connexions entrantes. Un socket est un point final de communication entre deux machines.
- Ensuite, elle utilise
yield 'recv', sockpour mettre en pause la fonction jusqu'à ce qu'un client se connecte. C'est une partie essentielle de notre approche asynchrone. - Enfin, elle crée une nouvelle tâche de gestion pour chaque connexion client. Cela permet au serveur de gérer plusieurs clients de manière concurrente.
La fonction
echo_handler:- Elle cède (yield)
'recv', clientpour attendre que le client envoie des données. Cela met en pause la fonction jusqu'à ce que des données soient disponibles. - Elle cède
'send', clientpour attendre qu'elle puisse renvoyer des données au client. Cela garantit que le client est prêt à recevoir les données. - Elle traite les données du client jusqu'à ce que la connexion soit fermée par le client.
- Elle cède (yield)
Lorsque nous exécutons le serveur, nous ajoutons la tâche
tcp_serverà la file d'attente et nous démarrons le planificateur. Le planificateur est responsable de la gestion de toutes les tâches et s'assure qu'elles s'exécutent de manière asynchrone.
Pour tester le serveur, exécutez - le dans un terminal :
python3 /home/labex/project/server.py
Vous devriez voir un message indiquant que le serveur est en cours d'exécution. Cela signifie que le serveur écoute maintenant les connexions entrantes.
Ouvrez un autre terminal et connectez - vous au serveur en utilisant nc (netcat). Netcat est un utilitaire simple qui vous permet de vous connecter à un serveur et d'envoyer des données.
nc localhost 25000
Maintenant, vous pouvez taper des messages et les voir renvoyés avec le préfixe "GOT:" :
Hello
GOT:Hello
World
GOT:World
Si vous n'avez pas nc installé, vous pouvez utiliser la bibliothèque intégrée telnetlib de Python. Telnetlib est une bibliothèque qui vous permet de vous connecter à un serveur en utilisant le protocole Telnet.
python3 -c "import telnetlib; t = telnetlib.Telnet('localhost', 25000); t.interact()"
Vous pouvez ouvrir plusieurs fenêtres de terminal et connecter plusieurs clients simultanément. Le serveur gérera toutes les connexions de manière concurrente, même s'il est mono - thread. Cela est dû à notre planificateur de tâches basé sur les générateurs, qui permet au serveur de mettre en pause et de reprendre les tâches selon les besoins.
Comment cela fonctionne
Cet exemple démontre une application puissante des générateurs pour les E/S asynchrones :
- Le serveur cède (yield) lorsqu'il devrait normalement bloquer en attendant des E/S. Cela signifie que, au lieu d'attendre indéfiniment des données, le serveur peut se mettre en pause et laisser d'autres tâches s'exécuter.
- Le planificateur le déplace dans une zone d'attente jusqu'à ce que les E/S soient prêtes. Cela garantit que le serveur ne gaspille pas de ressources en attendant des E/S.
- D'autres tâches peuvent s'exécuter pendant que l'on attend que les E/S se terminent. Cela permet au serveur de gérer plusieurs tâches de manière concurrente.
- Lorsque les E/S sont prêtes, la tâche reprend là où elle s'était arrêtée. C'est une caractéristique clé de la programmation asynchrone.
Ce modèle forme la base des frameworks Python asynchrones modernes comme asyncio, qui a été ajouté à la bibliothèque standard de Python en Python 3.4.
Résumé
Dans ce laboratoire, vous avez appris le concept de générateurs gérés en Python. Vous avez exploré comment mettre en pause et reprendre des générateurs à l'aide de l'instruction yield, et vous avez construit un simple planificateur de tâches pour exécuter plusieurs générateurs de manière concurrente. De plus, vous avez étendu le planificateur pour gérer efficacement les E/S réseau et avez implémenté un serveur réseau capable de gérer plusieurs connexions simultanément.
Ce modèle d'utilisation des générateurs pour la multitâche coopérative est une technique puissante qui sous-tend de nombreux frameworks de programmation asynchrone en Python, comme le module intégré asyncio. Cette approche offre plusieurs avantages, notamment un code séquentiel simple, une gestion efficace des E/S non bloquantes, une multitâche coopérative sans plusieurs threads et un contrôle précis de l'exécution des tâches. Ces techniques sont précieuses pour la construction d'applications réseau haute performance et de systèmes nécessitant une gestion efficace des opérations concurrentes.