Rust 多线程服务器开发

Beginner

This tutorial is from open-source community. Access the source code

简介

欢迎来到「将我们的单线程服务器转换为多线程服务器」实验。本实验是 Rust 程序设计语言 的一部分。你可以在 LabEx 中练习你的 Rust 技能。

在本实验中,我们将把单线程服务器转换为多线程服务器,以提高其同时处理多个请求的效率。

将我们的单线程服务器转换为多线程服务器

目前,服务器会依次处理每个请求,这意味着在第一个请求处理完成之前,它不会处理第二个连接。如果服务器接收到越来越多的请求,这种串行执行的效率会越来越低。如果服务器接收到一个需要很长时间处理的请求,后续的请求将不得不等待,直到长请求完成,即使新请求可以快速处理。我们需要解决这个问题,但首先我们来看看实际中的问题。

模拟一个耗时请求

我们将看看一个处理缓慢的请求如何影响对当前服务器实现发出的其他请求。清单 20-10 实现了对/sleep的请求处理,它带有一个模拟的缓慢响应,这会导致服务器在响应之前休眠五秒钟。

文件名:src/main.rs

use std::{
    fs,
    io::{prelude::*, BufReader},
    net::{TcpListener, TcpStream},
    thread,
    time::Duration,
};
--snip--

fn handle_connection(mut stream: TcpStream) {
    --snip--

    let (status_line, filename) = 1 match &request_line[..] {
      2 "GET / HTTP/1.1" => ("HTTP/1.1 200 OK", "hello.html"),
      3 "GET /sleep HTTP/1.1" => {
            thread::sleep(Duration::from_secs(5));
            ("HTTP/1.1 200 OK", "hello.html")
        }
      4 _ => ("HTTP/1.1 404 NOT FOUND", "404.html"),
    };

    --snip--
}

清单 20-10:通过休眠五秒钟来模拟一个耗时请求

既然我们有三种情况,我们就从if切换到了match[1]。我们需要显式地匹配request_line的切片,以便与字符串字面量值进行模式匹配;match不像相等方法那样进行自动引用和解引用。

第一个分支[2]与清单 20-9 中的if块相同。第二个分支[3]匹配对/sleep的请求。当接收到该请求时,服务器将休眠五秒钟,然后再渲染成功的 HTML 页面。第三个分支[4]与清单 20-9 中的else块相同。

你可以看到我们的服务器是多么原始:真正的库会以一种简洁得多的方式处理多个请求的识别!

使用cargo run启动服务器。然后打开两个浏览器窗口:一个用于访问http://127.0.0.1:7878,另一个用于访问*http://127.0.0.1:7878/sleep*。如果你像之前一样多次输入`/_` URI,你会看到它响应得很快。但是如果你输入/sleep,然后再加载/_,你会看到/_会一直等到sleep休眠满五秒钟后才加载。

我们可以使用多种技术来避免请求在一个耗时请求之后排队等待;我们将实现的是一个线程池。

使用线程池提高吞吐量

线程池是一组已生成的线程,它们正在等待并准备好处理任务。当程序接收到一个新任务时,它会从线程池中分配一个线程来处理该任务,而该线程将处理这个任务。在线程处理第一个任务时,线程池中的其余线程可用于处理任何其他进来的任务。当第一个线程完成其任务处理后,它会返回到空闲线程池中,准备处理新任务。线程池允许你并发处理连接,从而提高服务器的吞吐量。

我们会将线程池中的线程数量限制为一个较小的数字,以防范拒绝服务(DoS)攻击;如果我们让程序在每个请求进来时都创建一个新线程,那么向我们的服务器发送一千万个请求的人可能会耗尽我们服务器的所有资源,使请求处理陷入停顿,从而造成严重破坏。

因此,我们不会生成无限制的线程,而是让固定数量的线程在线程池中等待。进来的请求会被发送到线程池进行处理。线程池将维护一个传入请求的队列。线程池中的每个线程都会从这个队列中取出一个请求,处理该请求,然后再向队列请求另一个请求。通过这种设计,我们可以并发处理多达 N 个请求,其中 N 是线程的数量。如果每个线程都在响应一个长时间运行的请求,后续请求仍可能在队列中排队,但在达到这一点之前,我们已经增加了可以处理的长时间运行请求的数量。

