Processamento de Dados com Corrotinas

Beginner

This tutorial is from open-source community. Access the source code

Introdução

Neste laboratório, você aprenderá como usar corrotinas para construir pipelines de processamento de dados. Corrotinas, um recurso poderoso do Python, suportam multitarefa cooperativa, permitindo que funções pausem e retomem a execução em um ponto posterior.

Os objetivos deste laboratório são entender como as corrotinas funcionam em Python, implementar pipelines de processamento de dados baseados em corrotinas e transformar dados através de múltiplos estágios de corrotinas. Você criará dois arquivos: cofollow.py, um seguidor de arquivo baseado em corrotinas, e coticker.py, uma aplicação de ticker de ações usando corrotinas. Assume-se que o programa stocksim.py de um exercício anterior ainda está sendo executado em segundo plano, gerando dados de ações em um arquivo de log.

Entendendo Corrotinas com um Seguidor de Arquivo

Vamos começar entendendo o que são corrotinas e como elas funcionam em Python. Uma corrotina é uma versão especializada de uma função geradora (generator function). Em Python, as funções geralmente começam do início toda vez que são chamadas. Mas as corrotinas são diferentes. Elas podem consumir e produzir dados, e têm a capacidade de suspender e retomar sua execução. Isso significa que uma corrotina pode pausar sua operação em um determinado ponto e, em seguida, retomar exatamente de onde parou mais tarde.

Criando um Seguidor de Arquivo de Corrotina Básico

Nesta etapa, criaremos um seguidor de arquivo que usa corrotinas para monitorar um arquivo em busca de novo conteúdo e processá-lo. Isso é semelhante ao comando Unix tail -f, que mostra continuamente o final de um arquivo e atualiza à medida que novas linhas são adicionadas.

  1. Abra o editor de código e crie um novo arquivo chamado cofollow.py no diretório /home/labex/project. É aqui que escreveremos nosso código Python para implementar o seguidor de arquivo usando corrotinas.

  2. Copie o seguinte código para o arquivo:

## cofollow.py
import os
import time

## Data source
def follow(filename, target):
    with open(filename, 'r') as f:
        f.seek(0, os.SEEK_END)  ## Move to the end of the file
        while True:
            line = f.readline()
            if line != '':
                target.send(line)  ## Send the line to the target coroutine
            else:
                time.sleep(0.1)  ## Sleep briefly if no new content

## Decorator for coroutine functions
from functools import wraps

def consumer(func):
    @wraps(func)
    def start(*args, **kwargs):
        f = func(*args, **kwargs)
        f.send(None)  ## Prime the coroutine (necessary first step)
        return f
    return start

## Sample coroutine
@consumer
def printer():
    while True:
        item = yield     ## Receive an item sent to me
        print(item)

## Example use
if __name__ == '__main__':
    follow('stocklog.csv', printer())
  1. Vamos entender os componentes-chave deste código:

    • follow(filename, target): Esta função é responsável por abrir um arquivo. Primeiro, ela move o ponteiro do arquivo para o final do arquivo usando f.seek(0, os.SEEK_END). Em seguida, ela entra em um loop infinito onde tenta continuamente ler novas linhas do arquivo. Se uma nova linha for encontrada, ela envia essa linha para a corrotina de destino usando o método send. Se não houver novo conteúdo, ela pausa por um curto período de tempo (0,1 segundos) usando time.sleep(0.1) antes de verificar novamente.
    • @consumer decorator: Em Python, as corrotinas precisam ser "preparadas" (primed) antes que possam começar a receber dados. Este decorador cuida disso. Ele envia automaticamente um valor inicial None para a corrotina, que é um primeiro passo necessário para preparar a corrotina para receber dados reais.
    • printer() corrotina: Esta é uma corrotina simples. Ela tem um loop infinito onde usa a palavra-chave yield para receber um item enviado a ela. Depois de receber um item, ela simplesmente o imprime.
  2. Salve o arquivo e execute-o a partir do terminal:

cd /home/labex/project
python3 cofollow.py
  1. Você deve ver o script imprimindo o conteúdo do arquivo de log de ações, e ele continuará a imprimir novas linhas à medida que forem adicionadas ao arquivo. Pressione Ctrl+C para parar o programa.

