Producteurs, Consommateurs et Pipelines

Beginner

This tutorial is from open-source community. Access the source code

Introduction

Les générateurs sont un outil utile pour résoudre différents types de problèmes producteur/consommateur et pour créer des pipelines de flux de données. Cette section en traite.

Problèmes producteur-consommateur

Les générateurs sont étroitement liés à diverses formes de problèmes producteur-consommateur.

## Producteur
def follow(f):
  ...
    while True:
      ...
        yield line        ## Génère la valeur dans `line` ci-dessous
      ...

## Consommateur
for line in follow(f):    ## Consomme la valeur provenant de `yield` ci-dessus
  ...

yield génère des valeurs que for consomme.

Pipelines de générateurs

Vous pouvez utiliser cet aspect des générateurs pour créer des pipelines de traitement (comme les tubes Unix).

producteurtraitementtraitementconsommateur

Les pipelines de traitement ont un producteur de données initial, un certain nombre d'étapes de traitement intermédiaires et un consommateur final.

producteurtraitementtraitementconsommateur

def producteur():
 ...
    yield élément
 ...

Le producteur est généralement un générateur. Bien qu'il puisse également être une liste ou une autre séquence. yield alimente les données dans le pipeline.

producteurtraitementtraitementconsommateur

def consommateur(s):
    for élément in s:
     ...

Le consommateur est une boucle for. Il reçoit les éléments et fait quelque chose avec eux.

producteurtraitementtraitementconsommateur

def traitement(s):
    for élément in s:
     ...
        yield nouvelélément
     ...

Les étapes de traitement intermédiaires consomment et produisent simultanément des éléments. Elles peuvent modifier le flux de données. Elles peuvent également filtrer (en éliminant des éléments).

producteurtraitementtraitementconsommateur

def producteur():
 ...
    yield élément          ## produit l'élément reçu par le `traitement`
 ...

def traitement(s):
    for élément in s:      ## Vient du `producteur`
     ...
        yield nouvelélément   ## produit un nouvel élément
     ...

def consommateur(s):
    for élément in s:      ## Vient du `traitement`
     ...

Code pour configurer le pipeline

a = producteur()
b = traitement(a)
c = consommateur(b)

Vous remarquerez que les données circulent progressivement à travers les différentes fonctions.

Pour cet exercice, le programme stocksim.py devrait toujours être exécuté en arrière-plan. Vous allez utiliser la fonction suivre() que vous avez écrite dans l'exercice précédent.

Exercice 6.8 : Configuration d'un pipeline simple

Voyons cette idée de pipeline en action. Écrivez la fonction suivante :

>>> def filematch(lignes, sous_chaîne):
        for ligne in lignes:
            if sous_chaîne in ligne:
                yield ligne

>>>

Cette fonction est presque exactement la même que le premier exemple de générateur de l'exercice précédent, sauf qu'elle n'ouvre plus un fichier - elle opère simplement sur une séquence de lignes passée en argument. Maintenant, essayez ceci :

>>> from suivre import suivre
>>> lignes = suivre('stocklog.csv')
>>> goog = filematch(lignes, 'GOOG')
>>> for ligne in goog:
        print(ligne)

... attendez la sortie...

Il peut prendre un certain temps pour que la sortie apparaisse, mais finalement vous devriez voir quelques lignes contenant des données pour GOOG.

Note : Ces exercices doivent être exécutés simultanément sur deux terminaux distincts.

Exercice 6.9 : Configuration d'un pipeline plus complexe

Poussez l'idée de pipeline un peu plus loin en effectuant plus d'actions.

>>> from suivre import suivre
>>> import csv
>>> lignes = suivre('stocklog.csv')
>>> lignes_csv = csv.reader(lignes)
>>> for ligne_csv in lignes_csv:
        print(ligne_csv)

['GOOG', '1502.08', '2023-10-01', '09:37.19', '1.83', '1500.25', '1502.08', '1500.25', '731']
['AAPL', '252.33', '2023-10-01', '09:37.19', '1.83', '250.50', '252.33', '250.50', '681']
['GOOG', '1502.09', '2023-10-01', '09:37.21', '1.84', '1500.25', '1502.09', '1500.25', '734']
['AAPL', '252.34', '2023-10-01', '09:37.21', '1.84', '250.50', '252.34', '250.50', '684']
['GOOG', '1502.10', '2023-10-01', '09:37.23', '1.85', '1500.25', '1502.10', '1500.25', '738']
['AAPL', '252.35', '2023-10-01', '09:37.23', '1.85', '250.50', '252.35', '250.50', '688']
...

