Совместное состояние в многопоточном программировании на Rust

RustRustBeginner
Практиковаться сейчас

This tutorial is from open-source community. Access the source code

💡 Этот учебник переведен с английского с помощью ИИ. Чтобы просмотреть оригинал, вы можете перейти на английский оригинал

Введение

Добро пожаловать в Shared-State Concurrency. Этот лаба является частью Rust Book. Вы можете практиковать свои навыки Rust в LabEx.

В этом лабе мы исследуем концепцию совместной памяти в многопоточности и почему сторонники передачи сообщений предупреждают против этого.


Skills Graph

%%%%{init: {'theme':'neutral'}}%%%% flowchart RL rust(("Rust")) -.-> rust/BasicConceptsGroup(["Basic Concepts"]) rust(("Rust")) -.-> rust/ControlStructuresGroup(["Control Structures"]) rust(("Rust")) -.-> rust/FunctionsandClosuresGroup(["Functions and Closures"]) rust(("Rust")) -.-> rust/DataStructuresandEnumsGroup(["Data Structures and Enums"]) rust/BasicConceptsGroup -.-> rust/variable_declarations("Variable Declarations") rust/BasicConceptsGroup -.-> rust/mutable_variables("Mutable Variables") rust/ControlStructuresGroup -.-> rust/for_loop("for Loop") rust/FunctionsandClosuresGroup -.-> rust/function_syntax("Function Syntax") rust/FunctionsandClosuresGroup -.-> rust/expressions_statements("Expressions and Statements") rust/DataStructuresandEnumsGroup -.-> rust/method_syntax("Method Syntax") subgraph Lab Skills rust/variable_declarations -.-> lab-100439{{"Совместное состояние в многопоточном программировании на Rust"}} rust/mutable_variables -.-> lab-100439{{"Совместное состояние в многопоточном программировании на Rust"}} rust/for_loop -.-> lab-100439{{"Совместное состояние в многопоточном программировании на Rust"}} rust/function_syntax -.-> lab-100439{{"Совместное состояние в многопоточном программировании на Rust"}} rust/expressions_statements -.-> lab-100439{{"Совместное состояние в многопоточном программировании на Rust"}} rust/method_syntax -.-> lab-100439{{"Совместное состояние в многопоточном программировании на Rust"}} end

Shared-State Concurrency

Передача сообщений - хороший способ обрабатывать многопоточность, но не единственный. Другой метод - это доступ нескольких потоков к одной и той же общей памяти. Возьмите снова этот кусок слогана из документации по Go: "Не используйте разделение памяти для коммуникации".

Что может быть коммуникацией с использованием разделения памяти? Кроме того, почему сторонники передачи сообщений рекомендуют не использовать разделение памяти?

По некоторым причинам каналы в любом языке программирования похожи на единственную собственность, потому что, как только вы передаете значение по каналу, вы не должны больше использовать это значение. Совместная память в многопоточности похожа на множественную собственность: несколько потоков могут одновременно обращаться к одному и тому же участку памяти. Как вы видели в главе 15, где умные указатели позволили обеспечить множественную собственность, множественная собственность может добавить сложности, так как эти разные владельцы требуют управления. Типовая система и правила собственности Rust значительно помогают правильно организовать это управление. В качестве примера рассмотрим мьютексы, один из наиболее распространенных примитивов многопоточности для общей памяти.

Использование мьютексов для доступа к данным из одного потока за раз

Mutex - сокращение от mutual exclusion (взаимное исключение), то есть мьютекс позволяет только одному потоку получать доступ к некоторым данным в любое конкретное время. Чтобы получить доступ к данным в мьютексе, поток должен сначала запросить получение блокировки мьютекса, тем самым сигнализируя, что хочет получить доступ. Блокировка - это структура данных, являющаяся частью мьютекса, которая отслеживает, кто в настоящее время имеет эксклюзивный доступ к данным. Таким образом, мьютекс считается защищающим данные, которые он хранит, с помощью системы блокировки.

Мьютексы имеют репутацию быть сложными в использовании, потому что нужно запомнить две правила:

  1. Перед использованием данных необходимо попытаться получить блокировку.
  2. Когда вы закончите с данными, защищаемыми мьютексом, необходимо разблокировать их, чтобы другие потоки могли получить блокировку.

