Stocksim パイプラインでジェネレータを活用する

Beginner

This tutorial is from open-source community. Access the source code

はじめに

この実験では、Python のジェネレータ (generator) を活用して効率的なデータ処理パイプラインを構築する方法を学びます。ジェネレータは Python の強力な機能で、必要に応じてデータを生成することができ、すべてのデータを同時にメモリに格納する必要をなくします。Unix パイプに似たデータ処理ワークフローを作成するためにジェネレータを接続する方法を学びます。

この実験の目的は、ジェネレータベースの処理パイプラインの基本を理解し、Python のジェネレータを使用してデータ処理ワークフローを作成し、リアルタイムデータストリームをフィルタリングおよび整形することです。この実験中に ticker.py ファイルが作成されます。この演習では、stocksim.py プログラムがバックグラウンドで実行されている必要があり、前の演習で使用した follow() 関数を使用します。

CSV データを使用した基本的なジェネレータパイプライン

このステップでは、ジェネレータ (generator) を使用して基本的な処理パイプラインを作成する方法を学びます。まずは、ジェネレータが何であるかを理解しましょう。ジェネレータは Python の特殊な種類のイテレータ (iterator) です。一度にすべてのデータをメモリにロードする通常のイテレータとは異なり、ジェネレータは必要に応じて値を生成します。これは大規模なデータストリームを扱う際に非常に有用で、メモリを節約します。ジェネレータは、データセット全体をメモリに格納する代わりに、必要なときに値を 1 つずつ生成します。

ジェネレータの理解

ジェネレータは本質的にイテレータを返す関数です。このイテレータを反復処理すると、一連の値が生成されます。ジェネレータ関数の書き方は通常の関数に似ていますが、重要な違いがあります。通常の関数では return 文を使用しますが、ジェネレータ関数では yield 文を使用します。yield 文は独特な動作をします。関数を一時停止し、現在の状態を保存します。次の値が要求されると、関数は中断したところから再開します。これにより、ジェネレータは毎回最初から始める必要なく、値を段階的に生成することができます。

follow() 関数の使用

先ほど作成した follow() 関数は、Unix の tail -f コマンドと同様の動作をします。tail -f コマンドはファイルの新しい内容を継続的に監視し、follow() 関数も同じです。では、これを使用して簡単な処理パイプラインを作成しましょう。

ステップ 1: 新しいターミナルウィンドウを開く

まず、WebIDE で新しいターミナルウィンドウを開きます。Terminal → New Terminal から開くことができます。この新しいターミナルで Python コマンドを実行します。

ステップ 2: Python インタラクティブシェルを起動する

新しいターミナルが開いたら、Python インタラクティブシェルを起動します。ターミナルに以下のコマンドを入力することで起動できます。

python3

Python インタラクティブシェルを使用すると、Python コードを 1 行ずつ実行し、すぐに結果を確認することができます。

ステップ 3: follow 関数をインポートし、パイプラインを設定する

次に、follow 関数をインポートし、株式データを読み取る基本的なパイプラインを設定します。Python インタラクティブシェルで以下のコードを入力します。

>>> from follow import follow
>>> import csv
>>> lines = follow('stocklog.csv')
>>> rows = csv.reader(lines)
>>> for row in rows:
...     print(row)
...

各行の動作は以下の通りです。

  • from follow import follow: follow モジュールから follow 関数をインポートします。
  • import csv: Python で CSV ファイルを読み書きするための csv モジュールをインポートします。
  • lines = follow('stocklog.csv'): stocklog.csv というファイル名を引数に follow 関数を呼び出します。follow 関数は、ファイルに新しい行が追加されるたびにその行を生成するジェネレータを返します。
  • rows = csv.reader(lines): csv.reader() 関数は、follow 関数によって生成された行を受け取り、それらを CSV データの行に解析します。
  • for ループはこれらの行を反復処理し、各行を出力します。

ステップ 4: 出力を確認する

コードを実行すると、以下のような出力が表示されます(データは異なる場合があります)。

['BA', '98.35', '6/11/2007', '09:41.07', '0.16', '98.25', '98.35', '98.31', '158148']
['AA', '39.63', '6/11/2007', '09:41.07', '-0.03', '39.67', '39.63', '39.31', '270224']
['XOM', '82.45', '6/11/2007', '09:41.07', '-0.23', '82.68', '82.64', '82.41', '748062']
['PG', '62.95', '6/11/2007', '09:41.08', '-0.12', '62.80', '62.97', '62.61', '454327']
...

