使用 Rust 通道进行并发数据传输

Beginner

This tutorial is from open-source community. Access the source code

简介

欢迎来到「使用消息传递在线程间传输数据」实验。本实验是 Rust 程序设计语言 的一部分。你可以在 LabEx 中练习你的 Rust 技能。

在本实验中,我们将探索消息传递作为一种安全的并发方法,使用 Rust 标准库中的通道在线程之间发送和接收数据。

使用消息传递在线程间传输数据

一种越来越流行的确保安全并发的方法是「消息传递」,即线程或参与者通过相互发送包含数据的消息来进行通信。这里有一个来自 Go 语言文档(*https://golang.org/doc/effective_go.html#concurrency*)的口号来解释这个概念:“不要通过共享内存来通信;相反,通过通信来共享内存。”

为了实现消息发送并发,Rust 的标准库提供了「通道」的实现。通道是一种通用的编程概念,通过它可以将数据从一个线程发送到另一个线程。

你可以将编程中的通道想象成一个定向的水流通道,比如溪流或河流。如果你把像橡皮鸭这样的东西放入河中,它会顺流而下到达水道的尽头。

通道有两个部分:发送端和接收端。发送端是你将橡皮鸭放入河流的上游位置,接收端是橡皮鸭最终到达的下游位置。你的代码的一部分在发送端调用方法并传入你想要发送的数据,另一部分则在接收端检查到达的消息。如果发送端或接收端被丢弃,通道就被称为「关闭」。

在这里,我们将逐步构建一个程序,该程序有一个线程来生成值并通过通道发送它们,另一个线程将接收这些值并打印出来。我们将使用通道在线程之间发送简单的值来说明这个功能。一旦你熟悉了这个技术,你就可以将通道用于任何需要相互通信的线程,比如聊天系统或一个由多个线程执行计算的部分并将这些部分发送到一个汇总结果的线程的系统。

首先,在清单 16 - 6 中,我们将创建一个通道,但不对其进行任何操作。请注意,这还不能编译,因为 Rust 不知道我们想要通过通道发送什么类型的值。

文件名:src/main.rs

use std::sync::mpsc;

fn main() {
    let (tx, rx) = mpsc::channel();
}

清单 16 - 6:创建一个通道并将两个部分分配给 txrx

我们使用 mpsc::channel 函数创建一个新通道;mpsc 代表「多个生产者,单个消费者」。简而言之,Rust 标准库实现通道的方式意味着一个通道可以有多个「发送」端来生成值,但只有一个「接收」端来消费这些值。想象一下多条溪流汇聚成一条大河:从任何一条溪流中向下游发送的所有东西最终都会在大河的尽头汇集到一起。目前我们先从单个生产者开始,但当这个示例运行起来后,我们会添加多个生产者。

mpsc::channel 函数返回一个元组,其第一个元素是发送端——发射器,第二个元素是接收端——接收器。在许多领域中,传统上分别使用缩写 txrx 来表示「发射器」和「接收器」,所以我们这样命名我们的变量以表示每个端。我们使用带有模式的 let 语句来解构元组;我们将在第 18 章讨论 let 语句中模式的使用和解构。目前,只需知道以这种方式使用 let 语句是提取 mpsc::channel 返回的元组各部分的一种便捷方法。

让我们将发送端移动到一个新创建的线程中,并让它发送一个字符串,这样新创建的线程就可以与主线程进行通信,如清单 16 - 7 所示。这就好比把橡皮鸭放入上游的河中,或者从一个线程向另一个线程发送聊天消息。

文件名:src/main.rs

use std::sync::mpsc;
use std::thread;

fn main() {
    let (tx, rx) = mpsc::channel();

    thread::spawn(move || {
        let val = String::from("hi");
        tx.send(val).unwrap();
    });
}

清单 16 - 7:将 tx 移动到新创建的线程并发送 "hi"

同样,我们使用 thread::spawn 创建一个新线程,然后使用 movetx 移动到闭包中,这样新创建的线程就拥有了 tx。新创建的线程需要拥有发射器才能通过通道发送消息。

发射器有一个 send 方法,它接受我们想要发送的值。send 方法返回一个 Result<T, E> 类型,所以如果接收器已经被丢弃,并且无处发送值,发送操作将返回一个错误。在这个示例中,我们调用 unwrap 在发生错误时使程序恐慌。但在实际应用中,我们会正确地处理它:回到第 9 章复习正确处理错误的策略。

在清单 16 - 8 中,我们将在主线程中从接收器获取值。这就好比从河流尽头的水中取回橡皮鸭,或者接收聊天消息。

文件名:src/main.rs

use std::sync::mpsc;
use std::thread;

fn main() {
    let (tx, rx) = mpsc::channel();

    thread::spawn(move || {
        let val = String::from("hi");
        tx.send(val).unwrap();
    });

    let received = rx.recv().unwrap();
    println!("Got: {received}");
}

清单 16 - 8:在主线程中接收值 "hi" 并打印它

接收器有两个有用的方法:recvtry_recv。我们使用的是 recv,即「接收」的缩写,它会阻塞主线程的执行并等待,直到有值通过通道发送过来。一旦有值发送过来,recv 将在 Result<T, E> 中返回它。当发射器关闭时,recv 将返回一个错误以表示不会再有值到来。

try_recv 方法不会阻塞,而是会立即返回一个 Result<T, E>:如果有可用消息,Ok 值中会包含该消息;如果这次没有消息,返回 Err 值。如果这个线程在等待消息时还有其他工作要做,使用 try_recv 会很有用:我们可以编写一个循环,每隔一段时间调用 try_recv,如果有可用消息就处理它,否则在再次检查之前做一些其他工作。

为了简单起见,我们在这个示例中使用了 recv;除了等待消息之外,主线程没有其他工作要做,所以阻塞主线程是合适的。

当我们运行清单 16 - 8 中的代码时,我们会看到主线程打印出的值:

Got: hi

完美!

通道与所有权转移

所有权规则在消息发送中起着至关重要的作用,因为它们有助于你编写安全的并发代码。在整个 Rust 程序中考虑所有权是防止并发编程错误的优势。让我们做一个实验来展示通道和所有权是如何协同工作以防止问题的:我们将尝试在通过通道发送 val 值之后,在新创建的线程中使用它。尝试编译清单 16 - 9 中的代码,看看为什么这段代码不被允许。

文件名:src/main.rs

use std::sync::mpsc;
use std::thread;

fn main() {
    let (tx, rx) = mpsc::channel();

    thread::spawn(move || {
        let val = String::from("hi");
        tx.send(val).unwrap();
        println!("val is {val}");
    });

    let received = rx.recv().unwrap();
    println!("Got: {received}");
}

清单 16 - 9:在通过通道发送 val 之后尝试使用它

在这里,我们在通过 tx.sendval 发送到通道之后,尝试打印 val。允许这样做会是个坏主意:一旦值被发送到另一个线程,在我们再次尝试使用该值之前,那个线程可能会修改或丢弃它。潜在地,其他线程的修改可能会由于数据不一致或不存在而导致错误或意外结果。然而,如果我们尝试编译清单 16 - 9 中的代码,Rust 会给我们一个错误:

error[E0382]: borrow of moved value: `val`
  --> src/main.rs:10:31
   |
8  |         let val = String::from("hi");
   |             --- move occurs because `val` has type `String`, which does
not implement the `Copy` trait
9  |         tx.send(val).unwrap();
   |                 --- value moved here
10 |         println!("val is {val}");
   |                           ^^^ value borrowed here after move

我们的并发错误导致了一个编译时错误。send 函数获取其参数的所有权,并且当值被移动时,接收者获取它的所有权。这阻止了我们在发送值之后意外地再次使用它;所有权系统会检查一切是否正常。

发送多个值并观察接收者的等待状态

清单 16 - 8 中的代码能够编译并运行,但它并没有清晰地向我们展示两个独立的线程是如何通过通道进行通信的。在清单 16 - 10 中,我们做了一些修改,以证明清单 16 - 8 中的代码是在并发运行:新创建的线程现在将发送多个消息,并且在每条消息之间暂停一秒。

文件名:src/main.rs

use std::sync::mpsc;
use std::thread;
use std::time::Duration;

fn main() {
    let (tx, rx) = mpsc::channel();

    thread::spawn(move || {
        let vals = vec![
            String::from("hi"),
            String::from("from"),
            String::from("the"),
            String::from("thread"),
        ];

        for val in vals {
            tx.send(val).unwrap();
            thread::sleep(Duration::from_secs(1));
        }
    });

    for received in rx {
        println!("Got: {received}");
    }
}

清单 16 - 10:发送多个消息并在每条消息之间暂停

这一次,新创建的线程有一个字符串向量,我们想要将其发送到主线程。我们遍历这些字符串,逐个发送它们,并通过调用 thread::sleep 函数,传入一秒的 Duration 值,在每条消息之间暂停。

在主线程中,我们不再显式调用 recv 函数:相反,我们将 rx 当作一个迭代器来处理。对于接收到的每个值,我们都将其打印出来。当通道关闭时,迭代将会结束。

当运行清单 16 - 10 中的代码时,你应该会看到以下输出,并且每行之间会有一秒的停顿:

Got: hi
Got: from
Got: the
Got: thread

由于在主线程的 for 循环中我们没有任何暂停或延迟的代码,所以我们可以判断主线程正在等待从新创建的线程接收值。

通过克隆发送端创建多个生产者

之前我们提到过 mpsc 是「多个生产者,单个消费者」的首字母缩写。现在让我们使用 mpsc 并扩展清单 16 - 10 中的代码,来创建多个线程,这些线程都向同一个接收端发送值。我们可以通过克隆发送端来实现这一点,如清单 16 - 11 所示。

文件名:src/main.rs

--snip--

let (tx, rx) = mpsc::channel();

let tx1 = tx.clone();
thread::spawn(move || {
    let vals = vec![
        String::from("hi"),
        String::from("from"),
        String::from("the"),
        String::from("thread"),
    ];

    for val in vals {
        tx1.send(val).unwrap();
        thread::sleep(Duration::from_secs(1));
    }
});

thread::spawn(move || {
    let vals = vec![
        String::from("more"),
        String::from("messages"),
        String::from("for"),
        String::from("you"),
    ];

    for val in vals {
        tx.send(val).unwrap();
        thread::sleep(Duration::from_secs(1));
    }
});

for received in rx {
    println!("Got: {received}");
}

--snip--

清单 16 - 11:从多个生产者发送多个消息

这一次,在我们创建第一个新线程之前,我们对发送端调用 clone。这将给我们一个新的发送端,我们可以将其传递给第一个新线程。我们将原始发送端传递给第二个新线程。这样我们就有了两个线程,每个线程都向同一个接收端发送不同的消息。

当你运行这段代码时,输出可能如下所示:

Got: hi
Got: more
Got: from
Got: messages
Got: for
Got: the
Got: thread
Got: you

根据你的系统,你可能会看到值以另一种顺序出现。这就是并发既有趣又困难的地方。如果你尝试使用 thread::sleep,在不同线程中给它设置不同的值,那么每次运行都会更加不可预测,并且每次都会产生不同的输出。

既然我们已经了解了通道的工作原理,接下来让我们看看另一种并发方法。

总结

恭喜你!你已经完成了「使用消息传递在线程间传输数据」实验。你可以在 LabEx 中练习更多实验来提升你的技能。