Traitement de données basé sur les coroutines

PythonPythonBeginner
Pratiquer maintenant

This tutorial is from open-source community. Access the source code

💡 Ce tutoriel est traduit par l'IA à partir de la version anglaise. Pour voir la version originale, vous pouvez cliquer ici

Introduction

Dans ce laboratoire, vous apprendrez à utiliser les coroutines pour construire des pipelines de traitement de données. Les coroutines, une fonctionnalité puissante de Python, prennent en charge le multitâche coopératif, permettant aux fonctions de suspendre et de reprendre leur exécution ultérieurement.

Les objectifs de ce laboratoire sont de comprendre le fonctionnement des coroutines en Python, de mettre en œuvre des pipelines de traitement de données basés sur les coroutines et de transformer les données à travers plusieurs étapes de coroutines. Vous allez créer deux fichiers : cofollow.py, un suivi de fichier basé sur les coroutines, et coticker.py, une application de suivi des cours boursiers utilisant les coroutines. On suppose que le programme stocksim.py de l'exercice précédent est toujours en cours d'exécution en arrière-plan, générant des données boursières dans un fichier journal.


Skills Graph

%%%%{init: {'theme':'neutral'}}%%%% flowchart RL python(("Python")) -.-> python/ObjectOrientedProgrammingGroup(["Object-Oriented Programming"]) python(("Python")) -.-> python/AdvancedTopicsGroup(["Advanced Topics"]) python(("Python")) -.-> python/PythonStandardLibraryGroup(["Python Standard Library"]) python(("Python")) -.-> python/FunctionsGroup(["Functions"]) python/FunctionsGroup -.-> python/function_definition("Function Definition") python/ObjectOrientedProgrammingGroup -.-> python/classes_objects("Classes and Objects") python/AdvancedTopicsGroup -.-> python/generators("Generators") python/AdvancedTopicsGroup -.-> python/threading_multiprocessing("Multithreading and Multiprocessing") python/PythonStandardLibraryGroup -.-> python/data_collections("Data Collections") subgraph Lab Skills python/function_definition -.-> lab-132524{{"Traitement de données basé sur les coroutines"}} python/classes_objects -.-> lab-132524{{"Traitement de données basé sur les coroutines"}} python/generators -.-> lab-132524{{"Traitement de données basé sur les coroutines"}} python/threading_multiprocessing -.-> lab-132524{{"Traitement de données basé sur les coroutines"}} python/data_collections -.-> lab-132524{{"Traitement de données basé sur les coroutines"}} end

Comprendre les coroutines avec un suivi de fichier

Commençons par comprendre ce que sont les coroutines et comment elles fonctionnent en Python. Une coroutine est une version spécialisée d'une fonction génératrice. En Python, les fonctions commencent généralement au début chaque fois qu'elles sont appelées. Mais les coroutines sont différentes. Elles peuvent à la fois consommer et produire des données, et elles ont la capacité de suspendre et de reprendre leur exécution. Cela signifie qu'une coroutine peut suspendre son opération à un certain moment puis reprendre exactement là où elle s'était arrêtée plus tard.

Création d'un suivi de fichier de base utilisant les coroutines

Dans cette étape, nous allons créer un suivi de fichier qui utilise les coroutines pour surveiller un fichier à la recherche de nouveau contenu et le traiter. Cela est similaire à la commande Unix tail -f, qui affiche en continu la fin d'un fichier et se met à jour à mesure que de nouvelles lignes sont ajoutées.

  1. Ouvrez l'éditeur de code et créez un nouveau fichier nommé cofollow.py dans le répertoire /home/labex/project. C'est là que nous allons écrire notre code Python pour implémenter le suivi de fichier en utilisant les coroutines.

  2. Copiez le code suivant dans le fichier :

## cofollow.py
import os
import time

