生産者、消費者、およびパイプライン

Beginner

This tutorial is from open-source community. Access the source code

はじめに

ジェネレータは、さまざまな生産者/消費者問題やデータフローパイプラインを設定するための便利なツールです。このセクションではそれについて説明します。

生産者 - 消費者問題

ジェネレータは、さまざまな形式の「生産者 - 消費者」問題と密接に関連しています。

## 生産者
def follow(f):
 ...
    while True:
     ...
        yield line        ## 以下の `line` の値を生成する
     ...

## 消費者
for line in follow(f):    ## 上の `yield` から値を消費する
 ...

yield は、for が消費する値を生成します。

ジェネレータパイプライン

ジェネレータのこの側面を使って、処理パイプライン(Unix パイプのようなもの)を設定することができます。

生産者処理処理消費者

処理パイプには、初期のデータ生産者と、いくつかの中間処理段階と、最終的な消費者があります。

生産者処理処理消費者

def producer():
 ...
    yield item
 ...

生産者は通常、ジェネレータです。ただし、他のシーケンスのリストであってもかまいません。yield によってデータがパイプラインに供給されます。

生産者処理処理消費者

def consumer(s):
    for item in s:
     ...

消費者は for ループです。項目を取得して、それらを使って何かを行います。

生産者処理処理消費者

def processing(s):
    for item in s:
     ...
        yield newitem
     ...

中間処理段階は、同時に項目を消費して生成します。データストリームを変更する場合があります。また、フィルタリング(項目を破棄する)もできます。

生産者処理処理消費者

def producer():
 ...
    yield item          ## `processing` によって受け取られる項目を生成する
 ...

def processing(s):
    for item in s:      ## `producer` から来る
     ...
        yield newitem   ## 新しい項目を生成する
     ...

def consumer(s):
    for item in s:      ## `processing` から来る
     ...

パイプラインを設定するコード

a = producer()
b = processing(a)
c = consumer(b)

データが異なる関数を通じて徐々に流れることに気付くでしょう。

このエクササイズでは、stocksim.py プログラムがまだバックグラウンドで実行されている必要があります。前のエクササイズで書いた follow() 関数を使います。

演習 6.8: 単純なパイプラインの設定

パイプラインの考え方を実際に見てみましょう。次の関数を書きましょう。

>>> def filematch(lines, substr):
        for line in lines:
            if substr in line:
                yield line

>>>

この関数は、前の演習の最初のジェネレータの例とほぼ同じですが、もはやファイルを開いていません。引数として与えられた行のシーケンスのみを操作します。では、これを試してみましょう。

>>> from follow import follow
>>> lines = follow('stocklog.csv')
>>> goog = filematch(lines, 'GOOG')
>>> for line in goog:
        print(line)

... 出力が表示されるのを待つ...

出力が表示されるまでに少し時間がかかる場合がありますが、最終的には GOOG に関するデータを含むいくつかの行が表示されるはずです。

注:これらの演習は、2 つの別々のターミナルで同時に行う必要があります。

演習 6.9: より複雑なパイプラインの設定

パイプラインの考え方をさらに進めて、より多くの操作を行いましょう。

>>> from follow import follow
>>> import csv
>>> lines = follow('stocklog.csv')
>>> rows = csv.reader(lines)
>>> for row in rows:
        print(row)

['GOOG', '1502.08', '2023-10-01', '09:37.19', '1.83', '1500.25', '1502.08', '1500.25', '731']
['AAPL', '252.33', '2023-10-01', '09:37.19', '1.83', '250.50', '252.33', '250.50', '681']
['GOOG', '1502.09', '2023-10-01', '09:37.21', '1.84', '1500.25', '1502.09', '1500.25', '734']
['AAPL', '252.34', '2023-10-01', '09:37.21', '1.84', '250.50', '252.34', '250.50', '684']
['GOOG', '1502.10', '2023-10-01', '09:37.23', '1.85', '1500.25', '1502.10', '1500.25', '738']
['AAPL', '252.35', '2023-10-01', '09:37.23', '1.85', '250.50', '252.35', '250.50', '688']
...

さて、これは興味深いです。ここで見ているのは、follow() 関数の出力が csv.reader() 関数にパイプされ、現在は分割された行のシーケンスを取得していることです。

演習 6.10: さらに多くのパイプラインコンポーネントを作成する

全体的な考え方をさらに大きなパイプラインに拡張しましょう。別のファイル ticker.py で、上で行ったように CSV ファイルを読み取る関数を作成します。

## ticker.py

from follow import follow
import csv

def parse_stock_data(lines):
    rows = csv.reader(lines)
    return rows

if __name__ == '__main__':
    lines = follow('stocklog.csv')
    rows = parse_stock_data(lines)
    for row in rows:
        print(row)

特定の列を選択する新しい関数を書きます。

## ticker.py

... def select_columns(rows, indices): for row in rows: yield [row[index] for index in indices] ... def parse_stock_data(lines): rows = csv.reader(lines) rows = select_columns(rows, [0, 1, 4]) return rows

再度プログラムを実行します。このように出力が絞られたものが表示されるはずです。

['GOOG', '1503.06', '2.81']
['AAPL', '253.31', '2.81']
['GOOG', '1503.07', '2.82']
['AAPL', '253.32', '2.82']
['GOOG', '1503.08', '2.83']

...

データ型を変換し、辞書を作成するジェネレータ関数を書きます。たとえば:

## ticker.py
...

def convert_types(rows, types):
    for row in rows:
        yield [func(val) for func, val in zip(types, row)]

def make_dicts(rows, headers):
    for row in rows:
        yield dict(zip(headers, row))
...
def parse_stock_data(lines):
    rows = csv.reader(lines)
    rows = select_columns(rows, [0, 1, 4])
    rows = convert_types(rows, [str, float, float])
    rows = make_dicts(rows, ['name', 'price', 'change'])
    return rows
...

再度プログラムを実行します。このような辞書のストリームが表示されるはずです。

{'name': 'GOOG', 'price': 1503.4, 'change': 3.15}
{'name': 'AAPL', 'price': 253.65, 'change': 3.15}
{'name': 'GOOG', 'price': 1503.41, 'change': 3.16}
{'name': 'AAPL', 'price': 253.66, 'change': 3.16}
{'name': 'GOOG', 'price': 1503.42, 'change': 3.17}
{'name': 'AAPL', 'price': 253.67, 'change': 3.17}

...

演習 6.11: データのフィルタリング

データをフィルタリングする関数を書きます。たとえば:

## ticker.py
...

def filter_symbols(rows, names):
    for row in rows:
        if row['GOOG'] in names:
            yield row

これを使って、ポートフォリオに含まれる銘柄のみにデータをフィルタリングします。

import report
import ticker
import follow
portfolio = report.read_portfolio('portfolio.csv')
rows = ticker.parse_stock_data(follow.follow('stocklog.csv'))
rows = ticker.filter_symbols(rows, portfolio)
for row in rows:
    print(row)

考察

学んだこと:様々なジェネレータ関数を作成し、データフローパイプラインに関する処理を行うためにそれらをチェーン化することができます。また、一連のパイプライン段階を単一の関数呼び出しにまとめる関数を作成することもできます (たとえば、parse_stock_data() 関数)。

まとめ

おめでとうございます!あなたは、生産者、消費者、およびパイプラインの実験を完了しました。あなたのスキルを向上させるために、LabEx でさらに多くの実験を行って練習することができます。