Rust 中的共享状态并发

Beginner

This tutorial is from open-source community. Access the source code

简介

欢迎来到共享状态并发。本实验是《Rust 程序设计语言》的一部分。你可以在 LabEx 中练习你的 Rust 技能。

在本实验中,我们将探讨共享内存并发的概念,以及为什么热衷于消息传递的人对它持谨慎态度。

共享状态并发

消息传递是处理并发的一种不错的方式,但它不是唯一的方式。另一种方法是让多个线程访问相同的共享数据。再看看 Go 语言文档中的这句口号:“不要通过共享内存来进行通信。”

通过共享内存进行通信会是什么样子呢?此外,为什么热衷于消息传递的人会告诫不要使用内存共享呢?

在某种程度上,任何编程语言中的通道都类似于单一所有权,因为一旦你将一个值传递到通道中,就不应该再使用该值。共享内存并发类似于多重所有权:多个线程可以同时访问相同的内存位置。正如你在第 15 章中看到的,智能指针使得多重所有权成为可能,多重所有权会增加复杂性,因为这些不同的所有者需要管理。Rust 的类型系统和所有权规则极大地有助于正确地进行这种管理。举个例子,让我们看看互斥锁,它是共享内存中更常见的并发原语之一。

使用互斥锁一次只允许一个线程访问数据

“Mutex”是“mutual exclusion”(互斥)的缩写,也就是说互斥锁在任何给定时间只允许一个线程访问某些数据。要访问互斥锁中的数据,线程必须首先通过请求获取互斥锁的“锁”来表明它想要访问。锁是互斥锁的数据结构的一部分,用于跟踪当前谁对数据拥有独占访问权。因此,互斥锁被描述为通过锁定系统“保护”它所保存的数据。

互斥锁因难以使用而声名狼藉,因为你必须记住两条规则:

  1. 在使用数据之前,你必须尝试获取锁。
  2. 当你使用完互斥锁保护的数据后,必须解锁数据,以便其他线程可以获取锁。

用一个现实世界的比喻来说明互斥锁,想象一下在一个会议上只有一个麦克风的小组讨论。在小组成员发言之前,他们必须请求或示意他们想要使用麦克风。当他们拿到麦克风后,他们可以想说多久就说多久,然后把麦克风交给下一个请求发言的小组成员。如果一个小组成员说完后忘记把麦克风交出去,其他人就无法发言。如果共享麦克风的管理出了问题,小组讨论就无法按计划进行!

正确管理互斥锁可能非常棘手,这就是为什么有那么多人热衷于通道。不过,多亏了 Rust 的类型系统和所有权规则,你不会在锁定和解锁上出错。

Mutex<T> 的 API

作为如何使用互斥锁的一个示例,让我们先在单线程环境中使用互斥锁,如清单 16 - 12 所示。

文件名:src/main.rs

use std::sync::Mutex;

fn main() {
  1 let m = Mutex::new(5);

    {
      2 let mut num = m.lock().unwrap();
      3 *num = 6;
  4 }

  5 println!("m = {:?}", m);
}

清单 16 - 12:为简单起见,在单线程环境中探索 Mutex<T> 的 API

和许多类型一样,我们使用关联函数 new 创建一个 Mutex<T>[1]。要访问互斥锁内部的数据,我们使用 lock 方法来获取锁[2]。这个调用会阻塞当前线程,这样在轮到我们获取锁之前它就不能做任何工作。

如果持有锁的另一个线程恐慌(panic),对 lock 的调用将会失败。在那种情况下,将永远没有人能够获取到锁,所以我们选择调用 unwrap,如果处于那种情况就让这个线程恐慌。

在我们获取锁之后,我们可以将返回值(在这种情况下名为 num)当作对内部数据的可变引用。类型系统确保我们在使用 m 中的值之前获取锁。m 的类型是 Mutex<i32>,而不是 i32,所以我们 必须 调用 lock 才能使用 i32 值。我们不会忘记;否则类型系统不会让我们访问内部的 i32

正如你可能猜测的那样,Mutex<T> 是一个智能指针。更准确地说,对 lock 的调用 返回 一个名为 MutexGuard 的智能指针,它被包装在一个 LockResult 中,我们通过调用 unwrap 来处理它。MutexGuard 智能指针实现了 Deref 来指向我们的内部数据;这个智能指针也有一个 Drop 实现,当 MutexGuard 超出作用域时(在内部作用域结束时发生)会自动释放锁[4]。因此,我们不会有忘记释放锁并阻止其他线程使用互斥锁的风险,因为锁的释放是自动发生的。

在释放锁之后,我们可以打印互斥锁的值,并看到我们能够将内部的 i32 改为 6[5]。

