Produzenten, Konsumenten und Pipelines

PythonPythonBeginner
Jetzt üben

This tutorial is from open-source community. Access the source code

💡 Dieser Artikel wurde von AI-Assistenten übersetzt. Um die englische Version anzuzeigen, können Sie hier klicken

Einführung

Generatoren sind ein nützliches Werkzeug für das Einrichten verschiedener Arten von Produzenten-/Verbraucher-Problemen und Datenfluss-Pipelines. In diesem Abschnitt wird darüber diskutiert.

Produzent-Verbraucher-Probleme

Generatoren stehen in enger Beziehung zu verschiedenen Formen von Produzent-Verbraucher-Problemen.

## Produzent
def follow(f):
 ...
    while True:
     ...
        yield line        ## Erzeugt den Wert in `line` unten
     ...

## Verbraucher
for line in follow(f):    ## Verbraucht den Wert von `yield` oben
 ...

yield erzeugt Werte, die for verbraucht.

Generator-Pipelines

Sie können diesen Aspekt von Generatoren nutzen, um Verarbeitungs-Pipelines aufzubauen (ähnlich wie Unix-Pipes).

producerVerarbeitungVerarbeitungVerbraucher

Verarbeitungs-Pipelines haben einen initialen Datenproduzenten, eine Reihe von Zwischenverarbeitungsstufen und einen Endverbraucher.

produzentVerarbeitungVerarbeitungVerbraucher

def producer():
 ...
    yield item
 ...

Der Produzent ist typischerweise ein Generator. Obwohl es auch eine Liste oder eine andere Sequenz sein könnte. yield liefert Daten in die Pipeline.

producerVerarbeitungVerarbeitungVerbraucher

def consumer(s):
    for item in s:
 ...

Der Verbraucher ist eine for-Schleife. Er bekommt Elemente und verarbeitet sie.

producerVerarbeitungVerarbeitungVerbraucher

def processing(s):
    for item in s:
 ...
        yield newitem
 ...

Zwischenverarbeitungsstufen konsumieren und produzieren Elemente gleichzeitig. Sie können den Datenstrom modifizieren. Sie können auch filtern (Elemente ausschließen).

producerVerarbeitungVerarbeitungVerbraucher

def producer():
 ...
    yield item          ## liefert das Element, das von der `Verarbeitung` empfangen wird
 ...

def processing(s):
    for item in s:      ## kommt vom `producer`
 ...
        yield newitem   ## liefert ein neues Element
 ...

def consumer(s):
    for item in s:      ## kommt von der `Verarbeitung`
 ...

Code, um die Pipeline aufzusetzen

a = producer()
b = processing(a)
c = consumer(b)

Sie werden feststellen, dass die Daten sukzessive durch die verschiedenen Funktionen fließen.

Für diese Übung sollte das Programm stocksim.py immer noch im Hintergrund laufen. Sie werden die Funktion follow(), die Sie im vorherigen Abschnitt geschrieben haben, verwenden.

Übung 6.8: Einfache Pipeline aufsetzen

Schauen wir uns die Pipeline-Idee im Einsatz an. Schreiben Sie die folgende Funktion:

>>> def filematch(lines, substr):
        for line in lines:
            if substr in line:
                yield line

>>>

Diese Funktion ist fast genau gleich wie das erste Generator-Beispiel aus der vorherigen Übung, nur dass sie keine Datei mehr öffnet, sondern lediglich auf einer Sequenz von Zeilen operiert, die ihr als Argument übergeben werden. Probieren Sie nun Folgendes aus:

>>> from follow import follow
>>> lines = follow('stocklog.csv')
>>> goog = filematch(lines, 'GOOG')
>>> for line in goog:
        print(line)

... warte auf Ausgabe...

Es kann einiger Zeit dauern, bis die Ausgabe erscheint, aber schließlich sollten Sie einige Zeilen sehen, die Daten für GOOG enthalten.

Hinweis: Diese Übungen müssen gleichzeitig auf zwei separaten Terminals ausgeführt werden.

Übung 6.9: Ein komplexere Pipeline aufsetzen

Verfolgen Sie die Pipeline-Idee ein paar Schritte weiter, indem Sie weitere Aktionen ausführen.

>>> from follow import follow
>>> import csv
>>> lines = follow('stocklog.csv')
>>> rows = csv.reader(lines)
>>> for row in rows:
        print(row)

