Datenverarbeitung mit Coroutinen

PythonPythonBeginner
Jetzt üben

This tutorial is from open-source community. Access the source code

💡 Dieser Artikel wurde von AI-Assistenten übersetzt. Um die englische Version anzuzeigen, können Sie hier klicken

Einführung

In diesem Lab lernen Sie, wie Sie Coroutinen (Korrekturen) verwenden, um Datenverarbeitungspipelines zu erstellen. Coroutinen, eine leistungsstarke Python-Funktion, unterstützen kooperatives Multitasking und ermöglichen es Funktionen, die Ausführung anzuhalten und später fortzusetzen.

Die Ziele dieses Labs sind es, zu verstehen, wie Coroutinen in Python funktionieren, Datenverarbeitungspipelines auf der Grundlage von Coroutinen zu implementieren und Daten durch mehrere Coroutinen-Stufen zu transformieren. Sie werden zwei Dateien erstellen: cofollow.py, ein auf Coroutinen basierter Dateifollower, und coticker.py, eine Aktien-Ticker-Anwendung, die Coroutinen nutzt. Es wird angenommen, dass das Programm stocksim.py aus einer früheren Übung weiterhin im Hintergrund läuft und Aktiendaten in einer Protokolldatei generiert.


Skills Graph

%%%%{init: {'theme':'neutral'}}%%%% flowchart RL python(("Python")) -.-> python/FunctionsGroup(["Functions"]) python(("Python")) -.-> python/ObjectOrientedProgrammingGroup(["Object-Oriented Programming"]) python(("Python")) -.-> python/AdvancedTopicsGroup(["Advanced Topics"]) python(("Python")) -.-> python/PythonStandardLibraryGroup(["Python Standard Library"]) python/FunctionsGroup -.-> python/function_definition("Function Definition") python/ObjectOrientedProgrammingGroup -.-> python/classes_objects("Classes and Objects") python/AdvancedTopicsGroup -.-> python/generators("Generators") python/AdvancedTopicsGroup -.-> python/threading_multiprocessing("Multithreading and Multiprocessing") python/PythonStandardLibraryGroup -.-> python/data_collections("Data Collections") subgraph Lab Skills python/function_definition -.-> lab-132524{{"Datenverarbeitung mit Coroutinen"}} python/classes_objects -.-> lab-132524{{"Datenverarbeitung mit Coroutinen"}} python/generators -.-> lab-132524{{"Datenverarbeitung mit Coroutinen"}} python/threading_multiprocessing -.-> lab-132524{{"Datenverarbeitung mit Coroutinen"}} python/data_collections -.-> lab-132524{{"Datenverarbeitung mit Coroutinen"}} end

Das Verständnis von Coroutinen anhand eines Dateifollowers

Beginnen wir damit, zu verstehen, was Coroutinen sind und wie sie in Python funktionieren. Eine Coroutine ist eine spezialisierte Version einer Generatorfunktion. In Python beginnen Funktionen normalerweise jedes Mal von vorne, wenn sie aufgerufen werden. Coroutinen sind jedoch anders. Sie können sowohl Daten verbrauchen als auch produzieren, und sie haben die Fähigkeit, ihre Ausführung anzuhalten und fortzusetzen. Das bedeutet, dass eine Coroutine ihre Operation an einem bestimmten Punkt anhalten und später genau dort fortsetzen kann, wo sie aufgehört hat.

Erstellen eines einfachen Dateifollowers mit Coroutinen

In diesem Schritt erstellen wir einen Dateifollower, der Coroutinen verwendet, um eine Datei auf neue Inhalte zu überwachen und diese zu verarbeiten. Dies ähnelt dem Unix-Befehl tail -f, der kontinuierlich das Ende einer Datei anzeigt und sich aktualisiert, wenn neue Zeilen hinzugefügt werden.

  1. Öffnen Sie den Code-Editor und erstellen Sie eine neue Datei mit dem Namen cofollow.py im Verzeichnis /home/labex/project. Hier werden wir unseren Python-Code schreiben, um den Dateifollower mit Coroutinen zu implementieren.

  2. Kopieren Sie den folgenden Code in die Datei:

## cofollow.py
import os
import time

## Data source
def follow(filename, target):
    with open(filename, 'r') as f:
        f.seek(0, os.SEEK_END)  ## Move to the end of the file
        while True:
            line = f.readline()
            if line != '':
                target.send(line)  ## Send the line to the target coroutine
            else:
                time.sleep(0.1)  ## Sleep briefly if no new content

## Decorator for coroutine functions
from functools import wraps