## Data source
def follow(filename, target):
    with open(filename, 'r') as f:
        f.seek(0, os.SEEK_END)  ## Move to the end of the file
        while True:
            line = f.readline()
            if line != '':
                target.send(line)  ## Send the line to the target coroutine
            else:
                time.sleep(0.1)  ## Sleep briefly if no new content

## Decorator for coroutine functions
from functools import wraps

def consumer(func):
    @wraps(func)
    def start(*args, **kwargs):
        f = func(*args, **kwargs)
        f.send(None)  ## Prime the coroutine (necessary first step)
        return f
    return start

## Sample coroutine
@consumer
def printer():
    while True:
        item = yield     ## Receive an item sent to me
        print(item)

## Example use
if __name__ == '__main__':
    follow('stocklog.csv', printer())
  1. Comprenons les éléments clés de ce code :

    • follow(filename, target) : Cette fonction est chargée d'ouvrir un fichier. Elle déplace d'abord le pointeur de fichier à la fin du fichier en utilisant f.seek(0, os.SEEK_END). Ensuite, elle entre dans une boucle infinie où elle essaie continuellement de lire de nouvelles lignes dans le fichier. Si une nouvelle ligne est trouvée, elle envoie cette ligne à la coroutine cible en utilisant la méthode send. S'il n'y a pas de nouveau contenu, elle s'arrête brièvement (0,1 seconde) en utilisant time.sleep(0.1) avant de vérifier à nouveau.
    • Décorateur @consumer : En Python, les coroutines doivent être « amorcées » avant de pouvoir commencer à recevoir des données. Ce décorateur s'en charge. Il envoie automatiquement une valeur initiale None à la coroutine, ce qui est une étape nécessaire pour préparer la coroutine à recevoir des données réelles.
    • Coroutine printer() : Il s'agit d'une simple coroutine. Elle a une boucle infinie où elle utilise le mot-clé yield pour recevoir un élément qui lui est envoyé. Une fois qu'elle reçoit un élément, elle le affiche simplement.
  2. Enregistrez le fichier et exécutez-le depuis le terminal :

cd /home/labex/project
python3 cofollow.py
  1. Vous devriez voir le script afficher le contenu du fichier de journal des actions, et il continuera d'afficher les nouvelles lignes à mesure qu'elles sont ajoutées au fichier. Appuyez sur Ctrl+C pour arrêter le programme.

Le concept clé ici est que les données circulent de la fonction follow vers la coroutine printer via la méthode send. Cette « poussée » de données est l'opposé des générateurs, qui « tirent » les données par itération. Dans un générateur, vous utilisez généralement une boucle for pour itérer sur les valeurs qu'il produit. Mais dans cet exemple de coroutine, les données sont activement envoyées d'une partie du code à une autre.

✨ Vérifier la solution et pratiquer

Création de composants de pipeline de coroutines

Dans cette étape, nous allons créer des coroutines plus spécialisées pour traiter les données boursières. Une coroutine est un type spécial de fonction qui peut suspendre et reprendre son exécution, ce qui est très utile pour construire des pipelines de traitement de données. Chaque coroutine que nous allons créer effectuera une tâche spécifique dans notre pipeline de traitement global.

  1. Tout d'abord, vous devez créer un nouveau fichier. Accédez au répertoire /home/labex/project et créez un fichier nommé coticker.py. Ce fichier contiendra tout le code pour notre traitement de données basé sur les coroutines.

  2. Maintenant, commençons à écrire du code dans le fichier coticker.py. Nous allons d'abord importer les modules nécessaires et définir la structure de base. Les modules sont des bibliothèques de code pré - écrites qui fournissent des fonctions et des classes utiles. Le code suivant fait exactement cela :

## coticker.py
from structure import Structure

class Ticker(Structure):
    name = String()
    price = Float()
    date = String()
    time = String()
    change = Float()
    open = Float()
    high = Float()
    low = Float()
    volume = Integer()

