コルーチンによるデータ処理

Beginner

This tutorial is from open-source community. Access the source code

はじめに

この実験では、コルーチンを使用してデータ処理パイプラインを構築する方法を学びます。コルーチンは Python の強力な機能で、協調的マルチタスクをサポートし、関数が一時停止して後で実行を再開できるようにします。

この実験の目的は、Python でコルーチンがどのように動作するかを理解し、コルーチンに基づくデータ処理パイプラインを実装し、複数のコルーチンステージを通じてデータを変換することです。2 つのファイルを作成します。cofollow.py(コルーチンベースのファイルフォロワー)とcoticker.py(コルーチンを使用した株式ティッカーアプリケーション)です。前の演習のstocksim.pyプログラムがまだバックグラウンドで実行され、ログファイルに株式データを生成していることを前提としています。

ファイルフォロワーを使ったコルーチンの理解

まずは、コルーチンとは何か、そして Python でどのように動作するかを理解しましょう。コルーチンはジェネレータ関数の特殊なバージョンです。Python では、関数は通常、呼び出されるたびに最初から実行されます。しかし、コルーチンは異なります。コルーチンはデータを消費することも生成することもでき、実行を一時停止して再開する機能を持っています。つまり、コルーチンはある時点で操作を一時停止し、後で中断したところから再開することができます。

基本的なコルーチンファイルフォロワーの作成

このステップでは、コルーチンを使用してファイルの新しい内容を監視し、それを処理するファイルフォロワーを作成します。これは Unix のtail -fコマンドに似ており、ファイルの末尾を継続的に表示し、新しい行が追加されると更新されます。

  1. コードエディタを開き、/home/labex/projectディレクトリにcofollow.pyという名前の新しいファイルを作成します。ここで、コルーチンを使用したファイルフォロワーを実装する Python コードを記述します。

  2. 以下のコードをファイルにコピーします。

## cofollow.py
import os
import time

## Data source
def follow(filename, target):
    with open(filename, 'r') as f:
        f.seek(0, os.SEEK_END)  ## Move to the end of the file
        while True:
            line = f.readline()
            if line != '':
                target.send(line)  ## Send the line to the target coroutine
            else:
                time.sleep(0.1)  ## Sleep briefly if no new content

## Decorator for coroutine functions
from functools import wraps

def consumer(func):
    @wraps(func)
    def start(*args, **kwargs):
        f = func(*args, **kwargs)
        f.send(None)  ## Prime the coroutine (necessary first step)
        return f
    return start

## Sample coroutine
@consumer
def printer():
    while True:
        item = yield     ## Receive an item sent to me
        print(item)

## Example use
if __name__ == '__main__':
    follow('stocklog.csv', printer())
  1. このコードの主要なコンポーネントを理解しましょう。

    • follow(filename, target): この関数はファイルを開く役割を担っています。まず、f.seek(0, os.SEEK_END)を使用してファイルポインタをファイルの末尾に移動させます。その後、無限ループに入り、ファイルから新しい行を継続的に読み取ろうとします。新しい行が見つかった場合、sendメソッドを使用してその行をターゲットのコルーチンに送信します。新しい内容がない場合は、time.sleep(0.1)を使用して短時間(0.1 秒)一時停止してから再度チェックします。
    • @consumerデコレータ:Python では、コルーチンはデータを受け取り始める前に「初期化」する必要があります。このデコレータはその役割を担っています。自動的にコルーチンに初期のNone値を送信します。これは、コルーチンが実際のデータを受け取る準備をするために必要な最初のステップです。
    • printer()コルーチン:これは単純なコルーチンです。無限ループを持ち、yieldキーワードを使用して送信されたアイテムを受け取ります。アイテムを受け取ると、単にそれを印刷します。
  2. ファイルを保存し、ターミナルから実行します。

cd /home/labex/project
python3 cofollow.py
  1. スクリプトが株式ログファイルの内容を印刷し、ファイルに新しい行が追加されるとそれを継続的に印刷するのが見えるはずです。Ctrl+Cを押してプログラムを停止します。

