Параллельная обработка данных в Rust

Beginner

This tutorial is from open-source community. Access the source code

Введение

В этом лабораторном задании исследуется язык программирования Rust в связи с его способностью параллелизировать обработку данных с использованием алгоритма map-reduce. Примерный код вычисляет сумму всех цифр в блоке чисел, разделяя данные на сегменты и обрабатывая каждый сегмент в отдельном потоке. Стандартная библиотека Rust предоставляет примитивы для потоков, которые предотвращают гонку данных и гарантируют безопасность потоков. Программа также демонстрирует, как Rust обрабатывает передачу только для чтения ссылок через границы потоков. Кроме того, код показывает использование замыканий, итераторов и метода join() для объединения промежуточных результатов каждого потока в окончательную сумму. Чтобы обеспечить эффективность, программу можно модифицировать так, чтобы разбивать данные на ограниченное количество сегментов, а не полагаться на пользовательский ввод данных, который может привести к чрезмерному количеству потоков.

Примечание: Если в лабораторном задании не указано имя файла, вы можете использовать любое имя файла, которое хотите. Например, вы можете использовать main.rs, скомпилировать и запустить его с помощью rustc main.rs &&./main.

Тестовый случай: map-reduce

Rust делает очень простым параллелизацию обработки данных, не вызывая многих головных болей, которые традиционно ассоциируются с таким подходом.

Стандартная библиотека предоставляет отличные примитивы для потоков сразу в коробке. Эти примитивы, в сочетании с концепцией владения и правилами по алиасам Rust, автоматически предотвращают гонку данных.

Правила по алиасам (одна изменяемая ссылка XOR несколько только для чтения ссылок) автоматически предотвращают вас от манипуляций с состоянием, доступным другим потокам. (В тех случаях, когда требуется синхронизация, есть примитивы синхронизации, такие как Mutex или Channel.)

В этом примере мы вычислим сумму всех цифр в блоке чисел. Мы будем это делать, разделяя блоки на разные потоки. Каждый поток просуммирует свой маленький блок цифр, а затем мы просуммируем промежуточные суммы, полученные каждым потоком.

Обратите внимание, что, хотя мы передаем ссылки через границы потоков, Rust понимает, что мы передаем только для чтения ссылки, и поэтому не могут возникнуть ошибки безопасности или гонки данных. Также потому, что ссылки, которые мы передаем, имеют 'static время жизни, Rust понимает, что наша data не будет уничтожена, пока эти потоки будут выполняться. (Когда вам нужно разделить не static данные между потоками, вы можете использовать умный указатель, такой как Arc, чтобы сохранить данные в живых и избежать не static времени жизни.)

use std::thread;

// Это главный поток
fn main() {

    // Это наши данные для обработки.
    // Мы вычислим сумму всех цифр с помощью многопоточного алгоритма map-reduce.
    // Каждый разделенный пробелом кусок будет обрабатываться в отдельном потоке.
    //
    // TODO: посмотрите, что произойдет с выводом, если вы вставите пробелы!
    let data = "86967897737416471853297327050364959
11861322575564723963297542624962850
70856234701860851907960690014725639
38397966707106094172783238747669219
52380795257888236525459303330302837
58495327135744041048897885734297812
69920216438980873548808413720956532
16278424637452589860345374828574668";

    // Создаем вектор, чтобы хранить дочерние потоки, которые мы создадим.
    let mut children = vec![];

    /*************************************************************************
     * "Map" фаза
     *
     * Разделяем наши данные на сегменты и применяем начальную обработку
     ************************************************************************/

    // Разделяем наши данные на сегменты для отдельного вычисления
    // каждый кусок будет ссылкой (&str) на фактические данные
    let chunked_data = data.split_whitespace();

    // Перебираем сегменты данных.
    //.enumerate() добавляет текущий индекс цикла к тому, что перебирается
    // результирующий кортеж "(индекс, элемент)" затем сразу
    // "раскладывается" на две переменные, "i" и "data_segment" с помощью
    // "раскладывающего присваивания"
    for (i, data_segment) in chunked_data.enumerate() {
        println!("data segment {} is \"{}\"", i, data_segment);

        // Обрабатываем каждый сегмент данных в отдельном потоке
        //
        // spawn() возвращает ссылку на новый поток,
        // которую мы ДОЛЖНЫ сохранить, чтобы получить возвращаемое значение
        //
        // 'move || -> u32' - это синтаксис для замыкания, которое:
        // * не принимает аргументов ('||')
        // * принимает владение своими захваченными переменными ('move') и
        // * возвращает беззнаковое целое 32-разрядное число ('-> u32')
        //
        // Rust достаточно умный, чтобы вывести '-> u32' из
        // самого замыкания, поэтому мы могли бы опустить это.
        //
        // TODO: попробуйте удалить'move' и посмотрите, что произойдет
        children.push(thread::spawn(move || -> u32 {
            // Вычисляем промежуточную сумму этого сегмента:
            let result = data_segment
                        // перебираем символы нашего сегмента..
                       .chars()
                        //.. преобразуем текстовые символы в их числовое значение..
                       .map(|c| c.to_digit(10).expect("should be a digit"))
                        //.. и суммируем результирующий итератор чисел
                       .sum();

            // println! блокирует stdout, поэтому не происходит перепутывания текста
            println!("processed segment {}, result={}", i, result);

            // "return" не нужен, потому что Rust - это "язык выражений",
            // последнее вычисленное выражение в каждом блоке автоматически является его значением.
            result

        }));
    }


    /*************************************************************************
     * "Reduce" фаза
     *
     * Собираем наши промежуточные результаты и объединяем их в окончательный результат
     ************************************************************************/

    // Объединяем промежуточные результаты каждого потока в единую окончательную сумму.
    //
    // мы используем "turbofish" ::<> для предоставления sum() с подсказкой типа.
    //
    // TODO: попробуйте без turbofish, явно указав
    // тип final_result
    let final_result = children.into_iter().map(|c| c.join().unwrap()).sum::<u32>();

    println!("Final sum result: {}", final_result);
}

Задания

Не имеет смысла позволить количеству наших потоков зависеть от данных, введенных пользователем. Что если пользователь решит вставить много пробелов? Мы действительно хотим создавать 2000 потоков? Измените программу так, чтобы данные всегда были разбиты на ограниченное количество кусочков, определенное статической константой в начале программы.

Резюме

Поздравляем! Вы завершили лабораторный тест Map-Reduce. Вы можете практиковаться в других лабораторных работах в LabEx, чтобы улучшить свои навыки.