Procesamiento de datos impulsado por corrutinas

PythonPythonBeginner
Practicar Ahora

This tutorial is from open-source community. Access the source code

💡 Este tutorial está traducido por IA desde la versión en inglés. Para ver la versión original, puedes hacer clic aquí

Introducción

En este laboratorio, aprenderás cómo utilizar corrutinas para construir tuberías de procesamiento de datos. Las corrutinas, una poderosa característica de Python, admiten la multitarea cooperativa, lo que permite a las funciones pausar y reanudar la ejecución en un momento posterior.

Los objetivos de este laboratorio son comprender cómo funcionan las corrutinas en Python, implementar tuberías de procesamiento de datos basadas en corrutinas y transformar datos a través de múltiples etapas de corrutinas. Crearás dos archivos: cofollow.py, un seguidor de archivos basado en corrutinas, y coticker.py, una aplicación de cotizaciones de acciones que utiliza corrutinas. Se asume que el programa stocksim.py del ejercicio anterior sigue ejecutándose en segundo plano, generando datos de acciones en un archivo de registro.


Skills Graph

%%%%{init: {'theme':'neutral'}}%%%% flowchart RL python(("Python")) -.-> python/AdvancedTopicsGroup(["Advanced Topics"]) python(("Python")) -.-> python/PythonStandardLibraryGroup(["Python Standard Library"]) python(("Python")) -.-> python/FunctionsGroup(["Functions"]) python(("Python")) -.-> python/ObjectOrientedProgrammingGroup(["Object-Oriented Programming"]) python/FunctionsGroup -.-> python/function_definition("Function Definition") python/ObjectOrientedProgrammingGroup -.-> python/classes_objects("Classes and Objects") python/AdvancedTopicsGroup -.-> python/generators("Generators") python/AdvancedTopicsGroup -.-> python/threading_multiprocessing("Multithreading and Multiprocessing") python/PythonStandardLibraryGroup -.-> python/data_collections("Data Collections") subgraph Lab Skills python/function_definition -.-> lab-132524{{"Procesamiento de datos impulsado por corrutinas"}} python/classes_objects -.-> lab-132524{{"Procesamiento de datos impulsado por corrutinas"}} python/generators -.-> lab-132524{{"Procesamiento de datos impulsado por corrutinas"}} python/threading_multiprocessing -.-> lab-132524{{"Procesamiento de datos impulsado por corrutinas"}} python/data_collections -.-> lab-132524{{"Procesamiento de datos impulsado por corrutinas"}} end

Comprender las corrutinas con un seguidor de archivos

Comencemos por entender qué son las corrutinas y cómo funcionan en Python. Una corrutina es una versión especializada de una función generadora. En Python, las funciones generalmente comienzan desde el principio cada vez que se llaman. Pero las corrutinas son diferentes. Pueden consumir y producir datos, y tienen la capacidad de suspender y reanudar su ejecución. Esto significa que una corrutina puede pausar su operación en un cierto punto y luego retomar justo donde lo dejó más tarde.

Crear un seguidor de archivos de corrutina básico

En este paso, crearemos un seguidor de archivos que utiliza corrutinas para monitorear un archivo en busca de nuevo contenido y procesarlo. Esto es similar al comando Unix tail -f, que muestra continuamente el final de un archivo y se actualiza a medida que se agregan nuevas líneas.

  1. Abre el editor de código y crea un nuevo archivo llamado cofollow.py en el directorio /home/labex/project. Aquí es donde escribiremos nuestro código Python para implementar el seguidor de archivos utilizando corrutinas.

  2. Copia el siguiente código en el archivo:

## cofollow.py
import os
import time

## Data source
def follow(filename, target):
    with open(filename, 'r') as f:
        f.seek(0, os.SEEK_END)  ## Move to the end of the file
        while True:
            line = f.readline()
            if line != '':
                target.send(line)  ## Send the line to the target coroutine
            else:
                time.sleep(0.1)  ## Sleep briefly if no new content

## Decorator for coroutine functions
from functools import wraps

def consumer(func):
    @wraps(func)
    def start(*args, **kwargs):
        f = func(*args, **kwargs)
        f.send(None)  ## Prime the coroutine (necessary first step)
        return f
    return start

## Sample coroutine
@consumer
def printer():
    while True:
        item = yield     ## Receive an item sent to me
        print(item)