ここでの重要な概念は、データがfollow関数からsendメソッドを通じてprinterコルーチンに流れるということです。このデータの「押し出し」は、イテレーションを通じてデータを「引き出す」ジェネレータとは逆です。ジェネレータでは、通常、生成する値を反復処理するためにforループを使用します。しかし、このコルーチンの例では、データはコードの一部から別の部分に能動的に送信されます。

コルーチンパイプラインコンポーネントの作成

このステップでは、株式データを処理するためのより特殊なコルーチンを作成します。コルーチンは、実行を一時停止して再開できる特殊なタイプの関数であり、データ処理パイプラインの構築に非常に役立ちます。作成する各コルーチンは、全体の処理パイプラインの中で特定のタスクを実行します。

  1. まず、新しいファイルを作成する必要があります。/home/labex/projectディレクトリに移動し、coticker.pyという名前のファイルを作成します。このファイルには、コルーチンベースのデータ処理に関するすべてのコードが含まれます。

  2. 次に、coticker.pyファイルにコードを書き始めましょう。まず、必要なモジュールをインポートし、基本構造を定義します。モジュールは、便利な関数やクラスを提供する事前に書かれたコードライブラリです。以下のコードがその役割を果たします。

## coticker.py
from structure import Structure

class Ticker(Structure):
    name = String()
    price = Float()
    date = String()
    time = String()
    change = Float()
    open = Float()
    high = Float()
    low = Float()
    volume = Integer()

from cofollow import consumer, follow
from tableformat import create_formatter
import csv
  1. 上記のコードを見ると、String()Float()Integer()に関連するエラーがあることに気づくでしょう。これらはインポートする必要があるクラスです。そのため、ファイルの先頭に必要なインポートを追加します。これにより、Python はこれらのクラスの場所を知ることができます。以下は更新後のコードです。
## coticker.py
from structure import Structure, String, Float, Integer

class Ticker(Structure):
    name = String()
    price = Float()
    date = String()
    time = String()
    change = Float()
    open = Float()
    high = Float()
    low = Float()
    volume = Integer()

from cofollow import consumer, follow
from tableformat import create_formatter
import csv
  1. 次に、データ処理パイプラインを形成するコルーチンコンポーネントを追加します。各コルーチンは、パイプライン内で特定の役割を持っています。以下はこれらのコルーチンを追加するコードです。
@consumer
def to_csv(target):
    def producer():
        while True:
            line = yield

    reader = csv.reader(producer())
    while True:
        line = yield
        target.send(next(reader))

@consumer
def create_ticker(target):
    while True:
        row = yield
        target.send(Ticker.from_row(row))

@consumer
def negchange(target):
    while True:
        record = yield
        if record.change < 0:
            target.send(record)

@consumer
def ticker(fmt, fields):
    formatter = create_formatter(fmt)
    formatter.headings(fields)
    while True:
        rec = yield
        row = [getattr(rec, name) for name in fields]
        formatter.row(row)
  1. これらの各コルーチンが何をするかを理解しましょう。

    • to_csv: このコルーチンの役割は、生のテキスト行を解析された CSV 行に変換することです。これは、データが最初はテキスト形式であり、構造化された CSV データに分割する必要があるため重要です。
    • create_ticker: このコルーチンは、CSV 行を受け取り、それらからTickerオブジェクトを作成します。Tickerオブジェクトは、株式データをより整理された方法で表します。
    • negchange: このコルーチンは、Tickerオブジェクトをフィルタリングします。価格が下落している株式のみを通過させます。これにより、価値が低下している株式に焦点を当てることができます。
    • ticker: このコルーチンは、ティッカーデータを整形して表示します。フォーマッタを使用して、データを見やすい表形式で表示します。
  2. 最後に、これらすべてのコンポーネントを接続するメインプログラムコードを追加する必要があります。このコードは、パイプラインを通じたデータの流れを設定します。以下はそのコードです。

if __name__ == '__main__':
    import sys

    ## Define the field names to display
    fields = ['name', 'price', 'change']

    ## Create the processing pipeline
    t = ticker('text', fields)
    neg_filter = negchange(t)
    tick_creator = create_ticker(neg_filter)
    csv_parser = to_csv(tick_creator)

    ## Connect the pipeline to the data source
    follow('stocklog.csv', csv_parser)
  1. すべてのコードを書いた後、coticker.pyファイルを保存します。次に、ターミナルを開き、以下のコマンドを実行します。cdコマンドは、ファイルが保存されているディレクトリに移動し、python3コマンドは Python スクリプトを実行します。