Для примера из реальной жизни можно представить обсуждение на конференции с использованием только одной микрофоны. Перед тем, как участник панели сможет发言, он должен попросить или сигнализировать, что хочет использовать микрофон. Когда он получает микрофон, он может говорить столько, сколько хочет, а затем передать микрофон следующему участнику панели, запрашивающему发言. Если участник панели забывает передать микрофон после окончания речи, никто другой не сможет发言. Если управление общей микрофоной будет неправильным, дискуссия не пройдет по плану!

Управление мьютексами может быть难以置信 сложным в правильном использовании, и именно поэтому многие люди предпочитают каналы. Однако, благодаря типовой системе и правилам собственности Rust, вы не сможете ошибиться при блокировке и разблокировке.

API Mutex<T>{=html}

В качестве примера использования мьютекса давайте начнем с его использования в однопоточном контексте, как показано в Listing 16-12.

Filename: src/main.rs

use std::sync::Mutex;

fn main() {
  1 let m = Mutex::new(5);

    {
      2 let mut num = m.lock().unwrap();
      3 *num = 6;
  4 }

  5 println!("m = {:?}", m);
}

Listing 16-12: Исследование API Mutex<T> в однопоточном контексте для простоты

Как и в случае с многими типами, мы создаем Mutex<T> с использованием ассоциированной функции new [1]. Чтобы получить доступ к данным внутри мьютекса, мы используем метод lock для получения блокировки [2]. Этот вызов заблокирует текущий поток, так что он не сможет выполнять никаких операций, пока не будет轮到 его получить блокировку.

Вызов lock завершится неудачей, если другой поток, удерживающий блокировку, вызовет панику. В этом случае никто никогда не сможет получить блокировку, поэтому мы решили использовать unwrap и заставить этот поток вызвать панику, если мы попадаем в такую ситуацию.

После того, как мы получили блокировку, мы можем рассматривать возвращаемое значение, названное num в этом случае, как изменяемую ссылку на данные внутри. Типовая система гарантирует, что мы получаем блокировку перед использованием значения в m. Тип m - Mutex<i32>, а не i32, поэтому мы должны вызвать lock, чтобы иметь возможность использовать значение i32. Мы не можем забыть; типовая система не позволит нам получить доступ к внутреннему i32 иначе.

Как вы, вероятно, догадываетесь, Mutex<T> - это умный указатель. Более точно, вызов lock возвращает умный указатель, называемый MutexGuard, обернутый в LockResult, который мы обработали вызовом unwrap. Умный указатель MutexGuard реализует Deref, чтобы указывать на наши внутренние данные; умный указатель также имеет реализацию Drop, которая автоматически освобождает блокировку, когда MutexGuard выходит из области видимости, что происходит в конце внутренней области видимости [4]. Таким образом, мы не рискуем забыть освободить блокировку и заблокировать мьютекс для использования другими потоками, потому что освобождение блокировки происходит автоматически.

После освобождения блокировки мы можем вывести значение мьютекса и убедиться, что мы смогли изменить внутреннее i32 на 6 [5].

Обмен Mutex<T>{=html} между несколькими потоками

Теперь попробуем поделиться значением между несколькими потоками с использованием Mutex<T>. Мы запустим 10 потоков и попросим каждый из них увеличить значение счетчика на 1, так что счетчик изменится от 0 до 10. Пример в Listing 16-13 будет иметь ошибку компиляции, и мы будем использовать эту ошибку, чтобы лучше понять, как использовать Mutex<T> и как Rust помогает нам использовать его правильно.

Filename: src/main.rs

use std::sync::Mutex;
use std::thread;

fn main() {
  1 let counter = Mutex::new(0);
    let mut handles = vec![];

  2 for _ in 0..10 {
      3 let handle = thread::spawn(move || {
          4 let mut num = counter.lock().unwrap();

          5 *num += 1;
        });
      6 handles.push(handle);
    }

    for handle in handles {
      7 handle.join().unwrap();
    }

  8 println!("Result: {}", *counter.lock().unwrap());
}

Listing 16-13: Десять потоков, каждый из которых увеличивает счетчик, защищенный Mutex<T>

Мы создаем переменную counter, которая хранит i32 внутри Mutex<T> [1], как мы это делали в Listing 16-12. Затем мы создаем 10 потоков, перебирая диапазон чисел [2]. Мы используем thread::spawn и даем всем потокам ту же замыкание: замыкание, которое перемещает счетчик в поток [3], получает блокировку на Mutex<T>, вызвав метод lock [4], а затем увеличивает на 1 значение в мьютексе [5]. Когда поток закончит выполнять свое замыкание, num выйдет из области видимости и освободит блокировку, чтобы другой поток мог ее получить.