from cofollow import consumer, follow
from tableformat import create_formatter
import csv
  1. Lorsque vous regardez le code ci - dessus, vous remarquerez qu'il y a des erreurs liées à String(), Float() et Integer(). Ce sont des classes que nous devons importer. Nous allons donc ajouter les importations nécessaires en haut du fichier. De cette façon, Python sait où trouver ces classes. Voici le code mis à jour :
## coticker.py
from structure import Structure, String, Float, Integer

class Ticker(Structure):
    name = String()
    price = Float()
    date = String()
    time = String()
    change = Float()
    open = Float()
    high = Float()
    low = Float()
    volume = Integer()

from cofollow import consumer, follow
from tableformat import create_formatter
import csv
  1. Ensuite, nous allons ajouter les composants de coroutine qui formeront notre pipeline de traitement de données. Chaque coroutine a une tâche spécifique dans le pipeline. Voici le code pour ajouter ces coroutines :
@consumer
def to_csv(target):
    def producer():
        while True:
            line = yield

    reader = csv.reader(producer())
    while True:
        line = yield
        target.send(next(reader))

@consumer
def create_ticker(target):
    while True:
        row = yield
        target.send(Ticker.from_row(row))

@consumer
def negchange(target):
    while True:
        record = yield
        if record.change < 0:
            target.send(record)

@consumer
def ticker(fmt, fields):
    formatter = create_formatter(fmt)
    formatter.headings(fields)
    while True:
        rec = yield
        row = [getattr(rec, name) for name in fields]
        formatter.row(row)
  1. Comprenons ce que chaque coroutine fait :

    • to_csv : Son rôle est de convertir les lignes de texte brutes en lignes CSV analysées. C'est important car nos données sont initialement au format texte, et nous devons les diviser en données CSV structurées.
    • create_ticker : Cette coroutine prend les lignes CSV et crée des objets Ticker à partir d'elles. Les objets Ticker représentent les données boursières de manière plus organisée.
    • negchange : Elle filtre les objets Ticker. Elle ne transmet que les actions dont le prix a diminué. Cela nous permet de nous concentrer sur les actions qui perdent de la valeur.
    • ticker : Cette coroutine formate et affiche les données des actions. Elle utilise un formateur pour présenter les données dans un tableau agréable et lisible.
  2. Enfin, nous devons ajouter le code du programme principal qui connecte tous ces composants ensemble. Ce code configurera le flux de données à travers le pipeline. Voici le code :

if __name__ == '__main__':
    import sys

    ## Define the field names to display
    fields = ['name', 'price', 'change']

    ## Create the processing pipeline
    t = ticker('text', fields)
    neg_filter = negchange(t)
    tick_creator = create_ticker(neg_filter)
    csv_parser = to_csv(tick_creator)

    ## Connect the pipeline to the data source
    follow('stocklog.csv', csv_parser)
  1. Après avoir écrit tout le code, enregistrez le fichier coticker.py. Ensuite, ouvrez le terminal et exécutez les commandes suivantes. La commande cd change le répertoire pour celui où se trouve notre fichier, et la commande python3 exécute notre script Python :
cd /home/labex/project
python3 coticker.py
  1. Si tout se passe bien, vous devriez voir un tableau formaté dans le terminal. Ce tableau montre les actions dont le prix a diminué. La sortie ressemblera à ceci :
      name      price     change
---------- ---------- ----------
      MSFT      72.50      -0.25
        AA      35.25      -0.15
       IBM      50.10      -0.15
      GOOG     100.02      -0.01
      AAPL     102.50      -0.06

Gardez à l'esprit que les valeurs réelles dans le tableau peuvent varier en fonction des données boursières générées.

Comprendre le flux du pipeline

