Transfert de données concurrentes avec les canaux Rust

RustRustBeginner
Pratiquer maintenant

This tutorial is from open-source community. Access the source code

💡 Ce tutoriel est traduit par l'IA à partir de la version anglaise. Pour voir la version originale, vous pouvez cliquer ici

Introduction

Bienvenue dans Utiliser la communication par messages pour transférer des données entre des threads. Ce laboratoire est une partie du Rust Book. Vous pouvez pratiquer vos compétences Rust dans LabEx.

Dans ce laboratoire, nous explorons la communication par messages comme une approche de concurrence sécurisée, en utilisant les canaux de la bibliothèque standard de Rust pour envoyer et recevoir des données entre des threads.


Skills Graph

%%%%{init: {'theme':'neutral'}}%%%% flowchart RL rust(("Rust")) -.-> rust/DataTypesGroup(["Data Types"]) rust(("Rust")) -.-> rust/ControlStructuresGroup(["Control Structures"]) rust(("Rust")) -.-> rust/FunctionsandClosuresGroup(["Functions and Closures"]) rust(("Rust")) -.-> rust/DataStructuresandEnumsGroup(["Data Structures and Enums"]) rust(("Rust")) -.-> rust/PerformanceandConcurrencyGroup(["Performance and Concurrency"]) rust(("Rust")) -.-> rust/BasicConceptsGroup(["Basic Concepts"]) rust/BasicConceptsGroup -.-> rust/variable_declarations("Variable Declarations") rust/DataTypesGroup -.-> rust/string_type("String Type") rust/ControlStructuresGroup -.-> rust/for_loop("for Loop") rust/FunctionsandClosuresGroup -.-> rust/function_syntax("Function Syntax") rust/FunctionsandClosuresGroup -.-> rust/expressions_statements("Expressions and Statements") rust/DataStructuresandEnumsGroup -.-> rust/method_syntax("Method Syntax") rust/PerformanceandConcurrencyGroup -.-> rust/message_passing("Message Passing") subgraph Lab Skills rust/variable_declarations -.-> lab-100438{{"Transfert de données concurrentes avec les canaux Rust"}} rust/string_type -.-> lab-100438{{"Transfert de données concurrentes avec les canaux Rust"}} rust/for_loop -.-> lab-100438{{"Transfert de données concurrentes avec les canaux Rust"}} rust/function_syntax -.-> lab-100438{{"Transfert de données concurrentes avec les canaux Rust"}} rust/expressions_statements -.-> lab-100438{{"Transfert de données concurrentes avec les canaux Rust"}} rust/method_syntax -.-> lab-100438{{"Transfert de données concurrentes avec les canaux Rust"}} rust/message_passing -.-> lab-100438{{"Transfert de données concurrentes avec les canaux Rust"}} end

Utiliser la communication par messages pour transférer des données entre des threads

Une approche de concurrence sécurisée de plus en plus populaire est la communication par messages, où les threads ou les acteurs communiquent en s'envoyant des messages contenant des données. Voici l'idée sous forme d'un slogan de la documentation du langage Go à https://golang.org/doc/effective_go.html#concurrency: "Ne communiquez pas en partageant la mémoire; au contraire, partagez la mémoire en communiquant."

Pour réaliser une concurrence de communication de messages, la bibliothèque standard de Rust fournit une implémentation de canaux. Un canal est un concept de programmation général par lequel des données sont envoyées d'un thread à un autre.

Vous pouvez imaginer un canal en programmation comme un canal directionnel d'eau, comme un ruisseau ou une rivière. Si vous mettez quelque chose comme un canard en caoutchouc dans une rivière, il ira en aval jusqu'à la fin de l'eauway.

Un canal a deux parties: un émetteur et un récepteur. La partie émettrice est l'emplacement en amont où vous mettez le canard en caoutchouc dans la rivière, et la partie réceptrice est où le canard en caoutchouc finit en aval. Une partie de votre code appelle des méthodes sur l'émetteur avec les données que vous voulez envoyer, et une autre partie vérifie l'extrémité de réception pour les messages arrivant. Un canal est dit fermé si l'une ou l'autre des parties émettrice ou réceptrice est supprimée.