在多个线程之间共享 Mutex<T>

现在让我们尝试使用 Mutex<T> 在多个线程之间共享一个值。我们将启动 10 个线程,让它们每个都将一个计数器值加 1,这样计数器就从 0 增加到 10。清单 16 - 13 中的示例会有一个编译错误,我们将利用这个错误来更多地了解使用 Mutex<T> 以及 Rust 如何帮助我们正确地使用它。

文件名:src/main.rs

use std::sync::Mutex;
use std::thread;

fn main() {
  1 let counter = Mutex::new(0);
    let mut handles = vec![];

  2 for _ in 0..10 {
      3 let handle = thread::spawn(move || {
          4 let mut num = counter.lock().unwrap();

          5 *num += 1;
        });
      6 handles.push(handle);
    }

    for handle in handles {
      7 handle.join().unwrap();
    }

  8 println!("Result: {}", *counter.lock().unwrap());
}

清单 16 - 13:10 个线程,每个线程都对由 Mutex<T> 保护的计数器加 1

和在清单 16 - 12 中一样,我们创建一个 counter 变量来在 Mutex<T> 中保存一个 i32[1]。接下来,我们通过遍历一个数字范围来创建 10 个线程[2]。我们使用 thread::spawn 并给所有线程相同的闭包:一个将计数器移动到线程中的闭包[3],通过调用 lock 方法获取 Mutex<T> 上的锁[4],然后将互斥锁中的值加 1[5]。当一个线程完成运行其闭包时,num 将超出作用域并释放锁,以便另一个线程可以获取它。

在主线程中,我们收集所有的连接句柄[6]。然后,就像在清单 16 - 2 中一样,我们对每个句柄调用 join 以确保所有线程都完成[7]。在那个时候,主线程将获取锁并打印这个程序的结果[8]。

我们暗示过这个示例不会编译。现在让我们找出原因!

error[E0382]: use of moved value: `counter`
  --> src/main.rs:9:36
   |
5  |     let counter = Mutex::new(0);
   |         ------- move occurs because `counter` has type `Mutex<i32>`, which