La partie la plus importante de ce programme est la façon dont les données circulent à travers les coroutines. Découpons - la étape par étape :

  1. La fonction follow commence par lire les lignes du fichier stocklog.csv. C'est notre source de données.
  2. Chaque ligne lue est ensuite envoyée à la coroutine csv_parser. Le csv_parser prend la ligne de texte brute et l'analyse en champs CSV.
  3. Les données CSV analysées sont ensuite envoyées à la coroutine tick_creator. Cette coroutine crée des objets Ticker à partir des lignes CSV.
  4. Les objets Ticker sont ensuite envoyés à la coroutine neg_filter. Cette coroutine vérifie chaque objet Ticker. Si le prix de l'action a diminué, elle transmet l'objet ; sinon, elle le rejette.
  5. Enfin, les objets Ticker filtrés sont envoyés à la coroutine ticker. La coroutine ticker formate les données et les affiche dans un tableau.

Cette architecture de pipeline est très utile car elle permet à chaque composant de se concentrer sur une seule tâche. Cela rend le code plus modulaire, ce qui signifie qu'il est plus facile à comprendre, à modifier et à maintenir.

✨ Vérifier la solution et pratiquer

Amélioration du pipeline de coroutines

Maintenant que nous avons un pipeline de base opérationnel, il est temps de le rendre plus flexible. En programmation, la flexibilité est cruciale car elle permet à notre code de s'adapter à différentes exigences. Nous allons y parvenir en modifiant notre programme coticker.py pour prendre en charge diverses options de filtrage et de formatage.

  1. Tout d'abord, ouvrez le fichier coticker.py dans votre éditeur de code. C'est dans l'éditeur de code que vous allez apporter toutes les modifications nécessaires au programme. Il offre un environnement pratique pour visualiser, éditer et enregistrer votre code.

  2. Ensuite, nous allons ajouter une nouvelle coroutine qui filtre les données par nom d'action. Une coroutine est un type spécial de fonction qui peut suspendre et reprendre son exécution. Cela nous permet de créer un pipeline où les données peuvent traverser différentes étapes de traitement. Voici le code de la nouvelle coroutine :

@consumer
def filter_by_name(name, target):
    while True:
        record = yield
        if record.name == name:
            target.send(record)

Dans ce code, la coroutine filter_by_name prend un nom d'action et une coroutine cible en paramètres. Elle attend continuellement un enregistrement en utilisant le mot - clé yield. Lorsqu'un enregistrement arrive, elle vérifie si le nom de l'enregistrement correspond au nom spécifié. Si c'est le cas, elle envoie l'enregistrement à la coroutine cible.

  1. Maintenant, ajoutons une autre coroutine qui filtre en fonction de seuils de prix. Cette coroutine nous aidera à sélectionner les actions dans une plage de prix spécifique. Voici le code :
@consumer
def price_threshold(min_price, max_price, target):
    while True:
        record = yield
        if min_price <= record.price <= max_price:
            target.send(record)

De même que la coroutine précédente, la coroutine price_threshold attend un enregistrement. Elle vérifie ensuite si le prix de l'enregistrement se trouve dans la plage de prix minimale et maximale spécifiée. Si c'est le cas, elle envoie l'enregistrement à la coroutine cible.

  1. Après avoir ajouté les nouvelles coroutines, nous devons mettre à jour le programme principal pour démontrer ces filtres supplémentaires. Le programme principal est le point d'entrée de notre application, où nous configurons les pipelines de traitement et démarrons le flux de données. Voici le code mis à jour :
if __name__ == '__main__':
    import sys

    ## Define the field names to display
    fields = ['name', 'price', 'change', 'high', 'low']

    ## Create the processing pipeline with multiple outputs

    ## Pipeline 1: Show all negative changes (same as before)
    print("Stocks with negative changes:")
    t1 = ticker('text', fields)
    neg_filter = negchange(t1)
    tick_creator1 = create_ticker(neg_filter)
    csv_parser1 = to_csv(tick_creator1)

    ## Start following the file with the first pipeline
    import threading
    threading.Thread(target=follow, args=('stocklog.csv', csv_parser1), daemon=True).start()

    ## Wait a moment to see some results
    import time
    time.sleep(5)

    ## Pipeline 2: Filter by name (AAPL)
    print("\nApple stock updates:")
    t2 = ticker('text', fields)
    name_filter = filter_by_name('AAPL', t2)
    tick_creator2 = create_ticker(name_filter)
    csv_parser2 = to_csv(tick_creator2)

    ## Follow the file with the second pipeline
    threading.Thread(target=follow, args=('stocklog.csv', csv_parser2), daemon=True).start()

    ## Wait a moment to see some results
    time.sleep(5)

    ## Pipeline 3: Filter by price range
    print("\nStocks priced between 50 and 75:")
    t3 = ticker('text', fields)
    price_filter = price_threshold(50, 75, t3)
    tick_creator3 = create_ticker(price_filter)
    csv_parser3 = to_csv(tick_creator3)

    ## Follow with the third pipeline
    follow('stocklog.csv', csv_parser3)