def consumer(func):
    @wraps(func)
    def start(*args, **kwargs):
        f = func(*args, **kwargs)
        f.send(None)  ## Prime the coroutine (necessary first step)
        return f
    return start

## Sample coroutine
@consumer
def printer():
    while True:
        item = yield     ## Receive an item sent to me
        print(item)

## Example use
if __name__ == '__main__':
    follow('stocklog.csv', printer())
  1. Lassen Sie uns die wichtigsten Bestandteile dieses Codes verstehen:

    • follow(filename, target): Diese Funktion ist für das Öffnen einer Datei verantwortlich. Sie bewegt zunächst den Dateizeiger an das Ende der Datei mit f.seek(0, os.SEEK_END). Dann tritt sie in eine Endlosschleife ein, in der sie kontinuierlich versucht, neue Zeilen aus der Datei zu lesen. Wenn eine neue Zeile gefunden wird, sendet sie diese Zeile an die Ziel-Coroutine mit der send-Methode. Wenn es keine neuen Inhalte gibt, hält sie kurz (0,1 Sekunden) mit time.sleep(0.1) an, bevor sie erneut prüft.
    • @consumer-Decorator: In Python müssen Coroutinen "initialisiert" werden, bevor sie Daten empfangen können. Dieser Decorator kümmert sich darum. Er sendet automatisch einen Anfangswert None an die Coroutine, was ein notwendiger erster Schritt ist, um die Coroutine für den Empfang echter Daten vorzubereiten.
    • printer()-Coroutine: Dies ist eine einfache Coroutine. Sie hat eine Endlosschleife, in der sie das yield-Schlüsselwort verwendet, um ein an sie gesendetes Element zu empfangen. Sobald sie ein Element empfängt, gibt sie es einfach aus.
  2. Speichern Sie die Datei und führen Sie sie aus dem Terminal aus:

cd /home/labex/project
python3 cofollow.py
  1. Sie sollten sehen, dass das Skript den Inhalt der Aktienprotokolldatei ausgibt und weiterhin neue Zeilen ausgibt, wenn sie zur Datei hinzugefügt werden. Drücken Sie Ctrl+C, um das Programm zu beenden.

Das Schlüsselkonzept hier ist, dass Daten von der follow-Funktion in die printer-Coroutine über die send-Methode fließen. Diese "Schiebung" von Daten steht im Gegensatz zu Generatoren, die Daten durch Iteration "ziehen". In einem Generator verwenden Sie normalerweise eine for-Schleife, um über die von ihm produzierten Werte zu iterieren. In diesem Coroutine-Beispiel werden die Daten jedoch aktiv von einem Teil des Codes an einen anderen gesendet.

✨ Lösung prüfen und üben

Erstellen von Komponenten für Coroutine-Pipelines

In diesem Schritt werden wir spezialisiertere Coroutinen zur Verarbeitung von Aktiendaten erstellen. Eine Coroutine ist eine spezielle Art von Funktion, die ihre Ausführung anhalten und fortsetzen kann, was für das Aufbauen von Datenverarbeitungspipelines sehr nützlich ist. Jede von uns erstellte Coroutine wird in unserer gesamten Verarbeitungspipeline eine bestimmte Aufgabe ausführen.

  1. Zunächst müssen Sie eine neue Datei erstellen. Navigieren Sie zum Verzeichnis /home/labex/project und erstellen Sie eine Datei mit dem Namen coticker.py. In dieser Datei wird der gesamte Code für unsere auf Coroutinen basierte Datenverarbeitung gespeichert.

  2. Jetzt beginnen wir, Code in die Datei coticker.py zu schreiben. Zuerst importieren wir die erforderlichen Module und definieren die Grundstruktur. Module sind vorgefertigte Codebibliotheken, die nützliche Funktionen und Klassen bereitstellen. Der folgende Code macht genau das:

## coticker.py
from structure import Structure

class Ticker(Structure):
    name = String()
    price = Float()
    date = String()
    time = String()
    change = Float()
    open = Float()
    high = Float()
    low = Float()
    volume = Integer()

from cofollow import consumer, follow
from tableformat import create_formatter
import csv
  1. Wenn Sie sich den obigen Code ansehen, werden Sie feststellen, dass es Fehler in Bezug auf String(), Float() und Integer() gibt. Dies sind Klassen, die wir importieren müssen. Daher fügen wir die erforderlichen Importe oben in die Datei ein. Auf diese Weise weiß Python, wo es diese Klassen finden kann. Hier ist der aktualisierte Code:
## coticker.py
from structure import Structure, String, Float, Integer