Eh bien, c'est intéressant. Ce que vous voyez ici est que la sortie de la fonction suivre() a été acheminée vers la fonction csv.reader() et que nous obtenons maintenant une séquence de lignes divisées.

Exercice 6.10 : Création de plus de composants de pipeline

Étendons l'idée globale en un pipeline plus important. Dans un fichier séparé ticker.py, commençons par créer une fonction qui lit un fichier CSV comme vous l'avez fait ci-dessus :

## ticker.py

from suivre import suivre
import csv

def parser_données_boursières(lignes):
    lignes_csv = csv.reader(lignes)
    return lignes_csv

if __name__ == '__main__':
    lignes = suivre('stocklog.csv')
    lignes_csv = parser_données_boursières(lignes)
    for ligne_csv in lignes_csv:
        print(ligne_csv)

Écrivez une nouvelle fonction qui sélectionne des colonnes spécifiques :

## ticker.py

... def sélectionner_colonnes(lignes_csv, indices): for ligne_csv in lignes_csv: yield [ligne_csv[index] for index in indices] ... def parser_données_boursières(lignes): lignes_csv = csv.reader(lignes) lignes_csv = sélectionner_colonnes(lignes_csv, [0, 1, 4]) return lignes_csv

Exécutez votre programme à nouveau. Vous devriez voir une sortie réduite comme ceci :

['GOOG', '1503.06', '2.81']
['AAPL', '253.31', '2.81']
['GOOG', '1503.07', '2.82']
['AAPL', '253.32', '2.82']
['GOOG', '1503.08', '2.83']

...

Écrivez des fonctions génératrices qui convertissent les types de données et construisent des dictionnaires. Par exemple :

## ticker.py
...

def convertir_types(lignes_csv, types):
    for ligne_csv in lignes_csv:
        yield [func(val) for func, val in zip(types, ligne_csv)]

def créer_dictionnaires(lignes_csv, en-têtes):
    for ligne_csv in lignes_csv:
        yield dict(zip(en-têtes, ligne_csv))
...
def parser_données_boursières(lignes):
    lignes_csv = csv.reader(lignes)
    lignes_csv = sélectionner_colonnes(lignes_csv, [0, 1, 4])
    lignes_csv = convertir_types(lignes_csv, [str, float, float])
    lignes_csv = créer_dictionnaires(lignes_csv, ['nom', 'prix', 'variation'])
    return lignes_csv
...

Exécutez votre programme à nouveau. Vous devriez maintenant avoir un flux de dictionnaires comme ceci :

{'nom': 'GOOG', 'prix': 1503.4, 'variation': 3.15}
{'nom': 'AAPL', 'prix': 253.65, 'variation': 3.15}
{'nom': 'GOOG', 'prix': 1503.41, 'variation': 3.16}
{'nom': 'AAPL', 'prix': 253.66, 'variation': 3.16}
{'nom': 'GOOG', 'prix': 1503.42, 'variation': 3.17}
{'nom': 'AAPL', 'prix': 253.67, 'variation': 3.17}

...

Exercice 6.11 : Filtrer des données

Écrivez une fonction qui filtre des données. Par exemple :

## ticker.py
...

def filtrer_symboles(lignes, noms):
    for ligne in lignes:
        if ligne['GOOG'] in noms:
            yield ligne

Utilisez cela pour filtrer les actions uniquement celles de votre portefeuille :

import rapport
import cotation
import suivre
portefeuille = rapport.lire_portefeuille('portefeuille.csv')
lignes = cotation.parser_données_boursières(suivre.suivre('stocklog.csv'))
lignes = cotation.filtrer_symboles(lignes, portefeuille)
for ligne in lignes:
    print(ligne)

Discussion

Certains enseignements tirés : Vous pouvez créer diverses fonctions génératrices et les chaîner ensemble pour effectuer un traitement impliquant des pipelines de flux de données. De plus, vous pouvez créer des fonctions qui emballent une série d'étapes de pipeline dans un seul appel de fonction (par exemple, la fonction parser_données_boursières()).

Sommaire

Félicitations ! Vous avez terminé le laboratoire sur les producteurs, les consommateurs et les pipelines. Vous pouvez pratiquer d'autres laboratoires sur LabEx pour améliorer vos compétences.