Использование потоков для параллельного выполнения кода

Beginner

This tutorial is from open-source community. Access the source code

Введение

Добро пожаловать в Использование потоков для параллельного выполнения кода. Эта лабораторная работа является частью Rust Book. Вы можете практиковать свои навыки Rust в LabEx.

В этой лабораторной работе мы изучим концепцию потоков в программировании и то, как их можно использовать для параллельного выполнения кода, улучшая производительность, но добавляя сложность и потенциальные проблемы, такие как гонки данных, взаимоблокировки и ошибки, которые трудно воспроизвести.

Использование потоков для параллельного выполнения кода

В большинстве современных операционных систем код выполняемой программы запускается в процессе, и операционная система может одновременно управлять несколькими процессами. В пределах программы вы также можете иметь независимые части, которые запускаются одновременно. Функции, которые запускают эти независимые части, называются потоками. Например, веб-сервер может иметь несколько потоков, чтобы он мог одновременно реагировать на несколько запросов.

Разделение вычислений в вашей программе на несколько потоков для параллельного выполнения нескольких задач может повысить производительность, но это также добавляет сложность. Поскольку потоки могут выполняться одновременно, не гарантируется никакой определенной последовательности выполнения частей вашего кода на разных потоках. Это может привести к таким проблемам, как:

  • Гонки данных, когда потоки обращаются к данным или ресурсам в несовместимом порядке
  • Взаимоблокировки, когда два потока ждут друг друга, не позволяя обоим потокам продолжить работу
  • Ошибки, которые возникают только в определенных ситуациях и трудно воспроизводить и исправить надежно

Rust пытается минимизировать негативные последствия использования потоков, но программирование в многопоточном контексте по-прежнему требует тщательного мышления и требует структуры кода, которая отличается от программы, работающей в одном потоке.

Программирования языки реализуют потоки несколькими разными способами, и многие операционные системы предоставляют API, которое может вызываться языком для создания новых потоков. Стандартная библиотека Rust использует 1:1 модель реализации потоков, при которой программа использует один операционной системный поток на каждый поток языка. Существуют пакеты, которые реализуют другие модели многопоточности, которые делают разные компромиссы по сравнению с моделью 1:1.

Создание нового потока с помощью spawn

Для создания нового потока мы вызываем функцию thread::spawn и передаем ей замыкание (мы говорили о замыканиях в главе 13), содержащее код, который мы хотим запустить в новом потоке. Пример в листинге 16-1 выводит некоторый текст из главного потока и другой текст из нового потока.

Имя файла: src/main.rs

use std::thread;
use std::time::Duration;

fn main() {
    thread::spawn(|| {
        for i in 1..10 {
            println!("hi number {i} from the spawned thread!");
            thread::sleep(Duration::from_millis(1));
        }
    });

    for i in 1..5 {
        println!("hi number {i} from the main thread!");
        thread::sleep(Duration::from_millis(1));
    }
}

Листинг 16-1: Создание нового потока для вывода одной вещи, в то время как главный поток выводит что-то другое

Обратите внимание, что когда главный поток Rust программы завершается, все созданные потоки завершаются, независимо от того, закончили ли они выполняться. Вывод из этой программы может быть немного разным каждый раз, но будет похож на следующий:

hi number 1 from the main thread!
hi number 1 from the spawned thread!
hi number 2 from the main thread!
hi number 2 from the spawned thread!
hi number 3 from the main thread!
hi number 3 from the spawned thread!
hi number 4 from the main thread!
hi number 4 from the spawned thread!
hi number 5 from the spawned thread!

Вызовы thread::sleep заставляют поток остановить свое выполнение на короткое время, позволяя другому потоку запуститься. Вероятно, потоки будут чередоваться, но это не гарантируется: зависит от того, как ваша операционная система планирует потоки. В этом запуске главный поток вывел первый, хотя инструкция вывода из созданного потока появляется первой в коде. И хотя мы сказали созданному потоку выводить до тех пор, пока i не станет равным 9, он успел вывести только до 5, прежде чем главный поток завершился.

Если вы запустите этот код и увидите только вывод из главного потока или не увидите никакого перекрытия, попробуйте увеличить числа в диапазонах, чтобы создать больше возможностей для операционной системы переключаться между потоками.

Ожидание завершения всех потоков с использованием join-объектов

Код в листинге 16-1 не только преждевременно останавливает созданный поток в большинстве случаев из-за завершения главного потока, но и потому, что не гарантируется порядок выполнения потоков, мы также не можем гарантировать, что созданный поток вообще запустится!