Ici, nous allons travailler jusqu'à un programme qui a un thread pour générer des valeurs et les envoyer à travers un canal, et un autre thread qui recevra les valeurs et les imprimera. Nous enverrons des valeurs simples entre les threads en utilisant un canal pour illustrer la fonctionnalité. Une fois que vous serez familier avec la technique, vous pourriez utiliser des canaux pour tous les threads qui ont besoin de communiquer entre eux, comme un système de chat ou un système où de nombreux threads exécutent des parties d'un calcul et envoient les parties à un thread qui agrège les résultats.

Tout d'abord, dans la Liste 16-6, nous allons créer un canal mais ne rien faire avec. Notez que cela ne compilera pas encore car Rust ne peut pas savoir quel type de valeurs nous voulons envoyer sur le canal.

Nom de fichier: src/main.rs

use std::sync::mpsc;

fn main() {
    let (tx, rx) = mpsc::channel();
}

Liste 16-6: Création d'un canal et attribution des deux parties à tx et rx

Nous créons un nouveau canal en utilisant la fonction mpsc::channel; mpsc signifie multiple producteurs, un consommateur. En bref, la manière dont la bibliothèque standard de Rust implémente les canaux signifie qu'un canal peut avoir plusieurs extrémités d'envoi qui produisent des valeurs mais seulement une extrémité de réception qui consomme ces valeurs. Imaginez plusieurs ruisseaux qui convergent dans une grande rivière: tout ce qui est envoyé dans l'un des ruisseaux finira dans une seule rivière à la fin. Nous commencerons avec un seul producteur pour l'instant, mais nous ajouterons plusieurs producteurs lorsque cet exemple fonctionnera.

La fonction mpsc::channel renvoie un tuple, dont le premier élément est l'extrémité d'envoi - l'émetteur - et le second élément est l'extrémité de réception - le récepteur. Les abréviations tx et rx sont traditionnellement utilisées dans de nombreux domaines pour émetteur et récepteur respectivement, donc nous nommons nos variables ainsi pour indiquer chaque extrémité. Nous utilisons une instruction let avec un motif qui décompose les tuples; nous discuterons de l'utilisation des motifs dans les instructions let et de la décomposition au Chapitre 18. Pour l'instant, sachez que l'utilisation d'une instruction let de cette manière est une approche pratique pour extraire les parties du tuple renvoyé par mpsc::channel.

Déplaçons l'extrémité d'envoi dans un thread lancé et lui faisons envoyer une chaîne de caractères de sorte que le thread lancé communique avec le thread principal, comme indiqué dans la Liste 16-7. C'est comme mettre un canard en caoutchouc dans la rivière en amont ou envoyer un message de chat d'un thread à un autre.

Nom de fichier: src/main.rs

use std::sync::mpsc;
use std::thread;

fn main() {
    let (tx, rx) = mpsc::channel();

    thread::spawn(move || {
        let val = String::from("hi");
        tx.send(val).unwrap();
    });
}

Liste 16-7: Déplacement de tx dans un thread lancé et envoi de "hi"

Encore une fois, nous utilisons thread::spawn pour créer un nouveau thread puis move pour déplacer tx dans la fermeture afin que le thread lancé possède tx. Le thread lancé doit posséder l'émetteur pour être capable d'envoyer des messages à travers le canal.

L'émetteur a une méthode send qui prend la valeur que nous voulons envoyer. La méthode send renvoie un type Result<T, E>, donc si le récepteur a déjà été supprimé et qu'il n'y a nulle part où envoyer une valeur, l'opération d'envoi renverra une erreur. Dans cet exemple, nous appelons unwrap pour générer une panique en cas d'erreur. Mais dans une application réelle, nous la gérerions correctement: revenez au Chapitre 9 pour réviser les stratégies de gestion appropriée des erreurs.

Dans la Liste 16-8, nous allons obtenir la valeur du récepteur dans le thread principal. C'est comme récupérer le canard en caoutchouc dans l'eau à la fin de la rivière ou recevoir un message de chat.

Nom de fichier: src/main.rs

use std::sync::mpsc;
use std::thread;