cd /home/labex/project
python3 coticker.py
  1. すべてがうまくいけば、ターミナルに整形された表が表示されるはずです。この表は、価格が下落している株式を示しています。出力は次のようになります。
      name      price     change
---------- ---------- ----------
      MSFT      72.50      -0.25
        AA      35.25      -0.15
       IBM      50.10      -0.15
      GOOG     100.02      -0.01
      AAPL     102.50      -0.06

表の実際の値は、生成される株式データによって異なることに注意してください。

パイプラインの流れの理解

このプログラムの最も重要な部分は、データがコルーチンを通じてどのように流れるかです。以下に、その流れを段階的に説明します。

  1. follow関数は、stocklog.csvファイルから行を読み取り始めます。これがデータソースです。
  2. 読み取られた各行は、csv_parserコルーチンに送信されます。csv_parserは、生のテキスト行を受け取り、それを CSV フィールドに解析します。
  3. 解析された CSV データは、tick_creatorコルーチンに送信されます。このコルーチンは、CSV 行からTickerオブジェクトを作成します。
  4. Tickerオブジェクトは、neg_filterコルーチンに送信されます。このコルーチンは、各Tickerオブジェクトをチェックします。株式の価格が下落している場合、オブジェクトを通過させます。そうでない場合は、破棄します。
  5. 最後に、フィルタリングされたTickerオブジェクトは、tickerコルーチンに送信されます。tickerコルーチンは、データを整形し、表形式で表示します。

このパイプラインアーキテクチャは、各コンポーネントが単一のタスクに集中できるため非常に便利です。これにより、コードがよりモジュール化され、理解、変更、および保守が容易になります。

コルーチンパイプラインの拡張

基本的なパイプラインが稼働したので、これをより柔軟にする時が来ました。プログラミングにおいて、柔軟性はコードが様々な要件に適応できるため、非常に重要です。これを実現するために、coticker.pyプログラムを修正して、様々なフィルタリングとフォーマットオプションをサポートするようにします。

  1. まず、コードエディタでcoticker.pyファイルを開きます。コードエディタは、プログラムに必要なすべての変更を加える場所です。コードを表示、編集、保存する便利な環境を提供します。

  2. 次に、株式名でデータをフィルタリングする新しいコルーチンを追加します。コルーチンは、実行を一時停止して再開できる特殊なタイプの関数です。これにより、データがさまざまな処理ステップを流れるパイプラインを作成することができます。以下は新しいコルーチンのコードです。

@consumer
def filter_by_name(name, target):
    while True:
        record = yield
        if record.name == name:
            target.send(record)

このコードでは、filter_by_nameコルーチンは株式名とターゲットコルーチンをパラメータとして受け取ります。yieldキーワードを使用してレコードを待ち続けます。レコードが到着すると、レコードの名前が指定された名前と一致するかどうかをチェックします。一致する場合は、レコードをターゲットコルーチンに送信します。

  1. 次に、価格の閾値に基づいてフィルタリングする別のコルーチンを追加しましょう。このコルーチンは、特定の価格範囲内の株式を選択するのに役立ちます。以下はコードです。
@consumer
def price_threshold(min_price, max_price, target):
    while True:
        record = yield
        if min_price <= record.price <= max_price:
            target.send(record)

前のコルーチンと同様に、price_thresholdコルーチンはレコードを待ちます。そして、レコードの価格が指定された最小価格と最大価格の範囲内にあるかどうかをチェックします。範囲内にある場合は、レコードをターゲットコルーチンに送信します。

  1. 新しいコルーチンを追加した後、これらの追加フィルタを実証するためにメインプログラムを更新する必要があります。メインプログラムはアプリケーションのエントリポイントで、処理パイプラインを設定し、データの流れを開始します。以下は更新後のコードです。