В главном потоке мы собираем все дескрипторы присоединения [6]. Затем, как мы это делали в Listing 16-2, мы вызываем join для каждого дескриптора, чтобы убедиться, что все потоки завершились [7]. В этот момент главный поток получит блокировку и выведет результат этой программы [8].

Мы暗示или, что этот пример не скомпилируется. Теперь выясним, почему!

error[E0382]: use of moved value: `counter`
  --> src/main.rs:9:36
   |
5  |     let counter = Mutex::new(0);
   |         ------- move occurs because `counter` has type `Mutex<i32>`, which
does not implement the `Copy` trait
...
9  |         let handle = thread::spawn(move || {
   |                                    ^^^^^^^ value moved into closure here,
in previous iteration of loop
10 |             let mut num = counter.lock().unwrap();
   |                           ------- use occurs due to use in closure

Сообщение об ошибке говорит, что значение counter было перемещено на предыдущей итерации цикла. Rust сообщает нам, что мы не можем передать владение блокировкой counter в несколько потоков. Исправим ошибку компиляции с использованием метода множественной собственности, который мы обсуждали в главе 15.

Множественная собственность с несколькими потоками

В главе 15 мы передали значение нескольким владельцам, создав счетчик ссылок с использованием умного указателя Rc<T>. Давайте поступим так же здесь и посмотрим, что произойдет. Мы обернем Mutex<T> в Rc<T> в Listing 16-14 и склонируем Rc<T>, прежде чем передать владение в поток.

Filename: src/main.rs

use std::rc::Rc;
use std::sync::Mutex;
use std::thread;

fn main() {
    let counter = Rc::new(Mutex::new(0));
    let mut handles = vec![];

    for _ in 0..10 {
        let counter = Rc::clone(&counter);
        let handle = thread::spawn(move || {
            let mut num = counter.lock().unwrap();

            *num += 1;
        });
        handles.push(handle);
    }

    for handle in handles {
        handle.join().unwrap();
    }

    println!("Result: {}", *counter.lock().unwrap());
}

Listing 16-14: Попытка использовать Rc<T> для того, чтобы несколько потоков могли владеть Mutex<T>

Опять же, мы компилируем и получаем... разные ошибки! Компилятор учит нас много вещей.

error[E0277]: `Rc<Mutex<i32>>` cannot be sent between threads safely 1
   --> src/main.rs:11:22
    |
11  |           let handle = thread::spawn(move || {
    |  ______________________^^^^^^^^^^^^^_-
    | |                      |
    | |                      `Rc<Mutex<i32>>` cannot be sent between threads
safely
12  | |             let mut num = counter.lock().unwrap();
13  | |
14  | |             *num += 1;
15  | |         });
    | |_________- within this `[closure@src/main.rs:11:36: 15:10]`
    |
= help: within `[closure@src/main.rs:11:36: 15:10]`, the trait `Send` is not
implemented for `Rc<Mutex<i32>>` 2
    = note: required because it appears within the type
`[closure@src/main.rs:11:36: 15:10]`
note: required by a bound in `spawn`

Вау, это сообщение об ошибке очень длинное! Вот что важно заметить: Rc<Mutex<i32>>` cannot be sent between threads safely` [1]. Компилятор также объясняет причину: `the trait `Send` is not implemented for `Rc<Mutex<i32>> [2]. Мы поговорим о Send в следующем разделе: это один из трейтов, которые гарантируют, что типы, которые мы используем с потоками, предназначены для использования в конкурентных ситуациях.

К сожалению, Rc<T> не безопасен для обмена между потоками. Когда Rc<T> управляет счетчиком ссылок, он увеличивает счетчик для каждого вызова clone и уменьшает счетчик, когда каждый клон удаляется. Но он не использует никаких примитивов многопоточности, чтобы убедиться, что изменения счетчика не могут быть прерваны другим потоком. Это может привести к неправильным счетчикам - неочевидным ошибкам, которые, в свою очередь, могут привести к утечкам памяти или тому, что значение будет удалено, пока мы его не закончим использовать. Что нам нужно, это тип, точно такой же, как Rc<T>, но способный изменять счетчик ссылок в потокобезопасном режиме.

Атомарное подсчет ссылок с помощью Arc<T>{=html}

К счастью, Arc<T> - это такой же тип, как Rc<T>, который безопасен для использования в конкурентных ситуациях. Буква a означает атомарность, то есть это атомарно подсчитываемый по ссылкам тип. Атомарные типы - это дополнительный вид примитивов многопоточности, о которых мы здесь не будем подробно останавливаться: для получения более подробной информации обратитесь к документации по стандартной библиотеке для std::sync::atomic. В этом случае вам просто нужно знать, что атомарные типы работают аналогично примитивным типам, но безопасны для обмена между потоками.

Затем вы, возможно, спросите себя, почему все примитивные типы не являются атомарными и почему типы стандартной библиотеки по умолчанию не реализованы для использования Arc<T>. Причина заключается в том, что обеспечение безопасности потоков связано с потерей производительности, которую вы хотите платить только тогда, когда это действительно необходимо. Если вы просто выполняете операции с значениями внутри одного потока, ваш код может работать быстрее, если не нужно обеспечивать гарантии, которые предоставляют атомарные типы.

Вернемся к нашему примеру: Arc<T> и Rc<T> имеют одинаковый API, поэтому мы исправляем нашу программу, изменив строку use, вызов new и вызов clone. Код в Listing 16-15 наконец-то скомпилируется и запустится.

Filename: src/main.rs

use std::sync::{Arc, Mutex};
use std::thread;

fn main() {
    let counter = Arc::new(Mutex::new(0));
    let mut handles = vec![];

    for _ in 0..10 {
        let counter = Arc::clone(&counter);
        let handle = thread::spawn(move || {
            let mut num = counter.lock().unwrap();

            *num += 1;
        });
        handles.push(handle);
    }

    for handle in handles {
        handle.join().unwrap();
    }

    println!("Result: {}", *counter.lock().unwrap());
}

Listing 16-15: Использование Arc<T> для оборачивания Mutex<T>, чтобы иметь возможность разделить владение между несколькими потоками

Этот код выведет следующее:

Result: 10

Мы сделали это! Мы посчитали от 0 до 10, что, может быть, не кажется очень впечатляющим, но это действительно научило нас много о Mutex<T> и безопасности потоков. Вы также можете использовать структуру этой программы для выполнения более сложных операций, чем просто увеличение счетчика. Используя эту стратегию, вы можете разделить вычисление на независимые части, разбить эти части между потоками, а затем использовать Mutex<T>, чтобы каждый поток обновлял окончательный результат своей частью.

Обратите внимание, что если вы выполняете простые числовые операции, существуют типы, проще Mutex<T>, предоставляемые модулем std::sync::atomic стандартной библиотеки. Эти типы обеспечивают безопасный, конкурентный, атомарный доступ к примитивным типам. Мы выбрали использовать Mutex<T> с примитивным типом для этого примера, чтобы сосредоточиться на том, как работает Mutex<T>.

Похожие черты между RefCell<T>{=html}/Rc<T>{=html} и Mutex<T>{=html}/Arc<T>{=html}

Вы, возможно, заметили, что counter является неизменяемым, но мы можем получить изменяемую ссылку на значение внутри него; это означает, что Mutex<T> обеспечивает внутреннюю изменяемость, как это делает семейство Cell. Таким же образом, как мы использовали RefCell<T> в главе 15, чтобы иметь возможность изменять содержимое внутри Rc<T>, мы используем Mutex<T> для изменения содержимого внутри Arc<T>.

Еще один важный момент - Rust не может защитить вас от всех типов ошибок в логике, когда вы используете Mutex<T>. Помните, что в главе 15 использование Rc<T> было связано с риском создания циклов ссылок, когда два значения Rc<T> ссылаются друг на друга, что приводит к утечке памяти. Аналогично, Mutex<T> имеет риск создания зависающих блокировок. Это происходит, когда операция требует блокировки двух ресурсов, и два потока каждый получили одну из блокировок, заставляя их ждать друг друга навсегда. Если вы заинтересованы в зависаниях, попробуйте создать программу на Rust, которая имеет зависание; затем изучите стратегии минимизации зависаний для мьютексов на любом языке и попробуйте реализовать их на Rust. API-документация стандартной библиотеки для Mutex<T> и MutexGuard содержит полезную информацию.

Мы завершим эту главу, поговорив о трейтах Send и Sync и том, как мы можем их использовать с пользовательскими типами.

Резюме

Поздравляем! Вы завершили лабораторную работу по разделу "Совместное состояние в многопоточном программировании". Вы можете выполнить больше лабораторных работ в LabEx, чтобы улучшить свои навыки.