优雅关闭与清理

RustRustBeginner
立即练习

This tutorial is from open-source community. Access the source code

💡 本教程由 AI 辅助翻译自英文原版。如需查看原文,您可以 切换至英文原版

简介

欢迎来到「优雅关闭与清理」实验。本实验是 Rust 程序设计语言 的一部分。你可以在 LabEx 中练习 Rust 技能。

在本实验中,我们将通过利用 Drop 特性并提供一种让线程停止接受新请求并关闭的方式,在代码中实现优雅关闭与清理机制。

优雅关闭与清理

如我们所愿,清单20-20中的代码通过使用线程池异步响应请求。我们收到了一些关于workersidthread字段的警告,这些字段我们没有直接使用,这提醒我们没有清理任何东西。当我们使用不太优雅的ctrl-C方法来停止主线程时,所有其他线程也会立即停止,即使它们正在处理请求的过程中。

接下来,我们将实现Drop特性,以便对线程池中的每个线程调用join,这样它们就可以在关闭之前完成正在处理的请求。然后,我们将实现一种方法来告诉线程它们应该停止接受新请求并关闭。为了看到这段代码的实际运行情况,我们将修改我们的服务器,使其在优雅地关闭线程池之前只接受两个请求。

在线程池上实现 Drop 特性

让我们从在线程池上实现 Drop 特性开始。当线程池被丢弃时,我们的线程应该全部调用 join 以确保它们完成工作。清单20-22展示了对 Drop 实现的首次尝试;这段代码目前还不能正常工作。

文件名:src/lib.rs

impl Drop for ThreadPool {
    fn drop(&mut self) {
      1 for worker in &mut self.workers {
          2 println!("Shutting down worker {}", worker.id);

          3 worker.thread.join().unwrap();
        }
    }
}

清单20-22:当线程池超出作用域时,让每个线程调用 join

首先,我们遍历线程池中的每个 worker [1]。这里我们使用 &mut,因为 self 是一个可变引用,并且我们还需要能够修改 worker。对于每个 worker,我们打印一条消息,表明这个特定的 Worker 实例正在关闭 [2],然后我们在该 Worker 实例的线程上调用 join [3]。如果对 join 的调用失败,我们使用 unwrap 使 Rust 发生恐慌并进入非优雅关闭状态。

当我们编译这段代码时,会得到以下错误:

error[E0507]: cannot move out of `worker.thread` which is behind a mutable
reference
    --> src/lib.rs:52:13
     |
52   |             worker.thread.join().unwrap();
     |             ^^^^^^^^^^^^^ ------ `worker.thread` moved due to this
method call
     |             |
     |             move occurs because `worker.thread` has type
`JoinHandle<()>`, which does not implement the `Copy` trait
     |
note: this function takes ownership of the receiver `self`, which moves
`worker.thread`

错误提示我们不能调用 join,因为我们对每个 worker 只有一个可变借用,而 join 会获取其参数的所有权。为了解决这个问题,我们需要将线程从拥有 threadWorker 实例中移出,这样 join 才能消耗该线程。我们在清单17-15中就是这么做的:如果 Worker 持有一个 Option<thread::JoinHandle<()>>,而不是其他类型,我们就可以在 Option 上调用 take 方法,将值从 Some 变体中移出,并在其位置留下一个 None 变体。换句话说,正在运行的 Workerthread 中会有一个 Some 变体,当我们想要清理一个 Worker 时,我们会用 None 替换 Some,这样 Worker 就没有线程可运行了。

所以我们知道需要像这样更新 Worker 的定义:

文件名:src/lib.rs

struct Worker {
    id: usize,
    thread: Option<thread::JoinHandle<()>>,
}

现在让我们依靠编译器来找到其他需要更改的地方。检查这段代码时,我们得到两个错误:

error[E0599]: no method named `join` found for enum `Option` in the current
scope
  --> src/lib.rs:52:27
   |
52 |             worker.thread.join().unwrap();
   |                           ^^^^ method not found in
`Option<JoinHandle<()>>`