class Ticker(Structure):
    name = String()
    price = Float()
    date = String()
    time = String()
    change = Float()
    open = Float()
    high = Float()
    low = Float()
    volume = Integer()

from cofollow import consumer, follow
from tableformat import create_formatter
import csv
  1. Als Nächstes fügen wir die Coroutine-Komponenten hinzu, die unsere Datenverarbeitungspipeline bilden werden. Jede Coroutine hat in der Pipeline eine bestimmte Aufgabe. Hier ist der Code, um diese Coroutinen hinzuzufügen:
@consumer
def to_csv(target):
    def producer():
        while True:
            line = yield

    reader = csv.reader(producer())
    while True:
        line = yield
        target.send(next(reader))

@consumer
def create_ticker(target):
    while True:
        row = yield
        target.send(Ticker.from_row(row))

@consumer
def negchange(target):
    while True:
        record = yield
        if record.change < 0:
            target.send(record)

@consumer
def ticker(fmt, fields):
    formatter = create_formatter(fmt)
    formatter.headings(fields)
    while True:
        rec = yield
        row = [getattr(rec, name) for name in fields]
        formatter.row(row)
  1. Lassen Sie uns verstehen, was jede dieser Coroutinen tut:

    • to_csv: Ihre Aufgabe besteht darin, rohe Textzeilen in geparste CSV-Zeilen umzuwandeln. Dies ist wichtig, da unsere Daten zunächst im Textformat vorliegen und wir sie in strukturierte CSV-Daten aufteilen müssen.
    • create_ticker: Diese Coroutine nimmt die CSV-Zeilen und erstellt daraus Ticker-Objekte. Ticker-Objekte repräsentieren die Aktiendaten auf eine organisiertere Weise.
    • negchange: Sie filtert die Ticker-Objekte. Sie leitet nur die Aktien weiter, die negative Preisänderungen aufweisen. Dies hilft uns, uns auf die Aktien zu konzentrieren, die an Wert verlieren.
    • ticker: Diese Coroutine formatiert und zeigt die Ticker-Daten an. Sie verwendet einen Formatter, um die Daten in einer schönen, lesbaren Tabelle darzustellen.
  2. Schließlich müssen wir den Hauptprogrammcode hinzufügen, der alle diese Komponenten miteinander verbindet. Dieser Code wird den Datenfluss durch die Pipeline einrichten. Hier ist der Code:

if __name__ == '__main__':
    import sys

    ## Define the field names to display
    fields = ['name', 'price', 'change']

    ## Create the processing pipeline
    t = ticker('text', fields)
    neg_filter = negchange(t)
    tick_creator = create_ticker(neg_filter)
    csv_parser = to_csv(tick_creator)

    ## Connect the pipeline to the data source
    follow('stocklog.csv', csv_parser)
  1. Nachdem Sie den gesamten Code geschrieben haben, speichern Sie die Datei coticker.py. Öffnen Sie dann das Terminal und führen Sie die folgenden Befehle aus. Der cd-Befehl wechselt das Verzeichnis zu dem Ort, an dem sich unsere Datei befindet, und der python3-Befehl führt unser Python-Skript aus:
cd /home/labex/project
python3 coticker.py
  1. Wenn alles gut geht, sollten Sie im Terminal eine formatierte Tabelle sehen. Diese Tabelle zeigt Aktien mit negativen Preisänderungen. Die Ausgabe sieht in etwa so aus:
      name      price     change
---------- ---------- ----------
      MSFT      72.50      -0.25
        AA      35.25      -0.15
       IBM      50.10      -0.15
      GOOG     100.02      -0.01
      AAPL     102.50      -0.06

Denken Sie daran, dass die tatsächlichen Werte in der Tabelle je nach generierten Aktiendaten variieren können.

Das Verständnis des Pipeline-Flusses

Der wichtigste Teil dieses Programms ist, wie Daten durch die Coroutinen fließen. Lassen Sie uns dies Schritt für Schritt aufschlüsseln:

  1. Die follow-Funktion beginnt damit, Zeilen aus der Datei stocklog.csv zu lesen. Dies ist unsere Datenquelle.
  2. Jede gelesene Zeile wird dann an die csv_parser-Coroutine gesendet. Die csv_parser nimmt die rohe Textzeile und parst sie in CSV-Felder.
  3. Die geparsten CSV-Daten werden dann an die tick_creator-Coroutine gesendet. Diese Coroutine erstellt Ticker-Objekte aus den CSV-Zeilen.
  4. Die Ticker-Objekte werden dann an die neg_filter-Coroutine gesendet. Diese Coroutine prüft jedes Ticker-Objekt. Wenn die Aktie eine negative Preisänderung aufweist, leitet sie das Objekt weiter; andernfalls verwirft sie es.
  5. Schließlich werden die gefilterten Ticker-Objekte an die ticker-Coroutine gesendet. Die ticker-Coroutine formatiert die Daten und zeigt sie in einer Tabelle an.

