Rust 中的并行数据处理

Beginner

This tutorial is from open-source community. Access the source code

简介

在本实验中,我们将探索 Rust 编程语言使用 map-reduce 算法并行化数据处理的能力。示例代码通过将数据分成多个段,并在单独的线程中处理每个段,来计算一组数字中所有数字的总和。Rust 标准库提供了防止数据竞争并保证线程安全的线程原语。该程序还展示了 Rust 对跨线程边界传递只读引用的理解。此外,代码展示了如何使用闭包、迭代器和 join() 方法将每个线程的中间结果组合成最终的总和。为了确保效率,可以修改程序,将数据分块成有限数量的段,而不是依赖于可能导致过多线程的用户输入数据。

注意:如果实验未指定文件名,你可以使用任何你想要的文件名。例如,你可以使用 main.rs,并通过 rustc main.rs &&./main 进行编译和运行。

测试用例:map-reduce

Rust 让并行化数据处理变得非常容易,不会带来传统上进行此类尝试时遇到的诸多麻烦。

标准库开箱即用,提供了出色的线程原语。这些原语与 Rust 的所有权概念和别名规则相结合,能自动防止数据竞争。

别名规则(一个可写引用异或多个可读引用)会自动防止你操作其他线程可见的状态。(在需要同步的地方,有像 MutexChannel 这样的同步原语。)

在这个示例中,我们将计算一组数字中所有数字的总和。我们会把这组数字分成多个块,每个块由不同的线程处理。每个线程计算其小块数字的总和,随后我们将各个线程产生的中间和相加。

请注意,尽管我们在跨线程边界传递引用,但 Rust 明白我们传递的只是只读引用,因此不会出现不安全情况或数据竞争。而且由于我们传递的引用具有 'static 生命周期,Rust 知道在这些线程仍在运行时,我们的数据不会被销毁。(当你需要在线程之间共享非 static 数据时,可以使用像 Arc 这样的智能指针来保持数据存活,并避免非 static 生命周期。)

use std::thread;

// 这是 `main` 线程
fn main() {

    // 这是我们要处理的数据。
    // 我们将通过一个多线程的 map-reduce 算法来计算所有数字的总和。
    // 每个由空格分隔的块将在不同的线程中处理。
    //
    // 注意:如果你插入空格,看看输出会怎样!
    let data = "86967897737416471853297327050364959
11861322575564723963297542624962850
70856234701860851907960690014725639
38397966707106094172783238747669219
52380795257888236525459303330302837
58495327135744041048897885734297812
69920216438980873548808413720956532
16278424637452589860345374828574668";

    // 创建一个向量来保存我们要创建的子线程。
    let mut children = vec![];

    /*************************************************************************
     * “Map”阶段
     *
     * 将我们的数据分成多个段,并进行初始处理
     ************************************************************************/

    // 将我们的数据分成多个段进行单独计算
    // 每个块将是对实际数据的一个引用(&str)
    let chunked_data = data.split_whitespace();

    // 遍历数据段。
    //.enumerate() 会将当前循环索引添加到迭代的任何内容上
    // 然后,得到的元组“(索引,元素)”会立即
    // 通过“解构赋值”被“解构”成两个变量,“i”和“data_segment”
    for (i, data_segment) in chunked_data.enumerate() {
        println!("数据段 {} 是 \"{}\"", i, data_segment);

        // 在一个单独的线程中处理每个数据段
        //
        // spawn() 返回一个指向新线程的句柄,
        // 我们必须保留这个句柄才能访问返回值
        //
        //'move || -> u32' 是一个闭包的语法,它:
        // * 不接受参数 ('||')
        // * 拥有其捕获变量的所有权 ('move') 并且
        // * 返回一个无符号 32 位整数 ('-> u32')
        //
        // Rust 足够智能,能够从闭包本身推断出 '-> u32',所以我们可以省略它。
        //
        // 注意:尝试去掉'move',看看会发生什么
        children.push(thread::spawn(move || -> u32 {
            // 计算这个段的中间和:
            let result = data_segment
                        // 遍历我们段中的字符..
                      .chars()
                        //.. 将文本字符转换为它们的数字值..
                      .map(|c| c.to_digit(10).expect("应该是一个数字"))
                        //.. 并对得到的数字迭代器求和
                      .sum();

            // println! 会锁定标准输出,所以不会出现文本交错
            println!("处理后的段 {}, 结果={}", i, result);

            // 不需要“return”,因为 Rust 是一种“表达式语言”,
            // 每个块中最后计算的表达式会自动成为其值。
            result

        }));
    }


    /*************************************************************************
     * “Reduce”阶段
     *
     * 收集我们的中间结果,并将它们组合成一个最终结果
     ************************************************************************/

    // 将每个线程的中间结果组合成一个最终的总和。
    //
    // 我们使用“turbofish” ::<> 为 sum() 提供一个类型提示。
    //
    // 注意:尝试不使用 turbofish,而是显式
    // 指定 final_result 的类型
    let final_result = children.into_iter().map(|c| c.join().unwrap()).sum::<u32>();

    println!("最终总和结果:{}", final_result);
}

作业

让线程数量依赖于用户输入的数据是不明智的。如果用户决定插入大量空格会怎样?我们真的要创建 2000 个线程吗?修改程序,使数据总是被分成有限数量的块,这个数量由程序开头的一个静态常量定义。

总结

恭喜你!你已经完成了“测试用例:Map-Reduce”实验。你可以在 LabEx 中练习更多实验来提升你的技能。