## Example use
if __name__ == '__main__':
    follow('stocklog.csv', printer())
  1. Comprendamos los componentes clave de este código:

    • follow(filename, target): Esta función es responsable de abrir un archivo. Primero mueve el puntero del archivo al final del archivo utilizando f.seek(0, os.SEEK_END). Luego, entra en un bucle infinito donde intenta leer continuamente nuevas líneas del archivo. Si se encuentra una nueva línea, la envía a la corrutina objetivo utilizando el método send. Si no hay nuevo contenido, se pausa durante un corto tiempo (0,1 segundos) utilizando time.sleep(0.1) antes de volver a comprobar.
    • Decorador @consumer: En Python, las corrutinas deben ser "inicializadas" antes de que puedan comenzar a recibir datos. Este decorador se encarga de eso. Envía automáticamente un valor inicial None a la corrutina, que es un paso necesario para preparar la corrutina para recibir datos reales.
    • Corrutina printer(): Esta es una corrutina simple. Tiene un bucle infinito donde utiliza la palabra clave yield para recibir un elemento enviado a ella. Una vez que recibe un elemento, simplemente lo imprime.
  2. Guarda el archivo y ejecútalo desde la terminal:

cd /home/labex/project
python3 cofollow.py
  1. Deberías ver que el script imprime el contenido del archivo de registro de acciones, y continuará imprimiendo nuevas líneas a medida que se agreguen al archivo. Presiona Ctrl+C para detener el programa.

El concepto clave aquí es que los datos fluyen desde la función follow hacia la corrutina printer a través del método send. Este "empuje" de datos es lo contrario de los generadores, que "tiran" datos a través de la iteración. En un generador, normalmente se utiliza un bucle for para iterar sobre los valores que produce. Pero en este ejemplo de corrutina, los datos se envían activamente de una parte del código a otra.

✨ Revisar Solución y Practicar

Creación de componentes de tubería de corrutinas

En este paso, vamos a crear corrutinas más especializadas para procesar datos de acciones. Una corrutina es un tipo especial de función que puede pausar y reanudar su ejecución, lo cual es muy útil para construir tuberías de procesamiento de datos. Cada corrutina que creemos realizará una tarea específica en nuestra tubería de procesamiento general.

  1. Primero, necesitas crear un nuevo archivo. Navega hasta el directorio /home/labex/project y crea un archivo llamado coticker.py. Este archivo contendrá todo el código para nuestro procesamiento de datos basado en corrutinas.

  2. Ahora, comencemos a escribir código en el archivo coticker.py. Primero importaremos los módulos necesarios y definiremos la estructura básica. Los módulos son bibliotecas de código preescritas que proporcionan funciones y clases útiles. El siguiente código hace precisamente eso:

## coticker.py
from structure import Structure

class Ticker(Structure):
    name = String()
    price = Float()
    date = String()
    time = String()
    change = Float()
    open = Float()
    high = Float()
    low = Float()
    volume = Integer()

from cofollow import consumer, follow
from tableformat import create_formatter
import csv
  1. Cuando mires el código anterior, notarás que hay errores relacionados con String(), Float() e Integer(). Estas son clases que necesitamos importar. Entonces, agregaremos las importaciones necesarias en la parte superior del archivo. De esta manera, Python sabe dónde encontrar estas clases. Aquí está el código actualizado:
## coticker.py
from structure import Structure, String, Float, Integer

class Ticker(Structure):
    name = String()
    price = Float()
    date = String()
    time = String()
    change = Float()
    open = Float()
    high = Float()
    low = Float()
    volume = Integer()

from cofollow import consumer, follow
from tableformat import create_formatter
import csv
  1. A continuación, agregaremos los componentes de corrutina que formarán nuestra tubería de procesamiento de datos. Cada corrutina tiene un trabajo específico en la tubería. Aquí está el código para agregar estas corrutinas:
@consumer
def to_csv(target):
    def producer():
        while True:
            line = yield

    reader = csv.reader(producer())
    while True:
        line = yield
        target.send(next(reader))

@consumer
def create_ticker(target):
    while True:
        row = yield
        target.send(Ticker.from_row(row))

@consumer
def negchange(target):
    while True:
        record = yield
        if record.change < 0:
            target.send(record)

@consumer
def ticker(fmt, fields):
    formatter = create_formatter(fmt)
    formatter.headings(fields)
    while True:
        rec = yield
        row = [getattr(rec, name) for name in fields]
        formatter.row(row)
  1. Comprendamos lo que hace cada una de estas corrutinas:

    • to_csv: Su trabajo es convertir líneas de texto sin procesar en filas CSV analizadas. Esto es importante porque nuestros datos inicialmente están en formato de texto y necesitamos dividirlos en datos CSV estructurados.
    • create_ticker: Esta corrutina toma las filas CSV y crea objetos Ticker a partir de ellas. Los objetos Ticker representan los datos de acciones de una manera más organizada.
    • negchange: Filtra los objetos Ticker. Solo pasa las acciones que tienen cambios de precio negativos. Esto nos ayuda a centrarnos en las acciones que están perdiendo valor.
    • ticker: Esta corrutina formatea y muestra los datos de cotización. Utiliza un formateador para presentar los datos en una tabla agradable y legible.
  2. Finalmente, necesitamos agregar el código del programa principal que conecta todos estos componentes. Este código configurará el flujo de datos a través de la tubería. Aquí está el código:

if __name__ == '__main__':
    import sys

    ## Define the field names to display
    fields = ['name', 'price', 'change']

    ## Create the processing pipeline
    t = ticker('text', fields)
    neg_filter = negchange(t)
    tick_creator = create_ticker(neg_filter)
    csv_parser = to_csv(tick_creator)

    ## Connect the pipeline to the data source
    follow('stocklog.csv', csv_parser)
  1. Después de escribir todo el código, guarda el archivo coticker.py. Luego, abre la terminal y ejecuta los siguientes comandos. El comando cd cambia el directorio al lugar donde se encuentra nuestro archivo, y el comando python3 ejecuta nuestro script de Python:
cd /home/labex/project
python3 coticker.py
  1. Si todo sale bien, deberías ver una tabla formateada en la terminal. Esta tabla muestra las acciones con cambios de precio negativos. La salida se verá algo así:
      name      price     change
---------- ---------- ----------
      MSFT      72.50      -0.25
        AA      35.25      -0.15
       IBM      50.10      -0.15
      GOOG     100.02      -0.01
      AAPL     102.50      -0.06

Ten en cuenta que los valores reales en la tabla pueden variar dependiendo de los datos de acciones generados.

Comprender el flujo de la tubería

La parte más importante de este programa es cómo fluyen los datos a través de las corrutinas. Analicémoslo paso a paso:

  1. La función follow comienza leyendo líneas del archivo stocklog.csv. Este es nuestro origen de datos.
  2. Cada línea que se lee se envía luego a la corrutina csv_parser. El csv_parser toma la línea de texto sin procesar y la analiza en campos CSV.
  3. Los datos CSV analizados se envían luego a la corrutina tick_creator. Esta corrutina crea objetos Ticker a partir de las filas CSV.
  4. Los objetos Ticker se envían luego a la corrutina neg_filter. Esta corrutina comprueba cada objeto Ticker. Si la acción tiene un cambio de precio negativo, pasa el objeto; de lo contrario, lo descarta.
  5. Finalmente, los objetos Ticker filtrados se envían a la corrutina ticker. La corrutina ticker formatea los datos y los muestra en una tabla.

Esta arquitectura de tubería es muy útil porque permite que cada componente se centre en una sola tarea. Esto hace que el código sea más modular, lo que significa que es más fácil de entender, modificar y mantener.

✨ Revisar Solución y Practicar

Mejorando la tubería de corrutinas

Ahora que tenemos una tubería básica funcionando, es hora de hacerla más flexible. En programación, la flexibilidad es crucial ya que permite que nuestro código se adapte a diferentes requisitos. Lo lograremos modificando nuestro programa coticker.py para soportar diversas opciones de filtrado y formato.

  1. Primero, abre el archivo coticker.py en tu editor de código. El editor de código es donde realizarás todos los cambios necesarios en el programa. Proporciona un entorno conveniente para ver, editar y guardar tu código.

  2. A continuación, agregaremos una nueva corrutina que filtra los datos por el nombre de la acción. Una corrutina es un tipo especial de función que puede pausar y reanudar su ejecución. Esto nos permite crear una tubería donde los datos pueden fluir a través de diferentes pasos de procesamiento. Aquí está el código de la nueva corrutina:

@consumer
def filter_by_name(name, target):
    while True:
        record = yield
        if record.name == name:
            target.send(record)

En este código, la corrutina filter_by_name toma un nombre de acción y una corrutina objetivo como parámetros. Espera continuamente un registro utilizando la palabra clave yield. Cuando llega un registro, comprueba si el nombre del registro coincide con el nombre especificado. Si coincide, envía el registro a la corrutina objetivo.

  1. Ahora, agreguemos otra corrutina que filtre en base a umbrales de precio. Esta corrutina nos ayudará a seleccionar acciones dentro de un rango de precios específico. Aquí está el código:
@consumer
def price_threshold(min_price, max_price, target):
    while True:
        record = yield
        if min_price <= record.price <= max_price:
            target.send(record)

Similar a la corrutina anterior, la corrutina price_threshold espera un registro. Luego comprueba si el precio del registro está dentro del rango de precio mínimo y máximo especificado. Si lo está, envía el registro a la corrutina objetivo.

  1. Después de agregar las nuevas corrutinas, necesitamos actualizar el programa principal para demostrar estos filtros adicionales. El programa principal es el punto de entrada de nuestra aplicación, donde configuramos las tuberías de procesamiento y comenzamos el flujo de datos. Aquí está el código actualizado:
if __name__ == '__main__':
    import sys

    ## Define the field names to display
    fields = ['name', 'price', 'change', 'high', 'low']

    ## Create the processing pipeline with multiple outputs

    ## Pipeline 1: Show all negative changes (same as before)
    print("Stocks with negative changes:")
    t1 = ticker('text', fields)
    neg_filter = negchange(t1)
    tick_creator1 = create_ticker(neg_filter)
    csv_parser1 = to_csv(tick_creator1)

    ## Start following the file with the first pipeline
    import threading
    threading.Thread(target=follow, args=('stocklog.csv', csv_parser1), daemon=True).start()

    ## Wait a moment to see some results
    import time
    time.sleep(5)

    ## Pipeline 2: Filter by name (AAPL)
    print("\nApple stock updates:")
    t2 = ticker('text', fields)
    name_filter = filter_by_name('AAPL', t2)
    tick_creator2 = create_ticker(name_filter)
    csv_parser2 = to_csv(tick_creator2)

    ## Follow the file with the second pipeline
    threading.Thread(target=follow, args=('stocklog.csv', csv_parser2), daemon=True).start()

    ## Wait a moment to see some results
    time.sleep(5)

    ## Pipeline 3: Filter by price range
    print("\nStocks priced between 50 and 75:")
    t3 = ticker('text', fields)
    price_filter = price_threshold(50, 75, t3)
    tick_creator3 = create_ticker(price_filter)
    csv_parser3 = to_csv(tick_creator3)

    ## Follow with the third pipeline
    follow('stocklog.csv', csv_parser3)

En este código actualizado, creamos tres tuberías de procesamiento diferentes. La primera tubería muestra las acciones con cambios negativos, la segunda tubería filtra las acciones por el nombre 'AAPL' y la tercera tubería filtra las acciones en base a un rango de precios entre 50 y 75. Usamos hilos para ejecutar las dos primeras tuberías de forma concurrente, lo que nos permite procesar los datos de manera más eficiente.

  1. Una vez que hayas realizado todos los cambios, guarda el archivo. Guardar el archivo asegura que todas tus modificaciones se conserven. Luego, ejecuta el programa actualizado utilizando los siguientes comandos en tu terminal:
cd /home/labex/project
python3 coticker.py

El comando cd cambia el directorio actual al directorio del proyecto, y el comando python3 coticker.py ejecuta el programa de Python.

  1. Después de ejecutar el programa, deberías ver tres salidas diferentes:
    • Primero, verás las acciones con cambios negativos.
    • Luego, verás todas las actualizaciones de las acciones de AAPL.
    • Finalmente, verás todas las acciones con precios entre 50 y 75.

Comprendiendo la tubería mejorada

El programa mejorado demuestra varios conceptos importantes:

  1. Múltiples tuberías: Podemos crear múltiples tuberías de procesamiento a partir de la misma fuente de datos. Esto nos permite realizar diferentes tipos de análisis en los mismos datos simultáneamente.
  2. Filtros especializados: Podemos crear diferentes corrutinas para tareas de filtrado específicas. Estos filtros nos ayudan a seleccionar solo los datos que cumplen con nuestros criterios específicos.
  3. Procesamiento concurrente: Usando hilos, podemos ejecutar múltiples tuberías de forma concurrente. Esto mejora la eficiencia de nuestro programa al permitirle procesar datos en paralelo.
  4. Composición de tuberías: Las corrutinas se pueden combinar de diferentes maneras para alcanzar diferentes objetivos de procesamiento de datos. Esto nos da la flexibilidad de personalizar nuestras tuberías de procesamiento de datos según nuestras necesidades.

Este enfoque proporciona una forma flexible y modular de procesar datos en streaming. Permite agregar o modificar pasos de procesamiento sin cambiar la arquitectura general del programa.

✨ Revisar Solución y Practicar

Resumen

En este laboratorio (lab), has aprendido cómo utilizar corrutinas para construir tuberías de procesamiento de datos en Python. Los conceptos clave incluyen comprender los fundamentos de las corrutinas, como cómo funcionan, la necesidad de inicializarlas (priming) y el uso de decoradores para la inicialización. También has explorado el flujo de datos, enviando datos a través de una tubería mediante el método send(), diferente del modelo de "extracción" (pull) de los generadores.

Además, has creado corrutinas especializadas para tareas como analizar datos CSV, filtrar registros y formatear la salida. Has aprendido a componer tuberías conectando múltiples corrutinas e implementado operaciones de filtrado y transformación. Las corrutinas ofrecen un enfoque poderoso para el procesamiento de datos en streaming, permitiendo una clara separación de responsabilidades y una fácil modificación de etapas individuales.