Diese Pipeline-Architektur ist sehr nützlich, da sie es jeder Komponente ermöglicht, sich auf eine einzelne Aufgabe zu konzentrieren. Dies macht den Code modularer, was bedeutet, dass er leichter zu verstehen, zu ändern und zu warten ist.

✨ Lösung prüfen und üben

Verbessern der Coroutine-Pipeline

Nachdem wir nun eine grundlegende Pipeline eingerichtet und in Betrieb haben, ist es an der Zeit, sie flexibler zu gestalten. In der Programmierung ist Flexibilität von entscheidender Bedeutung, da sie unseren Code in die Lage versetzt, sich an verschiedene Anforderungen anzupassen. Wir werden dies erreichen, indem wir unser coticker.py-Programm so ändern, dass es verschiedene Filter- und Formatierungsoptionen unterstützt.

  1. Öffnen Sie zunächst die Datei coticker.py in Ihrem Code-Editor. Der Code-Editor ist der Ort, an dem Sie alle erforderlichen Änderungen am Programm vornehmen. Er bietet eine bequeme Umgebung, um Ihren Code anzuzeigen, zu bearbeiten und zu speichern.

  2. Als Nächstes fügen wir eine neue Coroutine hinzu, die Daten nach Aktienname filtert. Eine Coroutine ist eine spezielle Art von Funktion, die ihre Ausführung anhalten und fortsetzen kann. Dies ermöglicht es uns, eine Pipeline zu erstellen, in der Daten durch verschiedene Verarbeitungsschritte fließen können. Hier ist der Code für die neue Coroutine:

@consumer
def filter_by_name(name, target):
    while True:
        record = yield
        if record.name == name:
            target.send(record)

In diesem Code nimmt die filter_by_name-Coroutine einen Aktiennamen und eine Ziel-Coroutine als Parameter. Sie wartet kontinuierlich auf einen Datensatz mithilfe des yield-Schlüsselworts. Wenn ein Datensatz ankommt, prüft sie, ob der Name des Datensatzes mit dem angegebenen Namen übereinstimmt. Wenn dies der Fall ist, sendet sie den Datensatz an die Ziel-Coroutine.

  1. Jetzt fügen wir eine weitere Coroutine hinzu, die basierend auf Preis-Schwellenwerten filtert. Diese Coroutine wird uns helfen, Aktien in einem bestimmten Preisbereich auszuwählen. Hier ist der Code:
@consumer
def price_threshold(min_price, max_price, target):
    while True:
        record = yield
        if min_price <= record.price <= max_price:
            target.send(record)

Ähnlich wie die vorherige Coroutine wartet die price_threshold-Coroutine auf einen Datensatz. Sie prüft dann, ob der Preis des Datensatzes innerhalb des angegebenen Mindest- und Höchstpreises liegt. Wenn dies der Fall ist, sendet sie den Datensatz an die Ziel-Coroutine.

  1. Nachdem wir die neuen Coroutinen hinzugefügt haben, müssen wir das Hauptprogramm aktualisieren, um diese zusätzlichen Filter zu demonstrieren. Das Hauptprogramm ist der Einstiegspunkt unserer Anwendung, in dem wir die Verarbeitungspipelines einrichten und den Datenfluss starten. Hier ist der aktualisierte Code:
if __name__ == '__main__':
    import sys

    ## Define the field names to display
    fields = ['name', 'price', 'change', 'high', 'low']

    ## Create the processing pipeline with multiple outputs

    ## Pipeline 1: Show all negative changes (same as before)
    print("Stocks with negative changes:")
    t1 = ticker('text', fields)
    neg_filter = negchange(t1)
    tick_creator1 = create_ticker(neg_filter)
    csv_parser1 = to_csv(tick_creator1)

    ## Start following the file with the first pipeline
    import threading
    threading.Thread(target=follow, args=('stocklog.csv', csv_parser1), daemon=True).start()

    ## Wait a moment to see some results
    import time
    time.sleep(5)

    ## Pipeline 2: Filter by name (AAPL)
    print("\nApple stock updates:")
    t2 = ticker('text', fields)
    name_filter = filter_by_name('AAPL', t2)
    tick_creator2 = create_ticker(name_filter)
    csv_parser2 = to_csv(tick_creator2)

    ## Follow the file with the second pipeline
    threading.Thread(target=follow, args=('stocklog.csv', csv_parser2), daemon=True).start()

    ## Wait a moment to see some results
    time.sleep(5)

    ## Pipeline 3: Filter by price range
    print("\nStocks priced between 50 and 75:")
    t3 = ticker('text', fields)
    price_filter = price_threshold(50, 75, t3)
    tick_creator3 = create_ticker(price_filter)
    csv_parser3 = to_csv(tick_creator3)

    ## Follow with the third pipeline
    follow('stocklog.csv', csv_parser3)

