Produtores, Consumidores e Pipelines

Beginner

This tutorial is from open-source community. Access the source code

Introdução

Geradores são uma ferramenta útil para configurar vários tipos de problemas produtor/consumidor e pipelines de fluxo de dados (dataflow pipelines). Esta seção discute isso.

Problemas Produtor-Consumidor (Producer-Consumer Problems)

Geradores estão intimamente relacionados a várias formas de problemas produtor-consumidor (producer-consumer).

## Produtor (Producer)
def follow(f):
    ...
    while True:
        ...
        yield line        ## Produz valor em `line` abaixo (Produces value in `line` below)
        ...

## Consumidor (Consumer)
for line in follow(f):    ## Consome valor de `yield` acima (Consumes value from `yield` above)
    ...

yield produz valores que o for consome.

Pipelines de Geradores (Generator Pipelines)

Você pode usar este aspecto dos geradores para configurar pipelines de processamento (como pipes do Unix).

produtorprocessamentoprocessamentoconsumidor

Pipes de processamento têm um produtor de dados inicial, um conjunto de estágios de processamento intermediários e um consumidor final.

produtorprocessamentoprocessamentoconsumidor

def producer():
    ...
    yield item
    ...

O produtor é tipicamente um gerador. Embora também possa ser uma lista ou alguma outra sequência. yield alimenta dados no pipeline.

produtorprocessamentoprocessamentoconsumidor

def consumer(s):
    for item in s:
        ...

O consumidor é um loop for. Ele recebe itens e faz algo com eles.

produtorprocessamentoprocessamentoconsumidor

def processing(s):
    for item in s:
        ...
        yield newitem
        ...

Estágios de processamento intermediários consomem e produzem itens simultaneamente. Eles podem modificar o fluxo de dados. Eles também podem filtrar (descartando itens).

produtorprocessamentoprocessamentoconsumidor

def producer():
    ...
    yield item          ## yields the item that is received by the `processing`
    ...

def processing(s):
    for item in s:      ## Comes from the `producer`
        ...
        yield newitem   ## yields a new item
        ...

def consumer(s):
    for item in s:      ## Comes from the `processing`
        ...

Código para configurar o pipeline

a = producer()
b = processing(a)
c = consumer(b)

Você notará que os dados fluem incrementalmente através das diferentes funções.

Para este exercício, o programa stocksim.py ainda deve estar rodando em segundo plano. Você vai usar a função follow() que você escreveu no exercício anterior.

Exercício 6.8: Configurando um pipeline simples

Vamos ver a ideia de pipelining em ação. Escreva a seguinte função:

>>> def filematch(lines, substr):
        for line in lines:
            if substr in line:
                yield line

>>>

Esta função é quase exatamente a mesma do primeiro exemplo de gerador no exercício anterior, exceto que ela não está mais abrindo um arquivo - ela simplesmente opera em uma sequência de linhas fornecidas como argumento. Agora, tente isto:

>>> from follow import follow
>>> lines = follow('stocklog.csv')
>>> goog = filematch(lines, 'GOOG')
>>> for line in goog:
        print(line)

... aguarde a saída ...

Pode levar um tempo para a saída aparecer, mas eventualmente você deverá ver algumas linhas contendo dados para GOOG.

Nota: Estes exercícios devem ser realizados simultaneamente em dois terminais separados.

Exercício 6.9: Configurando um pipeline mais complexo

Leve a ideia de pipelining alguns passos adiante, realizando mais ações.

>>> from follow import follow
>>> import csv
>>> lines = follow('stocklog.csv')
>>> rows = csv.reader(lines)
>>> for row in rows:
        print(row)

