简介
欢迎来到「优雅关闭与清理」实验。本实验是 Rust 程序设计语言 的一部分。你可以在 LabEx 中练习 Rust 技能。
在本实验中,我们将通过利用 Drop
特性并提供一种让线程停止接受新请求并关闭的方式,在代码中实现优雅关闭与清理机制。
This tutorial is from open-source community. Access the source code
💡 本教程由 AI 辅助翻译自英文原版。如需查看原文,您可以 切换至英文原版
欢迎来到「优雅关闭与清理」实验。本实验是 Rust 程序设计语言 的一部分。你可以在 LabEx 中练习 Rust 技能。
在本实验中,我们将通过利用 Drop
特性并提供一种让线程停止接受新请求并关闭的方式,在代码中实现优雅关闭与清理机制。
如我们所愿,清单20-20中的代码通过使用线程池异步响应请求。我们收到了一些关于workers
、id
和thread
字段的警告,这些字段我们没有直接使用,这提醒我们没有清理任何东西。当我们使用不太优雅的ctrl-C方法来停止主线程时,所有其他线程也会立即停止,即使它们正在处理请求的过程中。
接下来,我们将实现Drop
特性,以便对线程池中的每个线程调用join
,这样它们就可以在关闭之前完成正在处理的请求。然后,我们将实现一种方法来告诉线程它们应该停止接受新请求并关闭。为了看到这段代码的实际运行情况,我们将修改我们的服务器,使其在优雅地关闭线程池之前只接受两个请求。
让我们从在线程池上实现 Drop
特性开始。当线程池被丢弃时,我们的线程应该全部调用 join
以确保它们完成工作。清单20-22展示了对 Drop
实现的首次尝试;这段代码目前还不能正常工作。
文件名:src/lib.rs
impl Drop for ThreadPool {
fn drop(&mut self) {
1 for worker in &mut self.workers {
2 println!("Shutting down worker {}", worker.id);
3 worker.thread.join().unwrap();
}
}
}
清单20-22:当线程池超出作用域时,让每个线程调用 join
首先,我们遍历线程池中的每个 worker
[1]。这里我们使用 &mut
,因为 self
是一个可变引用,并且我们还需要能够修改 worker
。对于每个 worker
,我们打印一条消息,表明这个特定的 Worker
实例正在关闭 [2],然后我们在该 Worker
实例的线程上调用 join
[3]。如果对 join
的调用失败,我们使用 unwrap
使 Rust 发生恐慌并进入非优雅关闭状态。
当我们编译这段代码时,会得到以下错误:
error[E0507]: cannot move out of `worker.thread` which is behind a mutable
reference
--> src/lib.rs:52:13
|
52 | worker.thread.join().unwrap();
| ^^^^^^^^^^^^^ ------ `worker.thread` moved due to this
method call
| |
| move occurs because `worker.thread` has type
`JoinHandle<()>`, which does not implement the `Copy` trait
|
note: this function takes ownership of the receiver `self`, which moves
`worker.thread`
错误提示我们不能调用 join
,因为我们对每个 worker
只有一个可变借用,而 join
会获取其参数的所有权。为了解决这个问题,我们需要将线程从拥有 thread
的 Worker
实例中移出,这样 join
才能消耗该线程。我们在清单17-15中就是这么做的:如果 Worker
持有一个 Option<thread::JoinHandle<()>>
,而不是其他类型,我们就可以在 Option
上调用 take
方法,将值从 Some
变体中移出,并在其位置留下一个 None
变体。换句话说,正在运行的 Worker
在 thread
中会有一个 Some
变体,当我们想要清理一个 Worker
时,我们会用 None
替换 Some
,这样 Worker
就没有线程可运行了。
所以我们知道需要像这样更新 Worker
的定义:
文件名:src/lib.rs
struct Worker {
id: usize,
thread: Option<thread::JoinHandle<()>>,
}
现在让我们依靠编译器来找到其他需要更改的地方。检查这段代码时,我们得到两个错误:
error[E0599]: no method named `join` found for enum `Option` in the current
scope
--> src/lib.rs:52:27
|
52 | worker.thread.join().unwrap();
| ^^^^ method not found in
`Option<JoinHandle<()>>`
error[E0308]: mismatched types
--> src/lib.rs:72:22
|
72 | Worker { id, thread }
| ^^^^^^ expected enum `Option`, found struct
`JoinHandle`
|
= note: expected enum `Option<JoinHandle<()>>`
found struct `JoinHandle<_>`
help: try wrapping the expression in `Some`
|
72 | Worker { id, thread: Some(thread) }
| +++++++++++++ +
让我们先解决第二个错误,它指向 Worker::new
末尾的代码;当我们创建一个新的 Worker
时,需要将 thread
值包装在 Some
中。进行以下更改以修复此错误:
文件名:src/lib.rs
impl Worker {
fn new(
id: usize,
receiver: Arc<Mutex<mpsc::Receiver<Job>>>,
) -> Worker {
--snip--
Worker {
id,
thread: Some(thread),
}
}
}
第一个错误出现在我们的 Drop
实现中。我们之前提到过,我们打算在 Option
值上调用 take
以将 thread
从 worker
中移出。以下更改将实现这一点:
文件名:src/lib.rs
impl Drop for ThreadPool {
fn drop(&mut self) {
for worker in &mut self.workers {
println!("Shutting down worker {}", worker.id);
1 if let Some(thread) = worker.thread.take() {
2 thread.join().unwrap();
}
}
}
}
如第17章所述,Option
上的 take
方法会取出 Some
变体并在其位置留下 None
。我们使用 if let
来解构 Some
并获取线程 [1];然后我们在线程上调用 join
[2]。如果一个 Worker
实例的线程已经是 None
,我们知道该 Worker
的线程已经被清理,所以在这种情况下什么也不会发生。
经过我们所做的所有更改,代码可以编译通过且没有任何警告。然而,坏消息是这段代码目前的运行方式并非我们期望的那样。关键在于 Worker
实例的线程所运行的闭包中的逻辑:目前,我们调用了 join
,但这并不会关闭线程,因为它们会永远在 loop
中寻找任务。如果我们尝试使用当前的 drop
实现来丢弃我们的 ThreadPool
,主线程将会永远阻塞,等待第一个线程完成。
为了解决这个问题,我们需要对 ThreadPool
的 drop
实现进行更改,然后再更改 Worker
中的循环。
首先,我们将更改 ThreadPool
的 drop
实现,以便在等待线程完成之前显式地丢弃 sender
。清单20-23展示了对 ThreadPool
进行的更改,以显式地丢弃 sender
。我们使用与处理线程时相同的 Option
和 take
技术,以便能够将 sender
从 ThreadPool
中移出。
文件名:src/lib.rs
pub struct ThreadPool {
workers: Vec<Worker>,
sender: Option<mpsc::Sender<Job>>,
}
--snip--
impl ThreadPool {
pub fn new(size: usize) -> ThreadPool {
--snip--
ThreadPool {
workers,
sender: Some(sender),
}
}
pub fn execute<F>(&self, f: F)
where
F: FnOnce() + Send + 'static,
{
let job = Box::new(f);
self.sender
.as_ref()
.unwrap()
.send(job)
.unwrap();
}
}
impl Drop for ThreadPool {
fn drop(&mut self) {
1 drop(self.sender.take());
for worker in &mut self.workers {
println!("Shutting down worker {}", worker.id);
if let Some(thread) = worker.thread.take() {
thread.join().unwrap();
}
}
}
}
清单20-23:在加入 Worker
线程之前显式丢弃 sender
丢弃 sender
[1] 会关闭通道,这表明不会再发送更多消息。当这种情况发生时,Worker
实例在无限循环中对 recv
的所有调用都将返回一个错误。在清单20-24中,我们更改了 Worker
中的循环,以便在这种情况下优雅地退出循环,这意味着当 ThreadPool
的 drop
实现对它们调用 join
时,线程将会完成。
文件名:src/lib.rs
impl Worker {
fn new(
id: usize,
receiver: Arc<Mutex<mpsc::Receiver<Job>>>,
) -> Worker {
let thread = thread::spawn(move || loop {
let message = receiver.lock().unwrap().recv();
match message {
Ok(job) => {
println!(
"Worker {id} got a job; executing."
);
job();
}
Err(_) => {
println!(
"Worker {id} shutting down."
);
break;
}
}
});
Worker {
id,
thread: Some(thread),
}
}
}
清单20-24:当 recv
返回错误时显式跳出循环
为了看到这段代码的实际运行情况,让我们修改 main
函数,使其在优雅地关闭服务器之前只接受两个请求,如清单20-25所示。
文件名:src/main.rs
fn main() {
let listener = TcpListener::bind("127.0.0.1:7878").unwrap();
let pool = ThreadPool::new(4);
for stream in listener.incoming().take(2) {
let stream = stream.unwrap();
pool.execute(|| {
handle_connection(stream);
});
}
println!("Shutting down.");
}
清单20-25:通过退出循环在处理两个请求后关闭服务器
在实际的Web服务器中,你肯定不希望它只处理两个请求就关闭。这段代码只是为了演示优雅关闭和清理功能是否正常工作。
take
方法是在 Iterator
特性中定义的,它最多将迭代限制为前两个项。ThreadPool
将在 main
函数结束时超出作用域,然后 drop
实现将会运行。
使用 cargo run
启动服务器,并发送三个请求。第三个请求应该会出错,在你的终端中你应该会看到类似这样的输出:
$ cargo run
Compiling hello v0.1.0 (file:///projects/hello)
Finished dev [unoptimized + debuginfo] target(s) in 1.0s
Running `target/debug/hello`
Worker 0 got a job; executing.
Shutting down.
Shutting down worker 0
Worker 3 got a job; executing.
Worker 1 disconnected; shutting down.
Worker 2 disconnected; shutting down.
Worker 3 disconnected; shutting down.
Worker 0 disconnected; shutting down.
Shutting down worker 1
Shutting down worker 2
Shutting down worker 3
你可能会看到不同的 Worker
ID 和打印消息的顺序。从这些消息中我们可以看出这段代码的工作方式:Worker
实例0和3收到了前两个请求。在第二个连接之后,服务器停止接受连接,并且 ThreadPool
上的 Drop
实现甚至在 Worker
3开始其任务之前就开始执行。丢弃 sender
会断开所有 Worker
实例的连接,并告诉它们关闭。每个 Worker
实例在断开连接时都会打印一条消息,然后线程池调用 join
来等待每个 Worker
线程完成。
注意这个特定执行过程中的一个有趣方面:ThreadPool
丢弃了 sender
,并且在任何 Worker
收到错误之前,我们尝试加入 Worker
0。Worker
0还没有从 recv
收到错误,所以主线程阻塞,等待 Worker
0完成。与此同时,Worker
3收到了一个任务,然后所有线程都收到了一个错误。当 Worker
0完成时,主线程等待其余的 Worker
实例完成。此时,它们都已经退出了循环并停止了。
恭喜!我们现在已经完成了我们的项目;我们有一个基本的Web服务器,它使用线程池来异步响应。我们能够对服务器进行优雅关闭,这会清理线程池中的所有线程。请访问 https://www.nostarch.com/Rust2021 下载本章的完整代码以供参考。
我们还可以做更多的事情!如果你想继续改进这个项目,这里有一些想法:
ThreadPool
及其公共方法添加更多文档。unwrap
的调用更改为更健壮的错误处理。ThreadPool
来执行除服务Web请求之外的其他任务。恭喜你!你已经完成了「优雅关闭与清理」实验。你可以在 LabEx 中练习更多实验来提升你的技能。