In diesem aktualisierten Code erstellen wir drei verschiedene Verarbeitungspipelines. Die erste Pipeline zeigt Aktien mit negativen Änderungen, die zweite Pipeline filtert Aktien nach dem Namen 'AAPL' und die dritte Pipeline filtert Aktien basierend auf einem Preisbereich zwischen 50 und 75. Wir verwenden Threads, um die ersten beiden Pipelines gleichzeitig auszuführen, was es uns ermöglicht, Daten effizienter zu verarbeiten.

  1. Nachdem Sie alle Änderungen vorgenommen haben, speichern Sie die Datei. Das Speichern der Datei stellt sicher, dass alle Ihre Änderungen beibehalten werden. Führen Sie dann das aktualisierte Programm mit den folgenden Befehlen in Ihrem Terminal aus:
cd /home/labex/project
python3 coticker.py

Der cd-Befehl wechselt das aktuelle Verzeichnis in das Projektverzeichnis, und der Befehl python3 coticker.py führt das Python-Programm aus.

  1. Nach dem Ausführen des Programms sollten Sie drei verschiedene Ausgaben sehen:
    • Zunächst werden Sie Aktien mit negativen Änderungen sehen.
    • Dann werden Sie alle Aktienaktualisierungen von AAPL sehen.
    • Schließlich werden Sie alle Aktien sehen, deren Preis zwischen 50 und 75 liegt.

Das Verständnis der verbesserten Pipeline

Das verbesserte Programm demonstriert mehrere wichtige Konzepte:

  1. Mehrere Pipelines: Wir können mehrere Verarbeitungspipelines aus derselben Datenquelle erstellen. Dies ermöglicht es uns, verschiedene Arten von Analysen an denselben Daten gleichzeitig durchzuführen.
  2. Spezialisierte Filter: Wir können verschiedene Coroutinen für bestimmte Filteraufgaben erstellen. Diese Filter helfen uns, nur die Daten auszuwählen, die unseren spezifischen Kriterien entsprechen.
  3. Gleichzeitige Verarbeitung: Mit Hilfe von Threads können wir mehrere Pipelines gleichzeitig ausführen. Dies verbessert die Effizienz unseres Programms, indem es uns ermöglicht, Daten parallel zu verarbeiten.
  4. Pipeline-Zusammensetzung: Coroutinen können auf verschiedene Weise kombiniert werden, um verschiedene Datenverarbeitungsziele zu erreichen. Dies gibt uns die Flexibilität, unsere Datenverarbeitungspipelines nach unseren Bedürfnissen anzupassen.

Dieser Ansatz bietet eine flexible und modulare Möglichkeit, Streaming-Daten zu verarbeiten. Er ermöglicht es Ihnen, Verarbeitungsschritte hinzuzufügen oder zu ändern, ohne die Gesamtarchitektur des Programms zu ändern.

✨ Lösung prüfen und üben

Zusammenfassung

In diesem Lab haben Sie gelernt, wie Sie Coroutinen in Python verwenden, um Datenverarbeitungspipelines aufzubauen. Zu den Schlüsselkonzepten gehört das Verständnis der Grundlagen von Coroutinen, wie beispielsweise wie sie funktionieren, warum sie initialisiert werden müssen und wie Sie Decorators zur Initialisierung nutzen. Sie haben auch den Datenfluss untersucht und gelernt, wie Sie Daten über die send()-Methode durch eine Pipeline schieben, was sich vom "Pull"-Modell von Generatoren unterscheidet.

Darüber hinaus haben Sie spezialisierte Coroutinen für Aufgaben wie das Parsen von CSV-Daten, das Filtern von Datensätzen und das Formatieren der Ausgabe erstellt. Sie haben gelernt, Pipelines zu erstellen, indem Sie mehrere Coroutinen verbinden, und Filter- und Transformationsoperationen implementiert. Coroutinen bieten einen leistungsstarken Ansatz für die Verarbeitung von Streaming-Daten, der eine klare Trennung von Aufgaben ermöglicht und die einfache Modifikation einzelner Stufen erlaubt.