この出力は、データパイプラインが正常に作成されたことを示しています。follow() 関数がファイルから行を生成し、これらの行は csv.reader() 関数に渡され、データの行に解析されます。

出力を十分に確認したら、Ctrl+C を押して実行を停止できます。

何が起こっているのか?

このパイプラインで何が起こっているかを分解してみましょう。

  1. follow('stocklog.csv') はジェネレータを作成します。このジェネレータは stocklog.csv ファイルを追跡し、ファイルに新しい行が追加されるたびにその行を生成します。
  2. csv.reader(lines) は、follow 関数によって生成された行を受け取り、それらを CSV の行データに解析します。この関数は CSV ファイルの構造を理解し、行を個々の値に分割します。
  3. for ループはこれらの行を反復処理し、各行を出力します。これにより、データを読みやすい形式で確認することができます。

これは、ジェネレータを使用したデータ処理パイプラインの簡単な例です。次のステップでは、より複雑で有用なパイプラインを構築します。

Ticker クラスの作成

データ処理において、生データを扱うことはかなり難しい場合があります。株式データの処理をより整理されたものにし、効率的に行うために、株式相場を表す適切なクラスを定義します。このクラスは株式データの設計図として機能し、データ処理パイプラインをより堅牢で管理しやすいものにします。

ticker.py ファイルの作成

  1. まず、WebIDE で新しいファイルを作成する必要があります。「New File」アイコンをクリックするか、ファイルエクスプローラーで右クリックして「New File」を選択することができます。このファイルに ticker.py という名前を付けます。このファイルには Ticker クラスのコードが記述されます。

  2. 次に、新しく作成した ticker.py ファイルに以下のコードを追加します。このコードは Ticker クラスを定義し、それをテストするための簡単な処理パイプラインを設定します。

## ticker.py

from structure import Structure, String, Float, Integer

class Ticker(Structure):
    name = String()
    price = Float()
    date = String()
    time = String()
    change = Float()
    open = Float()
    high = Float()
    low = Float()
    volume = Integer()

if __name__ == '__main__':
    from follow import follow
    import csv
    lines = follow('stocklog.csv')
    rows = csv.reader(lines)
    records = (Ticker.from_row(row) for row in rows)
    for record in records:
        print(record)
  1. コードを追加したら、ファイルを保存します。Ctrl+S を押すか、メニューから「File」→「Save」を選択することができます。ファイルを保存することで、変更内容が保存され、後で実行することができます。

コードの理解

このコードがどのように動作するかを詳しく見ていきましょう。

  1. コードの最初では、structure.py モジュールから Structure とフィールドタイプをインポートしています。このモジュールはすでに設定されています。これらのインポートは、Ticker クラスの構成要素を提供するために不可欠です。Structure クラスは Ticker クラスの基底クラスとなり、StringFloatInteger などのフィールドタイプは株式データのフィールドのデータ型を定義します。

  2. 次に、Structure を継承する Ticker クラスを定義します。このクラスには、株式データのさまざまな側面を表すいくつかのフィールドがあります。

    • name: このフィールドは株式シンボル(例えば「IBM」や「AAPL」)を格納します。どの会社の株式を扱っているかを識別するのに役立ちます。
    • price: このフィールドは株式の現在の価格を保持します。これは投資家にとって重要な情報です。
    • datetime: これらのフィールドは株式相場が生成された日時を示します。日時を知ることは、株価のトレンドを経時的に分析するために重要です。
    • change: このフィールドは株価の変動を表します。前の時点と比較して株価が上がったか下がったかを示します。
    • openhighlow: これらのフィールドは、ある期間内の株式の始値、最高値、最低値を表します。株価の範囲を把握するのに役立ちます。
    • volume: このフィールドは取引された株式数を格納します。高い取引量は、特定の株式に対する強い市場関心を示すことがあります。
  3. if __name__ == '__main__': ブロックでは、処理パイプラインを設定しています。このコードブロックは、ticker.py ファイルを直接実行したときに実行されます。

    • follow('stocklog.csv') は、stocklog.csv ファイルから行を生成する関数です。ファイルを 1 行ずつ読み取ることができます。
    • csv.reader(lines) はこれらの行を受け取り、行データに解析します。CSV(Comma - Separated Values)は表形式のデータを格納するための一般的なファイル形式で、この関数は各行からデータを抽出するのに役立ちます。
    • (Ticker.from_row(row) for row in rows) はジェネレータ式です。各行のデータを取り、それを Ticker オブジェクトに変換します。これにより、生の CSV データを操作しやすい構造化されたオブジェクトに変換することができます。
    • for ループはこれらの Ticker オブジェクトを反復処理し、それぞれを出力します。これにより、構造化されたデータが実際に動作する様子を確認することができます。