Мы можем исправить проблему с тем, что созданный поток не запускается или преждевременно завершается, сохранив возвращаемое значение thread::spawn в переменную. Тип возврата thread::spawn - это JoinHandle<T>. JoinHandle<T> - это владение значением, которое, когда мы вызываем метод join для него, будет ожидать завершения своего потока. Листинг 16-2 показывает, как использовать JoinHandle<T> нашего потока, созданного в листинге 16-1, и вызвать join, чтобы убедиться, что созданный поток завершится, прежде чем main выйдет.

Имя файла: src/main.rs

use std::thread;
use std::time::Duration;

fn main() {
    let handle = thread::spawn(|| {
        for i in 1..10 {
            println!("hi number {i} from the spawned thread!");
            thread::sleep(Duration::from_millis(1));
        }
    });

    for i in 1..5 {
        println!("hi number {i} from the main thread!");
        thread::sleep(Duration::from_millis(1));
    }

    handle.join().unwrap();
}

Листинг 16-2: Сохранение JoinHandle<T> из thread::spawn, чтобы гарантировать, что поток будет выполнен до конца

Вызов join для объекта блокирует текущий выполняющийся поток до тех пор, пока поток, представленный объектом, не завершится. Блокировка потока означает, что этот поток не может выполнять работу или выходить. Поскольку мы поместили вызов join после цикла for главного потока, запуск листинга 16-2 должен произвести вывод, похожий на этот:

hi number 1 from the main thread!
hi number 2 from the main thread!
hi number 1 from the spawned thread!
hi number 3 from the main thread!
hi number 2 from the spawned thread!
hi number 4 from the main thread!
hi number 3 from the spawned thread!
hi number 4 from the spawned thread!
hi number 5 from the spawned thread!
hi number 6 from the spawned thread!
hi number 7 from the spawned thread!
hi number 8 from the spawned thread!
hi number 9 from the spawned thread!

Два потока продолжают чередоваться, но главный поток ждет из-за вызова handle.join() и не завершается, пока созданный поток не закончит работу.

Но давайте посмотрим, что произойдет, если мы переместить handle.join() перед циклом for в main, вот так:

Имя файла: src/main.rs

use std::thread;
use std::time::Duration;

fn main() {
    let handle = thread::spawn(|| {
        for i in 1..10 {
            println!("hi number {i} from the spawned thread!");
            thread::sleep(Duration::from_millis(1));
        }
    });

    handle.join().unwrap();

    for i in 1..5 {
        println!("hi number {i} from the main thread!");
        thread::sleep(Duration::from_millis(1));
    }
}

Главный поток будет ждать завершения созданного потока и затем выполнять свой цикл for, поэтому вывод больше не будет чередоваться, как показано здесь:

hi number 1 from the spawned thread!
hi number 2 from the spawned thread!
hi number 3 from the spawned thread!
hi number 4 from the spawned thread!
hi number 5 from the spawned thread!
hi number 6 from the spawned thread!
hi number 7 from the spawned thread!
hi number 8 from the spawned thread!
hi number 9 from the spawned thread!
hi number 1 from the main thread!
hi number 2 from the main thread!
hi number 3 from the main thread!
hi number 4 from the main thread!

Маленькие детали, такие как то, где вызывается join, могут повлиять на то, будут ли ваши потоки выполняться одновременно.

Использование move-замыканий с потоками

Мы часто используем ключевое слово move с замыканиями, передаваемыми в thread::spawn, потому что в этом случае замыкание будет владеть значениями, которые оно использует из окружающей среды, тем самым передавая владение этими значениями из одного потока в другой. В разделе "Захват окружающей среды с помощью замыканий" мы обсуждали move в контексте замыканий. Теперь мы сосредоточимся больше на взаимодействии между move и thread::spawn.

Обратите внимание в листинге 16-1, что замыкание, которое мы передаем в thread::spawn, не принимает аргументов: мы не используем никаких данных из главного потока в коде созданного потока. Чтобы использовать данные из главного потока в созданном потоке, замыкание созданного потока должно захватить значения, которые ему нужны. Листинг 16-3 показывает попытку создать вектор в главном потоке и использовать его в созданном потоке. Однако это еще не будет работать, как вы вскоре увидите.

Имя файла: src/main.rs

use std::thread;

fn main() {
    let v = vec![1, 2, 3];

    let handle = thread::spawn(|| {
        println!("Here's a vector: {:?}", v);
    });

    handle.join().unwrap();
}

Листинг 16-3: Попытка использовать вектор, созданный главным потоком, в другом потоке

Замыкание использует v, поэтому оно будет захватывать v и делать его частью окружения замыкания. Поскольку thread::spawn запускает это замыкание в новом потоке, мы должны быть able to access v внутри этого нового потока. Но когда мы компилируем этот пример, мы получаем следующую ошибку:

error[E0373]: closure may outlive the current function, but it borrows `v`,
which is owned by the current function
 --> src/main.rs:6:32
  |