['GOOG', '1502.08', '2023-10-01', '09:37.19', '1.83', '1500.25', '1502.08', '1500.25', '731']
['AAPL', '252.33', '2023-10-01', '09:37.19', '1.83', '250.50', '252.33', '250.50', '681']
['GOOG', '1502.09', '2023-10-01', '09:37.21', '1.84', '1500.25', '1502.09', '1500.25', '734']
['AAPL', '252.34', '2023-10-01', '09:37.21', '1.84', '250.50', '252.34', '250.50', '684']
['GOOG', '1502.10', '2023-10-01', '09:37.23', '1.85', '1500.25', '1502.10', '1500.25', '738']
['AAPL', '252.35', '2023-10-01', '09:37.23', '1.85', '250.50', '252.35', '250.50', '688']
...

Bem, isso é interessante. O que você está vendo aqui é que a saída da função follow() foi canalizada (piped) para a função csv.reader() e agora estamos obtendo uma sequência de linhas divididas.

Exercício 6.10: Criando mais componentes de pipeline

Vamos estender toda a ideia para um pipeline maior. Em um arquivo separado ticker.py, comece criando uma função que lê um arquivo CSV como você fez acima:

## ticker.py

from follow import follow
import csv

def parse_stock_data(lines):
    rows = csv.reader(lines)
    return rows

if __name__ == '__main__':
    lines = follow('stocklog.csv')
    rows = parse_stock_data(lines)
    for row in rows:
        print(row)

Escreva uma nova função que selecione colunas específicas:

## ticker.py
...
def select_columns(rows, indices):
    for row in rows:
        yield [row[index] for index in indices]
...
def parse_stock_data(lines):
    rows = csv.reader(lines)
    rows = select_columns(rows, [0, 1, 4])
    return rows

Execute seu programa novamente. Você deve ver a saída reduzida assim:

['GOOG', '1503.06', '2.81']
['AAPL', '253.31', '2.81']
['GOOG', '1503.07', '2.82']
['AAPL', '253.32', '2.82']
['GOOG', '1503.08', '2.83']
...

Escreva funções geradoras que convertem tipos de dados e constroem dicionários. Por exemplo:

## ticker.py
...

def convert_types(rows, types):
    for row in rows:
        yield [func(val) for func, val in zip(types, row)]

def make_dicts(rows, headers):
    for row in rows:
        yield dict(zip(headers, row))
...
def parse_stock_data(lines):
    rows = csv.reader(lines)
    rows = select_columns(rows, [0, 1, 4])
    rows = convert_types(rows, [str, float, float])
    rows = make_dicts(rows, ['name', 'price', 'change'])
    return rows
...

Execute seu programa novamente. Você deve agora ter um fluxo de dicionários como este:

{'name': 'GOOG', 'price': 1503.4, 'change': 3.15}
{'name': 'AAPL', 'price': 253.65, 'change': 3.15}
{'name': 'GOOG', 'price': 1503.41, 'change': 3.16}
{'name': 'AAPL', 'price': 253.66, 'change': 3.16}
{'name': 'GOOG', 'price': 1503.42, 'change': 3.17}
{'name': 'AAPL', 'price': 253.67, 'change': 3.17}
...

Exercício 6.11: Filtrando dados

Escreva uma função que filtre dados. Por exemplo:

## ticker.py
...

def filter_symbols(rows, names):
    for row in rows:
        if row['GOOG'] in names:
            yield row

Use isso para filtrar ações apenas para aquelas em seu portfólio:

import report
import ticker
import follow
portfolio = report.read_portfolio('portfolio.csv')
rows = ticker.parse_stock_data(follow.follow('stocklog.csv'))
rows = ticker.filter_symbols(rows, portfolio)
for row in rows:
    print(row)

Discussão

Algumas lições aprendidas: Você pode criar várias funções geradoras e encadeá-las para realizar o processamento envolvendo pipelines de fluxo de dados. Além disso, você pode criar funções que empacotam uma série de estágios de pipeline em uma única chamada de função (por exemplo, a função parse_stock_data()).

Resumo

Parabéns! Você concluiu o laboratório de Produtores, Consumidores e Pipelines. Você pode praticar mais laboratórios no LabEx para aprimorar suas habilidades.