コードの実行

コードがどのように動作するかを確認するために、実行してみましょう。

  1. まず、ターミナルでプロジェクトディレクトリにいることを確認する必要があります。まだそこにいない場合は、以下のコマンドを使用して移動します。

    cd /home/labex/project
    
  2. 正しいディレクトリにいることを確認したら、以下のコマンドを使用して ticker.py スクリプトを実行します。

    python3 ticker.py
    
  3. スクリプトを実行した後、以下のような出力が表示されるはずです(データは異なる場合があります)。

    Ticker(IBM, 103.53, 6/11/2007, 09:53.59, 0.46, 102.87, 103.53, 102.77, 541633)
    Ticker(MSFT, 30.21, 6/11/2007, 09:54.01, 0.16, 30.05, 30.21, 29.95, 7562516)
    Ticker(AA, 40.01, 6/11/2007, 09:54.01, 0.35, 39.67, 40.15, 39.31, 576619)
    Ticker(T, 40.1, 6/11/2007, 09:54.08, -0.16, 40.2, 40.19, 39.87, 1312959)
    

出力を十分に確認したら、Ctrl+C を押してスクリプトの実行を停止することができます。

生の CSV データが構造化された Ticker オブジェクトに変換されたことに注目してください。この変換により、Ticker クラスで定義されたフィールドを使用して株式データにアクセスし、操作することができるため、処理パイプラインでのデータの操作がはるかに容易になります。

より複雑なデータパイプラインの構築

ここでは、データパイプラインにフィルタリング機能を追加し、データの表示を改善することで、パイプラインを次のレベルに引き上げます。これにより、扱っている情報の分析と理解が容易になります。ticker.py スクリプトに変更を加えます。データをフィルタリングすることで、関心のある特定の情報に焦点を当てることができ、整形された表形式で表示することで、データの読みやすさが向上します。

ticker.py ファイルの更新

  1. まず、WebIDE で ticker.py ファイルを開きます。WebIDE は、ブラウザ内で直接コードを記述および編集できるツールです。Python スクリプトに変更を加えるのに便利な環境を提供します。

  2. 次に、ticker.py ファイル内の if __name__ == '__main__': ブロックを以下のコードに置き換えます。このコードブロックはスクリプトのエントリポイントであり、置き換えることで、スクリプトがデータを処理および表示する方法が変わります。

if __name__ == '__main__':
    from follow import follow
    import csv
    from tableformat import create_formatter, print_table

    formatter = create_formatter('text')

    lines = follow('stocklog.csv')
    rows = csv.reader(lines)
    records = (Ticker.from_row(row) for row in rows)
    negative = (rec for rec in records if rec.change < 0)
    print_table(negative, ['name', 'price', 'change'], formatter)
  1. これらの変更を加えた後、ファイルを保存します。キーボードで Ctrl+S を押すか、メニューから「File」→「Save」を選択することができます。ファイルを保存することで、変更内容が保存され、後で実行することができます。

強化されたパイプラインの理解

