使いやすいスレッド処理

PythonBeginner
オンラインで実践に進む

はじめに

このチュートリアルでは、Python のスレッドモジュールを使って、複数の実行スレッドを同時に実行する方法を学びます。

Python のスレッドモジュールは、Python プログラムでスレッドを作成および管理するための簡単な方法を提供します。スレッドは、プログラム内の別個の実行フローです。複数のスレッドを同時に実行することで、マルチコア CPU を活用して、プログラムのパフォーマンスを向上させることができます。

スレッドモジュールは、スレッドの作成および管理に 2 つのクラスを提供します。

  1. Thread クラス:このクラスは、1 つの実行スレッドを表します。
  2. Lock クラス:このクラスは、スレッド間で共有リソースへのアクセスを同期させることができます。

スレッドの作成

Python で新しいスレッドを作成するには、Thread クラスの新しいインスタンスを作成し、実行する関数を渡す必要があります。

WebIDE で create_thread.py という名前のプロジェクトを作成し、次の内容を入力します。

import threading

## スレッドで実行する関数を定義する
def my_function():
    print("Hello from thread")

## 新しいスレッドを作成する
thread = threading.Thread(target=my_function)

## スレッドを開始する
thread.start()

## スレッドが終了するのを待つ
thread.join()

## プログラムが終了したことを示すために "Done" を出力する
print("Done")

この例では、メッセージを出力する my_function 関数を定義しています。その後、Thread クラスの新しいインスタンスを作成し、対象関数として my_function を渡します。最後に、start メソッドを使ってスレッドを開始し、join メソッドを使ってスレッドが終了するのを待ちます。

次のコマンドを使ってスクリプトを実行します。

python create_thread.py

同期化

複数のスレッドが同じ共有リソース(たとえば、変数やファイル)にアクセスする場合、競合状態を回避するために、そのリソースへのアクセスを同期化する必要があります。Python のスレッドモジュールは、この目的のために Lock クラスを提供しています。

WebIDE で sync.py という名前のプロジェクトを作成し、次の内容を入力することで、Lock を使用する例を示します。

import threading

## 共有リソースへのアクセスを保護するためのロックを作成する
lock = threading.Lock()

## 複数のスレッドによって変更される共有リソース
counter = 0

## 各スレッドが実行する関数を定義する
def my_function():
    global counter

    ## 共有リソースにアクセスする前にロックを取得する
    lock.acquire()
    try:
        ## 共有リソースにアクセスする
        counter += 1
    finally:
        ## 共有リソースを変更した後にロックを解放する
        lock.release()

## 共有リソースにアクセスするための複数のスレッドを作成する
threads = []

## 同じ関数を実行する 10 個のスレッドを作成して開始する
for i in range(10):
    thread = threading.Thread(target=my_function)
    threads.append(thread)
    thread.start()

## すべてのスレッドが実行を完了するのを待つ
for thread in threads:
    thread.join()

## カウンタの最終値を出力する
print(counter) ## 出力:10

この例では、Lock オブジェクトと共有リソース counter を作成しています。my_function 関数は、acquire メソッドを使ってロックを取得し、release メソッドを使ってロックを解放することで共有リソースにアクセスします。複数のスレッドを作成して開始し、join メソッドを使ってそれらが終了するのを待ちます。最後に、カウンタの最終値を出力します。

次のコマンドを使ってスクリプトを実行します。

python sync.py

引数付きのスレッド

Python では、新しいスレッドを作成する際に args パラメータを使用するか、Thread クラスをサブクラス化して引数を受け取るコンストラクタを定義することで、スレッドに引数を渡すことができます。以下は両方のアプローチの例です。

1; Thread クラスのサブクラスを作成し、run メソッドをオーバーライドしてスレッドの動作を定義します。WebIDE で thread_subclass.py という名前のプロジェクトを作成し、次の内容を入力します。

import threading

## Thread クラスを拡張するカスタム Thread クラスを定義する
class MyThread(threading.Thread):
    ## スレッドの動作を実装するために run() メソッドをオーバーライドする
    def run(self):
        print("Hello from thread")

## カスタムスレッドクラスのインスタンスを作成する
thread = MyThread()

## スレッドを開始する
thread.start()

## スレッドが実行を終了するのを待つ
thread.join()

次のコマンドを使ってスクリプトを実行します。

python thread_subclass.py

2; スレッドを作成し、args パラメータを使って対象関数に引数を渡します。WebIDE で thread_with_args.py という名前のプロジェクトを作成し、次の内容を入力します。

import threading

## パラメータを持つ関数を定義する
def my_function(name):
    print("Hello from", name)

## 対象関数と引数を持つ新しいスレッドを作成する
thread = threading.Thread(target=my_function, args=("Thread 1",))

## スレッドを開始する
thread.start()

## スレッドが実行を終了するのを待つ
thread.join()

次のコマンドを使ってスクリプトを実行します。

python thread_with_args.py

スレッドプール

Python では、事前に定義されたスレッドのセットを使ってタスクを同時に実行するために、スレッドプールを使用できます。スレッドプールを使用する利点は、各タスクに対してスレッドを作成および破棄するオーバーヘッドを回避できることで、パフォーマンスを向上させることができます。

