Introdução
Geradores são uma ferramenta útil para configurar vários tipos de problemas produtor/consumidor e pipelines de fluxo de dados (dataflow pipelines). Esta seção discute isso.
Problemas Produtor-Consumidor (Producer-Consumer Problems)
Geradores estão intimamente relacionados a várias formas de problemas produtor-consumidor (producer-consumer).
## Produtor (Producer)
def follow(f):
...
while True:
...
yield line ## Produz valor em `line` abaixo (Produces value in `line` below)
...
## Consumidor (Consumer)
for line in follow(f): ## Consome valor de `yield` acima (Consumes value from `yield` above)
...
yield produz valores que o for consome.
Pipelines de Geradores (Generator Pipelines)
Você pode usar este aspecto dos geradores para configurar pipelines de processamento (como pipes do Unix).
produtor → processamento → processamento → consumidor
Pipes de processamento têm um produtor de dados inicial, um conjunto de estágios de processamento intermediários e um consumidor final.
produtor → processamento → processamento → consumidor
def producer():
...
yield item
...
O produtor é tipicamente um gerador. Embora também possa ser uma lista ou alguma outra sequência. yield alimenta dados no pipeline.
produtor → processamento → processamento → consumidor
def consumer(s):
for item in s:
...
O consumidor é um loop for. Ele recebe itens e faz algo com eles.
produtor → processamento → processamento → consumidor
def processing(s):
for item in s:
...
yield newitem
...
Estágios de processamento intermediários consomem e produzem itens simultaneamente. Eles podem modificar o fluxo de dados. Eles também podem filtrar (descartando itens).
produtor → processamento → processamento → consumidor
def producer():
...
yield item ## yields the item that is received by the `processing`
...
def processing(s):
for item in s: ## Comes from the `producer`
...
yield newitem ## yields a new item
...
def consumer(s):
for item in s: ## Comes from the `processing`
...
Código para configurar o pipeline
a = producer()
b = processing(a)
c = consumer(b)
Você notará que os dados fluem incrementalmente através das diferentes funções.
Para este exercício, o programa stocksim.py ainda deve estar rodando em segundo plano. Você vai usar a função follow() que você escreveu no exercício anterior.
Exercício 6.8: Configurando um pipeline simples
Vamos ver a ideia de pipelining em ação. Escreva a seguinte função:
>>> def filematch(lines, substr):
for line in lines:
if substr in line:
yield line
>>>
Esta função é quase exatamente a mesma do primeiro exemplo de gerador no exercício anterior, exceto que ela não está mais abrindo um arquivo - ela simplesmente opera em uma sequência de linhas fornecidas como argumento. Agora, tente isto:
>>> from follow import follow
>>> lines = follow('stocklog.csv')
>>> goog = filematch(lines, 'GOOG')
>>> for line in goog:
print(line)
... aguarde a saída ...
Pode levar um tempo para a saída aparecer, mas eventualmente você deverá ver algumas linhas contendo dados para GOOG.
Nota: Estes exercícios devem ser realizados simultaneamente em dois terminais separados.
Exercício 6.9: Configurando um pipeline mais complexo
Leve a ideia de pipelining alguns passos adiante, realizando mais ações.
>>> from follow import follow
>>> import csv
>>> lines = follow('stocklog.csv')
>>> rows = csv.reader(lines)
>>> for row in rows:
print(row)
['GOOG', '1502.08', '2023-10-01', '09:37.19', '1.83', '1500.25', '1502.08', '1500.25', '731']
['AAPL', '252.33', '2023-10-01', '09:37.19', '1.83', '250.50', '252.33', '250.50', '681']
['GOOG', '1502.09', '2023-10-01', '09:37.21', '1.84', '1500.25', '1502.09', '1500.25', '734']
['AAPL', '252.34', '2023-10-01', '09:37.21', '1.84', '250.50', '252.34', '250.50', '684']
['GOOG', '1502.10', '2023-10-01', '09:37.23', '1.85', '1500.25', '1502.10', '1500.25', '738']
['AAPL', '252.35', '2023-10-01', '09:37.23', '1.85', '250.50', '252.35', '250.50', '688']
...
Bem, isso é interessante. O que você está vendo aqui é que a saída da função follow() foi canalizada (piped) para a função csv.reader() e agora estamos obtendo uma sequência de linhas divididas.
Exercício 6.10: Criando mais componentes de pipeline
Vamos estender toda a ideia para um pipeline maior. Em um arquivo separado ticker.py, comece criando uma função que lê um arquivo CSV como você fez acima:
## ticker.py
from follow import follow
import csv
def parse_stock_data(lines):
rows = csv.reader(lines)
return rows
if __name__ == '__main__':
lines = follow('stocklog.csv')
rows = parse_stock_data(lines)
for row in rows:
print(row)
Escreva uma nova função que selecione colunas específicas:
## ticker.py
...
def select_columns(rows, indices):
for row in rows:
yield [row[index] for index in indices]
...
def parse_stock_data(lines):
rows = csv.reader(lines)
rows = select_columns(rows, [0, 1, 4])
return rows
Execute seu programa novamente. Você deve ver a saída reduzida assim:
['GOOG', '1503.06', '2.81']
['AAPL', '253.31', '2.81']
['GOOG', '1503.07', '2.82']
['AAPL', '253.32', '2.82']
['GOOG', '1503.08', '2.83']
...
Escreva funções geradoras que convertem tipos de dados e constroem dicionários. Por exemplo:
## ticker.py
...
def convert_types(rows, types):
for row in rows:
yield [func(val) for func, val in zip(types, row)]
def make_dicts(rows, headers):
for row in rows:
yield dict(zip(headers, row))
...
def parse_stock_data(lines):
rows = csv.reader(lines)
rows = select_columns(rows, [0, 1, 4])
rows = convert_types(rows, [str, float, float])
rows = make_dicts(rows, ['name', 'price', 'change'])
return rows
...
Execute seu programa novamente. Você deve agora ter um fluxo de dicionários como este:
{'name': 'GOOG', 'price': 1503.4, 'change': 3.15}
{'name': 'AAPL', 'price': 253.65, 'change': 3.15}
{'name': 'GOOG', 'price': 1503.41, 'change': 3.16}
{'name': 'AAPL', 'price': 253.66, 'change': 3.16}
{'name': 'GOOG', 'price': 1503.42, 'change': 3.17}
{'name': 'AAPL', 'price': 253.67, 'change': 3.17}
...
Exercício 6.11: Filtrando dados
Escreva uma função que filtre dados. Por exemplo:
## ticker.py
...
def filter_symbols(rows, names):
for row in rows:
if row['GOOG'] in names:
yield row
Use isso para filtrar ações apenas para aquelas em seu portfólio:
import report
import ticker
import follow
portfolio = report.read_portfolio('portfolio.csv')
rows = ticker.parse_stock_data(follow.follow('stocklog.csv'))
rows = ticker.filter_symbols(rows, portfolio)
for row in rows:
print(row)
Discussão
Algumas lições aprendidas: Você pode criar várias funções geradoras e encadeá-las para realizar o processamento envolvendo pipelines de fluxo de dados. Além disso, você pode criar funções que empacotam uma série de estágios de pipeline em uma única chamada de função (por exemplo, a função parse_stock_data()).
Resumo
Parabéns! Você concluiu o laboratório de Produtores, Consumidores e Pipelines. Você pode praticar mais laboratórios no LabEx para aprimorar suas habilidades.