测试用例:map-reduce
Rust 让并行化数据处理变得非常容易,不会带来传统上进行此类尝试时遇到的诸多麻烦。
标准库开箱即用,提供了出色的线程原语。这些原语与 Rust 的所有权概念和别名规则相结合,能自动防止数据竞争。
别名规则(一个可写引用异或多个可读引用)会自动防止你操作其他线程可见的状态。(在需要同步的地方,有像 Mutex 或 Channel 这样的同步原语。)
在这个示例中,我们将计算一组数字中所有数字的总和。我们会把这组数字分成多个块,每个块由不同的线程处理。每个线程计算其小块数字的总和,随后我们将各个线程产生的中间和相加。
请注意,尽管我们在跨线程边界传递引用,但 Rust 明白我们传递的只是只读引用,因此不会出现不安全情况或数据竞争。而且由于我们传递的引用具有 'static 生命周期,Rust 知道在这些线程仍在运行时,我们的数据不会被销毁。(当你需要在线程之间共享非 static 数据时,可以使用像 Arc 这样的智能指针来保持数据存活,并避免非 static 生命周期。)
use std::thread;
// 这是 `main` 线程
fn main() {
// 这是我们要处理的数据。
// 我们将通过一个多线程的 map-reduce 算法来计算所有数字的总和。
// 每个由空格分隔的块将在不同的线程中处理。
//
// 注意:如果你插入空格,看看输出会怎样!
let data = "86967897737416471853297327050364959
11861322575564723963297542624962850
70856234701860851907960690014725639
38397966707106094172783238747669219
52380795257888236525459303330302837
58495327135744041048897885734297812
69920216438980873548808413720956532
16278424637452589860345374828574668";
// 创建一个向量来保存我们要创建的子线程。
let mut children = vec![];
/*************************************************************************
* “Map”阶段
*
* 将我们的数据分成多个段,并进行初始处理
************************************************************************/
// 将我们的数据分成多个段进行单独计算
// 每个块将是对实际数据的一个引用(&str)
let chunked_data = data.split_whitespace();
// 遍历数据段。
//.enumerate() 会将当前循环索引添加到迭代的任何内容上
// 然后,得到的元组“(索引,元素)”会立即
// 通过“解构赋值”被“解构”成两个变量,“i”和“data_segment”
for (i, data_segment) in chunked_data.enumerate() {
println!("数据段 {} 是 \"{}\"", i, data_segment);
// 在一个单独的线程中处理每个数据段
//
// spawn() 返回一个指向新线程的句柄,
// 我们必须保留这个句柄才能访问返回值
//
//'move || -> u32' 是一个闭包的语法,它:
// * 不接受参数 ('||')
// * 拥有其捕获变量的所有权 ('move') 并且
// * 返回一个无符号 32 位整数 ('-> u32')
//
// Rust 足够智能,能够从闭包本身推断出 '-> u32',所以我们可以省略它。
//
// 注意:尝试去掉'move',看看会发生什么
children.push(thread::spawn(move || -> u32 {
// 计算这个段的中间和:
let result = data_segment
// 遍历我们段中的字符..
.chars()
//.. 将文本字符转换为它们的数字值..
.map(|c| c.to_digit(10).expect("应该是一个数字"))
//.. 并对得到的数字迭代器求和
.sum();
// println! 会锁定标准输出,所以不会出现文本交错
println!("处理后的段 {}, 结果={}", i, result);
// 不需要“return”,因为 Rust 是一种“表达式语言”,
// 每个块中最后计算的表达式会自动成为其值。
result
}));
}
/*************************************************************************
* “Reduce”阶段
*
* 收集我们的中间结果,并将它们组合成一个最终结果
************************************************************************/
// 将每个线程的中间结果组合成一个最终的总和。
//
// 我们使用“turbofish” ::<> 为 sum() 提供一个类型提示。
//
// 注意:尝试不使用 turbofish,而是显式
// 指定 final_result 的类型
let final_result = children.into_iter().map(|c| c.join().unwrap()).sum::<u32>();
println!("最终总和结果:{}", final_result);
}
作业
让线程数量依赖于用户输入的数据是不明智的。如果用户决定插入大量空格会怎样?我们真的要创建 2000 个线程吗?修改程序,使数据总是被分成有限数量的块,这个数量由程序开头的一个静态常量定义。