Rust における並列データ処理

RustRustBeginner
今すぐ練習

This tutorial is from open-source community. Access the source code

💡 このチュートリアルは英語版からAIによって翻訳されています。原文を確認するには、 ここをクリックしてください

はじめに

この実験では、マップリデュースアルゴリズムを使ってデータ処理を並列化する能力について、Rust プログラミング言語を調べます。サンプルコードは、データをセグメントに分割し、各セグメントを別のスレッドで処理することで、数字のブロック内のすべての桁の合計を計算します。Rust 標準ライブラリは、データ競合を防止し、スレッドセーフを保証するスレッドプリミティブを提供します。このプログラムはまた、スレッド境界を越えて読み取り専用参照を渡す Rust の理解を示しています。さらに、コードはクロージャ、反復子、および join() メソッドの使用を示しており、各スレッドの中間結果を最終的な合計に結合しています。効率を保証するために、プログラムは、過度の数のスレッドにつながる可能性のあるユーザー入力データに依存する代わりに、データを制限された数のセグメントにチャンク化するように修正できます。

注: 実験でファイル名が指定されていない場合、好きなファイル名を使うことができます。たとえば、main.rs を使って、rustc main.rs &&./main でコンパイルして実行することができます。

テストケース: マップリデュース

Rust は、データ処理を並列化することを非常に簡単にします。このような試みに関連する伝統的な問題の多くを伴わずにです。

標準ライブラリは、すぐに使える素晴らしいスレッドプリミティブを提供します。これらは、Rust の所有権とエイリアシングルールの概念と組み合わされることで、自動的にデータ競合を防止します。

エイリアシングルール (1 つの書き込み可能参照 XOR 複数の読み取り可能参照) は、自動的に他のスレッドに対して可視な状態を操作することを防いでくれます。(同期が必要な場合、MutexChannel などの同期プリミティブがあります。)

この例では、数字のブロック内のすべての桁の合計を計算します。これを行うには、ブロックのチャンクを異なるスレッドに分割します。各スレッドはその小さな桁のブロックの合計を計算し、その後、各スレッドが生成した中間合計を合計します。

注意点として、スレッド境界を越えて参照を渡していますが、Rust は、読み取り専用参照のみを渡していることを理解しており、したがって、不安全性やデータ競合は発生しません。また、渡している参照が 'static の寿命を持っているため、Rust は、これらのスレッドがまだ実行中の間にデータが破棄されないことを理解しています。(スレッド間で非 static データを共有する必要がある場合、Arc のようなスマートポインタを使ってデータを生存させ、非 static の寿命を回避することができます。)

use std::thread;

// これは `main` スレッドです
fn main() {

    // これは処理するデータです。
    // スレッド付きのマップリデュースアルゴリズムを使って、すべての桁の合計を計算します。
    // 各空白区切りのチャンクは別のスレッドで処理されます。
    //
    // TODO: 空白を挿入した場合の出力を確認してみてください!
    let data = "86967897737416471853297327050364959
11861322575564723963297542624962850
70856234701860851907960690014725639
38397966707106094172783238747669219
52380795257888236525459303330302837
58495327135744041048897885734297812
69920216438980873548808413720956532
16278424637452589860345374828574668";

    // 生成する子スレッドを保持するためのベクタを作成します。
    let mut children = vec![];

    /*************************************************************************
     * "Map" フェーズ
     *
     * データをセグメントに分割し、初期処理を適用します
     ************************************************************************/

    // 個別の計算のためにデータをセグメントに分割します
    // 各チャンクは実際のデータへの参照 (&str) になります
    let chunked_data = data.split_whitespace();

    // データセグメントを反復処理します。
    //.enumerate() は、反復処理するものに対して現在のループインデックスを追加します
    // 結果のタプル "(index, element)" は、その後すぐに
    // 2 つの変数 "i" と "data_segment" に "構文分解代入" で "構文分解" されます
    for (i, data_segment) in chunked_data.enumerate() {
        println!("data segment {} is \"{}\"", i, data_segment);

        // 各データセグメントを別のスレッドで処理します
        //
        // spawn() は新しいスレッドへのハンドルを返します
        // 返された値にアクセスするためには、これを必ず保持する必要があります
        //
        //'move || -> u32' は、次のようなクロージャの構文です:
        // * 引数を取らない ('||')
        // * キャプチャした変数の所有権を取得する ('move') と
        // * 符号なし 32 ビット整数を返す ('-> u32')
        //
        // Rust は十分に賢く、クロージャ自体から '-> u32' を推論することができるので、これを省略することができました。
        //
        // TODO: 'move' を削除してみて、何が起こるか確認してください
        children.push(thread::spawn(move || -> u32 {
            // このセグメントの中間合計を計算します:
            let result = data_segment
                        // セグメントの文字を反復処理します..
                       .chars()
                        //.. テキスト文字をその数値に変換します..
                       .map(|c| c.to_digit(10).expect("should be a digit"))
                        //.. そして生成された数値の反復子を合計します
                       .sum();

            // println! は標準出力をロックするので、テキストの交差表示は発生しません
            println!("processed segment {}, result={}", i, result);

            // "return" は必要ありません。Rust は "式言語" なので、
            // 各ブロックの最後に評価された式が自動的にその値になります。
            result

        }));
    }


    /*************************************************************************
     * "Reduce" フェーズ
     *
     * 中間結果を収集し、最終結果に結合します
     ************************************************************************/

    // 各スレッドの中間結果を単一の最終合計に結合します。
    //
    // 型ヒントを sum() に与えるために "ターボフィッシュ" ::<> を使います。
    //
    // TODO: ターボフィッシュを使わずに、代わりに明示的に
    // final_result の型を指定してみてください
    let final_result = children.into_iter().map(|c| c.join().unwrap()).sum::<u32>();

    println!("Final sum result: {}", final_result);
}

課題

スレッド数をユーザ入力データに依存させるのは賢明ではありません。ユーザがたくさんの空白を挿入した場合どうなるでしょうか?本当に 2,000 個のスレッドを生成したいのでしょうか?プログラムを修正して、データを常にプログラムの最初に定義された静的定数によって定義される制限された数のチャンクにチャンク化するようにしましょう。

まとめ

おめでとうございます!あなたはテストケース: マップリデュースの実験を完了しました。あなたのスキルを向上させるために、LabEx でさらに多くの実験を練習することができます。