O conceito-chave aqui é que os dados fluem da função follow para a corrotina printer através do método send. Este "empurrar" (pushing) de dados é o oposto dos geradores, que "puxam" (pull) dados através da iteração. Em um gerador, você normalmente usa um loop for para iterar sobre os valores que ele produz. Mas neste exemplo de corrotina, os dados são enviados ativamente de uma parte do código para outra.

Criando Componentes de Pipeline de Corrotina

Nesta etapa, vamos criar corrotinas mais especializadas para processar dados de ações. Uma corrotina é um tipo especial de função que pode pausar e retomar sua execução, o que é muito útil para construir pipelines de processamento de dados. Cada corrotina que criamos executará uma tarefa específica em nosso pipeline geral de processamento.

  1. Primeiro, você precisa criar um novo arquivo. Navegue até o diretório /home/labex/project e crie um arquivo chamado coticker.py. Este arquivo conterá todo o código para nosso processamento de dados baseado em corrotinas.

  2. Agora, vamos começar a escrever código no arquivo coticker.py. Primeiro, importaremos os módulos necessários e definiremos a estrutura básica. Módulos são bibliotecas de código pré-escritas que fornecem funções e classes úteis. O seguinte código faz exatamente isso:

## coticker.py
from structure import Structure

class Ticker(Structure):
    name = String()
    price = Float()
    date = String()
    time = String()
    change = Float()
    open = Float()
    high = Float()
    low = Float()
    volume = Integer()

from cofollow import consumer, follow
from tableformat import create_formatter
import csv
  1. Ao olhar o código acima, você notará que há erros relacionados a String(), Float() e Integer(). Estas são classes que precisamos importar. Portanto, adicionaremos as importações necessárias no topo do arquivo. Desta forma, o Python sabe onde encontrar essas classes. Aqui está o código atualizado:
## coticker.py
from structure import Structure, String, Float, Integer

class Ticker(Structure):
    name = String()
    price = Float()
    date = String()
    time = String()
    change = Float()
    open = Float()
    high = Float()
    low = Float()
    volume = Integer()

from cofollow import consumer, follow
from tableformat import create_formatter
import csv
  1. Em seguida, adicionaremos os componentes de corrotina que formarão nosso pipeline de processamento de dados. Cada corrotina tem um trabalho específico no pipeline. Aqui está o código para adicionar essas corrotinas:
@consumer
def to_csv(target):
    def producer():
        while True:
            line = yield

    reader = csv.reader(producer())
    while True:
        line = yield
        target.send(next(reader))

@consumer
def create_ticker(target):
    while True:
        row = yield
        target.send(Ticker.from_row(row))

@consumer
def negchange(target):
    while True:
        record = yield
        if record.change < 0:
            target.send(record)

@consumer
def ticker(fmt, fields):
    formatter = create_formatter(fmt)
    formatter.headings(fields)
    while True:
        rec = yield
        row = [getattr(rec, name) for name in fields]
        formatter.row(row)
  1. Vamos entender o que cada uma dessas corrotinas faz:

    • to_csv: Seu trabalho é converter linhas de texto bruto em linhas CSV analisadas. Isso é importante porque nossos dados estão inicialmente em formato de texto e precisamos dividi-los em dados CSV estruturados.
    • create_ticker: Esta corrotina pega as linhas CSV e cria objetos Ticker a partir delas. Os objetos Ticker representam os dados de ações de uma forma mais organizada.
    • negchange: Ele filtra os objetos Ticker. Ele só passa as ações que têm mudanças de preço negativas. Isso nos ajuda a nos concentrar nas ações que estão perdendo valor.
    • ticker: Esta corrotina formata e exibe os dados do ticker. Ele usa um formatador para apresentar os dados em uma tabela agradável e legível.
  2. Finalmente, precisamos adicionar o código do programa principal que conecta todos esses componentes. Este código configurará o fluxo de dados através do pipeline. Aqui está o código:

if __name__ == '__main__':
    import sys

    ## Define the field names to display
    fields = ['name', 'price', 'change']

    ## Create the processing pipeline
    t = ticker('text', fields)
    neg_filter = negchange(t)
    tick_creator = create_ticker(neg_filter)
    csv_parser = to_csv(tick_creator)

    ## Connect the pipeline to the data source
    follow('stocklog.csv', csv_parser)
  1. Depois de escrever todo o código, salve o arquivo coticker.py. Em seguida, abra o terminal e execute os seguintes comandos. O comando cd altera o diretório para onde nosso arquivo está localizado, e o comando python3 executa nosso script Python:
cd /home/labex/project
python3 coticker.py
  1. Se tudo correr bem, você deverá ver uma tabela formatada no terminal. Esta tabela mostra ações com mudanças de preço negativas. A saída será algo parecido com isto:
      name      price     change
---------- ---------- ----------
      MSFT      72.50      -0.25
        AA      35.25      -0.15
       IBM      50.10      -0.15
      GOOG     100.02      -0.01
      AAPL     102.50      -0.06

Tenha em mente que os valores reais na tabela podem variar dependendo dos dados de ações gerados.

Entendendo o Fluxo do Pipeline

A parte mais importante deste programa é como os dados fluem através das corrotinas. Vamos dividi-lo passo a passo:

  1. A função follow começa lendo linhas do arquivo stocklog.csv. Esta é nossa fonte de dados.
  2. Cada linha que é lida é então enviada para a corrotina csv_parser. O csv_parser pega a linha de texto bruto e a analisa em campos CSV.
  3. Os dados CSV analisados são então enviados para a corrotina tick_creator. Esta corrotina cria objetos Ticker a partir das linhas CSV.
  4. Os objetos Ticker são então enviados para a corrotina neg_filter. Esta corrotina verifica cada objeto Ticker. Se a ação tiver uma mudança de preço negativa, ela passa o objeto; caso contrário, ela o descarta.
  5. Finalmente, os objetos Ticker filtrados são enviados para a corrotina ticker. A corrotina ticker formata os dados e os exibe em uma tabela.

Esta arquitetura de pipeline é muito útil porque permite que cada componente se concentre em uma única tarefa. Isso torna o código mais modular, o que significa que é mais fácil de entender, modificar e manter.

Aprimorando o Pipeline de Corrotina

Agora que temos um pipeline básico em funcionamento, é hora de torná-lo mais flexível. Em programação, a flexibilidade é crucial, pois permite que nosso código se adapte a diferentes requisitos. Conseguiremos isso modificando nosso programa coticker.py para suportar várias opções de filtragem e formatação.

  1. Primeiro, abra o arquivo coticker.py em seu editor de código. O editor de código é onde você fará todas as alterações necessárias no programa. Ele fornece um ambiente conveniente para visualizar, editar e salvar seu código.

  2. Em seguida, adicionaremos uma nova corrotina que filtra dados por nome da ação. Uma corrotina é um tipo especial de função que pode pausar e retomar sua execução. Isso nos permite criar um pipeline onde os dados podem fluir por diferentes etapas de processamento. Aqui está o código para a nova corrotina:

@consumer
def filter_by_name(name, target):
    while True:
        record = yield
        if record.name == name:
            target.send(record)

Neste código, a corrotina filter_by_name recebe um nome de ação e uma corrotina de destino como parâmetros. Ele espera continuamente por um registro usando a palavra-chave yield. Quando um registro chega, ele verifica se o nome do registro corresponde ao nome especificado. Se corresponder, ele envia o registro para a corrotina de destino.

  1. Agora, vamos adicionar outra corrotina que filtra com base em limites de preço. Esta corrotina nos ajudará a selecionar ações dentro de uma faixa de preço específica. Aqui está o código:
@consumer
def price_threshold(min_price, max_price, target):
    while True:
        record = yield
        if min_price <= record.price <= max_price:
            target.send(record)

Semelhante à corrotina anterior, a corrotina price_threshold espera por um registro. Em seguida, ele verifica se o preço do registro está dentro da faixa de preço mínima e máxima especificada. Se estiver, ele envia o registro para a corrotina de destino.

  1. Depois de adicionar as novas corrotinas, precisamos atualizar o programa principal para demonstrar esses filtros adicionais. O programa principal é o ponto de entrada de nossa aplicação, onde configuramos os pipelines de processamento e iniciamos o fluxo de dados. Aqui está o código atualizado:
if __name__ == '__main__':
    import sys

    ## Define the field names to display
    fields = ['name', 'price', 'change', 'high', 'low']

    ## Create the processing pipeline with multiple outputs

    ## Pipeline 1: Show all negative changes (same as before)
    print("Stocks with negative changes:")
    t1 = ticker('text', fields)
    neg_filter = negchange(t1)
    tick_creator1 = create_ticker(neg_filter)
    csv_parser1 = to_csv(tick_creator1)

    ## Start following the file with the first pipeline
    import threading
    threading.Thread(target=follow, args=('stocklog.csv', csv_parser1), daemon=True).start()

    ## Wait a moment to see some results
    import time
    time.sleep(5)

    ## Pipeline 2: Filter by name (AAPL)
    print("\nApple stock updates:")
    t2 = ticker('text', fields)
    name_filter = filter_by_name('AAPL', t2)
    tick_creator2 = create_ticker(name_filter)
    csv_parser2 = to_csv(tick_creator2)

    ## Follow the file with the second pipeline
    threading.Thread(target=follow, args=('stocklog.csv', csv_parser2), daemon=True).start()

    ## Wait a moment to see some results
    time.sleep(5)

    ## Pipeline 3: Filter by price range
    print("\nStocks priced between 50 and 75:")
    t3 = ticker('text', fields)
    price_filter = price_threshold(50, 75, t3)
    tick_creator3 = create_ticker(price_filter)
    csv_parser3 = to_csv(tick_creator3)

    ## Follow with the third pipeline
    follow('stocklog.csv', csv_parser3)

Neste código atualizado, criamos três pipelines de processamento diferentes. O primeiro pipeline mostra ações com mudanças negativas, o segundo pipeline filtra ações pelo nome 'AAPL' e o terceiro pipeline filtra ações com base em uma faixa de preço entre 50 e 75. Usamos threads para executar os dois primeiros pipelines simultaneamente, o que nos permite processar dados de forma mais eficiente.

  1. Depois de fazer todas as alterações, salve o arquivo. Salvar o arquivo garante que todas as suas modificações sejam preservadas. Em seguida, execute o programa atualizado usando os seguintes comandos em seu terminal:
cd /home/labex/project
python3 coticker.py

O comando cd altera o diretório atual para o diretório do projeto, e o comando python3 coticker.py executa o programa Python.

  1. Após executar o programa, você deverá ver três saídas diferentes:
    • Primeiro, você verá ações com mudanças negativas.
    • Em seguida, você verá todas as atualizações de ações da AAPL.
    • Finalmente, você verá todas as ações com preços entre 50 e 75.

Entendendo o Pipeline Aprimorado

O programa aprimorado demonstra vários conceitos importantes:

  1. Múltiplos Pipelines: Podemos criar múltiplos pipelines de processamento a partir da mesma fonte de dados. Isso nos permite realizar diferentes tipos de análise nos mesmos dados simultaneamente.
  2. Filtros Especializados: Podemos criar diferentes corrotinas para tarefas de filtragem específicas. Esses filtros nos ajudam a selecionar apenas os dados que atendem aos nossos critérios específicos.
  3. Processamento Concorrente: Usando threads, podemos executar múltiplos pipelines simultaneamente. Isso melhora a eficiência do nosso programa, permitindo que ele processe dados em paralelo.
  4. Composição de Pipeline: Corrotinas podem ser combinadas de diferentes maneiras para atingir diferentes objetivos de processamento de dados. Isso nos dá a flexibilidade de personalizar nossos pipelines de processamento de dados de acordo com nossas necessidades.

Essa abordagem fornece uma maneira flexível e modular de processar dados de streaming. Ele permite que você adicione ou modifique etapas de processamento sem alterar a arquitetura geral do programa.

Resumo

Neste laboratório, você aprendeu como usar corrotinas para construir pipelines de processamento de dados em Python. Os principais conceitos incluem a compreensão dos fundamentos das corrotinas, como elas operam, a necessidade de priming (inicialização) e o uso de decoradores para inicialização. Você também explorou o fluxo de dados, empurrando dados por meio de um pipeline via o método send(), diferente do modelo "pull" (puxar) do gerador.

Além disso, você criou corrotinas especializadas para tarefas como análise de dados CSV, filtragem de registros e formatação de saída. Você aprendeu a compor pipelines conectando múltiplas corrotinas e implementou operações de filtragem e transformação. As corrotinas oferecem uma abordagem poderosa para o processamento de dados em streaming, permitindo uma separação limpa de preocupações e fácil modificação de estágios individuais.