简介
欢迎来到「优雅关闭与清理」实验。本实验是 Rust 程序设计语言 的一部分。你可以在 LabEx 中练习 Rust 技能。
在本实验中,我们将通过利用 Drop 特性并提供一种让线程停止接受新请求并关闭的方式,在代码中实现优雅关闭与清理机制。
优雅关闭与清理
如我们所愿,清单 20-20 中的代码通过使用线程池异步响应请求。我们收到了一些关于workers、id和thread字段的警告,这些字段我们没有直接使用,这提醒我们没有清理任何东西。当我们使用不太优雅的 ctrl-C 方法来停止主线程时,所有其他线程也会立即停止,即使它们正在处理请求的过程中。
接下来,我们将实现Drop特性,以便对线程池中的每个线程调用join,这样它们就可以在关闭之前完成正在处理的请求。然后,我们将实现一种方法来告诉线程它们应该停止接受新请求并关闭。为了看到这段代码的实际运行情况,我们将修改我们的服务器,使其在优雅地关闭线程池之前只接受两个请求。
在线程池上实现 Drop 特性
让我们从在线程池上实现 Drop 特性开始。当线程池被丢弃时,我们的线程应该全部调用 join 以确保它们完成工作。清单 20-22 展示了对 Drop 实现的首次尝试;这段代码目前还不能正常工作。
文件名:src/lib.rs
impl Drop for ThreadPool {
fn drop(&mut self) {
1 for worker in &mut self.workers {
2 println!("Shutting down worker {}", worker.id);
3 worker.thread.join().unwrap();
}
}
}
清单 20-22:当线程池超出作用域时,让每个线程调用 join
首先,我们遍历线程池中的每个 worker [1]。这里我们使用 &mut,因为 self 是一个可变引用,并且我们还需要能够修改 worker。对于每个 worker,我们打印一条消息,表明这个特定的 Worker 实例正在关闭 [2],然后我们在该 Worker 实例的线程上调用 join [3]。如果对 join 的调用失败,我们使用 unwrap 使 Rust 发生恐慌并进入非优雅关闭状态。
当我们编译这段代码时,会得到以下错误:
error[E0507]: cannot move out of `worker.thread` which is behind a mutable
reference
--> src/lib.rs:52:13
|
52 | worker.thread.join().unwrap();
| ^^^^^^^^^^^^^ ------ `worker.thread` moved due to this
method call
| |
| move occurs because `worker.thread` has type
`JoinHandle<()>`, which does not implement the `Copy` trait
|
note: this function takes ownership of the receiver `self`, which moves
`worker.thread`
错误提示我们不能调用 join,因为我们对每个 worker 只有一个可变借用,而 join 会获取其参数的所有权。为了解决这个问题,我们需要将线程从拥有 thread 的 Worker 实例中移出,这样 join 才能消耗该线程。我们在清单 17-15 中就是这么做的:如果 Worker 持有一个 Option<thread::JoinHandle<()>>,而不是其他类型,我们就可以在 Option 上调用 take 方法,将值从 Some 变体中移出,并在其位置留下一个 None 变体。换句话说,正在运行的 Worker 在 thread 中会有一个 Some 变体,当我们想要清理一个 Worker 时,我们会用 None 替换 Some,这样 Worker 就没有线程可运行了。
所以我们知道需要像这样更新 Worker 的定义:
文件名:src/lib.rs
struct Worker {
id: usize,
thread: Option<thread::JoinHandle<()>>,
}
现在让我们依靠编译器来找到其他需要更改的地方。检查这段代码时,我们得到两个错误:
error[E0599]: no method named `join` found for enum `Option` in the current
scope
--> src/lib.rs:52:27
|
52 | worker.thread.join().unwrap();
| ^^^^ method not found in
`Option<JoinHandle<()>>`
error[E0308]: mismatched types
--> src/lib.rs:72:22
|
72 | Worker { id, thread }
| ^^^^^^ expected enum `Option`, found struct
`JoinHandle`
|
= note: expected enum `Option<JoinHandle<()>>`
found struct `JoinHandle<_>`
help: try wrapping the expression in `Some`
|
72 | Worker { id, thread: Some(thread) }
| +++++++++++++ +
让我们先解决第二个错误,它指向 Worker::new 末尾的代码;当我们创建一个新的 Worker 时,需要将 thread 值包装在 Some 中。进行以下更改以修复此错误:
文件名:src/lib.rs
impl Worker {
fn new(
id: usize,
receiver: Arc<Mutex<mpsc::Receiver<Job>>>,
) -> Worker {
--snip--
Worker {
id,
thread: Some(thread),
}
}
}
第一个错误出现在我们的 Drop 实现中。我们之前提到过,我们打算在 Option 值上调用 take 以将 thread 从 worker 中移出。以下更改将实现这一点:
文件名:src/lib.rs
impl Drop for ThreadPool {
fn drop(&mut self) {
for worker in &mut self.workers {
println!("Shutting down worker {}", worker.id);
1 if let Some(thread) = worker.thread.take() {
2 thread.join().unwrap();
}
}
}
}
如第 17 章所述,Option 上的 take 方法会取出 Some 变体并在其位置留下 None。我们使用 if let 来解构 Some 并获取线程 [1];然后我们在线程上调用 join [2]。如果一个 Worker 实例的线程已经是 None,我们知道该 Worker 的线程已经被清理,所以在这种情况下什么也不会发生。
向线程发送信号以停止监听任务
经过我们所做的所有更改,代码可以编译通过且没有任何警告。然而,坏消息是这段代码目前的运行方式并非我们期望的那样。关键在于 Worker 实例的线程所运行的闭包中的逻辑:目前,我们调用了 join,但这并不会关闭线程,因为它们会永远在 loop 中寻找任务。如果我们尝试使用当前的 drop 实现来丢弃我们的 ThreadPool,主线程将会永远阻塞,等待第一个线程完成。
为了解决这个问题,我们需要对 ThreadPool 的 drop 实现进行更改,然后再更改 Worker 中的循环。
首先,我们将更改 ThreadPool 的 drop 实现,以便在等待线程完成之前显式地丢弃 sender。清单 20-23 展示了对 ThreadPool 进行的更改,以显式地丢弃 sender。我们使用与处理线程时相同的 Option 和 take 技术,以便能够将 sender 从 ThreadPool 中移出。
文件名:src/lib.rs
pub struct ThreadPool {
workers: Vec<Worker>,
sender: Option<mpsc::Sender<Job>>,
}
--snip--
impl ThreadPool {
pub fn new(size: usize) -> ThreadPool {
--snip--
ThreadPool {
workers,
sender: Some(sender),
}
}
pub fn execute<F>(&self, f: F)
where
F: FnOnce() + Send + 'static,
{
let job = Box::new(f);
self.sender
.as_ref()
.unwrap()
.send(job)
.unwrap();
}
}
impl Drop for ThreadPool {
fn drop(&mut self) {
1 drop(self.sender.take());
for worker in &mut self.workers {
println!("Shutting down worker {}", worker.id);
if let Some(thread) = worker.thread.take() {
thread.join().unwrap();
}
}
}
}
清单 20-23:在加入 Worker 线程之前显式丢弃 sender
丢弃 sender [1] 会关闭通道,这表明不会再发送更多消息。当这种情况发生时,Worker 实例在无限循环中对 recv 的所有调用都将返回一个错误。在清单 20-24 中,我们更改了 Worker 中的循环,以便在这种情况下优雅地退出循环,这意味着当 ThreadPool 的 drop 实现对它们调用 join 时,线程将会完成。
文件名:src/lib.rs
impl Worker {
fn new(
id: usize,
receiver: Arc<Mutex<mpsc::Receiver<Job>>>,
) -> Worker {
let thread = thread::spawn(move || loop {
let message = receiver.lock().unwrap().recv();
match message {
Ok(job) => {
println!(
"Worker {id} got a job; executing."
);
job();
}
Err(_) => {
println!(
"Worker {id} shutting down."
);
break;
}
}
});
Worker {
id,
thread: Some(thread),
}
}
}
清单 20-24:当 recv 返回错误时显式跳出循环
为了看到这段代码的实际运行情况,让我们修改 main 函数,使其在优雅地关闭服务器之前只接受两个请求,如清单 20-25 所示。
文件名:src/main.rs
fn main() {
let listener = TcpListener::bind("127.0.0.1:7878").unwrap();
let pool = ThreadPool::new(4);
for stream in listener.incoming().take(2) {
let stream = stream.unwrap();
pool.execute(|| {
handle_connection(stream);
});
}
println!("Shutting down.");
}
清单 20-25:通过退出循环在处理两个请求后关闭服务器
在实际的 Web 服务器中,你肯定不希望它只处理两个请求就关闭。这段代码只是为了演示优雅关闭和清理功能是否正常工作。
take 方法是在 Iterator 特性中定义的,它最多将迭代限制为前两个项。ThreadPool 将在 main 函数结束时超出作用域,然后 drop 实现将会运行。
使用 cargo run 启动服务器,并发送三个请求。第三个请求应该会出错,在你的终端中你应该会看到类似这样的输出:
$ cargo run
Compiling hello v0.1.0 (file:///projects/hello)
Finished dev [unoptimized + debuginfo] target(s) in 1.0s
Running `target/debug/hello`
Worker 0 got a job; executing.
Shutting down.
Shutting down worker 0
Worker 3 got a job; executing.
Worker 1 disconnected; shutting down.
Worker 2 disconnected; shutting down.
Worker 3 disconnected; shutting down.
Worker 0 disconnected; shutting down.
Shutting down worker 1
Shutting down worker 2
Shutting down worker 3
你可能会看到不同的 Worker ID 和打印消息的顺序。从这些消息中我们可以看出这段代码的工作方式:Worker 实例 0 和 3 收到了前两个请求。在第二个连接之后,服务器停止接受连接,并且 ThreadPool 上的 Drop 实现甚至在 Worker 3 开始其任务之前就开始执行。丢弃 sender 会断开所有 Worker 实例的连接,并告诉它们关闭。每个 Worker 实例在断开连接时都会打印一条消息,然后线程池调用 join 来等待每个 Worker 线程完成。
注意这个特定执行过程中的一个有趣方面:ThreadPool 丢弃了 sender,并且在任何 Worker 收到错误之前,我们尝试加入 Worker 0。Worker 0 还没有从 recv 收到错误,所以主线程阻塞,等待 Worker 0 完成。与此同时,Worker 3 收到了一个任务,然后所有线程都收到了一个错误。当 Worker 0 完成时,主线程等待其余的 Worker 实例完成。此时,它们都已经退出了循环并停止了。
恭喜!我们现在已经完成了我们的项目;我们有一个基本的 Web 服务器,它使用线程池来异步响应。我们能够对服务器进行优雅关闭,这会清理线程池中的所有线程。请访问 https://www.nostarch.com/Rust2021 下载本章的完整代码以供参考。
我们还可以做更多的事情!如果你想继续改进这个项目,这里有一些想法:
- 为
ThreadPool及其公共方法添加更多文档。 - 添加对库功能的测试。
- 将对
unwrap的调用更改为更健壮的错误处理。 - 使用
ThreadPool来执行除服务 Web 请求之外的其他任务。 - 在 https://crates.io 上找到一个线程池 crate,并使用该 crate 实现一个类似的 Web 服务器。然后将其 API 和健壮性与我们实现的线程池进行比较。
总结
恭喜你!你已经完成了「优雅关闭与清理」实验。你可以在 LabEx 中练习更多实验来提升你的技能。