6 |     let handle = thread::spawn(|| {
  |                                ^^ may outlive borrowed value `v`
7 |         println!("Here's a vector: {:?}", v);
  |                                           - `v` is borrowed here
  |
note: function requires argument type to outlive `'static`
 --> src/main.rs:6:18
  |
6 |       let handle = thread::spawn(|| {
  |  __________________^
7 | |         println!("Here's a vector: {:?}", v);
8 | |     });
  | |______^
help: to force the closure to take ownership of `v` (and any other referenced
variables), use the `move` keyword
  |
6 |     let handle = thread::spawn(move || {
  |                                ++++

Rust интерпретирует, как захватывать v, и потому что println! только нуждается в ссылке на v, замыкание пытается взять заимствовать v. Однако есть проблема: Rust не может сказать, как долго будет работать созданный поток, поэтому он не знает, будет ли ссылка на v всегда действительной.

Листинг 16-4 представляет сценарий, в котором более вероятно, что ссылка на v не будет действительной.

Имя файла: src/main.rs

use std::thread;

fn main() {
    let v = vec![1, 2, 3];

    let handle = thread::spawn(|| {
        println!("Here's a vector: {:?}", v);
    });

    drop(v); // ой нет!

    handle.join().unwrap();
}

Листинг 16-4: Поток с замыканием, которое пытается захватить ссылку на v из главного потока, который удаляет v

Если Rust позволил бы нам запустить этот код, есть вероятность, что созданный поток сразу будет помещен в фон и не запустится совсем. В созданном потоке есть ссылка на v внутри, но главный поток сразу удаляет v, используя функцию drop, которую мы обсуждали в главе 15. Затем, когда созданный поток начинает выполняться, v уже не действителен, поэтому ссылка на него также становится недействительной. Ой нет!

Чтобы исправить ошибку компиляции в листинге 16-3, мы можем использовать совет из сообщения об ошибке:

help: to force the closure to take ownership of `v` (and any other referenced
variables), use the `move` keyword
  |
6 |     let handle = thread::spawn(move || {
  |                                ++++

Добавив ключевое слово move перед замыканием, мы заставляем замыкание владеть значениями, которые оно использует, вместо того, чтобы позволить Rustу推断,что оно должно взять заимствовать значения. Модификация листинга 16-3, показанная в листинге 16-5, будет компилироваться и работать, как мы предполагаем.

Имя файла: src/main.rs

use std::thread;

fn main() {
    let v = vec![1, 2, 3];

    let handle = thread::spawn(move || {
        println!("Here's a vector: {:?}", v);
    });

    handle.join().unwrap();
}

Листинг 16-5: Использование ключевого слова move, чтобы заставить замыкание владеть значениями, которые оно использует

Мы, возможно, будем склонны попробовать то же самое, чтобы исправить код в листинге 16-4, где главный поток вызывал drop, используя move-замыкание. Однако этот метод исправления не сработает, потому что то, что пытается сделать листинг 16-4, запрещено по другому причине. Если мы добавим move в замыкание, мы переместите v в окружение замыкания, и мы больше не сможем вызвать drop для него в главном потоке. Вместо этого мы получим следующую ошибку компиляции:

error[E0382]: use of moved value: `v`
  --> src/main.rs:10:10
   |
4  |     let v = vec![1, 2, 3];
   |         - move occurs because `v` has type `Vec<i32>`, which does not
implement the `Copy` trait
5  |
6  |     let handle = thread::spawn(move || {
   |                                ------- value moved into closure here
7  |         println!("Here's a vector: {:?}", v);
   |                                           - variable moved due to use in
closure
...
10 |     drop(v); // oh no!
   |          ^ value used here after move

Правила владения Rust снова спасли нас! Мы получили ошибку из кода в листинге 16-3, потому что Rust был консервативным и только взял заимствовать v для потока, что означает, что главный поток теоретически мог бы сделать ссылку созданного потока недействительной. Передав Rustу указание на то, чтобы передать владение v созданному потоку, мы гарантируем Rustу, что главный поток больше не будет использовать v. Если мы изменим листинг 16-4 так же, мы нарушаем правила владения, когда пытаемся использовать v в главном потоке. Ключевое слово move переопределяет консервативный стандарт Rustа по взятию в заим; оно не позволяет нам нарушать правила владения.

Теперь, когда мы рассмотрели, что такое потоки и методы, предоставляемые API потоков, давайте посмотрим на некоторые ситуации, в которых мы можем использовать потоки.

Резюме

Поздравляем! Вы завершили лабораторную работу по использованию потоков для параллельного выполнения кода. Вы можете практиковаться в более многих лабораторных работах в LabEx, чтобы улучшить свои навыки.