['GOOG', '1502.08', '2023-10-01', '09:37.19', '1.83', '1500.25', '1502.08', '1500.25', '731']
['AAPL', '252.33', '2023-10-01', '09:37.19', '1.83', '250.50', '252.33', '250.50', '681']
['GOOG', '1502.09', '2023-10-01', '09:37.21', '1.84', '1500.25', '1502.09', '1500.25', '734']
['AAPL', '252.34', '2023-10-01', '09:37.21', '1.84', '250.50', '252.34', '250.50', '684']
['GOOG', '1502.10', '2023-10-01', '09:37.23', '1.85', '1500.25', '1502.10', '1500.25', '738']
['AAPL', '252.35', '2023-10-01', '09:37.23', '1.85', '250.50', '252.35', '250.50', '688']
...

Nun, das ist interessant. Was Sie hier sehen, ist, dass die Ausgabe der follow()-Funktion in die csv.reader()-Funktion geleitet wurde und wir jetzt eine Sequenz von aufgespaltenen Zeilen erhalten.

Übung 6.10: Weitere Pipeline-Komponenten erstellen

Lassen Sie uns die gesamte Idee in eine größere Pipeline erweitern. In einer separaten Datei ticker.py beginnen Sie mit der Erstellung einer Funktion, die eine CSV-Datei wie oben liest:

## ticker.py

from follow import follow
import csv

def parse_stock_data(lines):
    rows = csv.reader(lines)
    return rows

if __name__ == '__main__':
    lines = follow('stocklog.csv')
    rows = parse_stock_data(lines)
    for row in rows:
        print(row)

Schreiben Sie eine neue Funktion, die bestimmte Spalten auswählt:

## ticker.py

...
def select_columns(rows, indices):
for row in rows:
yield [row[index] for index in indices]
...
def parse_stock_data(lines):
rows = csv.reader(lines)
rows = select_columns(rows, [0, 1, 4])
return rows

Führen Sie Ihr Programm erneut aus. Sie sollten eine Ausgabe wie diese sehen:

['GOOG', '1503.06', '2.81']
['AAPL', '253.31', '2.81']
['GOOG', '1503.07', '2.82']
['AAPL', '253.32', '2.82']
['GOOG', '1503.08', '2.83']

...

Schreiben Sie Generatorfunktionen, die die Datentypen umwandeln und Dictionaries erstellen. Beispielsweise:

## ticker.py
...

def convert_types(rows, types):
    for row in rows:
        yield [func(val) for func, val in zip(types, row)]

def make_dicts(rows, headers):
    for row in rows:
        yield dict(zip(headers, row))
...
def parse_stock_data(lines):
    rows = csv.reader(lines)
    rows = select_columns(rows, [0, 1, 4])
    rows = convert_types(rows, [str, float, float])
    rows = make_dicts(rows, ['name', 'price', 'change'])
    return rows
...

Führen Sie Ihr Programm erneut aus. Sie sollten jetzt einen Strom von Dictionaries wie diesen sehen:

{'name': 'GOOG', 'price': 1503.4, 'change': 3.15}
{'name': 'AAPL', 'price': 253.65, 'change': 3.15}
{'name': 'GOOG', 'price': 1503.41, 'change': 3.16}
{'name': 'AAPL', 'price': 253.66, 'change': 3.16}
{'name': 'GOOG', 'price': 1503.42, 'change': 3.17}
{'name': 'AAPL', 'price': 253.67, 'change': 3.17}

...

Übung 6.11: Daten filtern

Schreiben Sie eine Funktion, die Daten filtert. Beispielsweise:

## ticker.py
...

def filter_symbols(rows, names):
    for row in rows:
        if row['GOOG'] in names:
            yield row

Verwenden Sie dies, um die Aktien auf nur die in Ihrem Portfolio zu filtern:

import report
import ticker
import follow
portfolio = report.read_portfolio('portfolio.csv')
rows = ticker.parse_stock_data(follow.follow('stocklog.csv'))
rows = ticker.filter_symbols(rows, portfolio)
for row in rows:
    print(row)

Diskussion

Einige Lehren, die man ziehen kann: Man kann verschiedene Generatorfunktionen erstellen und sie miteinander verketten, um Verarbeitungen durchzuführen, die Datenfluss-Pipelines betreffen. Darüber hinaus kann man Funktionen erstellen, die eine Reihe von Pipeline-Stufen in einen einzelnen Funktionsaufruf verpacken (z.B. die parse_stock_data()-Funktion).

Zusammenfassung

Herzlichen Glückwunsch! Sie haben das Lab zu Produzenten, Konsumenten und Pipelines abgeschlossen. Sie können in LabEx weitere Labs ausprobieren, um Ihre Fähigkeiten zu verbessern.