这种技术只是提高 Web 服务器吞吐量的众多方法之一。你可能会探索的其他选项包括 fork/join 模型、单线程异步 I/O 模型和多线程异步 I/O 模型。如果你对这个主题感兴趣,可以阅读更多关于其他解决方案的内容并尝试实现它们;使用像 Rust 这样的低级语言,所有这些选项都是可行的。

在我们开始实现线程池之前,让我们先讨论一下使用线程池应该是什么样子。当你试图设计代码时,先编写客户端接口可以帮助指导你的设计。编写代码的 API,使其结构符合你想要调用它的方式;然后在该结构内实现功能,而不是先实现功能然后再设计公共 API。

类似于我们在第 12 章的项目中使用测试驱动开发的方式,我们在这里将使用编译器驱动开发。我们将编写调用我们想要的函数的代码,然后查看编译器的错误,以确定接下来我们应该更改什么以使代码能够运行。然而,在我们这样做之前,我们将先探讨一下我们不会使用的技术作为起点。

为每个请求生成一个线程

首先,让我们探讨一下如果代码真的为每个连接都创建一个新线程,代码会是什么样子。如前所述,由于可能会生成无限制数量的线程存在问题,这不是我们的最终方案,但这是首先获得一个可运行的多线程服务器的起点。然后我们将添加线程池作为改进,对比这两种解决方案会更容易。

清单 20-11 展示了对main函数所做的更改,以便在for循环中为每个流生成一个新线程来处理。

文件名:src/main.rs

fn main() {
    let listener = TcpListener::bind("127.0.0.1:7878").unwrap();

    for stream in listener.incoming() {
        let stream = stream.unwrap();

        thread::spawn(|| {
            handle_connection(stream);
        });
    }
}

清单 20-11:为每个流生成一个新线程

正如你在第 16 章中学到的,thread::spawn会创建一个新线程,然后在新线程中运行闭包中的代码。如果你运行这段代码,并在浏览器中加载/sleep,然后再在另外两个浏览器标签中加载/_,你确实会看到对/_的请求不必等待/sleep完成。然而,正如我们所提到的,这最终会使系统不堪重负,因为你会无限制地创建新线程。

创建有限数量的线程

我们希望我们的线程池以一种类似且熟悉的方式工作,这样从线程切换到线程池时,使用我们 API 的代码无需进行大量更改。清单 20-12 展示了我们想要使用的ThreadPool结构体的假设接口,以此来替代thread::spawn

文件名:src/main.rs

fn main() {
    let listener = TcpListener::bind("127.0.0.1:7878").unwrap();
  1 let pool = ThreadPool::new(4);

    for stream in listener.incoming() {
        let stream = stream.unwrap();

      2 pool.execute(|| {
            handle_connection(stream);
        });
    }
}

清单 20-12:我们理想中的ThreadPool接口

我们使用ThreadPool::new来创建一个新的线程池,其线程数量可配置,在这种情况下是四个[1]。然后,在for循环中,pool.execute具有与thread::spawn类似的接口,即它接受一个闭包,线程池会为每个流运行该闭包[2]。我们需要实现pool.execute,使其接受闭包并将其交给线程池中的一个线程来运行。这段代码目前还无法编译,但我们会尝试一下,以便编译器能指导我们如何修复它。

使用编译器驱动开发构建线程池

按照清单 20-12 对src/main.rs进行更改,然后让我们利用cargo check产生的编译器错误来推动我们的开发。这是我们得到的第一个错误:

$ cargo check
    Checking hello v0.1.0 (file:///projects/hello)
error[E0433]: failed to resolve: use of undeclared type `ThreadPool`
  --> src/main.rs:11:16
   |
11 |     let pool = ThreadPool::new(4);
   |                ^^^^^^^^^^ use of undeclared type `ThreadPool`

太棒了!这个错误告诉我们需要一个ThreadPool类型或模块,所以我们现在就来构建一个。我们对ThreadPool的实现将独立于我们的 Web 服务器所做的工作类型。所以让我们把hello crate 从一个二进制 crate 切换为一个库 crate,来存放我们对ThreadPool的实现。在我们切换到库 crate 之后,我们也可以将这个单独的线程池库用于任何我们想使用线程池来做的工作,而不仅仅是用于提供 Web 请求服务。

创建一个src/lib.rs文件,其中包含以下内容,这是我们目前能有的ThreadPool结构体的最简单定义:

文件名:src/lib.rs

pub struct ThreadPool;

然后编辑main.rs文件,通过在src/main.rs的顶部添加以下代码,将ThreadPool从库 crate 引入作用域:

文件名:src/main.rs

use hello::ThreadPool;

这段代码仍然无法工作,但让我们再次检查它,以得到下一个我们需要解决的错误:

$ cargo check
    Checking hello v0.1.0 (file:///projects/hello)
error[E0599]: no function or associated item named `new` found for struct
`ThreadPool` in the current scope
  --> src/main.rs:12:28
   |
12 |     let pool = ThreadPool::new(4);
   |                            ^^^ function or associated item not found in
`ThreadPool`

这个错误表明接下来我们需要为ThreadPool创建一个名为new的关联函数。我们也知道new需要有一个能接受4作为参数的参数,并且应该返回一个ThreadPool实例。让我们实现一个具有这些特性的最简单的new函数:

文件名:src/lib.rs

pub struct ThreadPool;

impl ThreadPool {
    pub fn new(size: usize) -> ThreadPool {
        ThreadPool
    }
}

我们选择usize作为size参数的类型,因为我们知道负数个线程是没有意义的。我们也知道我们会将这个4用作线程集合中的元素数量,正如在“整数类型”中所讨论的,这就是usize类型的用途。

让我们再次检查代码:

$ cargo check
    Checking hello v0.1.0 (file:///projects/hello)
error[E0599]: no method named `execute` found for struct `ThreadPool` in the
current scope
  --> src/main.rs:17:14
   |
17 |         pool.execute(|| {
   |              ^^^^^^^ method not found in `ThreadPool`

现在出现错误是因为我们在ThreadPool上没有execute方法。回想一下“创建有限数量的线程”,我们决定我们的线程池应该有一个类似于thread::spawn的接口。此外,我们将实现execute函数,使其接受给定的闭包,并将其交给线程池中的一个空闲线程来运行。

我们将在ThreadPool上定义execute方法,使其接受一个闭包作为参数。回想一下“将捕获的值移出闭包和 Fn 特质”,我们可以使用三种不同的特质将闭包作为参数:FnFnMutFnOnce。我们需要决定在这里使用哪种类型的闭包。我们知道我们最终会做一些类似于标准库thread::spawn实现的事情,所以我们可以看看thread::spawn的签名对其参数有哪些约束。文档向我们展示了以下内容:

pub fn spawn<F, T>(f: F) -> JoinHandle<T>
    where
        F: FnOnce() -> T,
        F: Send + 'static,
        T: Send + 'static,

这里我们关心的是F类型参数;T类型参数与返回值有关,我们不关心那个。我们可以看到spawn使用FnOnce作为对F的特质约束。这可能也是我们想要的,因为我们最终会将在execute中得到的参数传递给spawn。我们可以更确定FnOnce是我们想要使用的特质,因为运行请求的线程只会执行该请求的闭包一次,这与FnOnce中的Once相匹配。

F类型参数还有特质约束Send和生命周期约束'static,这在我们的情况下很有用:我们需要Send来将闭包从一个线程转移到另一个线程,并且需要'static,因为我们不知道线程执行需要多长时间。让我们在ThreadPool上创建一个execute方法,它将接受一个具有这些约束的泛型参数F

文件名:src/lib.rs

impl ThreadPool {
    --snip--
    pub fn execute<F>(&self, f: F)
    where
        F: FnOnce() 1 + Send + 'static,
    {
    }
}

我们仍然在FnOnce之后使用()[1],因为这个FnOnce表示一个不接受参数并返回单元类型()的闭包。就像函数定义一样,签名中可以省略返回类型,但即使我们没有参数,我们仍然需要括号。

同样,这是execute方法的最简单实现:它什么都不做,但我们只是试图让我们的代码编译通过。让我们再次检查它:

$ cargo check
    Checking hello v0.1.0 (file:///projects/hello)
    Finished dev [unoptimized + debuginfo] target(s) in 0.24s

它编译通过了!但请注意,如果你尝试cargo run并在浏览器中发出请求,你会在浏览器中看到我们在本章开头看到的错误。我们的库实际上还没有调用传递给execute的闭包!

注意:你可能会听到关于像 Haskell 和 Rust 这样具有严格编译器的语言的一种说法,即“如果代码编译通过,它就可以工作”。但这种说法并不总是正确的。我们的项目编译通过了,但它实际上什么都没做!如果我们正在构建一个真实、完整的项目,这将是一个开始编写单元测试的好时机,以检查代码是否编译通过 并且 具有我们想要的行为。

验证 new 函数中线程数量的有效性

我们目前没有对newexecute函数的参数进行任何处理。让我们按照我们期望的行为来实现这些函数的主体。首先,来考虑一下new函数。之前我们为size参数选择了无符号类型,因为线程数量为负数的线程池是没有意义的。然而,线程数量为零的线程池同样没有意义,不过零是一个完全有效的usize值。我们将添加代码来检查size是否大于零,然后在返回ThreadPool实例之前,如果接收到零值就让程序恐慌,这可以通过使用assert!宏来实现,如清单 20-13 所示。

文件名:src/lib.rs

impl ThreadPool {
    /// 创建一个新的线程池。
    ///
    /// 大小是线程池中线程的数量。
    ///
  1 /// ## 恐慌
    ///
    /// 如果大小为零,`new` 函数将恐慌。
    pub fn new(size: usize) -> ThreadPool {
      2 assert!(size > 0);

        ThreadPool
    }

    --snip--
}

清单 20-13:实现ThreadPool::new,使其在size为零时恐慌

我们还使用文档注释为ThreadPool添加了一些文档。注意,我们遵循了良好的文档编写规范,添加了一个部分来指出我们的函数可能恐慌的情况[1],正如第 14 章所讨论的那样。尝试运行cargo doc --open并点击ThreadPool结构体,看看为new生成的文档是什么样子的!

我们也可以不像这里这样添加assert!宏[2],而是将new改为build,并像我们在清单 12-9 的 I/O 项目中对Config::build所做的那样返回一个Result。但在这种情况下,我们决定尝试创建一个没有任何线程的线程池应该是一个不可恢复的错误。如果你有雄心壮志,可以尝试编写一个具有以下签名的名为build的函数,与new函数进行比较:

pub fn build(
    size: usize
) -> Result<ThreadPool, PoolCreationError> {

创建用于存储线程的空间

既然我们已经有了一种方法来确定我们有有效的线程数量可以存储在池中,那么我们可以创建这些线程,并在返回结构体之前将它们存储在ThreadPool结构体中。但是我们如何“存储”一个线程呢?让我们再看一下thread::spawn的签名:

pub fn spawn<F, T>(f: F) -> JoinHandle<T>
    where
        F: FnOnce() -> T,
        F: Send + 'static,
        T: Send + 'static,

spawn函数返回一个JoinHandle<T>,其中T是闭包返回的类型。让我们也尝试使用JoinHandle,看看会发生什么。在我们的例子中,我们传递给线程池的闭包将处理连接并且不返回任何东西,所以T将是单元类型()

清单 20-14 中的代码将编译通过,但还没有创建任何线程。我们已经更改了ThreadPool的定义,使其持有一个thread::JoinHandle<()>实例的向量,用容量为size初始化该向量,设置了一个for循环,该循环将运行一些代码来创建线程,并返回一个包含这些线程的ThreadPool实例。

文件名:src/lib.rs

1 use std::thread;

pub struct ThreadPool {
  2 threads: Vec<thread::JoinHandle<()>>,
}

impl ThreadPool {
    --snip--
    pub fn new(size: usize) -> ThreadPool {
        assert!(size > 0);

      3 let mut threads = Vec::with_capacity(size);

        for _ in 0..size {
            // 创建一些线程并将它们存储在向量中
        }

        ThreadPool { threads }
    }
    --snip--
}

清单 20-14:为ThreadPool创建一个向量来存储线程

我们已经在库 crate 中引入了std::thread到作用域[1],因为我们在ThreadPool中使用thread::JoinHandle作为向量中元素的类型[2]。

一旦接收到有效的大小,我们的ThreadPool会创建一个新的向量,该向量可以容纳size个元素[3]。with_capacity函数执行与Vec::new相同的任务,但有一个重要的区别:它预先在向量中分配空间。因为我们知道我们需要在向量中存储size个元素,所以预先进行这种分配比使用Vec::new稍微高效一些,Vec::new会在插入元素时自行调整大小。

当你再次运行cargo check时,它应该会成功。

从线程池向线程发送代码

我们在清单 20-14 的for循环中留下了一条关于创建线程的注释。在这里,我们将看看如何实际创建线程。标准库提供了thread::spawn作为创建线程的一种方式,并且thread::spawn期望在创建线程时立即获得线程应该运行的一些代码。然而,在我们的例子中,我们想要创建线程并让它们等待我们稍后发送的代码。标准库中线程的实现没有包含任何这样做的方法;我们必须手动实现它。

我们将通过在线程池和线程之间引入一个新的数据结构来管理这种新行为,从而实现这种行为。我们将把这个数据结构称为工作线程(Worker),这在池化实现中是一个常用术语。工作线程获取需要运行的代码并在其线程中运行该代码。

想象一下在餐厅厨房工作的人:工人们等待顾客下单,然后他们负责接收这些订单并完成它们。

我们将在线程池中存储Worker结构体的实例,而不是存储JoinHandle<()>实例的向量。每个工作线程将存储一个JoinHandle<()>实例。然后我们将在Worker上实现一个方法,该方法将接受要运行的代码闭包并将其发送到已经在运行的线程进行执行。我们还将为每个工作线程赋予一个id,以便在日志记录或调试时我们可以区分线程池中的不同工作线程实例。

以下是创建线程池时将发生的新过程。在我们以这种方式设置好工作线程之后,我们将实现将闭包发送到线程的代码:

  1. 定义一个持有idJoinHandle<()>Worker结构体。
  2. 更改ThreadPool以持有Worker实例的向量。
  3. 定义一个Worker::new函数,该函数接受一个id编号并返回一个持有该id和使用空闭包创建的线程的Worker实例。
  4. ThreadPool::new中,使用for循环计数器生成一个id,使用该id创建一个新的工作线程,并将该工作线程存储在向量中。

如果你愿意接受挑战,可以在查看清单 20-15 中的代码之前自己尝试实现这些更改。

准备好了吗?以下是清单 20-15,其中展示了进行上述修改的一种方法。

文件名:src/lib.rs

use std::thread;

pub struct ThreadPool {
  1 workers: Vec<Worker>,
}

impl ThreadPool {
    --snip--
    pub fn new(size: usize) -> ThreadPool {
        assert!(size > 0);

        let mut workers = Vec::with_capacity(size);

      2 for id in 0..size {
          3 workers.push(Worker::new(id));
        }

        ThreadPool { workers }
    }
    --snip--
}

4 struct Worker {
    id: usize,
    thread: thread::JoinHandle<()>,
}

impl Worker {
  5 fn new(id: usize) -> Worker {
      6 let thread = thread::spawn(|| {});

        Worker { 7 id, 8 thread }
    }
}

清单 20-15:修改ThreadPool以持有Worker实例而不是直接持有线程

我们将ThreadPool上的字段名称从threads改为workers,因为现在它持有Worker实例而不是JoinHandle<()>实例[1]。我们在for循环中使用计数器[2]作为参数传递给Worker::new,并将每个新的工作线程存储在名为workers的向量中[3]。

外部代码(比如我们在src/main.rs中的服务器)不需要知道在线程池中使用Worker结构体的实现细节,所以我们将Worker结构体[4]及其new函数[5]设为私有。Worker::new函数使用我们给它的id[7],并存储一个通过使用空闭包创建新线程而得到的JoinHandle<()>实例[8],该闭包为[6]。

注意:如果操作系统因为没有足够的系统资源而无法创建线程,thread::spawn将会恐慌。这将导致我们整个服务器恐慌,即使某些线程的创建可能会成功。为了简单起见,这种行为是可以的,但在生产环境中的线程池实现中,你可能希望使用std::thread::Builder及其返回Resultspawn方法。

这段代码将编译通过,并将存储我们作为参数传递给ThreadPool::new的工作线程实例的数量。但我们仍然没有处理在execute中得到的闭包。接下来让我们看看如何处理它。

通过通道向线程发送请求

我们接下来要解决的问题是,传递给thread::spawn的闭包什么都不做。目前,我们在execute方法中得到了想要执行的闭包。但是我们需要在创建线程池期间创建每个Worker时,给thread::spawn一个要运行的闭包。

我们希望刚刚创建的Worker结构体从线程池中持有的队列中获取要运行的代码,并将该代码发送到其线程中运行。

我们在第 16 章中学到的通道——一种在两个线程之间进行通信的简单方式——非常适合这个用例。我们将使用一个通道作为任务队列,并且execute将从线程池向Worker实例发送一个任务,Worker实例再将任务发送到其线程。具体计划如下:

  1. 线程池将创建一个通道并持有发送端。
  2. 每个Worker将持有接收端。
  3. 我们将创建一个新的Job结构体,用于持有我们想要通过通道发送的闭包。
  4. execute方法将通过发送端发送它想要执行的任务。
  5. 在其线程中,Worker将遍历其接收端并执行它接收到的任何任务的闭包。

让我们首先在ThreadPool::new中创建一个通道,并将发送端保存在ThreadPool实例中,如清单 20-16 所示。目前Job结构体不持有任何内容,但它将是我们通过通道发送的项的类型。

文件名:src/lib.rs

use std::{sync::mpsc, thread};

pub struct ThreadPool {
    workers: Vec<Worker>,
    sender: mpsc::Sender<Job>,
}

struct Job;

impl ThreadPool {
    --snip--
    pub fn new(size: usize) -> ThreadPool {
        assert!(size > 0);

      1 let (sender, receiver) = mpsc::channel();

        let mut workers = Vec::with_capacity(size);

        for id in 0..size {
            workers.push(Worker::new(id));
        }

        ThreadPool { workers, 2 sender }
    }
    --snip--
}

清单 20-16:修改ThreadPool以存储用于传输Job实例的通道的发送端

ThreadPool::new中,我们创建了新的通道[1],并让线程池持有发送端[2]。这将成功编译。

让我们尝试在线程池创建通道时,将通道的接收端传递给每个Worker。我们知道我们希望在Worker实例创建的线程中使用接收端,所以我们将在闭包中引用receiver参数。清单 20-17 中的代码还不能完全编译通过。

文件名:src/lib.rs

impl ThreadPool {
    --snip--
    pub fn new(size: usize) -> ThreadPool {
        assert!(size > 0);

        let (sender, receiver) = mpsc::channel();

        let mut workers = Vec::with_capacity(size);

        for id in 0..size {
          1 workers.push(Worker::new(id, receiver));
        }

        ThreadPool { workers, sender }
    }
    --snip--
}

--snip--

impl Worker {
    fn new(id: usize, receiver: mpsc::Receiver<Job>) -> Worker {
        let thread = thread::spawn(|| {
          2 receiver;
        });

        Worker { id, thread }
    }
}

清单 20-17:将接收端传递给每个Worker

我们做了一些小而直接的更改:我们将接收端传递给Worker::new[1],然后在闭包中使用它[2]。

当我们尝试检查这段代码时,会得到以下错误:

$ cargo check
    Checking hello v0.1.0 (file:///projects/hello)
error[E0382]: use of moved value: `receiver`
  --> src/lib.rs:26:42
   |
21 |         let (sender, receiver) = mpsc::channel();
   |                      -------- move occurs because `receiver` has type
`std::sync::mpsc::Receiver<Job>`, which does not implement the `Copy` trait
...
26 |             workers.push(Worker::new(id, receiver));
   |                                          ^^^^^^^^ value moved here, in
previous iteration of loop

代码试图将receiver传递给多个Worker实例。这行不通,正如你从第 16 章中回忆的那样:Rust 提供的通道实现是多个生产者,单个消费者。这意味着我们不能仅仅克隆通道的消费端来修复这段代码。我们也不希望向多个消费者多次发送消息;我们希望有一个包含多个Worker实例的消息列表,这样每个消息只被处理一次。

此外,从通道队列中取出一个任务涉及到对receiver进行变异,所以线程需要一种安全的方式来共享和修改receiver;否则,我们可能会遇到竞争条件(如第 16 章所述)。

回忆一下第 16 章中讨论的线程安全智能指针:为了在多个线程之间共享所有权并允许线程变异值,我们需要使用Arc<Mutex<T>>Arc类型将允许多个Worker实例拥有接收端,而Mutex将确保一次只有一个Worker从接收端获取任务。清单 20-18 展示了我们需要做的更改。

文件名:src/lib.rs

use std::{
    sync::{mpsc, Arc, Mutex},
    thread,
};
--snip--

impl ThreadPool {
    --snip--
    pub fn new(size: usize) -> ThreadPool {
        assert!(size > 0);

        let (sender, receiver) = mpsc::channel();

      1 let receiver = Arc::new(Mutex::new(receiver));

        let mut workers = Vec::with_capacity(size);

        for id in 0..size {
            workers.push(
                Worker::new(id, Arc::clone(& 2 receiver))
            );
        }

        ThreadPool { workers, sender }
    }

    --snip--
}

--snip--

impl Worker {
    fn new(
        id: usize,
        receiver: Arc<Mutex<mpsc::Receiver<Job>>>,
    ) -> Worker {
        --snip--
    }
}

清单 20-18:使用ArcMutexWorker实例之间共享接收端

ThreadPool::new中,我们将接收端放入ArcMutex中[1]。对于每个新的Worker,我们克隆Arc以增加引用计数,这样Worker实例就可以共享接收端的所有权[2]。

通过这些更改,代码编译通过了!我们正在逐步实现目标!

实现 execute 方法

现在让我们最终在 ThreadPool 上实现 execute 方法。我们还将把 Job 从一个结构体改为一个类型别名,用于表示持有 execute 接收的闭包类型的 trait 对象。正如在“使用类型别名创建类型同义词”中所讨论的,类型别名使我们能够将长类型缩短,以便于使用。请看清单 20-19。

文件名:src/lib.rs

--snip--

type Job = Box<dyn FnOnce() + Send + 'static>;

impl ThreadPool {
    --snip--

    pub fn execute<F>(&self, f: F)
    where
        F: FnOnce() + Send + 'static,
    {
      1 let job = Box::new(f);

      2 self.sender.send(job).unwrap();
    }
}

--snip--

清单 20-19:为持有每个闭包的 Box 创建一个 Job 类型别名,然后通过通道发送任务

在使用 execute 中得到的闭包创建一个新的 Job 实例之后[1],我们将该任务通过通道的发送端发送出去[2]。对于 send 操作,我们调用 unwrap 来处理发送失败的情况。例如,如果我们停止所有线程的执行,这意味着接收端已经停止接收新消息,那么发送就可能会失败。目前,我们无法停止线程的执行:只要线程池存在,我们的线程就会继续执行。我们使用 unwrap 的原因是我们知道失败情况不会发生,但编译器并不知道这一点。

但我们还没有完全完成!在 Worker 中,传递给 thread::spawn 的闭包仍然只是引用通道的接收端。相反,我们需要闭包永远循环,向通道的接收端请求任务,并在收到任务时运行它。让我们对 Worker::new 进行清单 20-20 所示的更改。

文件名:src/lib.rs

--snip--

impl Worker {
    fn new(
        id: usize,
        receiver: Arc<Mutex<mpsc::Receiver<Job>>>,
    ) -> Worker {
        let thread = thread::spawn(move || loop {
            let job = receiver
              1.lock()
              2.unwrap()
              3.recv()
              4.unwrap();

            println!("Worker {id} got a job; executing.");

            job();
        });

        Worker { id, thread }
    }
}

清单 20-20:在 Worker 实例的线程中接收并执行任务

在这里,我们首先对 receiver 调用 lock 来获取互斥锁[1],然后调用 unwrap 以在出现任何错误时使程序恐慌[2]。如果互斥锁处于中毒状态,获取锁可能会失败,当其他线程在持有锁时恐慌而不是释放锁时,就会发生这种情况。在这种情况下,调用 unwrap 使这个线程恐慌是正确的操作。你可以随意将这个 unwrap 改为带有对你有意义的错误消息的 expect

如果我们获得了互斥锁,我们调用 recv 从通道接收一个 Job[3]。最后一个 unwrap 也会处理这里可能出现的任何错误[4],如果持有发送端的线程已经关闭,就可能会出现错误,类似于如果接收端关闭,send 方法会返回 Err 的情况。

recv 的调用会阻塞,所以如果还没有任务,当前线程将等待,直到有任务可用。Mutex<T> 确保一次只有一个 Worker 线程试图请求任务。

我们的线程池现在处于工作状态!运行 cargo run 并进行一些请求:

[object Object]

成功!我们现在有了一个可以异步执行连接的线程池。创建的线程永远不会超过四个,所以如果服务器收到大量请求,我们的系统不会过载。如果我们向 /sleep 发出请求,服务器将能够通过让另一个线程运行其他请求来处理它们。

注意:如果你同时在多个浏览器窗口中打开 /sleep,它们可能会以五秒的间隔依次加载。由于缓存原因,一些网页浏览器会顺序执行同一请求的多个实例。这个限制不是由我们的 Web 服务器造成的。

在学习了第 18 章中的 while let 循环之后,你可能想知道为什么我们没有像清单 20-21 那样编写 Worker 线程代码。

文件名:src/lib.rs

--snip--

impl Worker {
    fn new(
        id: usize,
        receiver: Arc<Mutex<mpsc::Receiver<Job>>>,
    ) -> Worker {
        let thread = thread::spawn(move || {
            while let Ok(job) = receiver.lock().unwrap().recv() {
                println!("Worker {id} got a job; executing.");

                job();
            }
        });

        Worker { id, thread }
    }
}

清单 20-21:使用 while letWorker::new 的替代实现

这段代码可以编译并运行,但不会产生期望的线程行为:一个缓慢的请求仍然会导致其他请求等待处理。原因有点微妙:Mutex 结构体没有公共的 unlock 方法,因为锁的所有权基于 lock 方法返回的 LockResult<MutexGuard<T>> 中的 MutexGuard<T> 的生命周期。在编译时,借用检查器可以强制执行这样的规则:除非我们持有锁,否则不能访问由 Mutex 保护的资源。然而,如果我们不注意 MutexGuard<T> 的生命周期,这种实现也可能导致锁被持有比预期更长的时间。

清单 20-20 中使用 let job = receiver.lock().unwrap().recv().unwrap(); 的代码之所以有效,是因为使用 let 时,等号右侧表达式中使用的任何临时值在 let 语句结束时会立即被丢弃。但是,while let(以及 if letmatch)直到相关块结束才会丢弃临时值。在清单 20-21 中,锁在调用 job() 的整个过程中都被持有,这意味着其他 Worker 实例无法接收任务。

总结

恭喜你!你已经完成了将我们的单线程服务器转换为多线程服务器的实验。你可以在 LabEx 中练习更多实验来提升你的技能。