Introduction
Generators are a useful tool for setting various kinds of producer/consumer problems and dataflow pipelines. This section discusses that.
This tutorial is from open-source community. Access the source code
Generators are a useful tool for setting various kinds of producer/consumer problems and dataflow pipelines. This section discusses that.
Generators are closely related to various forms of producer-consumer problems.
## Producer
def follow(f):
...
while True:
...
yield line ## Produces value in `line` below
...
## Consumer
for line in follow(f): ## Consumes value from `yield` above
...
yield
produces values that for
consumes.
You can use this aspect of generators to set up processing pipelines (like Unix pipes).
producer â processing â processing â consumer
Processing pipes have an initial data producer, some set of intermediate processing stages and a final consumer.
producer â processing â processing â consumer
def producer():
...
yield item
...
The producer is typically a generator. Although it could also be a list of some other sequence. yield
feeds data into the pipeline.
producer â processing â processing â consumer
def consumer(s):
for item in s:
...
Consumer is a for-loop. It gets items and does something with them.
producer â processing â processing â consumer
def processing(s):
for item in s:
...
yield newitem
...
Intermediate processing stages simultaneously consume and produce items. They might modify the data stream. They can also filter (discarding items).
producer â processing â processing â consumer
def producer():
...
yield item ## yields the item that is received by the `processing`
...
def processing(s):
for item in s: ## Comes from the `producer`
...
yield newitem ## yields a new item
...
def consumer(s):
for item in s: ## Comes from the `processing`
...
Code to setup the pipeline
a = producer()
b = processing(a)
c = consumer(b)
You will notice that data incrementally flows through the different functions.
For this exercise the stocksim.py
program should still be running in the background. You're going to use the follow()
function you wrote in the previous exercise.
Let's see the pipelining idea in action. Write the following function:
>>> def filematch(lines, substr):
for line in lines:
if substr in line:
yield line
>>>
This function is almost exactly the same as the first generator example in the previous exercise except that it's no longer opening a file--it merely operates on a sequence of lines given to it as an argument. Now, try this:
>>> from follow import follow
>>> lines = follow('stocklog.csv')
>>> goog = filematch(lines, 'GOOG')
>>> for line in goog:
print(line)
... wait for output ...
It might take awhile for output to appear, but eventually you should see some lines containing data for GOOG.
Note: These exercises must be carried out simultaneously on two separate terminals.
Take the pipelining idea a few steps further by performing more actions.
>>> from follow import follow
>>> import csv
>>> lines = follow('stocklog.csv')
>>> rows = csv.reader(lines)
>>> for row in rows:
print(row)
['GOOG', '1502.08', '2023-10-01', '09:37.19', '1.83', '1500.25', '1502.08', '1500.25', '731']
['AAPL', '252.33', '2023-10-01', '09:37.19', '1.83', '250.50', '252.33', '250.50', '681']
['GOOG', '1502.09', '2023-10-01', '09:37.21', '1.84', '1500.25', '1502.09', '1500.25', '734']
['AAPL', '252.34', '2023-10-01', '09:37.21', '1.84', '250.50', '252.34', '250.50', '684']
['GOOG', '1502.10', '2023-10-01', '09:37.23', '1.85', '1500.25', '1502.10', '1500.25', '738']
['AAPL', '252.35', '2023-10-01', '09:37.23', '1.85', '250.50', '252.35', '250.50', '688']
...
Well, that's interesting. What you're seeing here is that the output of the follow()
function has been piped into the csv.reader()
function and we're now getting a sequence of split rows.
Let's extend the whole idea into a larger pipeline. In a separate file ticker.py
, start by creating a function that reads a CSV file as you did above:
## ticker.py
from follow import follow
import csv
def parse_stock_data(lines):
rows = csv.reader(lines)
return rows
if __name__ == '__main__':
lines = follow('stocklog.csv')
rows = parse_stock_data(lines)
for row in rows:
print(row)
Write a new function that selects specific columns:
## ticker.py
...
def select_columns(rows, indices):
for row in rows:
yield [row[index] for index in indices]
...
def parse_stock_data(lines):
rows = csv.reader(lines)
rows = select_columns(rows, [0, 1, 4])
return rows
Run your program again. You should see output narrowed down like this:
['GOOG', '1503.06', '2.81']
['AAPL', '253.31', '2.81']
['GOOG', '1503.07', '2.82']
['AAPL', '253.32', '2.82']
['GOOG', '1503.08', '2.83']
...
Write generator functions that convert data types and build dictionaries. For example:
## ticker.py
...
def convert_types(rows, types):
for row in rows:
yield [func(val) for func, val in zip(types, row)]
def make_dicts(rows, headers):
for row in rows:
yield dict(zip(headers, row))
...
def parse_stock_data(lines):
rows = csv.reader(lines)
rows = select_columns(rows, [0, 1, 4])
rows = convert_types(rows, [str, float, float])
rows = make_dicts(rows, ['name', 'price', 'change'])
return rows
...
Run your program again. You should now a stream of dictionaries like this:
{'name': 'GOOG', 'price': 1503.4, 'change': 3.15}
{'name': 'AAPL', 'price': 253.65, 'change': 3.15}
{'name': 'GOOG', 'price': 1503.41, 'change': 3.16}
{'name': 'AAPL', 'price': 253.66, 'change': 3.16}
{'name': 'GOOG', 'price': 1503.42, 'change': 3.17}
{'name': 'AAPL', 'price': 253.67, 'change': 3.17}
...
Write a function that filters data. For example:
## ticker.py
...
def filter_symbols(rows, names):
for row in rows:
if row['GOOG'] in names:
yield row
Use this to filter stocks to just those in your portfolio:
import report
import ticker
import follow
portfolio = report.read_portfolio('portfolio.csv')
rows = ticker.parse_stock_data(follow.follow('stocklog.csv'))
rows = ticker.filter_symbols(rows, portfolio)
for row in rows:
print(row)
Some lessons learned: You can create various generator functions and chain them together to perform processing involving data-flow pipelines. In addition, you can create functions that package a series of pipeline stages into a single function call (for example, the parse_stock_data()
function).
Congratulations! You have completed the Producers, Consumers and Pipelines lab. You can practice more labs in LabEx to improve your skills.