does not implement the `Copy` trait
...
9  |         let handle = thread::spawn(move || {
   |                                    ^^^^^^^ value moved into closure here,
in previous iteration of loop
10 |             let mut num = counter.lock().unwrap();
   |                           ------- use occurs due to use in closure

错误信息指出 counter 值在循环的上一次迭代中被移动了。Rust 告诉我们不能将锁 counter 的所有权移动到多个线程中。让我们使用第 15 章中讨论的多重所有权方法来修复编译错误。

多线程中的多重所有权

在第 15 章中,我们通过使用智能指针 Rc<T> 创建一个引用计数的值,将一个值赋予多个所有者。让我们在这里做同样的事情,看看会发生什么。在清单 16 - 14 中,我们将 Mutex<T> 包装在 Rc<T> 中,并在将所有权转移到线程之前克隆 Rc<T>

文件名:src/main.rs

use std::rc::Rc;
use std::sync::Mutex;
use std::thread;

fn main() {
    let counter = Rc::new(Mutex::new(0));
    let mut handles = vec![];

    for _ in 0..10 {
        let counter = Rc::clone(&counter);
        let handle = thread::spawn(move || {
            let mut num = counter.lock().unwrap();

            *num += 1;
        });
        handles.push(handle);
    }

    for handle in handles {
        handle.join().unwrap();
    }

    println!("Result: {}", *counter.lock().unwrap());
}

清单 16 - 14:尝试使用 Rc<T> 让多个线程拥有 Mutex<T>

再一次,我们编译后得到了……不同的错误!编译器教会了我们很多。

error[E0277]: `Rc<Mutex<i32>>` cannot be sent between threads safely 1
   --> src/main.rs:11:22
    |
11  |           let handle = thread::spawn(move || {
    |  ______________________^^^^^^^^^^^^^_-
    | |                      |
    | |                      `Rc<Mutex<i32>>` cannot be sent between threads
safely
12  | |             let mut num = counter.lock().unwrap();
13  | |
14  | |             *num += 1;
15  | |         });
    | |_________- within this `[closure@src/main.rs:11:36: 15:10]`
    |
= help: within `[closure@src/main.rs:11:36: 15:10]`, the trait `Send` is not
implemented for `Rc<Mutex<i32>>` 2
    = note: required because it appears within the type
`[closure@src/main.rs:11:36: 15:10]`
note: required by a bound in `spawn`

哇,那个错误信息好长啊!这里需要关注的重要部分是:“Rc<Mutex<i32>> 不能在线程之间安全地传递”[1]。编译器也告诉了我们原因:“Rc<Mutex<i32>> 没有实现 Send 特性”[2]。我们将在下一节讨论 Send:它是确保我们在线程中使用的类型适用于并发情况的特性之一。

不幸的是,Rc<T> 在线程间共享是不安全的。当 Rc<T> 管理引用计数时,每次调用 clone 它都会增加计数,每次克隆被丢弃时都会减少计数。但它没有使用任何并发原语来确保对计数的更改不会被另一个线程中断。这可能会导致错误的计数——这些微妙的错误可能会进而导致内存泄漏或在我们用完一个值之前它就被丢弃。我们需要的是一种与 Rc<T> 完全一样的类型,但它能以线程安全的方式更改引用计数。

使用 Arc<T> 进行原子引用计数

幸运的是,Arc<T> 是一种类似于 Rc<T> 的类型,在并发情况下使用是安全的。这里的“a”代表“atomic”(原子的),意思是它是一种“原子引用计数”类型。原子类型是另一种并发原语,我们在这里不会详细介绍:有关更多详细信息,请参阅 std::sync::atomic 的标准库文档。此时,你只需要知道原子类型的工作方式类似于基本类型,但在线程间共享是安全的。

然后你可能会想,为什么所有基本类型都不是原子的,以及为什么标准库类型默认不实现为使用 Arc<T>。原因是线程安全会带来性能开销,只有在真正需要时才值得付出这个代价。如果你只是在单个线程内对值执行操作,那么如果不必强制执行原子类型提供的保证,你的代码可以运行得更快。

让我们回到我们的示例:Arc<T>Rc<T> 具有相同的 API,所以我们通过更改 use 行、对 new 的调用以及对 clone 的调用,来修复我们的程序。清单 16 - 15 中的代码最终将编译并运行。

文件名:src/main.rs

use std::sync::{Arc, Mutex};
use std::thread;

fn main() {
    let counter = Arc::new(Mutex::new(0));
    let mut handles = vec![];

    for _ in 0..10 {
        let counter = Arc::clone(&counter);
        let handle = thread::spawn(move || {
            let mut num = counter.lock().unwrap();

            *num += 1;
        });
        handles.push(handle);
    }

    for handle in handles {
        handle.join().unwrap();
    }

    println!("Result: {}", *counter.lock().unwrap());
}

清单 16 - 15:使用 Arc<T> 包装 Mutex<T> 以便能够在多个线程之间共享所有权

这段代码将输出以下内容:

Result: 10

我们做到了!我们从 0 数到了 10,这可能看起来不是很了不起,但它确实让我们对 Mutex<T> 和线程安全有了很多了解。你也可以使用这个程序的结构来执行比仅仅增加计数器更复杂的操作。使用这种策略,你可以将一个计算分成独立的部分,在线程间拆分这些部分,然后使用 Mutex<T> 让每个线程用其部分更新最终结果。

请注意,如果你正在进行简单的数值操作,标准库的 std::sync::atomic 模块提供了比 Mutex<T> 类型更简单的类型。这些类型提供对基本类型的安全、并发、原子访问。我们在这个示例中选择将 Mutex<T> 与基本类型一起使用,以便我们可以专注于 Mutex<T> 的工作方式。

RefCell<T>/Rc<T> 与 Mutex<T>/Arc<T> 之间的相似之处

你可能已经注意到,counter 是不可变的,但我们可以获取到其内部值的可变引用;这意味着 Mutex<T>Cell 家族一样提供了内部可变性。就像我们在第 15 章中使用 RefCell<T> 来允许我们修改 Rc<T> 内部的内容一样,我们使用 Mutex<T> 来修改 Arc<T> 内部的内容。

另一个需要注意的细节是,当你使用 Mutex<T> 时,Rust 无法保护你免受所有类型的逻辑错误。回想一下第 15 章,使用 Rc<T> 存在创建引用循环的风险,即两个 Rc<T> 值相互引用,从而导致内存泄漏。类似地,Mutex<T> 存在产生 死锁 的风险。当一个操作需要锁定两个资源,而两个线程各自获取了其中一个锁,导致它们永远相互等待时,就会发生死锁。如果你对死锁感兴趣,可以尝试创建一个有死锁的 Rust 程序;然后研究任何语言中互斥锁的死锁缓解策略,并尝试在 Rust 中实现它们。Mutex<T>MutexGuard 的标准库 API 文档提供了有用的信息。

我们将通过讨论 SendSync 特性以及如何将它们与自定义类型一起使用来结束本章。

总结

恭喜你!你已经完成了共享状态并发实验。你可以在 LabEx 中练习更多实验来提升你的技能。