error[E0308]: mismatched types
  --> src/lib.rs:72:22
   |
72 |         Worker { id, thread }
   |                      ^^^^^^ expected enum `Option`, found struct
`JoinHandle`
   |
   = note: expected enum `Option<JoinHandle<()>>`
            found struct `JoinHandle<_>`
help: try wrapping the expression in `Some`
   |
72 |         Worker { id, thread: Some(thread) }
   |                      +++++++++++++      +

让我们先解决第二个错误,它指向 Worker::new 末尾的代码;当我们创建一个新的 Worker 时,需要将 thread 值包装在 Some 中。进行以下更改以修复此错误:

文件名:src/lib.rs

impl Worker {
    fn new(
        id: usize,
        receiver: Arc<Mutex<mpsc::Receiver<Job>>>,
    ) -> Worker {
        --snip--

        Worker {
            id,
            thread: Some(thread),
        }
    }
}

第一个错误出现在我们的 Drop 实现中。我们之前提到过,我们打算在 Option 值上调用 take 以将 threadworker 中移出。以下更改将实现这一点:

文件名:src/lib.rs

impl Drop for ThreadPool {
    fn drop(&mut self) {
        for worker in &mut self.workers {
            println!("Shutting down worker {}", worker.id);

          1 if let Some(thread) = worker.thread.take() {
              2 thread.join().unwrap();
            }
        }
    }
}

如第17章所述,Option 上的 take 方法会取出 Some 变体并在其位置留下 None。我们使用 if let 来解构 Some 并获取线程 [1];然后我们在线程上调用 join [2]。如果一个 Worker 实例的线程已经是 None,我们知道该 Worker 的线程已经被清理,所以在这种情况下什么也不会发生。

向线程发送信号以停止监听任务

经过我们所做的所有更改,代码可以编译通过且没有任何警告。然而,坏消息是这段代码目前的运行方式并非我们期望的那样。关键在于 Worker 实例的线程所运行的闭包中的逻辑:目前,我们调用了 join,但这并不会关闭线程,因为它们会永远在 loop 中寻找任务。如果我们尝试使用当前的 drop 实现来丢弃我们的 ThreadPool,主线程将会永远阻塞,等待第一个线程完成。

为了解决这个问题,我们需要对 ThreadPooldrop 实现进行更改,然后再更改 Worker 中的循环。

首先,我们将更改 ThreadPooldrop 实现,以便在等待线程完成之前显式地丢弃 sender。清单20-23展示了对 ThreadPool 进行的更改,以显式地丢弃 sender。我们使用与处理线程时相同的 Optiontake 技术,以便能够将 senderThreadPool 中移出。

文件名:src/lib.rs

pub struct ThreadPool {
    workers: Vec<Worker>,
    sender: Option<mpsc::Sender<Job>>,
}
--snip--
impl ThreadPool {
    pub fn new(size: usize) -> ThreadPool {
        --snip--

        ThreadPool {
            workers,
            sender: Some(sender),
        }
    }

    pub fn execute<F>(&self, f: F)
    where
        F: FnOnce() + Send + 'static,
    {
        let job = Box::new(f);

        self.sender
           .as_ref()
           .unwrap()
           .send(job)
           .unwrap();
    }
}

impl Drop for ThreadPool {
    fn drop(&mut self) {
      1 drop(self.sender.take());

        for worker in &mut self.workers {
            println!("Shutting down worker {}", worker.id);

            if let Some(thread) = worker.thread.take() {
                thread.join().unwrap();
            }
        }
    }
}

清单20-23:在加入 Worker 线程之前显式丢弃 sender

丢弃 sender [1] 会关闭通道,这表明不会再发送更多消息。当这种情况发生时,Worker 实例在无限循环中对 recv 的所有调用都将返回一个错误。在清单20-24中,我们更改了 Worker 中的循环,以便在这种情况下优雅地退出循环,这意味着当 ThreadPooldrop 实现对它们调用 join 时,线程将会完成。

文件名:src/lib.rs

