テストケース: マップリデュース
Rust は、データ処理を並列化することを非常に簡単にします。このような試みに関連する伝統的な問題の多くを伴わずにです。
標準ライブラリは、すぐに使える素晴らしいスレッドプリミティブを提供します。これらは、Rust の所有権とエイリアシングルールの概念と組み合わされることで、自動的にデータ競合を防止します。
エイリアシングルール (1 つの書き込み可能参照 XOR 複数の読み取り可能参照) は、自動的に他のスレッドに対して可視な状態を操作することを防いでくれます。(同期が必要な場合、Mutex
や Channel
などの同期プリミティブがあります。)
この例では、数字のブロック内のすべての桁の合計を計算します。これを行うには、ブロックのチャンクを異なるスレッドに分割します。各スレッドはその小さな桁のブロックの合計を計算し、その後、各スレッドが生成した中間合計を合計します。
注意点として、スレッド境界を越えて参照を渡していますが、Rust は、読み取り専用参照のみを渡していることを理解しており、したがって、不安全性やデータ競合は発生しません。また、渡している参照が 'static
の寿命を持っているため、Rust は、これらのスレッドがまだ実行中の間にデータが破棄されないことを理解しています。(スレッド間で非 static
データを共有する必要がある場合、Arc
のようなスマートポインタを使ってデータを生存させ、非 static
の寿命を回避することができます。)
use std::thread;
// これは `main` スレッドです
fn main() {
// これは処理するデータです。
// スレッド付きのマップリデュースアルゴリズムを使って、すべての桁の合計を計算します。
// 各空白区切りのチャンクは別のスレッドで処理されます。
//
// TODO: 空白を挿入した場合の出力を確認してみてください!
let data = "86967897737416471853297327050364959
11861322575564723963297542624962850
70856234701860851907960690014725639
38397966707106094172783238747669219
52380795257888236525459303330302837
58495327135744041048897885734297812
69920216438980873548808413720956532
16278424637452589860345374828574668";
// 生成する子スレッドを保持するためのベクタを作成します。
let mut children = vec![];
/*************************************************************************
* "Map" フェーズ
*
* データをセグメントに分割し、初期処理を適用します
************************************************************************/
// 個別の計算のためにデータをセグメントに分割します
// 各チャンクは実際のデータへの参照 (&str) になります
let chunked_data = data.split_whitespace();
// データセグメントを反復処理します。
//.enumerate() は、反復処理するものに対して現在のループインデックスを追加します
// 結果のタプル "(index, element)" は、その後すぐに
// 2 つの変数 "i" と "data_segment" に "構文分解代入" で "構文分解" されます
for (i, data_segment) in chunked_data.enumerate() {
println!("data segment {} is \"{}\"", i, data_segment);
// 各データセグメントを別のスレッドで処理します
//
// spawn() は新しいスレッドへのハンドルを返します
// 返された値にアクセスするためには、これを必ず保持する必要があります
//
//'move || -> u32' は、次のようなクロージャの構文です:
// * 引数を取らない ('||')
// * キャプチャした変数の所有権を取得する ('move') と
// * 符号なし 32 ビット整数を返す ('-> u32')
//
// Rust は十分に賢く、クロージャ自体から '-> u32' を推論することができるので、これを省略することができました。
//
// TODO: 'move' を削除してみて、何が起こるか確認してください
children.push(thread::spawn(move || -> u32 {
// このセグメントの中間合計を計算します:
let result = data_segment
// セグメントの文字を反復処理します..
.chars()
//.. テキスト文字をその数値に変換します..
.map(|c| c.to_digit(10).expect("should be a digit"))
//.. そして生成された数値の反復子を合計します
.sum();
// println! は標準出力をロックするので、テキストの交差表示は発生しません
println!("processed segment {}, result={}", i, result);
// "return" は必要ありません。Rust は "式言語" なので、
// 各ブロックの最後に評価された式が自動的にその値になります。
result
}));
}
/*************************************************************************
* "Reduce" フェーズ
*
* 中間結果を収集し、最終結果に結合します
************************************************************************/
// 各スレッドの中間結果を単一の最終合計に結合します。
//
// 型ヒントを sum() に与えるために "ターボフィッシュ" ::<> を使います。
//
// TODO: ターボフィッシュを使わずに、代わりに明示的に
// final_result の型を指定してみてください
let final_result = children.into_iter().map(|c| c.join().unwrap()).sum::<u32>();
println!("Final sum result: {}", final_result);
}
課題
スレッド数をユーザ入力データに依存させるのは賢明ではありません。ユーザがたくさんの空白を挿入した場合どうなるでしょうか?本当に 2,000 個のスレッドを生成したいのでしょうか?プログラムを修正して、データを常にプログラムの最初に定義された静的定数によって定義される制限された数のチャンクにチャンク化するようにしましょう。