Передача данных в параллельном режиме с использованием каналов Rust

RustRustBeginner
Практиковаться сейчас

This tutorial is from open-source community. Access the source code

💡 Этот учебник переведен с английского с помощью ИИ. Чтобы просмотреть оригинал, вы можете перейти на английский оригинал

Введение

Добро пожаловать в Использование передачи сообщений для передачи данных между потоками. Эта лабораторная работа является частью Rust Book. Вы можете практиковать свои навыки Rust в LabEx.

В этой лабораторной работе мы исследуем передачу сообщений как безопасный подход к параллелизации, используя каналы из стандартной библиотеки Rust для отправки и приема данных между потоками.


Skills Graph

%%%%{init: {'theme':'neutral'}}%%%% flowchart RL rust(("Rust")) -.-> rust/FunctionsandClosuresGroup(["Functions and Closures"]) rust(("Rust")) -.-> rust/DataStructuresandEnumsGroup(["Data Structures and Enums"]) rust(("Rust")) -.-> rust/PerformanceandConcurrencyGroup(["Performance and Concurrency"]) rust(("Rust")) -.-> rust/BasicConceptsGroup(["Basic Concepts"]) rust(("Rust")) -.-> rust/DataTypesGroup(["Data Types"]) rust(("Rust")) -.-> rust/ControlStructuresGroup(["Control Structures"]) rust/BasicConceptsGroup -.-> rust/variable_declarations("Variable Declarations") rust/DataTypesGroup -.-> rust/string_type("String Type") rust/ControlStructuresGroup -.-> rust/for_loop("for Loop") rust/FunctionsandClosuresGroup -.-> rust/function_syntax("Function Syntax") rust/FunctionsandClosuresGroup -.-> rust/expressions_statements("Expressions and Statements") rust/DataStructuresandEnumsGroup -.-> rust/method_syntax("Method Syntax") rust/PerformanceandConcurrencyGroup -.-> rust/message_passing("Message Passing") subgraph Lab Skills rust/variable_declarations -.-> lab-100438{{"Передача данных в параллельном режиме с использованием каналов Rust"}} rust/string_type -.-> lab-100438{{"Передача данных в параллельном режиме с использованием каналов Rust"}} rust/for_loop -.-> lab-100438{{"Передача данных в параллельном режиме с использованием каналов Rust"}} rust/function_syntax -.-> lab-100438{{"Передача данных в параллельном режиме с использованием каналов Rust"}} rust/expressions_statements -.-> lab-100438{{"Передача данных в параллельном режиме с использованием каналов Rust"}} rust/method_syntax -.-> lab-100438{{"Передача данных в параллельном режиме с использованием каналов Rust"}} rust/message_passing -.-> lab-100438{{"Передача данных в параллельном режиме с использованием каналов Rust"}} end

Использование передачи сообщений для передачи данных между потоками

Все более популярным подходом к обеспечению безопасной параллелизации является передача сообщений, при которой потоки или актёры общаются, отправляя друг другу сообщения, содержащие данные. Вот идея в слогане из документации по Go-языку на https://golang.org/doc/effective_go.html#concurrency: "Не общайтесь, делясь памятью; вместо этого делитесь памятью, обмениваясь сообщениями".

Для реализации параллелизации с передачей сообщений стандартная библиотека Rust предоставляет реализацию каналов. Канал - это общее концепция программирования, с помощью которой данные передаются из одного потока в другой.

Вы можете представить канал в программировании как направленный канал воды, такой как ручей или река. Если вы положите что-то, типа резиновой утки, в реку, она будет двигаться вниз по течению до конца водоёма.

Канал имеет две половины: отправитель и получатель. Часть отправителя - это то место вверх по течению, где вы кладёте резиновую утку в реку, а часть получателя - это то место, где резиновая утка оканчивается внизу по течению. Одна часть вашего кода вызывает методы на отправителе с данными, которые вы хотите отправить, а другая часть проверяет приемник на наличие прибывших сообщений. Канал считается закрытым, если либо отправитель, либо получатель удаляется.

Здесь мы напишем программу, которая имеет один поток для генерации значений и отправки их по каналу, и другой поток, который будет получать значения и выводить их на печать. Мы будем отправлять простые значения между потоками с помощью канала, чтобы продемонстрировать эту функцию. Когда вы будете знакомы с этой техникой, вы можете использовать каналы для любых потоков, которые должны взаимодействовать друг с другом, например, для чат-системы или системы, где несколько потоков выполняют части вычисления и отправляют части одному потоку, который агрегирует результаты.

Сначала, в Listing 16-6, мы создадим канал, но ничего с ним не сделаем. Обратите внимание, что это не скомпилируется, потому что Rust не может определить, какой тип значений мы хотим отправить по каналу.

Filename: src/main.rs

use std::sync::mpsc;

fn main() {
    let (tx, rx) = mpsc::channel();
}

Listing 16-6: Создание канала и присвоение двух половин tx и rx