impl Worker {
    fn new(
        id: usize,
        receiver: Arc<Mutex<mpsc::Receiver<Job>>>,
    ) -> Worker {
        let thread = thread::spawn(move || loop {
            let message = receiver.lock().unwrap().recv();

            match message {
                Ok(job) => {
                    println!(
                        "Worker {id} got a job; executing."
                    );

                    job();
                }
                Err(_) => {
                    println!(
                        "Worker {id} shutting down."
                    );
                    break;
                }
            }
        });

        Worker {
            id,
            thread: Some(thread),
        }
    }
}

清单20-24:当 recv 返回错误时显式跳出循环

为了看到这段代码的实际运行情况,让我们修改 main 函数,使其在优雅地关闭服务器之前只接受两个请求,如清单20-25所示。

文件名:src/main.rs

fn main() {
    let listener = TcpListener::bind("127.0.0.1:7878").unwrap();
    let pool = ThreadPool::new(4);

    for stream in listener.incoming().take(2) {
        let stream = stream.unwrap();

        pool.execute(|| {
            handle_connection(stream);
        });
    }

    println!("Shutting down.");
}

清单20-25:通过退出循环在处理两个请求后关闭服务器

在实际的Web服务器中,你肯定不希望它只处理两个请求就关闭。这段代码只是为了演示优雅关闭和清理功能是否正常工作。

take 方法是在 Iterator 特性中定义的,它最多将迭代限制为前两个项。ThreadPool 将在 main 函数结束时超出作用域,然后 drop 实现将会运行。

使用 cargo run 启动服务器,并发送三个请求。第三个请求应该会出错,在你的终端中你应该会看到类似这样的输出:

$ cargo run
   Compiling hello v0.1.0 (file:///projects/hello)
    Finished dev [unoptimized + debuginfo] target(s) in 1.0s
     Running `target/debug/hello`
Worker 0 got a job; executing.
Shutting down.
Shutting down worker 0
Worker 3 got a job; executing.
Worker 1 disconnected; shutting down.
Worker 2 disconnected; shutting down.
Worker 3 disconnected; shutting down.
Worker 0 disconnected; shutting down.
Shutting down worker 1
Shutting down worker 2
Shutting down worker 3

你可能会看到不同的 Worker ID 和打印消息的顺序。从这些消息中我们可以看出这段代码的工作方式:Worker 实例0和3收到了前两个请求。在第二个连接之后,服务器停止接受连接,并且 ThreadPool 上的 Drop 实现甚至在 Worker 3开始其任务之前就开始执行。丢弃 sender 会断开所有 Worker 实例的连接,并告诉它们关闭。每个 Worker 实例在断开连接时都会打印一条消息,然后线程池调用 join 来等待每个 Worker 线程完成。

注意这个特定执行过程中的一个有趣方面:ThreadPool 丢弃了 sender,并且在任何 Worker 收到错误之前,我们尝试加入 Worker 0。Worker 0还没有从 recv 收到错误,所以主线程阻塞,等待 Worker 0完成。与此同时,Worker 3收到了一个任务,然后所有线程都收到了一个错误。当 Worker 0完成时,主线程等待其余的 Worker 实例完成。此时,它们都已经退出了循环并停止了。

恭喜!我们现在已经完成了我们的项目;我们有一个基本的Web服务器,它使用线程池来异步响应。我们能够对服务器进行优雅关闭,这会清理线程池中的所有线程。请访问 https://www.nostarch.com/Rust2021 下载本章的完整代码以供参考。

我们还可以做更多的事情!如果你想继续改进这个项目,这里有一些想法:

  • ThreadPool 及其公共方法添加更多文档。
  • 添加对库功能的测试。
  • 将对 unwrap 的调用更改为更健壮的错误处理。
  • 使用 ThreadPool 来执行除服务Web请求之外的其他任务。
  • https://crates.io 上找到一个线程池 crate,并使用该 crate 实现一个类似的Web服务器。然后将其API和健壮性与我们实现的线程池进行比较。

总结

恭喜你!你已经完成了「优雅关闭与清理」实验。你可以在 LabEx 中练习更多实验来提升你的技能。