はじめに
共有状態の並列処理へようこそ。この実験は、Rust Bookの一部です。LabEx で Rust のスキルを練習できます。
この実験では、共有メモリ並列処理の概念と、メッセージパッシングの愛好者がそれに注意を促す理由を探ります。
共有状態の並列処理
メッセージパッシングは並列処理を扱う良い方法ですが、唯一の方法ではありません。別の方法は、複数のスレッドが同じ共有データにアクセスすることです。ここで再び Go 言語のドキュメントのソロガンの一部を見てみましょう:「メモリを共有することでコミュニケーションをとるな」。
メモリを共有することでコミュニケーションをとるとはどのようなものでしょうか。また、なぜメッセージパッシングの愛好者がメモリ共有を使わないように注意を促すのでしょうか。
ある意味で、どんなプログラミング言語におけるチャネルも単一所有権に似ています。なぜなら、チャネルを通じて値を転送すると、その値をもはや使用しないようになるからです。共有メモリ並列処理は複数所有権に似ています。複数のスレッドが同時に同じメモリ位置にアクセスできます。第 15 章で見たように、スマートポインタによって複数所有権が可能になりましたが、複数所有権は異なる所有者を管理する必要があるため、複雑さを増やす可能性があります。Rust の型システムと所有権ルールは、この管理を正しく行うのに大きく役立ちます。例として、共有メモリのより一般的な並列処理プリミティブの 1 つであるミューテックスを見てみましょう。
ミューテックスを使って一度に 1 つのスレッドからのデータアクセスを許可する
「Mutex」は「相互排他」(mutual exclusion)の略で、ミューテックスは任意の時点で 1 つのスレッドだけが特定のデータにアクセスできるようにします。ミューテックス内のデータにアクセスするには、まずスレッドはミューテックスの「ロック」を取得するように要求することでアクセスを希望することを信号で知らせる必要があります。ロックは、現在誰がデータに排他的なアクセスを持っているかを追跡するミューテックスの一部であるデータ構造です。したがって、ミューテックスはロッキングシステムを介して保持するデータを「保護」していると言われています。
ミューテックスは使いにくいと評判が悪いのですが、2 つのルールを覚えておかなければならないからです:
- データを使用する前にロックを取得しようとしなければなりません。
- ミューテックスが保護するデータを使い終えたら、データをアンロックして他のスレッドがロックを取得できるようにしなければなりません。
ミューテックスの現実世界的なメタファーとして、1 つのマイクしかない会議のパネルディスカッションを想像してみてください。パネリストが話す前には、マイクを使いたいことを申し出たり信号を送ったりしなければなりません。マイクを手に入れると、好きなだけ話すことができ、その後、話したいと要求する次のパネリストにマイクを渡すことができます。パネリストが話し終えたらマイクを渡すのを忘れてしまうと、他の誰も話すことができません。共有マイクの管理がうまくいかなければ、パネルは計画通りに機能しません!
ミューテックスの管理は正しく行うのが非常に難しいことがあります。これが、多くの人がチャネルに熱心な理由です。しかし、Rust の型システムと所有権ルールのおかげで、ロックの取得と解放を間違えることはありません。
Mutex<T>の API
ミューテックスを使う方法の例として、まず単一スレッドコンテキストでミューテックスを使ってみましょう。リスト 16-12 に示すようにです。
ファイル名:src/main.rs
use std::sync::Mutex;
fn main() {
1 let m = Mutex::new(5);
{
2 let mut num = m.lock().unwrap();
3 *num = 6;
4 }
5 println!("m = {:?}", m);
}
リスト 16-12: 単一スレッドコンテキストでのMutex<T>の API を単純化するために調べる
多くの型と同様に、関連付けられた関数newを使ってMutex<T>を作成します[1]。ミューテックス内のデータにアクセスするには、lockメソッドを使ってロックを取得します[2]。この呼び出しは現在のスレッドをブロックし、ロックを手に入れるまで何も作業を行えなくなります。
もしロックを保持している別のスレッドがパニックになった場合、lockの呼び出しは失敗します。その場合、誰もロックを取得できなくなりますので、その状況にあるときはunwrapを選択してこのスレッドをパニックにさせます。
ロックを取得した後、この場合numと名付けられた返り値を、内部のデータへの可変参照として扱うことができます。型システムにより、mの値を使う前にロックを取得することが保証されます。mの型はMutex<i32>で、i32ではありませんので、i32の値を使うためには必ずlockを呼び出さなければなりません。忘れることはできません。そうでなければ型システムが内部のi32にアクセスさせません。
おそらく想像できる通り、Mutex<T>はスマートポインタです。より正確には、lockの呼び出しはMutexGuardと呼ばれるスマートポインタを返し、unwrapの呼び出しで処理したLockResultにラップされています。MutexGuardスマートポインタはDerefを実装して内部データを指し、またDrop実装も持ち、MutexGuardがスコープ外になるとき(これは内部スコープの終わりで起こります)自動的にロックを解放します[4]。その結果、ロックを解放することを忘れて他のスレッドがミューテックスを使えなくなるというリスクはありません。なぜならロックの解放は自動的に行われるからです。
ロックを解放した後、ミューテックスの値を表示して、内部のi32を6に変更できたことがわかります[5]。
複数のスレッド間で Mutex<T>を共有する
次に、Mutex<T>を使って複数のスレッド間で値を共有してみましょう。10 個のスレッドを起動し、それぞれがカウンター値を 1 増やすようにします。すると、カウンターは 0 から 10 まで増えます。リスト 16-13 の例ではコンパイルエラーが発生します。そのエラーを使って、Mutex<T>の使い方と Rust がどのように正しく使うのを助けるかについてもっと学びましょう。
ファイル名:src/main.rs
use std::sync::Mutex;
use std::thread;
fn main() {
1 let counter = Mutex::new(0);
let mut handles = vec![];
2 for _ in 0..10 {
3 let handle = thread::spawn(move || {
4 let mut num = counter.lock().unwrap();
5 *num += 1;
});
6 handles.push(handle);
}
for handle in handles {
7 handle.join().unwrap();
}
8 println!("Result: {}", *counter.lock().unwrap());
}
リスト 16-13: 10 個のスレッドで、それぞれが Mutex<T>で保護されたカウンターを 1 増やす
リスト 16-12 と同じように、Mutex<T>内にi32を保持するためのcounter変数を作成します[1]。次に、数値の範囲を反復することで 10 個のスレッドを作成します[2]。thread::spawnを使って、すべてのスレッドに同じクロージャを与えます。それは、カウンターをスレッドに移動し[3]、lockメソッドを呼び出すことでMutex<T>のロックを取得し[4]、そしてミューテックス内の値に 1 を加えるものです[5]。スレッドがそのクロージャの実行を終えると、numはスコープ外になり、ロックが解放されて他のスレッドがそれを取得できるようになります。
メインスレッドでは、すべてのジョインハンドルを収集します[6]。そして、リスト 16-2 でやったように、各ハンドルにjoinを呼び出してすべてのスレッドが終了することを確認します[7]。その時点で、メインスレッドはロックを取得してこのプログラムの結果を表示します[8]。
この例がコンパイルされないことをヒントしました。では、なぜか見てみましょう!
error[E0382]: use of moved value: `counter`
--> src/main.rs:9:36
|
5 | let counter = Mutex::new(0);
| ------- move occurs because `counter` has type `Mutex<i32>`, which
does not implement the `Copy` trait
...
9 | let handle = thread::spawn(move || {
| ^^^^^^^ value moved into closure here,
in previous iteration of loop
10 | let mut num = counter.lock().unwrap();
| ------- use occurs due to use in closure
エラーメッセージによると、counter値はループの前の反復で移動されました。Rust は、ロックcounterの所有権を複数のスレッドに移動できないことを私たちに伝えています。第 15 章で議論した複数所有権の方法を使って、コンパイラエラーを修正しましょう。
複数のスレッドによる複数所有権
第 15 章では、スマートポインタRc<T>を使って参照カウント付きの値を作成することで、1 つの値を複数の所有者に与えました。ここでも同じことをして、何が起こるか見てみましょう。リスト 16-14 では、Mutex<T>をRc<T>でラップし、所有権をスレッドに移動する前にRc<T>をクローンします。
ファイル名:src/main.rs
use std::rc::Rc;
use std::sync::Mutex;
use std::thread;
fn main() {
let counter = Rc::new(Mutex::new(0));
let mut handles = vec![];
for _ in 0..10 {
let counter = Rc::clone(&counter);
let handle = thread::spawn(move || {
let mut num = counter.lock().unwrap();
*num += 1;
});
handles.push(handle);
}
for handle in handles {
handle.join().unwrap();
}
println!("Result: {}", *counter.lock().unwrap());
}
リスト 16-14: Rc<T>を使って複数のスレッドがMutex<T>を所有できるようにする試み
再びコンパイルすると、違うエラーが出ます!コンパイラがたくさん教えてくれています。
error[E0277]: `Rc<Mutex<i32>>` cannot be sent between threads safely 1
--> src/main.rs:11:22
|
11 | let handle = thread::spawn(move || {
| ______________________^^^^^^^^^^^^^_-
| | |
| | `Rc<Mutex<i32>>` cannot be sent between threads
safely
12 | | let mut num = counter.lock().unwrap();
13 | |
14 | | *num += 1;
15 | | });
| |_________- within this `[closure@src/main.rs:11:36: 15:10]`
|
= help: within `[closure@src/main.rs:11:36: 15:10]`, the trait `Send` is not
implemented for `Rc<Mutex<i32>>` 2
= note: required because it appears within the type
`[closure@src/main.rs:11:36: 15:10]`
note: required by a bound in `spawn`
えらーメッセージがとても長いです!注目すべき重要な部分はここです:「Rc<Mutex<i32>> cannot be sent between threads safely」[1]。コンパイラはまた、その理由も教えてくれています:「the trait Send is not implemented for Rc<Mutex<i32>>」[2]。次のセクションでSendについて話します。これは、スレッドで使う型が並列状況で使うことができるようにするためのトレイトの 1 つです。
残念ながら、Rc<T>はスレッド間で共有するのが安全ではありません。Rc<T>が参照カウントを管理するとき、cloneの各呼び出しでカウントが増え、各クローンが破棄されるときにカウントが減ります。しかし、参照カウントの変更が他のスレッドによって中断されないようにするための並列処理プリミティブを使っていません。これは誤ったカウントにつながる可能性があります。つまり、細かいバグがあり、それがメモリリークにつながったり、使い終わる前に値が破棄されたりする可能性があります。必要なのは、Rc<T>とまったく同じ型でありながら、参照カウントの変更をスレッドセーフな方法で行う型です。
Arc<T>を使った原子的参照カウント
幸いなことに、Arc<T>は並列状況で安全に使えるRc<T>のような型です。「a」は「atomic」を表し、つまりそれは「原子的に参照カウントされる」型です。アトミックは並列処理のプリミティブの一種で、ここでは詳細を説明しません。詳細については、std::sync::atomicの標準ライブラリドキュメントを参照してください。この時点では、アトミックが基本型のように動作するがスレッド間で共有するのが安全であることだけを知っておけば十分です。
それでは、なぜすべての基本型がアトミックでないのか、またなぜ標準ライブラリの型がデフォルトでArc<T>を使って実装されていないのか疑問に思うかもしれません。その理由は、スレッドセーフにすると性能ペナルティが伴うためで、本当に必要な場合にのみ支払いたいものです。単一のスレッド内で値に対して操作を行っているだけの場合、アトミックが提供する保証を強制する必要がなければ、コードが高速に実行できます。
例に戻りましょう。Arc<T>とRc<T>は同じ API を持っているので、use行、newの呼び出し、およびcloneの呼び出しを変更することでプログラムを修正します。リスト 16-15 のコードはついにコンパイルされて実行されます。
ファイル名:src/main.rs
use std::sync::{Arc, Mutex};
use std::thread;
fn main() {
let counter = Arc::new(Mutex::new(0));
let mut handles = vec![];
for _ in 0..10 {
let counter = Arc::clone(&counter);
let handle = thread::spawn(move || {
let mut num = counter.lock().unwrap();
*num += 1;
});
handles.push(handle);
}
for handle in handles {
handle.join().unwrap();
}
println!("Result: {}", *counter.lock().unwrap());
}
リスト 16-15: Arc<T>を使ってMutex<T>をラップし、複数のスレッド間で所有権を共有できるようにする
このコードは次のように表示されます:
Result: 10
成功です!0 から 10 まで数えました。これはあまり印象的ではないかもしれませんが、Mutex<T>とスレッドセーフについてたくさん学ぶことができました。また、このプログラムの構造を使って、カウンターをインクリメントするだけよりも複雑な操作を行うこともできます。この戦略を使えば、計算を独立した部分に分割し、それらの部分をスレッド間で分割し、そして各スレッドがその部分で最終結果を更新するようにMutex<T>を使うことができます。
単純な数値演算を行っている場合、標準ライブラリのstd::sync::atomicモジュールが提供するMutex<T>型よりも単純な型があります。これらの型は、基本型に対する安全な並列アトミックアクセスを提供します。この例では、Mutex<T>と基本型を使って、Mutex<T>の動作方法に集中できるようにしました。
RefCell<T>/Rc<T>と Mutex<T>/Arc<T>の類似点
counterは不変ですが、その中の値に対して可変参照を取得できることに気付いたかもしれません。これは、Mutex<T>がCellファミリと同じように内部可変性を提供することを意味します。第 15 章でRefCell<T>を使ってRc<T>内の内容を変更できるようにしたのと同じように、Mutex<T>を使ってArc<T>内の内容を変更します。
注目すべきもう一つの点は、Mutex<T>を使うとき、Rust はあらゆる種類の論理エラーからあなたを守ってくれないことです。第 15 章で思い出してください。Rc<T>を使うと、2 つのRc<T>値が互いを参照する参照サイクルを作成するリスクがあり、メモリリークにつながります。同様に、Mutex<T>は 死鎖 を作成するリスクがあります。これは、操作が 2 つのリソースをロックする必要があり、2 つのスレッドがそれぞれ 1 つのロックを取得しており、お互いを永遠に待たせる場合に発生します。死鎖に興味がある場合は、死鎖を発生させる Rust プログラムを作成してみてください。その後、任意の言語のミューテックスの死鎖軽減戦略を調べ、Rust でそれらを実装してみてください。Mutex<T>とMutexGuardの標準ライブラリ API ドキュメントには有用な情報があります。
この章は、SendとSyncトレイトと、カスタム型でそれらをどのように使うかについて話すことで締めくくります。
まとめ
おめでとうございます!共有状態の並列処理の実験を完了しました。LabEx でさらに多くの実験を行って、スキルを向上させることができます。