Python の concurrent.futures モジュールは、スレッドプールを作成してタスクを提出できる ThreadPoolExecutor クラスを提供しています。以下は例です。

WebIDE で thread_pool_range.py という名前のプロジェクトを作成し、次の内容を入力します。

import concurrent.futures

## 2 つの引数を持つ複数のスレッドで実行する関数を定義する
def my_func(arg1, arg2):
    ## ここでスレッドによって実行されるタスクを定義する
    print(f"Hello from my thread with args {arg1} and {arg2}")

## 最大 5 つのワーカースレッドを持つ ThreadPoolExecutor オブジェクトを作成する
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
    ## 各タスク(引数付きの関数呼び出し)をエグゼキュータに提出して、別のスレッドで処理させる
    ## submit() メソッドは、非同期計算の結果を表す Future オブジェクトを返す
    for i in range(10):
        executor.submit(my_func, i, i+1)

この例では、2 つの引数を持つ my_func 関数を定義しています。最大 5 つのワーカースレッドを持つ ThreadPoolExecutor を作成します。その後、数値の範囲をループして、executor.submit() メソッドを使ってタスクをスレッドプールに提出します。各提出されたタスクは、利用可能なワーカースレッドの 1 つで実行されます。

ヒント:ThreadPoolExecutor オブジェクトはコンテキストマネージャとして使用されます。これにより、with ブロック内のコードが完了したときにすべてのスレッドが適切にクリーンアップされることが保証されます。

次のコマンドを使ってスクリプトを実行します。

python thread_pool_range.py

submit() メソッドは、すぐに Future オブジェクトを返し、提出されたタスクの結果を表します。Future オブジェクトの result() メソッドを使って、タスクの戻り値を取得できます。タスクが例外を発生させた場合、result() を呼び出すとその例外が発生します。

また、ThreadPoolExecutor クラスの map() メソッドを使って、同じ関数をアイテムのコレクションに適用することもできます。たとえば、WebIDE で thread_pool_map.py という名前のプロジェクトを作成し、次の内容を入力します。

import concurrent.futures

## 複数のスレッドで実行する関数を定義する
def my_func(item):
    ## ここでスレッドによって実行されるタスクを定義する
    print(f"Hello from my thread with arg {item}")

## スレッドによって処理されるアイテムのリストを作成する
items = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]

## 最大 5 つのワーカースレッドを持つ ThreadPoolExecutor オブジェクトを作成する
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
    ## 各アイテムをエグゼキュータに提出して、別のスレッドで処理させる
    ## map() メソッドは自動的に結果を順序通りに返す
    executor.map(my_func, items)

この例では、1 つの引数を持つ my_func 関数を定義しています。アイテムのリストを作成し、executor.map() メソッドを使ってそれらをスレッドプールに提出します。リスト内の各アイテムは、引数として my_func に渡され、各アイテムは利用可能なワーカースレッドの 1 つで実行されます。

次のコマンドを使ってスクリプトを実行します。

python thread_pool_map.py

thread_pool_range.pythread_pool_map.py から得られる結果は同じです。

デーモンスレッド

Python において、デーモンスレッドはバックグラウンドで実行され、プログラムの終了を妨げないタイプのスレッドです。すべての非デーモンスレッドが完了すると、Python インタプリタは、デーモンスレッドがまだ実行中であっても、自動的に終了します。

WebIDE で daemon_thread_with_args.py という名前のプロジェクトを作成し、次の内容を入力します。

import threading
import time

## 無限ループで実行され、一定の間隔でメッセージを出力する関数を定義する
def my_function():
    while True:
        print("Hello from thread")
        time.sleep(1)

## 対象関数を実行するデーモンスレッドを作成する
thread = threading.Thread(target=my_function, daemon=True)

## スレッドを開始する
thread.start()

## メインプログラムは続けて実行され、メッセージを出力する
print("Main program")

## プログラムを終了する前に数秒待つ
time.sleep(5)

## プログラムが終了し、デーモンスレッドは自動的に終了する
print("Main thread exiting...")

この例では、無限ループを実行し、time.sleep() 関数を使って 1 秒ごとにメッセージを出力するスレッドを作成します。daemon パラメータを使って、メインプログラムが自動的に終了するときに終了するように、スレッドをデーモンとしてマークします。メインプログラムは続けて実行され、メッセージを出力します。数秒待った後、プログラムが終了し、デーモンスレッドが終了します。

次に、次のコマンドを使ってスクリプトを実行します。

python daemon_thread_with_args.py

もちろん、スレッドインスタンスに対して setDaemon(True) メソッドを呼び出すことで、スレッドをデーモンとして設定することもできます。たとえば、WebIDE で daemon_thread_with_func.py という名前のプロジェクトを作成し、次の内容を入力します。

import threading
import time

## 無限ループで実行され、一定の間隔でメッセージを出力する関数を定義する
def my_function():
    while True:
        print("Hello from thread")
        time.sleep(1)

