Использование передачи сообщений для передачи данных между потоками
Все более популярным подходом к обеспечению безопасной параллелизации является передача сообщений, при которой потоки или актёры общаются, отправляя друг другу сообщения, содержащие данные. Вот идея в слогане из документации по Go-языку на https://golang.org/doc/effective_go.html#concurrency: "Не общайтесь, делясь памятью; вместо этого делитесь памятью, обмениваясь сообщениями".
Для реализации параллелизации с передачей сообщений стандартная библиотека Rust предоставляет реализацию каналов. Канал - это общее концепция программирования, с помощью которой данные передаются из одного потока в другой.
Вы можете представить канал в программировании как направленный канал воды, такой как ручей или река. Если вы положите что-то, типа резиновой утки, в реку, она будет двигаться вниз по течению до конца водоёма.
Канал имеет две половины: отправитель и получатель. Часть отправителя - это то место вверх по течению, где вы кладёте резиновую утку в реку, а часть получателя - это то место, где резиновая утка оканчивается внизу по течению. Одна часть вашего кода вызывает методы на отправителе с данными, которые вы хотите отправить, а другая часть проверяет приемник на наличие прибывших сообщений. Канал считается закрытым, если либо отправитель, либо получатель удаляется.
Здесь мы напишем программу, которая имеет один поток для генерации значений и отправки их по каналу, и другой поток, который будет получать значения и выводить их на печать. Мы будем отправлять простые значения между потоками с помощью канала, чтобы продемонстрировать эту функцию. Когда вы будете знакомы с этой техникой, вы можете использовать каналы для любых потоков, которые должны взаимодействовать друг с другом, например, для чат-системы или системы, где несколько потоков выполняют части вычисления и отправляют части одному потоку, который агрегирует результаты.
Сначала, в Listing 16-6, мы создадим канал, но ничего с ним не сделаем. Обратите внимание, что это не скомпилируется, потому что Rust не может определить, какой тип значений мы хотим отправить по каналу.
Filename: src/main.rs
use std::sync::mpsc;
fn main() {
let (tx, rx) = mpsc::channel();
}
Listing 16-6: Создание канала и присвоение двух половин tx
и rx
Мы создаём новый канал с помощью функции mpsc::channel
; mpsc
означает множественные производители, одиночный потребитель. Короче говоря, то, как стандартная библиотека Rust реализует каналы, означает, что канал может иметь несколько отправляющих концов, которые генерируют значения, но только один принимающий конец, который потребляет эти значения. Представьте, что несколько ручьёв объединяются в одну большую реку: все, что отправляется по любому из ручьёв, в конце концов окажется в одной реке. Мы начнём с одного производителя на данный момент, но добавим несколько производителей, когда этот пример заработает.
Функция mpsc::channel
возвращает кортеж, первый элемент которого - это отправляющий конец - отправитель, а второй элемент - это принимающий конец - получатель. Аббревиатуры tx
и rx
традиционно используются в многих областях для отправителя и получателя соответственно, поэтому мы именуем наши переменные так, чтобы показать каждый конец. Мы используем let
-оператор с шаблоном, который разбирает кортежи; мы обсудим использование шаблонов в let
-операторах и разбор кортежей в главе 18. На данный момент просто запомните, что использование let
-оператора таким образом - это удобный способ извлечь части кортежа, возвращаемого функцией mpsc::channel
.
Переместим отправляющий конец в созданный поток и попросим его отправить одну строку, чтобы созданный поток общался с главным потоком, как показано в Listing 16-7. Это похоже на то, что вы кладёте резиновую утку в реку вверх по течению или отправляете сообщение в чат из одного потока в другой.
Filename: src/main.rs
use std::sync::mpsc;
use std::thread;
fn main() {
let (tx, rx) = mpsc::channel();
thread::spawn(move || {
let val = String::from("hi");
tx.send(val).unwrap();
});
}
Listing 16-7: Перемещение tx
в созданный поток и отправка "hi"
Опять же, мы используем thread::spawn
для создания нового потока, а затем используем move
, чтобы переместить tx
в замыкание, чтобы созданный поток владел tx
. Созданный поток должен владеть отправителем, чтобы иметь возможность отправлять сообщения по каналу.
Отправитель имеет метод send
, который принимает значение, которое мы хотим отправить. Метод send
возвращает тип Result<T, E>
, поэтому, если получатель уже был удалён и无处 отправить значение, операция отправки вернёт ошибку. В этом примере мы вызываем unwrap
, чтобы прервать выполнение программы в случае ошибки. Но в настоящем приложении мы бы обработали это правильно: обратитесь к главе 9, чтобы ознакомиться с стратегиями правильной обработки ошибок.
В Listing 16-8 мы получим значение из получателя в главном потоке. Это похоже на то, что вы извлекаете резиновую утку из воды в конце реки или получаете сообщение в чате.
Filename: src/main.rs
use std::sync::mpsc;
use std::thread;
fn main() {
let (tx, rx) = mpsc::channel();
thread::spawn(move || {
let val = String::from("hi");
tx.send(val).unwrap();
});
let received = rx.recv().unwrap();
println!("Got: {received}");
}
Listing 16-8: Получение значения "hi"
в главном потоке и вывод его на печать
Получатель имеет два полезных метода: recv
и try_recv
. Мы используем recv
, сокращение от receive, который блокирует выполнение главного потока и ждёт, пока значение не будет отправлено по каналу. Как только значение отправлено, recv
вернёт его в Result<T, E>
. Когда отправитель закрывается, recv
вернёт ошибку, чтобы сигнализировать, что больше значений не будут приходить.
Метод try_recv
не блокирует, а сразу же возвращает Result<T, E>
: значение Ok
, содержащее сообщение, если оно доступно, и значение Err
, если в этот раз нет сообщений. Использование try_recv
полезно, если этот поток имеет другие задачи, пока ждёт сообщений: мы могли бы написать цикл, который часто вызывает try_recv
, обрабатывает сообщение, если оно доступно, и в противном случае делает другую работу на некоторое время, пока не проверит снова.
Мы использовали recv
в этом примере для простоты; у главного потока нет других задач, кроме ожидания сообщений, поэтому блокировка главного потока вполне допустима.
Когда мы запустим код из Listing 16-8, мы увидим значение, выведенное из главного потока:
Got: hi
Великолепно!