Dans ce code mis à jour, nous créons trois pipelines de traitement différents. Le premier pipeline affiche les actions avec des variations négatives, le deuxième filtre les actions par le nom 'AAPL', et le troisième filtre les actions en fonction d'une plage de prix entre 50 et 75. Nous utilisons des threads pour exécuter les deux premiers pipelines de manière concurrente, ce qui nous permet de traiter les données plus efficacement.

  1. Une fois que vous avez apporté toutes les modifications, enregistrez le fichier. Enregistrer le fichier garantit que toutes vos modifications sont conservées. Ensuite, exécutez le programme mis à jour en utilisant les commandes suivantes dans votre terminal :
cd /home/labex/project
python3 coticker.py

La commande cd change le répertoire actuel pour le répertoire du projet, et la commande python3 coticker.py exécute le programme Python.

  1. Après avoir exécuté le programme, vous devriez voir trois sorties différentes :
    • Tout d'abord, vous verrez les actions avec des variations négatives.
    • Ensuite, vous verrez toutes les mises à jour des actions AAPL.
    • Enfin, vous verrez toutes les actions dont le prix est compris entre 50 et 75.

Comprendre le pipeline amélioré

Le programme amélioré démontre plusieurs concepts importants :

  1. Plusieurs pipelines : Nous pouvons créer plusieurs pipelines de traitement à partir de la même source de données. Cela nous permet d'effectuer différents types d'analyses sur les mêmes données simultanément.
  2. Filtres spécialisés : Nous pouvons créer différentes coroutines pour des tâches de filtrage spécifiques. Ces filtres nous aident à sélectionner uniquement les données qui répondent à nos critères spécifiques.
  3. Traitement concurrent : En utilisant des threads, nous pouvons exécuter plusieurs pipelines de manière concurrente. Cela améliore l'efficacité de notre programme en lui permettant de traiter les données en parallèle.
  4. Composition de pipelines : Les coroutines peuvent être combinées de différentes manières pour atteindre différents objectifs de traitement de données. Cela nous donne la flexibilité de personnaliser nos pipelines de traitement de données selon nos besoins.

Cette approche offre une façon flexible et modulaire de traiter les données en continu. Elle vous permet d'ajouter ou de modifier des étapes de traitement sans changer l'architecture globale du programme.

✨ Vérifier la solution et pratiquer

Résumé

Dans ce laboratoire (lab), vous avez appris à utiliser les coroutines pour construire des pipelines de traitement de données en Python. Les concepts clés incluent la compréhension des bases des coroutines, telles que leur fonctionnement, la nécessité de les initialiser (priming) et l'utilisation de décorateurs pour l'initialisation. Vous avez également exploré le flux de données, en poussant les données à travers un pipeline via la méthode send(), ce qui diffère du modèle « pull » des générateurs.

De plus, vous avez créé des coroutines spécialisées pour des tâches telles que l'analyse de données CSV, le filtrage d'enregistrements et la mise en forme de la sortie. Vous avez appris à composer des pipelines en connectant plusieurs coroutines et à implémenter des opérations de filtrage et de transformation. Les coroutines offrent une approche puissante pour le traitement de données en continu, permettant une séparation claire des préoccupations et une modification facile des étapes individuelles.