fn main() {
    let (tx, rx) = mpsc::channel();

    thread::spawn(move || {
        let val = String::from("hi");
        tx.send(val).unwrap();
    });

    let received = rx.recv().unwrap();
    println!("Got: {received}");
}

Liste 16-8: Réception de la valeur "hi" dans le thread principal et l'impression

Le récepteur a deux méthodes utiles: recv et try_recv. Nous utilisons recv, abréviation de recevoir, qui bloquera l'exécution du thread principal et attendra jusqu'à ce qu'une valeur soit envoyée à travers le canal. Une fois qu'une valeur est envoyée, recv la renverra dans un Result<T, E>. Lorsque l'émetteur se ferme, recv renverra une erreur pour signaler qu'aucune valeur ne viendra plus.

La méthode try_recv ne bloque pas, mais renverra immédiatement un Result<T, E>: une valeur Ok contenant un message s'il y en a un disponible et une valeur Err s'il n'y a pas de messages cette fois-ci. L'utilisation de try_recv est utile si ce thread a d'autres travaux à faire en attendant les messages: nous pourrions écrire une boucle qui appelle try_recv de temps en temps, traite un message s'il y en a un disponible et sinon effectue d'autres travaux pendant un certain temps jusqu'à vérifier à nouveau.

Nous avons utilisé recv dans cet exemple pour la simplicité; nous n'avons pas d'autres travaux pour le thread principal à faire autre que d'attendre les messages, donc bloquer le thread principal est approprié.

Lorsque nous exécutons le code de la Liste 16-8, nous verrons la valeur imprimée à partir du thread principal:

Got: hi

Parfait!

Canaux et transfert de propriété

Les règles de propriété jouent un rôle crucial dans l'envoi de messages car elles vous aident à écrire du code concurrent sécurisé. Prévenir les erreurs en programmation concurrente est l'avantage de penser à la propriété tout au long de vos programmes Rust. Faisons une expérience pour montrer comment les canaux et la propriété fonctionnent ensemble pour prévenir les problèmes : nous allons essayer d'utiliser une valeur val dans le thread lancé après l'avoir envoyée à travers le canal. Essayez de compiler le code de la Liste 16-9 pour voir pourquoi ce code n'est pas autorisé.

Nom de fichier: src/main.rs

use std::sync::mpsc;
use std::thread;

fn main() {
    let (tx, rx) = mpsc::channel();

    thread::spawn(move || {
        let val = String::from("hi");
        tx.send(val).unwrap();
        println!("val is {val}");
    });

    let received = rx.recv().unwrap();
    println!("Got: {received}");
}

Liste 16-9: Tentative d'utilisation de val après l'avoir envoyée à travers le canal

Ici, nous essayons d'imprimer val après l'avoir envoyée à travers le canal via tx.send. Autoriser cela serait une mauvaise idée : une fois que la valeur a été envoyée à un autre thread, ce thread pourrait la modifier ou la supprimer avant que nous n'essayions à nouveau d'utiliser la valeur. Potentiellement, les modifications de l'autre thread pourraient entraîner des erreurs ou des résultats inattendus en raison de données inconsistantes ou inexistantes. Cependant, Rust nous donne une erreur si nous essayons de compiler le code de la Liste 16-9 :

error[E0382]: emprunt d'une valeur déplacée: `val`
  --> src/main.rs:10:31
   |
8  |         let val = String::from("hi");
   |             --- déplacement car `val` est de type `String`, qui ne
définit pas le trait `Copy`
9  |         tx.send(val).unwrap();
   |                 --- valeur déplacée ici
10 |         println!("val is {val}");
   |                           ^^^ valeur empruntée ici après déplacement

Notre erreur de concurrence a entraîné une erreur de compilation. La fonction send prend la propriété de son paramètre, et lorsque la valeur est déplacée, le récepteur en prend la propriété. Cela empêche de l'utiliser accidentellement une fois de plus après l'avoir envoyée ; le système de propriété vérifie que tout est correct.

Envoi de plusieurs valeurs et observation du récepteur en attente