## 対象関数を持つ新しいスレッドを作成する
thread = threading.Thread(target=my_function)

## デーモンフラグを True に設定して、スレッドをバックグラウンドで実行し、メインプログラムが終了するときに終了させる
thread.setDaemon(True)

## スレッドを開始する
thread.start()

## メインプログラムは続けて実行され、メッセージを出力する
print("Main program")

## プログラムを終了する前に数秒待つ
time.sleep(5)

## プログラムが終了し、デーモンスレッドは自動的に終了する
print("Main thread exiting...")

次のコマンドでスクリプトを実行すると、上記の例と同じ結果が得られます。

python daemon_thread_with_func.py

イベントオブジェクト

Python では、threading.Event オブジェクトを使って、スレッドが特定のイベントが発生するまで待機するようにできます。イベントオブジェクトは、1 つのスレッドがイベントが発生したことを通知する手段を提供し、他のスレッドはその信号を待つことができます。

WebIDE で event_object.py という名前のプロジェクトを作成し、次の内容を入力します。

import threading

## イベントオブジェクトを作成する
event = threading.Event()

## イベントがセットされるのを待つ関数を定義する
def my_function():
    print("Waiting for event")
    ## イベントがセットされるのを待つ
    event.wait()
    print("Event received")

## 対象関数を持つ新しいスレッドを作成する
thread = threading.Thread(target=my_function)

## スレッドを開始する
thread.start()

## 数秒後にイベントを通知する
## 対象関数内の wait() 呼び出しは、これで返り値を返して実行を続ける
event.set()

## スレッドが実行を終了するのを待つ
thread.join()

この例では、Event クラスを使って Event オブジェクトを作成します。wait メソッドを使ってイベントが通知されるのを待ち、その後メッセージを出力する関数を定義します。新しいスレッドを作成して開始します。数秒後、set メソッドを使ってイベントを通知します。スレッドはイベントを受け取り、メッセージを出力します。最後に、join メソッドを使ってスレッドが終了するのを待ちます。

次に、次のコマンドを使ってスクリプトを実行します。

python event_object.py

タイマーオブジェクト

Python では、threading.Timer オブジェクトを使って、特定の時間が経過した後に関数を実行する予定を設定できます。タイマーオブジェクトは、関数を実行する前に指定された時間間隔を待つ新しいスレッドを作成します。

WebIDE で timer_object.py という名前のプロジェクトを作成し、次の内容を入力します。

import threading

## 5 秒後にタイマーによって実行される関数を定義する
def my_function():
    print("Hello from timer")

## 5 秒後に対象関数を実行するタイマーを作成する
timer = threading.Timer(5, my_function)

## タイマーを開始する
timer.start()

## タイマーが終了するのを待つ
timer.join()

この例では、Timer クラスを使って Timer オブジェクトを作成し、秒単位の時間遅延と実行する関数を渡します。start メソッドを使ってタイマーを開始し、join メソッドを使ってそれが終了するのを待ちます。5 秒後、関数が実行され、メッセージが出力されます。

次に、次のコマンドを使ってスクリプトを実行します。

python timer_object.py

ヒント:タイマースレッドは別々に実行されるため、メインスレッドと同期していない場合があります。関数がいくつかの共有状態やリソースに依存する場合は、それらへのアクセスを適切に同期する必要があります。また、すべての他の非デーモンスレッドが完了したときにまだ実行中である場合、タイマースレッドはプログラムの終了を阻止しません。

バリアオブジェクト

Python では、threading.Barrier オブジェクトを使って、事前に定義された同期ポイントで複数のスレッドを同期させることができます。バリアオブジェクトは、一連のスレッドが互いに待ち合わせて、実行の特定のポイントに到達した後に続けるための方法を提供します。

WebIDE で barrier_object.py という名前のプロジェクトを作成し、次の内容を入力します。

import threading

## 3 つのスレッド用のバリアオブジェクトを作成する
barrier = threading.Barrier(3)

## バリアで待つ関数を定義する
def my_function():
    print("Before barrier")
    ## すべての 3 つのスレッドがバリアに到達するのを待つ
    barrier.wait()
    print("After barrier")

## ループを使って 3 つのスレッドを作成して起動する
threads = []
for i in range(3):
    thread = threading.Thread(target=my_function)
    threads.append(thread)
    thread.start()

## すべてのスレッドが実行を終了するのを待つ
for thread in threads:
    thread.join()

この例では、Barrier クラスを使って Barrier オブジェクトを作成し、待機するスレッドの数を渡します。wait メソッドを使って、バリアで待ち、メッセージを出力する関数を定義します。3 つのスレッドを作成して起動します。各スレッドはバリアで待つため、すべてのスレッドがバリアに到達するまですべてが待機します。最後に、join メソッドを使ってすべてのスレッドが終了するのを待ちます。

次に、次のコマンドを使ってスクリプトを実行します。

python barrier_object.py

まとめ

以上です!これで、コードで Python の threading モジュールを使う方法がわかりました。これにより、並列プログラミングの基本原理と技術を深く理解し、効率的な並列アプリケーションをより良く開発できるようになります。