この強化されたパイプラインが何を行っているかを詳しく見ていきましょう。各ステップを理解することで、コードの異なる部分がどのように連携してデータを処理および表示するかがわかります。

  1. まず、tableformat モジュールから create_formatterprint_table をインポートします。このモジュールはすでに設定されており、データをきれいな表形式で整形して表示するための関数を提供します。

  2. 次に、create_formatter('text') を使用してテキストフォーマッタを作成します。このフォーマッタは、データを読みやすい形式に整形するために使用されます。

  3. では、パイプラインをステップごとに分解してみましょう。

    • follow('stocklog.csv') は、stocklog.csv ファイルから行を生成する関数です。ファイルを継続的に監視し、新しいデータが追加されると、行を 1 つずつ提供します。
    • csv.reader(lines) は、follow 関数によって生成された行を受け取り、行データに解析します。CSV ファイル内のデータはテキスト形式であるため、操作可能な構造化された形式に変換する必要があります。
    • (Ticker.from_row(row) for row in rows) は、各行のデータを Ticker オブジェクトに変換するジェネレータ式です。Ticker オブジェクトは株式を表し、株式の名前、価格、変動などの情報を含みます。
    • (rec for rec in records if rec.change < 0) は、Ticker オブジェクトをフィルタリングする別のジェネレータ式です。株価の変動が負のオブジェクトのみを保持します。これにより、価格が下落した株式に焦点を当てることができます。
    • print_table(negative, ['name', 'price', 'change'], formatter) は、フィルタリングされた Ticker オブジェクトを、先に作成したフォーマッタを使用して表形式に整形し、コンソールに表示します。

このパイプラインは、ジェネレータの強力さを示しています。ファイルからすべてのデータを一度にメモリにロードするのではなく、複数の操作(読み取り、解析、変換、フィルタリング)を連鎖させ、データを 1 つずつ処理します。これにより、メモリが節約され、コードがより効率的になります。

強化されたパイプラインの実行

更新されたコードを実行して結果を確認しましょう。

  1. まず、ターミナルでプロジェクトディレクトリにいることを確認します。まだそこにいない場合は、以下のコマンドを使用して移動できます。

    cd /home/labex/project
    
  2. プロジェクトディレクトリにいることを確認したら、以下のコマンドを使用して ticker.py スクリプトを実行します。

    python3 ticker.py
    
  3. スクリプトを実行した後、ターミナルにきれいに整形された表が表示されるはずです。この表は、価格が下落した株式のみを表示します。

           name      price     change
     ---------- ---------- ----------
              C      53.12      -0.21
            UTX      70.04      -0.19
            AXP      62.86      -0.18
            MMM      85.72      -0.22
            MCD      51.38      -0.03
            WMT      49.85      -0.23
             KO       51.6      -0.07
            AIG      71.39      -0.14
             PG      63.05      -0.02
             HD      37.76      -0.19
    

出力を十分に確認し、スクリプトの実行を停止したい場合は、キーボードで Ctrl+C を押すことができます。

ジェネレータパイプラインの強力さ

ここで構築したのは、強力なデータ処理パイプラインです。その機能をまとめてみましょう。

  1. stocklog.csv ファイルを継続的に監視し、新しいデータを検出します。つまり、ファイルに新しいデータが追加されると、パイプラインが自動的にそれを処理します。
  2. ファイルからの CSV データを構造化された Ticker オブジェクトに解析します。これにより、データの操作が容易になります。
  3. 特定の条件(この場合は価格の下落)に基づいてデータをフィルタリングします。これにより、価値が低下している株式に焦点を当てることができます。
  4. フィルタリングされたデータを読みやすい表形式で整形して表示します。これにより、データの分析と結論の導出が容易になります。

このパイプラインでジェネレータを使用する主な利点の 1 つは、最小限のメモリしか使用しないことです。ジェネレータは必要に応じて値を生成するため、一度にすべてのデータをメモリに格納する必要がありません。これは、各コンポーネントがデータを処理して次のコンポーネントに渡す Unix パイプに似ています。

ジェネレータはレゴブロックのようなものだと考えることができます。レゴブロックを積み重ねてさまざまな構造物を作るように、ジェネレータを組み合わせて強力なデータ処理ワークフローを作成することができます。このモジュール方式により、単純で再利用可能なコンポーネントから複雑なシステムを構築することができます。

まとめ

この実験では、Python のジェネレータを使用して効率的なデータ処理パイプラインを構築する方法を学びました。follow() 関数を使用してファイルの新しいデータを監視する、株式相場を表す Ticker クラスを作成する、CSV データを読み取り、解析し、フィルタリングし、結果を整形して表示する多段階の処理パイプラインを構築するなど、いくつかの重要なタスクを完了しました。

ジェネレータベースのアプローチには、データが必要に応じて処理されるためのメモリ効率、パイプラインコンポーネントの簡単な組み合わせと再利用を可能にするモジュール性、複雑なデータフローを簡単に表現できるといった複数の利点があります。これらの概念は、特に大規模なデータセットやストリーミングデータの実世界のデータ処理で一般的に適用されます。