Transferência Concorrente de Dados com Canais Rust

Beginner

This tutorial is from open-source community. Access the source code

Introdução

Bem-vindo ao Usando Passagem de Mensagens para Transferir Dados Entre Threads. Este laboratório faz parte do Livro Rust. Você pode praticar suas habilidades em Rust no LabEx.

Neste laboratório, exploramos a passagem de mensagens como uma abordagem segura de concorrência, usando canais na biblioteca padrão do Rust para enviar e receber dados entre threads.

Usando Passagem de Mensagens para Transferir Dados Entre Threads

Uma abordagem cada vez mais popular para garantir a concorrência segura é a passagem de mensagens, onde threads ou atores se comunicam enviando mensagens uns aos outros contendo dados. Aqui está a ideia em um slogan da documentação da linguagem Go em https://golang.org/doc/effective_go.html#concurrency: "Não se comunique compartilhando memória; em vez disso, compartilhe memória comunicando."

Para realizar a concorrência de envio de mensagens, a biblioteca padrão do Rust fornece uma implementação de canais. Um canal é um conceito geral de programação pelo qual os dados são enviados de uma thread para outra.

Você pode imaginar um canal em programação como um canal direcional de água, como um riacho ou um rio. Se você colocar algo como um patinho de borracha em um rio, ele viajará rio abaixo até o final do curso d'água.

Um canal tem duas metades: um transmissor e um receptor. A metade do transmissor é o local a montante onde você coloca o patinho de borracha no rio, e a metade do receptor é onde o patinho de borracha acaba a jusante. Uma parte do seu código chama métodos no transmissor com os dados que você deseja enviar, e outra parte verifica a extremidade receptora em busca de mensagens recebidas. Diz-se que um canal está fechado se o transmissor ou o receptor for descartado.

Aqui, vamos desenvolver um programa que tem uma thread para gerar valores e enviá-los por um canal, e outra thread que receberá os valores e os imprimirá. Estaremos enviando valores simples entre threads usando um canal para ilustrar o recurso. Depois de se familiarizar com a técnica, você pode usar canais para quaisquer threads que precisem se comunicar entre si, como um sistema de bate-papo ou um sistema onde muitas threads executam partes de um cálculo e enviam as partes para uma thread que agrega os resultados.

Primeiro, na Listagem 16-6, criaremos um canal, mas não faremos nada com ele. Observe que isso ainda não compilará porque o Rust não consegue dizer que tipo de valores queremos enviar pelo canal.

Nome do arquivo: src/main.rs

use std::sync::mpsc;

fn main() {
    let (tx, rx) = mpsc::channel();
}

Listagem 16-6: Criando um canal e atribuindo as duas metades a tx e rx

Criamos um novo canal usando a função mpsc::channel; mpsc significa múltiplos produtores, consumidor único. Em suma, a maneira como a biblioteca padrão do Rust implementa canais significa que um canal pode ter vários pontos de envio que produzem valores, mas apenas um ponto de recebimento que consome esses valores. Imagine vários riachos fluindo juntos em um grande rio: tudo o que é enviado por qualquer um dos riachos acabará em um rio no final. Começaremos com um único produtor por enquanto, mas adicionaremos vários produtores quando fizermos este exemplo funcionar.

A função mpsc::channel retorna uma tupla, cujo primeiro elemento é a extremidade de envio---o transmissor---e o segundo elemento é a extremidade de recebimento---o receptor. As abreviações tx e rx são tradicionalmente usadas em muitos campos para transmissor e receptor, respectivamente, então nomeamos nossas variáveis ​​como tal para indicar cada extremidade. Estamos usando uma instrução let com um padrão que desestrutura as tuplas; discutiremos o uso de padrões em instruções let e desestruturação no Capítulo 18. Por enquanto, saiba que usar uma instrução let dessa maneira é uma abordagem conveniente para extrair as partes da tupla retornada por mpsc::channel.

Vamos mover a extremidade de transmissão para uma thread gerada e fazer com que ela envie uma string para que a thread gerada esteja se comunicando com a thread principal, conforme mostrado na Listagem 16-7. Isso é como colocar um patinho de borracha no rio a montante ou enviar uma mensagem de bate-papo de uma thread para outra.

Nome do arquivo: src/main.rs

use std::sync::mpsc;
use std::thread;