Мы создаём новый канал с помощью функции mpsc::channel; mpsc означает множественные производители, одиночный потребитель. Короче говоря, то, как стандартная библиотека Rust реализует каналы, означает, что канал может иметь несколько отправляющих концов, которые генерируют значения, но только один принимающий конец, который потребляет эти значения. Представьте, что несколько ручьёв объединяются в одну большую реку: все, что отправляется по любому из ручьёв, в конце концов окажется в одной реке. Мы начнём с одного производителя на данный момент, но добавим несколько производителей, когда этот пример заработает.

Функция mpsc::channel возвращает кортеж, первый элемент которого - это отправляющий конец - отправитель, а второй элемент - это принимающий конец - получатель. Аббревиатуры tx и rx традиционно используются в многих областях для отправителя и получателя соответственно, поэтому мы именуем наши переменные так, чтобы показать каждый конец. Мы используем let-оператор с шаблоном, который разбирает кортежи; мы обсудим использование шаблонов в let-операторах и разбор кортежей в главе 18. На данный момент просто запомните, что использование let-оператора таким образом - это удобный способ извлечь части кортежа, возвращаемого функцией mpsc::channel.

Переместим отправляющий конец в созданный поток и попросим его отправить одну строку, чтобы созданный поток общался с главным потоком, как показано в Listing 16-7. Это похоже на то, что вы кладёте резиновую утку в реку вверх по течению или отправляете сообщение в чат из одного потока в другой.

Filename: src/main.rs

use std::sync::mpsc;
use std::thread;

fn main() {
    let (tx, rx) = mpsc::channel();

    thread::spawn(move || {
        let val = String::from("hi");
        tx.send(val).unwrap();
    });
}

Listing 16-7: Перемещение tx в созданный поток и отправка "hi"

Опять же, мы используем thread::spawn для создания нового потока, а затем используем move, чтобы переместить tx в замыкание, чтобы созданный поток владел tx. Созданный поток должен владеть отправителем, чтобы иметь возможность отправлять сообщения по каналу.

Отправитель имеет метод send, который принимает значение, которое мы хотим отправить. Метод send возвращает тип Result<T, E>, поэтому, если получатель уже был удалён и无处 отправить значение, операция отправки вернёт ошибку. В этом примере мы вызываем unwrap, чтобы прервать выполнение программы в случае ошибки. Но в настоящем приложении мы бы обработали это правильно: обратитесь к главе 9, чтобы ознакомиться с стратегиями правильной обработки ошибок.

В Listing 16-8 мы получим значение из получателя в главном потоке. Это похоже на то, что вы извлекаете резиновую утку из воды в конце реки или получаете сообщение в чате.

Filename: src/main.rs

use std::sync::mpsc;
use std::thread;

fn main() {
    let (tx, rx) = mpsc::channel();

    thread::spawn(move || {
        let val = String::from("hi");
        tx.send(val).unwrap();
    });

    let received = rx.recv().unwrap();
    println!("Got: {received}");
}

Listing 16-8: Получение значения "hi" в главном потоке и вывод его на печать

Получатель имеет два полезных метода: recv и try_recv. Мы используем recv, сокращение от receive, который блокирует выполнение главного потока и ждёт, пока значение не будет отправлено по каналу. Как только значение отправлено, recv вернёт его в Result<T, E>. Когда отправитель закрывается, recv вернёт ошибку, чтобы сигнализировать, что больше значений не будут приходить.

Метод try_recv не блокирует, а сразу же возвращает Result<T, E>: значение Ok, содержащее сообщение, если оно доступно, и значение Err, если в этот раз нет сообщений. Использование try_recv полезно, если этот поток имеет другие задачи, пока ждёт сообщений: мы могли бы написать цикл, который часто вызывает try_recv, обрабатывает сообщение, если оно доступно, и в противном случае делает другую работу на некоторое время, пока не проверит снова.

Мы использовали recv в этом примере для простоты; у главного потока нет других задач, кроме ожидания сообщений, поэтому блокировка главного потока вполне допустима.

Когда мы запустим код из Listing 16-8, мы увидим значение, выведенное из главного потока:

Got: hi

Великолепно!

Каналы и передача владения

Правила владения играют важную роль при отправке сообщений, так как они помогают вам писать безопасный параллельный код. Предотвращение ошибок при параллельном программировании - это преимущество того, что нужно думать о владении на протяжении всего вашего Rust-программа. Давайте проведём эксперимент, чтобы показать, как каналы и владение работают вместе, чтобы предотвратить проблемы: мы попытаемся использовать значение val в созданном потоке после, как мы отправили его по каналу. Попробуйте скомпилировать код из Listing 16-9, чтобы понять, почему этот код не допускается.

Filename: src/main.rs

use std::sync::mpsc;
use std::thread;

fn main() {
    let (tx, rx) = mpsc::channel();

    thread::spawn(move || {
        let val = String::from("hi");
        tx.send(val).unwrap();
        println!("val is {val}");
    });

    let received = rx.recv().unwrap();
    println!("Got: {received}");
}

Listing 16-9: Попытка использовать val после отправки его по каналу

