Einführung
In diesem Lab lernen Sie über verwaltete Generatoren (managed generators) kennen und verstehen, wie Sie diese auf ungewöhnliche Weise ansteuern können. Sie werden auch einen einfachen Task-Scheduler (Aufgabenplaner) erstellen und einen Netzwerkserver mithilfe von Generatoren entwickeln.
Eine Generatorfunktion in Python erfordert externen Code zur Ausführung. Beispielsweise wird ein Iterationsgenerator nur ausgeführt, wenn er in einer for-Schleife iteriert wird, und Coroutinen (Korrekturen) müssen über ihre send()-Methode aufgerufen werden. In diesem Lab werden wir praktische Beispiele für die Ansteuerung von Generatoren in fortgeschrittenen Anwendungen untersuchen. Die während dieses Labs erstellten Dateien sind multitask.py und server.py.
Grundlagen zu Python-Generatoren
Beginnen wir damit, uns zu vergegenwärtigen, was Generatoren in Python sind. In Python sind Generatoren eine besondere Art von Funktion. Sie unterscheiden sich von normalen Funktionen. Wenn Sie eine normale Funktion aufrufen, wird diese von Anfang bis Ende ausgeführt und gibt einen einzelnen Wert zurück. Im Gegensatz dazu gibt eine Generatorfunktion einen Iterator zurück, ein Objekt, über das wir iterieren können, d. h., wir können seine Werte nacheinander abrufen.
Generatoren verwenden die yield-Anweisung, um Werte zurückzugeben. Im Gegensatz zu einer normalen Funktion, die alle Werte auf einmal zurückgibt, gibt ein Generator die Werte einzeln zurück. Nachdem ein Generator einen Wert zurückgegeben hat, wird seine Ausführung angehalten. Wenn wir das nächste Mal einen Wert anfordern, wird die Ausführung dort fortgesetzt, wo sie aufgehört hat.
Erstellen eines einfachen Generators
Jetzt erstellen wir einen einfachen Generator. Im WebIDE müssen Sie eine neue Datei erstellen. Diese Datei wird den Code für unseren Generator enthalten. Benennen Sie die Datei generator_demo.py und legen Sie sie im Verzeichnis /home/labex/project ab. Hier ist der Inhalt, den Sie in die Datei einfügen sollten:
## Generator function that counts down from n
def countdown(n):
print(f"Starting countdown from {n}")
while n > 0:
yield n
n -= 1
print("Countdown complete!")
## Create a generator object
counter = countdown(5)
## Drive the generator manually
print(next(counter)) ## 5
print(next(counter)) ## 4
print(next(counter)) ## 3
## Iterate through remaining values
for value in counter:
print(value) ## 2, 1
In diesem Code definieren wir zunächst eine Generatorfunktion namens countdown. Diese Funktion nimmt eine Zahl n als Argument und zählt von n bis 1 runter. Innerhalb der Funktion verwenden wir eine while-Schleife, um n zu dekrementieren und jeden Wert zurückzugeben. Wenn wir countdown(5) aufrufen, wird ein Generatorobjekt namens counter erstellt.
Anschließend verwenden wir die next()-Funktion, um manuell Werte aus dem Generator zu erhalten. Jedes Mal, wenn wir next(counter) aufrufen, wird die Ausführung des Generators dort fortgesetzt, wo sie aufgehört hat, und der nächste Wert wird zurückgegeben. Nachdem wir drei Werte manuell abgerufen haben, verwenden wir eine for-Schleife, um die verbleibenden Werte im Generator zu durchlaufen.
Um diesen Code auszuführen, öffnen Sie das Terminal und führen Sie den folgenden Befehl aus:
python3 /home/labex/project/generator_demo.py
Wenn Sie den Code ausführen, sollten Sie die folgende Ausgabe sehen:
Starting countdown from 5
5
4
3
2
1
Countdown complete!
Beachten wir, wie sich die Generatorfunktion verhält:
- Die Generatorfunktion beginnt ihre Ausführung, wenn wir erstmals
next(counter)aufrufen. Davor ist die Funktion nur definiert, und es hat noch kein wirkliches Herunterzählen begonnen. - Sie pausiert bei jeder
yield-Anweisung. Nachdem ein Wert zurückgegeben wurde, stoppt sie und wartet auf den nächsten Aufruf vonnext(). - Wenn wir
next()erneut aufrufen, wird die Ausführung dort fortgesetzt, wo sie aufgehört hat. Beispielsweise erinnert sie sich nach dem Zurückgeben von 5 an den Zustand und setzt das Dekrementieren vonnund das Zurückgeben des nächsten Werts fort. - Die Generatorfunktion beendet ihre Ausführung, nachdem der letzte Wert zurückgegeben wurde. In unserem Fall gibt sie nach dem Zurückgeben von 1 "Countdown complete!" aus.
Dieses Vermögen, die Ausführung anzuhalten und fortzusetzen, macht Generatoren so leistungsstark. Es ist sehr nützlich für Aufgaben wie die Task-Scheduling (Aufgabenplanung) und die asynchrone Programmierung, bei denen wir mehrere Aufgaben effizient ausführen müssen, ohne die Ausführung anderer Aufgaben zu blockieren.
Erstellen eines Task-Schedulers mit Generatoren
In der Programmierung ist ein Task-Scheduler (Aufgabenplaner) ein wichtiges Werkzeug, das hilft, mehrere Aufgaben effizient zu verwalten und auszuführen. In diesem Abschnitt verwenden wir Generatoren, um einen einfachen Task-Scheduler zu erstellen, der mehrere Generatorfunktionen gleichzeitig ausführen kann. Dies zeigt Ihnen, wie Generatoren verwaltet werden können, um kooperatives Multitasking (gemeinsames Mehrfachverarbeitung) durchzuführen, was bedeutet, dass Aufgaben abwechselnd ausgeführt werden und die Ausführungszeit teilen.
Zunächst müssen Sie eine neue Datei erstellen. Navigieren Sie zum Verzeichnis /home/labex/project und erstellen Sie eine Datei namens multitask.py. Diese Datei wird den Code für unseren Task-Scheduler enthalten.
## multitask.py
from collections import deque
## Task queue
tasks = deque()
## Simple task scheduler
def run():
while tasks:
task = tasks.popleft() ## Get the next task
try:
task.send(None) ## Resume the task
tasks.append(task) ## Put it back in the queue
except StopIteration:
print('Task done') ## Task is complete
## Example task 1: Countdown
def countdown(n):
while n > 0:
print('T-minus', n)
yield ## Pause execution
n -= 1
## Example task 2: Count up
def countup(n):
x = 0
while x < n:
print('Up we go', x)
yield ## Pause execution
x += 1
Jetzt zerlegen wir, wie dieser Task-Scheduler funktioniert:
- Wir verwenden eine
deque(Doppel-Ende-Warteschlange), um unsere Generator-Aufgaben zu speichern. Einedequeist eine Datenstruktur, die es Ihnen ermöglicht, Elemente effizient von beiden Enden hinzuzufügen und zu entfernen. Sie ist eine gute Wahl für unsere Aufgabenwarteschlange, da wir Aufgaben am Ende hinzufügen und am Anfang entfernen müssen. - Die
run()-Funktion ist das Herzstück unseres Task-Schedulers. Sie nimmt die Aufgaben nacheinander aus der Warteschlange:- Sie setzt jede Aufgabe mit
send(None)fort. Dies ist ähnlich wie das Verwenden vonnext()auf einem Generator. Es sagt dem Generator, die Ausführung dort fortzusetzen, wo sie aufgehört hat. - Nachdem die Aufgabe einen Wert zurückgegeben hat, wird sie wieder an das Ende der Warteschlange hinzugefügt. Auf diese Weise hat die Aufgabe später wieder die Möglichkeit, ausgeführt zu werden.
- Wenn eine Aufgabe abgeschlossen ist (wenn
StopIterationausgelöst wird), wird sie aus der Warteschlange entfernt. Dies zeigt an, dass die Aufgabe ihre Ausführung beendet hat.
- Sie setzt jede Aufgabe mit
- Jede
yield-Anweisung in unseren Generator-Aufgaben fungiert als Pausepunkt. Wenn ein Generator eineyield-Anweisung erreicht, wird seine Ausführung angehalten und die Kontrolle an den Scheduler zurückgegeben. Dies ermöglicht es anderen Aufgaben, ausgeführt zu werden.
Dieser Ansatz implementiert kooperatives Multitasking. Jede Aufgabe gibt die Kontrolle freiwillig an den Scheduler zurück, wodurch andere Aufgaben ausgeführt werden können. Auf diese Weise können mehrere Aufgaben die Ausführungszeit teilen und gleichzeitig ausgeführt werden.
Testen unseres Task-Schedulers
Jetzt werden wir einen Test zu unserer multitask.py-Datei hinzufügen. Der Zweck dieses Tests besteht darin, mehrere Aufgaben gleichzeitig auszuführen, was als gleichzeitige Ausführung (concurrent execution) bekannt ist. Die gleichzeitige Ausführung ermöglicht es verschiedenen Aufgaben, scheinbar gleichzeitig Fortschritte zu machen, obwohl in einer single-threaded-Umgebung (eingleitige Umgebung) die Aufgaben tatsächlich abwechselnd ausgeführt werden.
Um diesen Test durchzuführen, fügen Sie den folgenden Code am Ende der multitask.py-Datei hinzu:
## Test our scheduler
if __name__ == '__main__':
## Add tasks to the queue
tasks.append(countdown(10)) ## Count down from 10
tasks.append(countdown(5)) ## Count down from 5
tasks.append(countup(20)) ## Count up to 20
## Run all tasks
run()
In diesem Code überprüfen wir zunächst, ob das Skript direkt ausgeführt wird, indem wir if __name__ == '__main__': verwenden. Dann fügen wir drei verschiedene Aufgaben zur tasks-Warteschlange hinzu. Die countdown-Aufgaben zählen von den angegebenen Zahlen runter, und die countup-Aufgabe zählt bis zur angegebenen Zahl hoch. Schließlich rufen wir die run()-Funktion auf, um die Ausführung dieser Aufgaben zu starten.
Nachdem Sie den Code hinzugefügt haben, führen Sie ihn mit dem folgenden Befehl im Terminal aus:
python3 /home/labex/project/multitask.py
Wenn Sie den Code ausführen, sollten Sie eine Ausgabe ähnlich der folgenden sehen (die genaue Reihenfolge der Zeilen kann variieren):
T-minus 10
T-minus 5
Up we go 0
T-minus 9
T-minus 4
Up we go 1
T-minus 8
T-minus 3
Up we go 2
...
Beachten Sie, wie die Ausgaben der verschiedenen Aufgaben miteinander vermischt sind. Dies ist ein eindeutiges Zeichen dafür, dass unser Scheduler alle drei Aufgaben gleichzeitig ausführt. Jedes Mal, wenn eine Aufgabe eine yield-Anweisung erreicht, pausiert der Scheduler diese Aufgabe und wechselt zu einer anderen, sodass alle Aufgaben im Laufe der Zeit Fortschritte machen können.
Wie es funktioniert
Schauen wir uns genauer an, was passiert, wenn unser Scheduler läuft:
- Zunächst fügen wir drei Generator-Aufgaben zur Warteschlange hinzu:
countdown(10),countdown(5)undcountup(20). Diese Generator-Aufgaben sind spezielle Funktionen, die ihre Ausführung anyield-Anweisungen anhalten und fortsetzen können. - Dann beginnt die
run()-Funktion ihre Arbeit:- Sie nimmt die erste Aufgabe,
countdown(10), aus der Warteschlange. - Sie führt diese Aufgabe aus, bis sie eine
yield-Anweisung erreicht. Wenn sie auf dieyield-Anweisung trifft, gibt sie "T-minus 10" aus. - Danach fügt sie die
countdown(10)-Aufgabe wieder zur Warteschlange hinzu, damit sie später erneut ausgeführt werden kann. - Als nächstes nimmt sie die
countdown(5)-Aufgabe aus der Warteschlange. - Sie führt die
countdown(5)-Aufgabe aus, bis sie auf eineyield-Anweisung trifft und gibt "T-minus 5" aus. - Und dieser Prozess setzt sich fort...
- Sie nimmt die erste Aufgabe,
Dieser Zyklus setzt sich so lange fort, bis alle Aufgaben abgeschlossen sind. Jede Aufgabe bekommt die Chance, für eine kurze Zeit ausgeführt zu werden, was den Eindruck einer gleichzeitigen Ausführung erweckt, ohne dass Threads oder Callbacks verwendet werden müssen. Threads sind eine komplexere Methode, um Parallelität zu erreichen, und Callbacks werden in der asynchronen Programmierung verwendet. Unser einfacher Scheduler verwendet Generatoren, um einen ähnlichen Effekt auf eine einfachere Weise zu erzielen.
Aufbau eines Netzwerkservers mit Generatoren
In diesem Abschnitt nehmen wir das Konzept eines Task-Schedulers, das wir gelernt haben, und erweitern es, um etwas Praktikableres zu erstellen: einen einfachen Netzwerkserver. Dieser Server kann mithilfe von Generatoren mehrere Client-Verbindungen gleichzeitig verwalten. Generatoren sind eine leistungsstarke Python-Funktion, die es Funktionen ermöglicht, ihre Ausführung anzuhalten und fortzusetzen, was sehr nützlich ist, um mehrere Aufgaben ohne Blockierung zu verwalten.
Zunächst müssen Sie eine neue Datei namens server.py im Verzeichnis /home/labex/project erstellen. Diese Datei wird den Code für unseren Netzwerkserver enthalten.
## server.py
from socket import *
from select import select
from collections import deque
## Task system
tasks = deque()
recv_wait = {} ## Map: socket -> task (for tasks waiting to receive)
send_wait = {} ## Map: socket -> task (for tasks waiting to send)
def run():
while any([tasks, recv_wait, send_wait]):
## If no active tasks, wait for I/O
while not tasks:
## Wait for any socket to become ready for I/O
can_recv, can_send, _ = select(recv_wait, send_wait, [])
## Add tasks waiting on readable sockets back to active queue
for s in can_recv:
tasks.append(recv_wait.pop(s))
## Add tasks waiting on writable sockets back to active queue
for s in can_send:
tasks.append(send_wait.pop(s))
## Get next task to run
task = tasks.popleft()
try:
## Resume the task
reason, resource = task.send(None)
## Handle different yield reasons
if reason == 'recv':
## Task is waiting to receive data
recv_wait[resource] = task
elif reason == 'send':
## Task is waiting to send data
send_wait[resource] = task
else:
raise RuntimeError('Unknown yield reason %r' % reason)
except StopIteration:
print('Task done')
Dieser verbesserte Scheduler ist etwas komplizierter als der vorherige, aber er folgt denselben grundlegenden Ideen. Lassen Sie uns die Hauptunterschiede aufschlüsseln:
- Aufgaben können einen Grund ('recv' oder 'send') und eine Ressource (einen Socket) zurückgeben. Dies bedeutet, dass eine Aufgabe dem Scheduler mitteilen kann, dass sie entweder auf das Empfangen oder Senden von Daten über einen bestimmten Socket wartet.
- Je nach Grund wird die Aufgabe in einen anderen Wartebereich verschoben. Wenn eine Aufgabe auf das Empfangen von Daten wartet, geht sie in das
recv_wait-Wörterbuch. Wenn sie auf das Senden von Daten wartet, geht sie in dassend_wait-Wörterbuch. - Die
select()-Funktion wird verwendet, um herauszufinden, welche Sockets für E/A-Operationen bereit sind. Diese Funktion überprüft die Sockets in denrecv_wait- undsend_wait-Wörterbüchern und gibt diejenigen zurück, die entweder auf das Empfangen oder Senden von Daten bereit sind. - Wenn ein Socket bereit ist, wird die zugehörige Aufgabe zurück in die aktive Warteschlange verschoben. Dies ermöglicht es der Aufgabe, ihre Ausführung fortzusetzen und die E/A-Operation auszuführen, auf die sie gewartet hat.
Durch die Verwendung dieser Techniken können unsere Aufgaben effizient auf Netzwerk-E/A warten, ohne die Ausführung anderer Aufgaben zu blockieren. Dies macht unseren Netzwerkserver reaktionsfähiger und in der Lage, mehrere Client-Verbindungen gleichzeitig zu verwalten.
Implementierung eines Echo-Servers
Jetzt werden wir die Implementierung eines Echo-Servers zu unserer server.py-Datei hinzufügen. Ein Echo-Server ist eine Art Server, der einfach alle Daten, die er von einem Client erhält, zurücksendet. Dies ist eine großartige Möglichkeit, zu verstehen, wie Server eingehende Daten verarbeiten und mit Clients kommunizieren.
Fügen Sie den folgenden Code am Ende der server.py-Datei hinzu. Dieser Code wird unseren Echo-Server einrichten und Client-Verbindungen verwalten.
## TCP Server implementation
def tcp_server(address, handler):
## Create a TCP socket
sock = socket(AF_INET, SOCK_STREAM)
## Set the socket option to reuse the address
sock.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1)
## Bind the socket to the given address
sock.bind(address)
## Start listening for incoming connections, with a backlog of 5
sock.listen(5)
while True:
## Yield to pause the function until a client connects
yield 'recv', sock ## Wait for a client connection
## Accept a client connection
client, addr = sock.accept()
## Add a new handler task for this client to the tasks list
tasks.append(handler(client, addr)) ## Start a handler task for this client
## Echo handler - echoes back whatever the client sends
def echo_handler(client, address):
print('Connection from', address)
while True:
## Yield to pause the function until the client sends data
yield 'recv', client ## Wait until client sends data
## Receive up to 1000 bytes of data from the client
data = client.recv(1000)
if not data: ## Client closed connection
break
## Yield to pause the function until the client can receive data
yield 'send', client ## Wait until client can receive data
## Send the data back to the client with 'GOT:' prefix
client.send(b'GOT:' + data)
print('Connection closed')
## Close the client connection
client.close()
## Start the server
if __name__ == '__main__':
## Add the tcp_server task to the tasks list
tasks.append(tcp_server(('', 25000), echo_handler))
## Start the scheduler
run()
Lassen Sie uns diesen Code Schritt für Schritt verstehen:
Die
tcp_server-Funktion:- Zunächst richtet sie einen Socket ein, um auf eingehende Verbindungen zu warten. Ein Socket ist ein Endpunkt für die Kommunikation zwischen zwei Maschinen.
- Dann verwendet sie
yield 'recv', sock, um die Funktion anzuhalten, bis ein Client eine Verbindung herstellt. Dies ist ein Schlüsselteil unseres asynchronen Ansatzes. - Schließlich erstellt sie für jede Client-Verbindung eine neue Handler-Aufgabe. Dies ermöglicht es dem Server, mehrere Clients gleichzeitig zu verwalten.
Die
echo_handler-Funktion:- Sie gibt
'recv', clientzurück, um auf die Daten des Clients zu warten. Dies hält die Funktion an, bis Daten verfügbar sind. - Sie gibt
'send', clientzurück, um zu warten, bis sie Daten an den Client senden kann. Dies stellt sicher, dass der Client bereit ist, die Daten zu empfangen. - Sie verarbeitet die Client-Daten, bis die Verbindung vom Client geschlossen wird.
- Sie gibt
Wenn wir den Server ausführen, fügen wir die
tcp_server-Aufgabe zur Warteschlange hinzu und starten den Scheduler. Der Scheduler ist für die Verwaltung aller Aufgaben verantwortlich und stellt sicher, dass sie asynchron ausgeführt werden.
Um den Server zu testen, führen Sie ihn in einem Terminal aus:
python3 /home/labex/project/server.py
Sie sollten eine Nachricht sehen, die darauf hinweist, dass der Server läuft. Dies bedeutet, dass der Server jetzt auf eingehende Verbindungen wartet.
Öffnen Sie ein anderes Terminal und verbinden Sie sich mit dem Server mithilfe von nc (Netcat). Netcat ist ein einfaches Werkzeug, das es Ihnen ermöglicht, sich mit einem Server zu verbinden und Daten zu senden.
nc localhost 25000
Jetzt können Sie Nachrichten eingeben und sehen, wie sie mit "GOT:" vorangestellt zurückgesendet werden:
Hello
GOT:Hello
World
GOT:World
Wenn Sie nc nicht installiert haben, können Sie die integrierte telnetlib von Python verwenden. Telnetlib ist eine Bibliothek, die es Ihnen ermöglicht, sich mit einem Server über das Telnet-Protokoll zu verbinden.
python3 -c "import telnetlib; t = telnetlib.Telnet('localhost', 25000); t.interact()"
Sie können mehrere Terminalfenster öffnen und mehrere Clients gleichzeitig verbinden. Der Server wird alle Verbindungen gleichzeitig verwalten, obwohl er single-threaded (eingleitig) ist. Dies ist dank unseres auf Generatoren basierenden Task-Schedulers möglich, der es dem Server ermöglicht, Aufgaben bei Bedarf anzuhalten und fortzusetzen.
Wie es funktioniert
Dieses Beispiel zeigt eine leistungsstarke Anwendung von Generatoren für asynchrone E/A:
- Der Server gibt die Kontrolle ab, wenn er normalerweise blockieren würde, während er auf E/A wartet. Dies bedeutet, dass der Server anstatt unendlich lange auf Daten zu warten, anhalten und andere Aufgaben ausführen lassen kann.
- Der Scheduler verschiebt ihn in einen Wartebereich, bis die E/A bereit ist. Dies stellt sicher, dass der Server keine Ressourcen verschwendet, indem er auf E/A wartet.
- Andere Aufgaben können ausgeführt werden, während auf die E/A-Vervollständigung gewartet wird. Dies ermöglicht es dem Server, mehrere Aufgaben gleichzeitig zu verwalten.
- Wenn die E/A bereit ist, wird die Aufgabe dort fortgesetzt, wo sie aufgehört hat. Dies ist ein Schlüsselmerkmal der asynchronen Programmierung.
Dieses Muster bildet die Grundlage moderner asynchroner Python-Frameworks wie asyncio, das in Python 3.4 zur Python-Standardbibliothek hinzugefügt wurde.
Zusammenfassung
In diesem Lab haben Sie das Konzept von verwalteten Generatoren in Python kennengelernt. Sie haben untersucht, wie man Generatoren mit der yield-Anweisung anhalten und fortsetzen kann, und einen einfachen Task-Scheduler erstellt, um mehrere Generatoren gleichzeitig auszuführen. Darüber hinaus haben Sie den Scheduler erweitert, um Netzwerk-E/A effizient zu verwalten, und einen Netzwerkserver implementiert, der mehrere Verbindungen gleichzeitig verarbeiten kann.
Dieses Muster der Verwendung von Generatoren für kooperatives Multitasking ist eine leistungsstarke Technik, die vielen asynchronen Programmierframeworks in Python zugrunde liegt, wie beispielsweise dem integrierten asyncio-Modul. Dieser Ansatz bietet mehrere Vorteile, darunter einfacher sequenzieller Code, effiziente nicht-blockierende E/A-Verarbeitung, kooperatives Multitasking ohne mehrere Threads und feingranulare Kontrolle über die Aufgabenausführung. Diese Techniken sind wertvoll für das Bauen von leistungsstarken Netzwerkapplikationen und Systemen, die eine effiziente Verwaltung von gleichzeitigen Operationen erfordern.