fn main() {
    let (tx, rx) = mpsc::channel();

    thread::spawn(move || {
        let val = String::from("hi");
        tx.send(val).unwrap();
    });
}

Listagem 16-7: Movendo tx para uma thread gerada e enviando "hi"

Novamente, estamos usando thread::spawn para criar uma nova thread e, em seguida, usando move para mover tx para o closure para que a thread gerada possua tx. A thread gerada precisa possuir o transmissor para poder enviar mensagens pelo canal.

O transmissor tem um método send que recebe o valor que queremos enviar. O método send retorna um tipo Result<T, E>, então, se o receptor já foi descartado e não há para onde enviar um valor, a operação de envio retornará um erro. Neste exemplo, estamos chamando unwrap para entrar em pânico em caso de erro. Mas em uma aplicação real, nós o trataríamos adequadamente: retorne ao Capítulo 9 para revisar as estratégias para o tratamento adequado de erros.

Na Listagem 16-8, obteremos o valor do receptor na thread principal. Isso é como recuperar o patinho de borracha da água no final do rio ou receber uma mensagem de bate-papo.

Nome do arquivo: src/main.rs

use std::sync::mpsc;
use std::thread;

fn main() {
    let (tx, rx) = mpsc::channel();

    thread::spawn(move || {
        let val = String::from("hi");
        tx.send(val).unwrap();
    });

    let received = rx.recv().unwrap();
    println!("Got: {received}");
}

Listagem 16-8: Recebendo o valor "hi" na thread principal e imprimindo-o

O receptor tem dois métodos úteis: recv e try_recv. Estamos usando recv, abreviação de receive (receber), que bloqueará a execução da thread principal e esperará até que um valor seja enviado pelo canal. Depois que um valor é enviado, recv o retornará em um Result<T, E>. Quando o transmissor fecha, recv retornará um erro para sinalizar que nenhum valor adicional virá.

O método try_recv não bloqueia, mas retornará um Result<T, E> imediatamente: um valor Ok contendo uma mensagem, se uma estiver disponível, e um valor Err se não houver mensagens desta vez. Usar try_recv é útil se esta thread tiver outro trabalho a fazer enquanto espera por mensagens: poderíamos escrever um loop que chama try_recv de vez em quando, lida com uma mensagem, se uma estiver disponível, e, caso contrário, faz outro trabalho por um tempo até verificar novamente.

Usamos recv neste exemplo por simplicidade; não temos nenhum outro trabalho para a thread principal fazer além de esperar por mensagens, então bloquear a thread principal é apropriado.

Quando executamos o código na Listagem 16-8, veremos o valor impresso da thread principal:

Got: hi

Perfeito!

Canais e Transferência de Propriedade

As regras de propriedade desempenham um papel vital no envio de mensagens porque ajudam você a escrever código concorrente seguro. Prevenir erros em programação concorrente é a vantagem de pensar sobre a propriedade em todos os seus programas Rust. Vamos fazer um experimento para mostrar como os canais e a propriedade trabalham juntos para evitar problemas: tentaremos usar um valor val na thread gerada depois de enviá-lo pelo canal. Tente compilar o código na Listagem 16-9 para ver por que este código não é permitido.

Nome do arquivo: src/main.rs

use std::sync::mpsc;
use std::thread;

fn main() {
    let (tx, rx) = mpsc::channel();

    thread::spawn(move || {
        let val = String::from("hi");
        tx.send(val).unwrap();
        println!("val is {val}");
    });

    let received = rx.recv().unwrap();
    println!("Got: {received}");
}

Listagem 16-9: Tentando usar val depois de enviá-lo pelo canal

Aqui, tentamos imprimir val depois de enviá-lo pelo canal via tx.send. Permitir isso seria uma má ideia: uma vez que o valor foi enviado para outra thread, essa thread pode modificá-lo ou descartá-lo antes de tentarmos usar o valor novamente. Potencialmente, as modificações da outra thread podem causar erros ou resultados inesperados devido a dados inconsistentes ou inexistentes. No entanto, o Rust nos dá um erro se tentarmos compilar o código na Listagem 16-9:

error[E0382]: borrow of moved value: `val`
  --> src/main.rs:10:31
   |
8  |         let val = String::from("hi");
   |             --- move occurs because `val` has type `String`, which does
not implement the `Copy` trait
9  |         tx.send(val).unwrap();
   |                 --- value moved here
10 |         println!("val is {val}");
   |                           ^^^ value borrowed here after move