if __name__ == '__main__':
    import sys

    ## Define the field names to display
    fields = ['name', 'price', 'change', 'high', 'low']

    ## Create the processing pipeline with multiple outputs

    ## Pipeline 1: Show all negative changes (same as before)
    print("Stocks with negative changes:")
    t1 = ticker('text', fields)
    neg_filter = negchange(t1)
    tick_creator1 = create_ticker(neg_filter)
    csv_parser1 = to_csv(tick_creator1)

    ## Start following the file with the first pipeline
    import threading
    threading.Thread(target=follow, args=('stocklog.csv', csv_parser1), daemon=True).start()

    ## Wait a moment to see some results
    import time
    time.sleep(5)

    ## Pipeline 2: Filter by name (AAPL)
    print("\nApple stock updates:")
    t2 = ticker('text', fields)
    name_filter = filter_by_name('AAPL', t2)
    tick_creator2 = create_ticker(name_filter)
    csv_parser2 = to_csv(tick_creator2)

    ## Follow the file with the second pipeline
    threading.Thread(target=follow, args=('stocklog.csv', csv_parser2), daemon=True).start()

    ## Wait a moment to see some results
    time.sleep(5)

    ## Pipeline 3: Filter by price range
    print("\nStocks priced between 50 and 75:")
    t3 = ticker('text', fields)
    price_filter = price_threshold(50, 75, t3)
    tick_creator3 = create_ticker(price_filter)
    csv_parser3 = to_csv(tick_creator3)

    ## Follow with the third pipeline
    follow('stocklog.csv', csv_parser3)

この更新されたコードでは、3 つの異なる処理パイプラインを作成しています。最初のパイプラインは価格が下落している株式を表示し、2 番目のパイプラインは名前が'AAPL'の株式をフィルタリングし、3 番目のパイプラインは価格が 50 から 75 の間の株式をフィルタリングします。スレッドを使用して最初の 2 つのパイプラインを同時に実行し、データをより効率的に処理できるようにしています。

  1. すべての変更を加えたら、ファイルを保存します。ファイルを保存することで、すべての変更が保存されます。次に、ターミナルで以下のコマンドを使用して更新されたプログラムを実行します。
cd /home/labex/project
python3 coticker.py

cdコマンドは現在のディレクトリをプロジェクトディレクトリに変更し、python3 coticker.pyコマンドは Python プログラムを実行します。

  1. プログラムを実行した後、3 つの異なる出力が表示されるはずです。
    • まず、価格が下落している株式が表示されます。
    • 次に、すべての AAPL 株式の更新情報が表示されます。
    • 最後に、価格が 50 から 75 の間のすべての株式が表示されます。

拡張されたパイプラインの理解

拡張されたプログラムはいくつかの重要な概念を示しています。

  1. 複数のパイプライン:同じデータソースから複数の処理パイプラインを作成することができます。これにより、同じデータに対して異なるタイプの分析を同時に行うことができます。
  2. 特殊なフィルタ:特定のフィルタリングタスクのために異なるコルーチンを作成することができます。これらのフィルタは、特定の基準を満たすデータのみを選択するのに役立ちます。
  3. 並行処理:スレッドを使用して、複数のパイプラインを同時に実行することができます。これにより、データを並列に処理できるため、プログラムの効率が向上します。
  4. パイプラインの構成:コルーチンをさまざまな方法で組み合わせて、異なるデータ処理の目標を達成することができます。これにより、必要に応じてデータ処理パイプラインをカスタマイズする柔軟性が得られます。

このアプローチは、ストリーミングデータを柔軟かつモジュール化された方法で処理する手段を提供します。プログラムの全体的なアーキテクチャを変更することなく、処理ステップを追加または変更することができます。

まとめ

この実験では、Python でコルーチンを使用してデータ処理パイプラインを構築する方法を学びました。重要な概念には、コルーチンの基本、例えば動作原理、初期化の必要性、デコレータを使った初期化などの理解が含まれます。また、データの流れについても調べ、ジェネレータの「プル」モデルとは異なり、send()メソッドを介してデータをパイプラインに流す方法を学びました。

さらに、CSV データの解析、レコードのフィルタリング、出力の整形などのタスク用に特殊なコルーチンを作成しました。複数のコルーチンを接続してパイプラインを構築し、フィルタリングや変換操作を実装する方法も学びました。コルーチンは、ストリーミングデータ処理に強力なアプローチを提供し、関心事の明確な分離と個々の段階の容易な変更を可能にします。