はじめに
スレッド間でデータを転送するためのメッセージパッシングの使用方法へようこそ。この実験は、Rust Book の一部です。LabEx で Rust のスキルを練習することができます。
この実験では、Rust の標準ライブラリにあるチャネルを使ってスレッド間でデータを送受信することで、安全な並列処理のアプローチとしてのメッセージパッシングを検討します。
スレッド間でデータを転送するためのメッセージパッシングの使用方法
安全な並列処理を確保するための、ますます人気のあるアプローチの 1 つは、「メッセージパッシング」です。これは、スレッドやアクターがデータを含むメッセージを相互に送信することで通信する方法です。ここに、https://golang.org/doc/effective_go.html#concurrency にある Go 言語のドキュメントのスローガンによる考え方を示します。「メモリを共有することではなく、通信によってメモリを共有する」。
メッセージ送信による並列処理を実現するために、Rust の標準ライブラリは「チャネル」の実装を提供しています。チャネルは、一般的なプログラミング概念であり、データを 1 つのスレッドから別のスレッドに送信します。
プログラミングにおけるチャネルを、川や小川のような方向性のある水路に例えることができます。もしゴムのアヒルのような何かを川に入れると、それは水路の終わりまで下流に流れていきます。
チャネルには 2 つの部分があります。送信側と受信側です。送信側は、ゴムのアヒルを川に入れる上流の場所であり、受信側はゴムのアヒルが下流に到着する場所です。コードの一部は送信したいデータを使って送信側のメソッドを呼び出し、もう一部は到着したメッセージを受信側で確認します。送信側または受信側のいずれかが破棄されると、チャネルは「クローズ」されたと言われます。
ここでは、1 つのスレッドが値を生成してチャネルを通じて送信し、もう 1 つのスレッドがその値を受信して出力するプログラムを作成します。チャネルを使ってスレッド間で単純な値を送信して、この機能を説明します。この技術に慣れたら、チャットシステムや、多数のスレッドが計算の一部を実行して結果をまとめる 1 つのスレッドにそれらの部分を送信するシステムのような、相互に通信する必要のある任意のスレッドにチャネルを使うことができます。
まず、リスト 16-6 では、チャネルを作成しますが、何もしません。これはまだコンパイルされません。なぜなら、Rust はチャネルを通じて送信する値の型を判断できないからです。
ファイル名:src/main.rs
use std::sync::mpsc;
fn main() {
let (tx, rx) = mpsc::channel();
}
リスト 16-6: チャネルを作成し、2 つの部分を tx と rx に割り当てる
mpsc::channel 関数を使って新しいチャネルを作成します。mpsc は「複数の生成元、単一の消費者」を意味します。簡単に言えば、Rust の標準ライブラリがチャネルを実装する方法は、チャネルが値を生成する複数の「送信」端を持つことができる一方で、それらの値を消費する「受信」端は 1 つだけであることを意味します。想像してみてください。複数の小川が 1 本の大きな川に合流しています。どの小川からでも流されるすべてのものは、最後に 1 本の川に集まります。今は単一の生成元から始めますが、この例が動作するようになったら、複数の生成元を追加します。
mpsc::channel 関数はタプルを返します。その最初の要素は送信端、つまり送信機であり、2 番目の要素は受信端、つまり受信機です。略称の tx と rx は、それぞれ「送信機」と「受信機」の意味で、多くの分野で伝統的に使われています。そのため、それぞれの端を示すように、変数をこのように命名します。let ステートメントを使ってタプルを分解するパターンを使っています。18 章では、let ステートメントにおけるパターンの使い方と分解について説明します。今のところ、このように let ステートメントを使うことは、mpsc::channel が返すタプルの要素を抽出する便利な方法であることを知っておいてください。
送信端を生成されたスレッドに移動させ、1 つの文字列を送信させましょう。これにより、生成されたスレッドがメインスレッドと通信します。これは、川の上流にゴムのアヒルを入れること、または 1 つのスレッドから別のスレッドにチャットメッセージを送信することに似ています。
ファイル名:src/main.rs
use std::sync::mpsc;
use std::thread;
fn main() {
let (tx, rx) = mpsc::channel();
thread::spawn(move || {
let val = String::from("hi");
tx.send(val).unwrap();
});
}
リスト 16-7: tx を生成されたスレッドに移動させ、"hi" を送信する
再び、thread::spawn を使って新しいスレッドを作成し、その後 move を使って tx をクロージャに移動させます。これにより、生成されたスレッドが tx を所有するようになります。生成されたスレッドは、チャネルを通じてメッセージを送信するために送信機を所有する必要があります。
送信機には、送信したい値を受け取る send メソッドがあります。send メソッドは Result<T, E> 型を返します。したがって、受信機が既に破棄されており、値を送信する場所がない場合、送信操作はエラーを返します。この例では、エラーが発生した場合にパニックするように unwrap を呼んでいます。しかし、本番のアプリケーションでは、適切にエラーを処理します。エラーの適切な処理の戦略を確認するには、9 章に戻ってください。
リスト 16-8 では、メインスレッドで受信機から値を取得します。これは、川の終わりで水からゴムのアヒルを取り出すこと、またはチャットメッセージを受信することに似ています。
ファイル名:src/main.rs
use std::sync::mpsc;
use std::thread;
fn main() {
let (tx, rx) = mpsc::channel();
thread::spawn(move || {
let val = String::from("hi");
tx.send(val).unwrap();
});
let received = rx.recv().unwrap();
println!("Got: {received}");
}
リスト 16-8: メインスレッドで "hi" の値を受信して出力する
受信機には 2 つの便利なメソッドがあります。recv と try_recv です。ここでは recv を使っています。recv は「受信」の略で、メインスレッドの実行をブロックし、チャネルを通じて値が送信されるのを待ちます。値が送信されると、recv は Result<T, E> でそれを返します。送信機がクローズされると、recv はエラーを返して、これ以上値が届かないことを示します。
try_recv メソッドはブロックしません。代わりに、直ちに Result<T, E> を返します。利用可能なメッセージがある場合は Ok 値で保持され、今回はメッセージがない場合は Err 値です。このスレッドがメッセージを待っている間に他の作業をする場合、try_recv を使うと便利です。たとえば、一定間隔で try_recv を呼び出し、利用可能なメッセージがあれば処理し、そうでなければ少し間他の作業をしてから再度確認するループを書くことができます。
この例では単純さのために recv を使っています。メッセージを待つ以外にメインスレッドがする作業がないため、メインスレッドをブロックするのが適切です。
リスト 16-8 のコードを実行すると、メインスレッドから値が出力されるのが見えます。
Got: hi
完璧です!
チャネルと所有権の移転
所有権のルールはメッセージ送信において重要な役割を果たします。なぜなら、それが安全で並列処理可能なコードを書くのに役立つからです。並列プログラミングにおけるエラーを防ぐことは、Rust プログラム全体で所有権について考えることの利点です。チャネルと所有権がどのように協働して問題を防ぐかを示すために、実験を行いましょう。チャネルを通じて値を送信した後、生成されたスレッドで val 値を使用しようとします。リスト 16-9 のコードをコンパイルして、なぜこのコードが許されないかを確認してみましょう。
ファイル名:src/main.rs
use std::sync::mpsc;
use std::thread;
fn main() {
let (tx, rx) = mpsc::channel();
thread::spawn(move || {
let val = String::from("hi");
tx.send(val).unwrap();
println!("val is {val}");
});
let received = rx.recv().unwrap();
println!("Got: {received}");
}
リスト 16-9: チャネルを通じて値を送信した後に val を使用しようとする
ここでは、tx.send を通じてチャネルに値を送信した後に val を出力しようとしています。これを許すのは良い考えではありません。値が別のスレッドに送信されると、そのスレッドが値を再利用しようとする前に、そのスレッドが値を変更または破棄する可能性があります。おそらく、他のスレッドによる変更が、データの不整合または存在しないデータにより、エラーや予期しない結果を引き起こす可能性があります。しかし、Rust はリスト 16-9 のコードをコンパイルしようとするとエラーを表示します。
error[E0382]: borrow of moved value: `val`
--> src/main.rs:10:31
|
8 | let val = String::from("hi");
| --- move occurs because `val` has type `String`, which does
not implement the `Copy` trait
9 | tx.send(val).unwrap();
| --- value moved here
10 | println!("val is {val}");
| ^^^ value borrowed here after move
私たちの並列処理上のミスが、コンパイル時のエラーを引き起こしました。send 関数はそのパラメータの所有権を取得し、値が移動すると、受信側がその所有権を取得します。これにより、値を送信した後に偶然再利用することが防がれます。所有権システムがすべてが正常であることを確認します。
複数の値を送信し、受信側が待機する様子を見る
リスト 16-8 のコードはコンパイルされて実行されましたが、2 つの別々のスレッドがチャネルを通じて相互に通信していることを明確に示していません。リスト 16-10 では、いくつかの修正を加えて、リスト 16-8 のコードが並列実行されていることを証明します。生成されたスレッドは、今回は複数のメッセージを送信し、各メッセージの間に 1 秒間停止します。
ファイル名:src/main.rs
use std::sync::mpsc;
use std::thread;
use std::time::Duration;
fn main() {
let (tx, rx) = mpsc::channel();
thread::spawn(move || {
let vals = vec![
String::from("hi"),
String::from("from"),
String::from("the"),
String::from("thread"),
];
for val in vals {
tx.send(val).unwrap();
thread::sleep(Duration::from_secs(1));
}
});
for received in rx {
println!("Got: {received}");
}
}
リスト 16-10: 複数のメッセージを送信し、各メッセージの間に停止する
今回、生成されたスレッドには、メインスレッドに送信したい文字列のベクトルがあります。それらを反復処理して個別に送信し、各送信の間に 1 秒間の Duration 値を持つ thread::sleep 関数を呼び出すことで、各送信の間に停止します。
メインスレッドでは、もはや明示的に recv 関数を呼び出していません。代わりに、rx をイテレータとして扱っています。受信した各値に対して、それを出力します。チャネルがクローズされると、反復処理は終了します。
リスト 16-10 のコードを実行すると、各行の間に 1 秒間の停止があり、以下の出力が表示されるはずです。
Got: hi
Got: from
Got: the
Got: thread
メインスレッドの for ループには、停止または遅延するコードがないため、メインスレッドが生成されたスレッドから値を受信するのを待っていることがわかります。
送信機をクローン化して複数の生成元を作成する
先ほど、mpsc は「複数の生成元、単一の消費者」の略語であると述べました。では、mpsc を使って、リスト 16-10 のコードを拡張して、すべてが同じ受信機に値を送信する複数のスレッドを作成してみましょう。送信機をクローン化することで、これを行うことができます。リスト 16-11 を参照してください。
ファイル名:src/main.rs
--snip--
let (tx, rx) = mpsc::channel();
let tx1 = tx.clone();
thread::spawn(move || {
let vals = vec![
String::from("hi"),
String::from("from"),
String::from("the"),
String::from("thread"),
];
for val in vals {
tx1.send(val).unwrap();
thread::sleep(Duration::from_secs(1));
}
});
thread::spawn(move || {
let vals = vec![
String::from("more"),
String::from("messages"),
String::from("for"),
String::from("you"),
];
for val in vals {
tx.send(val).unwrap();
thread::sleep(Duration::from_secs(1));
}
});
for received in rx {
println!("Got: {received}");
}
--snip--
リスト 16-11: 複数の生成元から複数のメッセージを送信する
今回は、最初の生成されたスレッドを作成する前に、送信機に対して clone を呼び出します。これにより、最初の生成されたスレッドに渡すことができる新しい送信機が得られます。元の送信機を 2 番目の生成されたスレッドに渡します。これにより、2 つのスレッドが得られ、それぞれが異なるメッセージを 1 つの受信機に送信します。
コードを実行すると、出力は以下のようになるはずです。
Got: hi
Got: more
Got: from
Got: messages
Got: for
Got: the
Got: thread
Got: you
システムによっては、値が別の順序で表示される場合があります。これが、並列処理を興味深く、また難しくするところです。thread::sleep を試してみて、異なるスレッドでさまざまな値を与えると、各実行はより非決定的になり、毎回異なる出力が生成されます。
ここまでチャネルの仕組みを見てきましたので、別の並列処理の方法を見てみましょう。
まとめ
おめでとうございます!「スレッド間でデータを転送するためのメッセージパッシングの使用方法」の実験を完了しました。あなたの技術を向上させるために、LabEx でさらに多くの実験を行って練習してください。