Le code de la Liste 16-8 a compilé et s'est exécuté, mais il n'a pas clairement montré que deux threads distincts communiquaient l'un avec l'autre via le canal. Dans la Liste 16-10, nous avons apporté quelques modifications qui prouveront que le code de la Liste 16-8 s'exécute en parallèle : le thread lancé enverra désormais plusieurs messages et pausera une seconde entre chaque message.

Nom de fichier: src/main.rs

use std::sync::mpsc;
use std::thread;
use std::time::Duration;

fn main() {
    let (tx, rx) = mpsc::channel();

    thread::spawn(move || {
        let vals = vec![
            String::from("hi"),
            String::from("from"),
            String::from("the"),
            String::from("thread"),
        ];

        for val in vals {
            tx.send(val).unwrap();
            thread::sleep(Duration::from_secs(1));
        }
    });

    for received in rx {
        println!("Got: {received}");
    }
}

Liste 16-10: Envoi de plusieurs messages et pause entre chacun

Cette fois, le thread lancé a un vecteur de chaînes de caractères que nous voulons envoyer au thread principal. Nous itérons sur eux, envoyant chacun individuellement, et nous mettons en pause entre chaque en appelant la fonction thread::sleep avec une valeur Duration d'une seconde.

Dans le thread principal, nous n'appelons plus explicitement la fonction recv : au lieu de cela, nous traitons rx comme un itérateur. Pour chaque valeur reçue, nous l'imprimons. Lorsque le canal est fermé, l'itération se terminera.

Lors de l'exécution du code de la Liste 16-10, vous devriez voir la sortie suivante avec une pause d'une seconde entre chaque ligne :

Got: hi
Got: from
Got: the
Got: thread

Comme nous n'avons pas de code qui met en pause ou retarde dans la boucle for du thread principal, nous pouvons dire que le thread principal est en attente de recevoir des valeurs du thread lancé.

Création de plusieurs producteurs en clonant l'émetteur

Plus tôt, nous avons mentionné que mpsc était un acronyme pour multiple producteurs, un consommateur. Utilisons mpsc et étendons le code de la Liste 16-10 pour créer plusieurs threads qui envoient tous des valeurs au même récepteur. Nous pouvons le faire en clonant l'émetteur, comme indiqué dans la Liste 16-11.

Nom de fichier: src/main.rs

--snip--

let (tx, rx) = mpsc::channel();

let tx1 = tx.clone();
thread::spawn(move || {
    let vals = vec![
        String::from("hi"),
        String::from("from"),
        String::from("the"),
        String::from("thread"),
    ];

    for val in vals {
        tx1.send(val).unwrap();
        thread::sleep(Duration::from_secs(1));
    }
});

thread::spawn(move || {
    let vals = vec![
        String::from("more"),
        String::from("messages"),
        String::from("for"),
        String::from("you"),
    ];

    for val in vals {
        tx.send(val).unwrap();
        thread::sleep(Duration::from_secs(1));
    }
});

for received in rx {
    println!("Got: {received}");
}

--snip--

Liste 16-11: Envoi de plusieurs messages à partir de plusieurs producteurs

Cette fois, avant de créer le premier thread lancé, nous appelons clone sur l'émetteur. Cela nous donnera un nouvel émetteur que nous pouvons passer au premier thread lancé. Nous passons l'émetteur original à un second thread lancé. Cela nous donne deux threads, chacun envoyant des messages différents au même récepteur.

Lorsque vous exécutez le code, votre sortie devrait ressembler à ceci :

Got: hi
Got: more
Got: from
Got: messages
Got: for
Got: the
Got: thread
Got: you

Vous pouvez voir les valeurs dans un autre ordre, selon votre système. C'est ce qui rend la concurrence intéressante et difficile. Si vous experimentez avec thread::sleep, en lui donnant diverses valeurs dans les différents threads, chaque exécution sera plus non déterministe et créera une sortie différente chaque fois.

Maintenant que nous avons vu comment fonctionnent les canaux, regardons une autre méthode de concurrence.

Sommaire

Félicitations ! Vous avez terminé le laboratoire Utiliser la communication par messages pour transférer des données entre des threads. Vous pouvez pratiquer d'autres laboratoires dans LabEx pour améliorer vos compétences.