Nosso erro de concorrência causou um erro em tempo de compilação. A função send assume a propriedade de seu parâmetro, e quando o valor é movido, o receptor assume a propriedade dele. Isso nos impede de usar o valor novamente acidentalmente após enviá-lo; o sistema de propriedade verifica se tudo está correto.

Enviando Vários Valores e Vendo o Receptor Esperando

O código na Listagem 16-8 compilou e executou, mas não nos mostrou claramente que duas threads separadas estavam conversando entre si pelo canal. Na Listagem 16-10, fizemos algumas modificações que provarão que o código na Listagem 16-8 está sendo executado concorrentemente: a thread gerada agora enviará várias mensagens e fará uma pausa de um segundo entre cada mensagem.

Nome do arquivo: src/main.rs

use std::sync::mpsc;
use std::thread;
use std::time::Duration;

fn main() {
    let (tx, rx) = mpsc::channel();

    thread::spawn(move || {
        let vals = vec![
            String::from("hi"),
            String::from("from"),
            String::from("the"),
            String::from("thread"),
        ];

        for val in vals {
            tx.send(val).unwrap();
            thread::sleep(Duration::from_secs(1));
        }
    });

    for received in rx {
        println!("Got: {received}");
    }
}

Listagem 16-10: Enviando várias mensagens e pausando entre cada uma

Desta vez, a thread gerada tem um vetor de strings que queremos enviar para a thread principal. Iteramos sobre elas, enviando cada uma individualmente, e pausamos entre cada uma chamando a função thread::sleep com um valor Duration de um segundo.

Na thread principal, não estamos mais chamando a função recv explicitamente: em vez disso, estamos tratando rx como um iterador. Para cada valor recebido, estamos imprimindo-o. Quando o canal é fechado, a iteração terminará.

Ao executar o código na Listagem 16-10, você deve ver a seguinte saída com uma pausa de um segundo entre cada linha:

Got: hi
Got: from
Got: the
Got: thread

Como não temos nenhum código que pausa ou atrasa no loop for na thread principal, podemos dizer que a thread principal está esperando para receber valores da thread gerada.

Criando Múltiplos Produtores Clonando o Transmissor

Anteriormente, mencionamos que mpsc era um acrônimo para múltiplos produtores, consumidor único (multiple producer, single consumer). Vamos colocar mpsc em uso e expandir o código na Listagem 16-10 para criar múltiplas threads que enviam valores para o mesmo receptor. Podemos fazer isso clonando o transmissor, como mostrado na Listagem 16-11.

Nome do arquivo: src/main.rs

--snip--

let (tx, rx) = mpsc::channel();

let tx1 = tx.clone();
thread::spawn(move || {
    let vals = vec![
        String::from("hi"),
        String::from("from"),
        String::from("the"),
        String::from("thread"),
    ];

    for val in vals {
        tx1.send(val).unwrap();
        thread::sleep(Duration::from_secs(1));
    }
});

thread::spawn(move || {
    let vals = vec![
        String::from("more"),
        String::from("messages"),
        String::from("for"),
        String::from("you"),
    ];

    for val in vals {
        tx.send(val).unwrap();
        thread::sleep(Duration::from_secs(1));
    }
});

for received in rx {
    println!("Got: {received}");
}

--snip--

Listagem 16-11: Enviando múltiplas mensagens de múltiplos produtores

Desta vez, antes de criarmos a primeira thread gerada, chamamos clone no transmissor. Isso nos dará um novo transmissor que podemos passar para a primeira thread gerada. Passamos o transmissor original para uma segunda thread gerada. Isso nos dá duas threads, cada uma enviando mensagens diferentes para o único receptor.

Quando você executa o código, sua saída deve ser semelhante a esta:

Got: hi
Got: more
Got: from
Got: messages
Got: for
Got: the
Got: thread
Got: you

Você pode ver os valores em outra ordem, dependendo do seu sistema. É isso que torna a concorrência interessante e difícil. Se você experimentar com thread::sleep, dando a ele vários valores nas diferentes threads, cada execução será mais não determinística e criará uma saída diferente a cada vez.

Agora que analisamos como os canais funcionam, vamos analisar um método diferente de concorrência.

Resumo

Parabéns! Você concluiu o laboratório Usando Passagem de Mensagens para Transferir Dados Entre Threads. Você pode praticar mais laboratórios no LabEx para aprimorar suas habilidades.