はじめに
この実験では、マネージドジェネレーターについて学び、通常とは異なる方法でジェネレーターを駆動する方法を理解します。また、簡単なタスクスケジューラーを構築し、ジェネレーターを使用してネットワークサーバーを作成します。
Python のジェネレーター関数は、外部コードによって実行される必要があります。たとえば、イテレーションジェネレーターは for ループで反復処理されるときにのみ実行され、コルーチンは send() メソッドが呼び出される必要があります。この実験では、高度なアプリケーションでジェネレーターを駆動する実用的な例を探ります。この実験中に作成されるファイルは multitask.py と server.py です。
Python ジェネレーターの理解
まず、Python のジェネレーターが何であるかを復習しましょう。Python では、ジェネレーターは特殊な型の関数です。通常の関数とは異なります。通常の関数を呼び出すと、その関数は最初から最後まで実行され、単一の値を返します。しかし、ジェネレーター関数はイテレーターを返します。イテレーターとは、反復処理できるオブジェクトで、つまり、その値を 1 つずつアクセスできます。
ジェネレーターは yield 文を使用して値を返します。通常の関数のように一度にすべての値を返すのではなく、ジェネレーターは値を 1 つずつ返します。値を生成 (yield) した後、ジェネレーターは実行を一時停止します。次に値を要求すると、中断したところから実行を再開します。
簡単なジェネレーターの作成
では、簡単なジェネレーターを作成しましょう。WebIDE で新しいファイルを作成する必要があります。このファイルには、ジェネレーターのコードが含まれます。ファイル名を generator_demo.py とし、/home/labex/project ディレクトリに配置します。以下は、ファイルに記述する内容です。
## Generator function that counts down from n
def countdown(n):
print(f"Starting countdown from {n}")
while n > 0:
yield n
n -= 1
print("Countdown complete!")
## Create a generator object
counter = countdown(5)
## Drive the generator manually
print(next(counter)) ## 5
print(next(counter)) ## 4
print(next(counter)) ## 3
## Iterate through remaining values
for value in counter:
print(value) ## 2, 1
このコードでは、まず countdown というジェネレーター関数を定義しています。この関数は、数値 n を引数として受け取り、n から 1 までカウントダウンします。関数内では、while ループを使用して n を減少させ、各値を生成 (yield) します。countdown(5) を呼び出すと、counter という名前のジェネレーターオブジェクトが作成されます。
次に、next() 関数を使用して、ジェネレーターから手動で値を取得します。next(counter) を呼び出すたびに、ジェネレーターは中断したところから実行を再開し、次の値を生成 (yield) します。手動で 3 つの値を取得した後、for ループを使用して、ジェネレーター内の残りの値を反復処理します。
このコードを実行するには、ターミナルを開き、次のコマンドを実行します。
python3 /home/labex/project/generator_demo.py
コードを実行すると、次の出力が表示されるはずです。
Starting countdown from 5
5
4
3
2
1
Countdown complete!
ジェネレーター関数の動作を見てみましょう。
- ジェネレーター関数は、最初に
next(counter)を呼び出したときに実行を開始します。それまでは、関数は定義されているだけで、実際のカウントダウンは開始されていません。 - 各
yield文で一時停止します。値を生成 (yield) した後、停止して、次のnext()呼び出しを待ちます。 - 再度
next()を呼び出すと、中断したところから続行します。たとえば、5 を生成 (yield) した後、状態を記憶し、nを減少させて次の値を生成 (yield) し続けます。 - 最後の値が生成 (yield) された後、ジェネレーター関数は実行を完了します。この場合、1 を生成 (yield) した後、「Countdown complete!」と表示されます。
この実行を一時停止して再開する機能が、ジェネレーターを強力なものにしています。タスクスケジューリングや非同期プログラミングなどのタスクでは、他のタスクの実行をブロックすることなく、複数のタスクを効率的に実行する必要があり、この機能は非常に有用です。
ジェネレーターを使ったタスクスケジューラーの作成
プログラミングにおいて、タスクスケジューラーは複数のタスクを効率的に管理し実行するための重要なツールです。このセクションでは、ジェネレーターを使って、複数のジェネレーター関数を同時に実行できる簡単なタスクスケジューラーを構築します。これにより、ジェネレーターを管理して協調的なマルチタスク処理を行う方法を学びます。協調的なマルチタスク処理とは、タスクが順番に実行され、実行時間を共有することを意味します。
まず、新しいファイルを作成する必要があります。/home/labex/project ディレクトリに移動し、multitask.py という名前のファイルを作成します。このファイルには、タスクスケジューラーのコードが含まれます。
## multitask.py
from collections import deque
## Task queue
tasks = deque()
## Simple task scheduler
def run():
while tasks:
task = tasks.popleft() ## Get the next task
try:
task.send(None) ## Resume the task
tasks.append(task) ## Put it back in the queue
except StopIteration:
print('Task done') ## Task is complete
## Example task 1: Countdown
def countdown(n):
while n > 0:
print('T-minus', n)
yield ## Pause execution
n -= 1
## Example task 2: Count up
def countup(n):
x = 0
while x < n:
print('Up we go', x)
yield ## Pause execution
x += 1
では、このタスクスケジューラーがどのように動作するかを解説しましょう。
- ジェネレータータスクを格納するために
deque(両端キュー)を使用しています。dequeは、両端から要素を効率的に追加および削除できるデータ構造です。タスクを末尾に追加し、先頭から削除する必要があるため、タスクキューに最適な選択です。 run()関数はタスクスケジューラーの核心部分です。キューからタスクを 1 つずつ取り出します。send(None)を使用して各タスクを再開します。これは、ジェネレーターに対してnext()を使用するのと似ています。ジェネレーターに中断したところから実行を続けるように指示します。- タスクが値を生成(yield)した後、キューの末尾に戻されます。これにより、タスクは後で再度実行される機会を得ます。
- タスクが完了すると(
StopIterationを発生させる)、キューから削除されます。これは、タスクの実行が終了したことを示します。
- ジェネレータータスク内の各
yield文は一時停止点として機能します。ジェネレーターがyield文に到達すると、実行を一時停止し、制御をスケジューラーに戻します。これにより、他のタスクが実行できるようになります。
このアプローチは協調的なマルチタスク処理を実装しています。各タスクは自発的に制御をスケジューラーに戻し、他のタスクが実行できるようにします。これにより、複数のタスクが実行時間を共有し、同時に実行できるようになります。
タスクスケジューラーのテスト
ここでは、multitask.py ファイルにテストを追加します。このテストの目的は、複数のタスクを同時に実行することです。これは並行実行(concurrent execution)と呼ばれます。並行実行では、単一スレッド環境であっても、異なるタスクが見かけ上同時に進捗を遂げることができます。実際には、タスクは順番に実行されます。
このテストを行うには、multitask.py ファイルの末尾に以下のコードを追加します。
## Test our scheduler
if __name__ == '__main__':
## Add tasks to the queue
tasks.append(countdown(10)) ## Count down from 10
tasks.append(countdown(5)) ## Count down from 5
tasks.append(countup(20)) ## Count up to 20
## Run all tasks
run()
このコードでは、まず if __name__ == '__main__': を使って、スクリプトが直接実行されているかどうかを確認します。次に、3 つの異なるタスクを tasks キューに追加します。countdown タスクは指定された数からカウントダウンし、countup タスクは指定された数までカウントアップします。最後に、run() 関数を呼び出してこれらのタスクの実行を開始します。
コードを追加した後、ターミナルで以下のコマンドを実行して実行します。
python3 /home/labex/project/multitask.py
コードを実行すると、以下のような出力が表示されるはずです(行の正確な順序は異なる場合があります)。
T-minus 10
T-minus 5
Up we go 0
T-minus 9
T-minus 4
Up we go 1
T-minus 8
T-minus 3
Up we go 2
...
異なるタスクからの出力が混在していることに注目してください。これは、スケジューラーが 3 つのタスクを並行して実行していることを明確に示しています。タスクが yield 文に到達するたびに、スケジューラーはそのタスクを一時停止し、別のタスクに切り替えます。これにより、すべてのタスクが時間の経過とともに進捗を遂げることができます。
動作原理
スケジューラーが実行されたときに何が起こるかを詳しく見てみましょう。
- まず、3 つのジェネレータータスク
countdown(10)、countdown(5)、countup(20)をキューに追加します。これらのジェネレータータスクは、yield文で実行を一時停止し、再開できる特殊な関数です。 - 次に、
run()関数が動作を開始します。- キューから最初のタスク
countdown(10)を取り出します。 - このタスクを
yield文に到達するまで実行します。yieldに到達すると、「T-minus 10」と表示されます。 - その後、
countdown(10)タスクをキューに戻し、後で再度実行できるようにします。 - 次に、キューから
countdown(5)タスクを取り出します。 countdown(5)タスクをyield文に到達するまで実行し、「T-minus 5」と表示します。- このプロセスが続きます...
- キューから最初のタスク
このサイクルは、すべてのタスクが完了するまで続きます。各タスクは短時間実行される機会を得ます。これにより、スレッドやコールバックを使用することなく、並行実行のような錯覚を与えます。スレッドは並行性を実現するより複雑な方法であり、コールバックは非同期プログラミングで使用されます。私たちのシンプルなスケジューラーは、ジェネレーターを使って、より簡単な方法で同様の効果を達成しています。
ジェネレーターを使ったネットワークサーバーの構築
このセクションでは、これまで学んだタスクスケジューラーの概念を拡張して、より実用的なもの、つまり簡単なネットワークサーバーを作成します。このサーバーは、ジェネレーターを使って複数のクライアント接続を同時に処理することができます。ジェネレーターは Python の強力な機能で、関数の実行を一時停止し再開することができ、ブロッキングすることなく複数のタスクを処理するのに非常に便利です。
まず、/home/labex/project ディレクトリに server.py という名前の新しいファイルを作成する必要があります。このファイルには、ネットワークサーバーのコードが含まれます。
## server.py
from socket import *
from select import select
from collections import deque
## Task system
tasks = deque()
recv_wait = {} ## Map: socket -> task (for tasks waiting to receive)
send_wait = {} ## Map: socket -> task (for tasks waiting to send)
def run():
while any([tasks, recv_wait, send_wait]):
## If no active tasks, wait for I/O
while not tasks:
## Wait for any socket to become ready for I/O
can_recv, can_send, _ = select(recv_wait, send_wait, [])
## Add tasks waiting on readable sockets back to active queue
for s in can_recv:
tasks.append(recv_wait.pop(s))
## Add tasks waiting on writable sockets back to active queue
for s in can_send:
tasks.append(send_wait.pop(s))
## Get next task to run
task = tasks.popleft()
try:
## Resume the task
reason, resource = task.send(None)
## Handle different yield reasons
if reason == 'recv':
## Task is waiting to receive data
recv_wait[resource] = task
elif reason == 'send':
## Task is waiting to send data
send_wait[resource] = task
else:
raise RuntimeError('Unknown yield reason %r' % reason)
except StopIteration:
print('Task done')
この改良版のスケジューラーは前のものより少し複雑ですが、基本的な考え方は同じです。主な違いを解説しましょう。
- タスクは理由('recv' または 'send')とリソース(ソケット)を生成(yield)することができます。これは、タスクが特定のソケットでデータの受信または送信を待っていることをスケジューラーに伝えることができることを意味します。
- 生成(yield)された理由に応じて、タスクは異なる待機領域に移動します。タスクがデータの受信を待っている場合は、
recv_wait辞書に移動します。データの送信を待っている場合は、send_wait辞書に移動します。 select()関数は、どのソケットが I/O 操作の準備ができているかを判断するために使用されます。この関数は、recv_waitおよびsend_wait辞書内のソケットをチェックし、データの受信または送信の準備ができているソケットを返します。- ソケットが準備できたら、関連するタスクはアクティブキューに戻されます。これにより、タスクは実行を続け、待っていた I/O 操作を実行することができます。
これらの技術を使用することで、タスクは他のタスクの実行をブロッキングすることなく、ネットワーク I/O を効率的に待つことができます。これにより、ネットワークサーバーはより応答性が高く、複数のクライアント接続を同時に処理することができます。
エコーサーバーの実装
ここでは、server.py ファイルにエコーサーバーの実装を追加します。エコーサーバーは、クライアントから受信したデータをそのまま返すタイプのサーバーです。これは、サーバーが受信したデータをどのように処理し、クライアントと通信するかを理解するのに最適な方法です。
server.py ファイルの末尾に以下のコードを追加します。このコードは、エコーサーバーをセットアップし、クライアント接続を処理します。
## TCP Server implementation
def tcp_server(address, handler):
## Create a TCP socket
sock = socket(AF_INET, SOCK_STREAM)
## Set the socket option to reuse the address
sock.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1)
## Bind the socket to the given address
sock.bind(address)
## Start listening for incoming connections, with a backlog of 5
sock.listen(5)
while True:
## Yield to pause the function until a client connects
yield 'recv', sock ## Wait for a client connection
## Accept a client connection
client, addr = sock.accept()
## Add a new handler task for this client to the tasks list
tasks.append(handler(client, addr)) ## Start a handler task for this client
## Echo handler - echoes back whatever the client sends
def echo_handler(client, address):
print('Connection from', address)
while True:
## Yield to pause the function until the client sends data
yield 'recv', client ## Wait until client sends data
## Receive up to 1000 bytes of data from the client
data = client.recv(1000)
if not data: ## Client closed connection
break
## Yield to pause the function until the client can receive data
yield 'send', client ## Wait until client can receive data
## Send the data back to the client with 'GOT:' prefix
client.send(b'GOT:' + data)
print('Connection closed')
## Close the client connection
client.close()
## Start the server
if __name__ == '__main__':
## Add the tcp_server task to the tasks list
tasks.append(tcp_server(('', 25000), echo_handler))
## Start the scheduler
run()
このコードを段階的に理解しましょう。
tcp_server関数:- まず、着信接続を待ち受けるソケットをセットアップします。ソケットは、2 台のマシン間の通信のエンドポイントです。
- 次に、
yield 'recv', sockを使用して、クライアントが接続するまで関数を一時停止します。これは、非同期アプローチの重要な部分です。 - 最後に、各クライアント接続に対して新しいハンドラータスクを作成します。これにより、サーバーは複数のクライアントを同時に処理することができます。
echo_handler関数:'recv', clientを生成(yield)して、クライアントがデータを送信するのを待ちます。これにより、データが利用可能になるまで関数が一時停止します。'send', clientを生成(yield)して、クライアントにデータを返送できる状態になるまで待ちます。これにより、クライアントがデータを受信できる準備ができていることが保証されます。- クライアントが接続を閉じるまで、クライアントデータを処理します。
サーバーを実行すると、
tcp_serverタスクがキューに追加され、スケジューラーが起動します。スケジューラーは、すべてのタスクを管理し、非同期で実行されるようにする役割を果たします。
サーバーをテストするには、1 つのターミナルで以下のコマンドを実行します。
python3 /home/labex/project/server.py
サーバーが実行中であることを示すメッセージが表示されるはずです。これは、サーバーが着信接続を待ち受けていることを意味します。
別のターミナルを開き、nc(netcat)を使用してサーバーに接続します。Netcat は、サーバーに接続してデータを送信できるシンプルなユーティリティです。
nc localhost 25000
これで、メッセージを入力し、「GOT:」が付加されてエコーバックされるのを確認できます。
Hello
GOT:Hello
World
GOT:World
nc がインストールされていない場合は、Python の組み込みライブラリ telnetlib を使用できます。Telnetlib は、Telnet プロトコルを使用してサーバーに接続できるライブラリです。
python3 -c "import telnetlib; t = telnetlib.Telnet('localhost', 25000); t.interact()"
複数のターミナルウィンドウを開き、複数のクライアントを同時に接続することができます。サーバーは単一スレッドであるにもかかわらず、すべての接続を同時に処理します。これは、ジェネレーターベースのタスクスケジューラーのおかげで、サーバーが必要に応じてタスクを一時停止および再開できるためです。
動作原理
この例は、非同期 I/O のためのジェネレーターの強力なアプリケーションを示しています。
- サーバーは、I/O を待ってブロックする場合に生成(yield)します。これは、データを無期限に待つ代わりに、サーバーが一時停止して他のタスクを実行できることを意味します。
- スケジューラーは、I/O が準備できるまでサーバーを待機領域に移動します。これにより、サーバーが I/O を待ってリソースを浪費しないようになります。
- I/O が完了するのを待っている間、他のタスクを実行できます。これにより、サーバーは複数のタスクを同時に処理することができます。
- I/O が準備できたら、タスクは中断したところから続行します。これは、非同期プログラミングの重要な機能です。
このパターンは、Python 3.4 で Python 標準ライブラリに追加された asyncio のような、現代の非同期 Python フレームワークの基礎を形成しています。
まとめ
この実験(Lab)では、Python の管理されたジェネレーターの概念について学びました。yield 文を使用してジェネレーターを一時停止および再開する方法を調査し、複数のジェネレーターを同時に実行するための簡単なタスクスケジューラーを構築しました。さらに、スケジューラーを拡張してネットワーク I/O を効率的に処理し、複数の接続を同時に処理できるネットワークサーバーを実装しました。
ジェネレーターを協調的マルチタスクに使用するこのパターンは、Python の多くの非同期プログラミングフレームワーク(組み込みの asyncio モジュールなど)の基礎となる強力な技術です。このアプローチには、単純な逐次コード、効率的な非ブロッキング I/O 処理、複数のスレッドを使用しない協調的マルチタスク、およびタスク実行の細かい制御など、いくつかの利点があります。これらの技術は、高性能なネットワークアプリケーションや、並行操作を効率的に処理する必要があるシステムを構築するのに非常に有用です。