Здесь мы пытаемся вывести val на печать после того, как отправили его по каналу с помощью tx.send. Разрешить это было бы плохой идеей: как только значение было отправлено в другой поток, этот поток мог бы изменить его или удалить, прежде чем мы попытаемся снова использовать значение. Возможно, изменения другого потока могут привести к ошибкам или неожиданным результатам из-за несовместимых или несуществующих данных. Однако, Rust выдаёт ошибку, если мы пытаемся скомпилировать код из Listing 16-9:

error[E0382]: borrow of moved value: `val`
  --> src/main.rs:10:31
   |
8  |         let val = String::from("hi");
   |             --- move occurs because `val` has type `String`, which does
not implement the `Copy` trait
9  |         tx.send(val).unwrap();
   |                 --- value moved here
10 |         println!("val is {val}");
   |                           ^^^ value borrowed here after move

Наша ошибка при параллелизации вызвала ошибку компиляции. Функция send получает владение над своим параметром, и когда значение перемещается, получатель получает за ним владение. Это предотвращает случайное использование значения снова после отправки; система владения проверяет, все ли в порядке.

Отправка нескольких значений и наблюдение за ожиданием получателя

Код из Listing 16-8 скомпилировался и запустился, но он не показал нам явно, что два отдельных потока общаются друг с другом по каналу. В Listing 16-10 мы внесли некоторые изменения, которые будут доказывать, что код из Listing 16-8 выполняется параллельно: созданный поток теперь будет отправлять несколько сообщений и останавливаться на секунду между каждым сообщением.

Filename: src/main.rs

use std::sync::mpsc;
use std::thread;
use std::time::Duration;

fn main() {
    let (tx, rx) = mpsc::channel();

    thread::spawn(move || {
        let vals = vec![
            String::from("hi"),
            String::from("from"),
            String::from("the"),
            String::from("thread"),
        ];

        for val in vals {
            tx.send(val).unwrap();
            thread::sleep(Duration::from_secs(1));
        }
    });

    for received in rx {
        println!("Got: {received}");
    }
}

Listing 16-10: Отправка нескольких сообщений и пауза между каждым

На этот раз созданный поток имеет вектор строк, которые мы хотим отправить в главный поток. Мы итерируемся по ним, отправляя каждый отдельно, и останавливаемся между каждым вызовом функции thread::sleep с значением Duration в одну секунду.

В главном потоке мы不再显式调用recv函数:相反,我们将rx视为一个迭代器。对于接收到的每个值,我们都将其打印出来。当通道关闭时,迭代将结束。

当运行Listing 16-10中的代码时,你应该会看到以下输出,每行之间有一秒的停顿:

Got: hi
Got: from
Got: the
Got: thread

因为我们在主线程的for循环中没有任何暂停或延迟的代码,所以我们可以看出主线程在等待从生成的线程接收值。

Создание нескольких производителей путём клонирования отправителя

Ранее мы упоминали, что mpsc - это сокращение от multiple producer, single consumer (множественные производители, одиночный потребитель). Давайте применим mpsc и расширим код из Listing 16-10, чтобы создать несколько потоков, которые все будут отправлять значения в один и тот же получатель. Мы можем это сделать, клонируя отправителя, как показано в Listing 16-11.

Filename: src/main.rs

--snip--

let (tx, rx) = mpsc::channel();

let tx1 = tx.clone();
thread::spawn(move || {
    let vals = vec![
        String::from("hi"),
        String::from("from"),
        String::from("the"),
        String::from("thread"),
    ];

    for val in vals {
        tx1.send(val).unwrap();
        thread::sleep(Duration::from_secs(1));
    }
});

thread::spawn(move || {
    let vals = vec![
        String::from("more"),
        String::from("messages"),
        String::from("for"),
        String::from("you"),
    ];

    for val in vals {
        tx.send(val).unwrap();
        thread::sleep(Duration::from_secs(1));
    }
});

for received in rx {
    println!("Got: {received}");
}

--snip--

Listing 16-11: Отправка нескольких сообщений от нескольких производителей

На этот раз, перед созданием первого созданного потока, мы вызываем clone на отправителе. Это даст нам новый отправитель, который мы можем передать первому созданному потоку. Мы передаём исходный отправитель второму созданному потоку. Таким образом, у нас есть два потока, каждый из которых отправляет разные сообщения в одного получателя.

Когда вы запускаете код, вывод может выглядеть примерно так:

Got: hi
Got: more
Got: from
Got: messages
Got: for
Got: the
Got: thread
Got: you

Возможно, вы увидите значения в другом порядке, в зависимости от вашей системы. Именно это делает параллелизм интересным, а также сложным. Если вы экспериментируете с thread::sleep, задавая ей разные значения в разных потоках, каждый запуск будет более неопределённым и создавать разные выводы каждый раз.

Теперь, когда мы рассмотрели, как работают каналы, давайте посмотрим на другой метод параллелизации.

Резюме

Поздравляем! Вы завершили лабораторную работу по использованию передачи сообщений для передачи данных между потоками. Вы можете практиковаться в других лабораторных работах в LabEx, чтобы улучшить свои навыки.