Gleichzeitige Datenübertragung mit Rust-Kanälen

RustRustBeginner
Jetzt üben

This tutorial is from open-source community. Access the source code

💡 Dieser Artikel wurde von AI-Assistenten übersetzt. Um die englische Version anzuzeigen, können Sie hier klicken

Einführung

Willkommen zu Using Message Passing to Transfer Data Between Threads. Dieser Lab ist Teil des Rust Buchs. Du kannst deine Rust-Fähigkeiten in LabEx üben.

In diesem Lab untersuchen wir die Übertragung von Nachrichten als sicheres Konkurrenzanwendungsverfahren und verwenden Kanäle aus der Standardbibliothek von Rust, um Daten zwischen Threads zu senden und zu empfangen.


Skills Graph

%%%%{init: {'theme':'neutral'}}%%%% flowchart RL rust(("Rust")) -.-> rust/PerformanceandConcurrencyGroup(["Performance and Concurrency"]) rust(("Rust")) -.-> rust/BasicConceptsGroup(["Basic Concepts"]) rust(("Rust")) -.-> rust/DataTypesGroup(["Data Types"]) rust(("Rust")) -.-> rust/ControlStructuresGroup(["Control Structures"]) rust(("Rust")) -.-> rust/FunctionsandClosuresGroup(["Functions and Closures"]) rust(("Rust")) -.-> rust/DataStructuresandEnumsGroup(["Data Structures and Enums"]) rust/BasicConceptsGroup -.-> rust/variable_declarations("Variable Declarations") rust/DataTypesGroup -.-> rust/string_type("String Type") rust/ControlStructuresGroup -.-> rust/for_loop("for Loop") rust/FunctionsandClosuresGroup -.-> rust/function_syntax("Function Syntax") rust/FunctionsandClosuresGroup -.-> rust/expressions_statements("Expressions and Statements") rust/DataStructuresandEnumsGroup -.-> rust/method_syntax("Method Syntax") rust/PerformanceandConcurrencyGroup -.-> rust/message_passing("Message Passing") subgraph Lab Skills rust/variable_declarations -.-> lab-100438{{"Gleichzeitige Datenübertragung mit Rust-Kanälen"}} rust/string_type -.-> lab-100438{{"Gleichzeitige Datenübertragung mit Rust-Kanälen"}} rust/for_loop -.-> lab-100438{{"Gleichzeitige Datenübertragung mit Rust-Kanälen"}} rust/function_syntax -.-> lab-100438{{"Gleichzeitige Datenübertragung mit Rust-Kanälen"}} rust/expressions_statements -.-> lab-100438{{"Gleichzeitige Datenübertragung mit Rust-Kanälen"}} rust/method_syntax -.-> lab-100438{{"Gleichzeitige Datenübertragung mit Rust-Kanälen"}} rust/message_passing -.-> lab-100438{{"Gleichzeitige Datenübertragung mit Rust-Kanälen"}} end

Verwenden von Nachrichtenübertragung zum Übertragen von Daten zwischen Threads

Ein zunehmend beliebter Ansatz zur Sicherstellung einer sicheren Konkurrenz ist die Nachrichtenübertragung, bei der Threads oder Akteure miteinander kommunizieren, indem sie sich Nachrichten mit Daten senden. Hier ist die Idee in einem Slogan aus der Go-Sprachen-Dokumentation unter https://golang.org/doc/effective_go.html#concurrency: "Kommunizieren Sie nicht, indem Sie den Speicher teilen; teilen Sie stattdessen den Speicher, indem Sie kommunizieren."

Um eine Nachrichtenübertragungskonkurrenz zu erreichen, bietet die Standardbibliothek von Rust eine Implementierung von Kanälen. Ein Kanal ist ein allgemeines Programmierkonzept, über den Daten von einem Thread an einen anderen gesendet werden.

Sie können sich einen Kanal in der Programmierung wie einen gerichteten Wasserkanal vorstellen, wie einen Bach oder einen Fluss. Wenn Sie etwas wie eine Gummitüte in einen Fluss legen, wird sie den Fluss hinunter bis zum Ende des Wasserkreislaufs fließen.

Ein Kanal hat zwei Hälften: einen Sender und einen Empfänger. Die Senderhälfte ist der obere Flussabschnitt, an dem Sie die Gummitüte in den Fluss legen, und die Empfängerhälfte ist der Punkt, an dem die Gummitüte am Ende des Flusses landet. Ein Teil Ihres Codes ruft Methoden am Sender mit den Daten auf, die Sie senden möchten, und ein anderer Teil prüft die Empfangsseite auf ankommende Nachrichten. Ein Kanal wird als geschlossen bezeichnet, wenn entweder die Sender- oder die Empfängerhälfte gelöscht wird.

Hier werden wir ein Programm entwickeln, das einen Thread verwendet, um Werte zu generieren und über einen Kanal zu senden, und einen anderen Thread, der die Werte empfängt und ausgibt. Wir werden einfache Werte zwischen Threads über einen Kanal senden, um das Feature zu veranschaulichen. Sobald Sie mit der Technik vertraut sind, können Sie Kanäle für alle Threads verwenden, die miteinander kommunizieren müssen, wie z. B. ein Chat-System oder ein System, in dem viele Threads Teile einer Berechnung durchführen und die Teile an einen Thread senden, der die Ergebnisse aggregiert.

Zunächst erstellen wir in Listing 16-6 einen Kanal, aber wir machen nichts damit. Beachten Sie, dass dies noch nicht kompilieren wird, da Rust nicht weiß, welchen Wertetyp wir über den Kanal senden möchten.

Dateiname: src/main.rs

use std::sync::mpsc;

fn main() {
    let (tx, rx) = mpsc::channel();
}

Listing 16-6: Erstellen eines Kanals und Zuweisen der beiden Hälften zu tx und rx

Wir erstellen einen neuen Kanal mit der mpsc::channel-Funktion; mpsc steht für multiple producer, single consumer. Kurz gesagt bedeutet die Art, wie die Standardbibliothek von Rust Kanäle implementiert, dass ein Kanal mehrere _Sende_enden haben kann, die Werte erzeugen, aber nur ein _Empfangs_ende, das diese Werte konsumiert. Stellen Sie sich mehrere Ströme vor, die zusammen in einen großen Fluss münden: Alles, was in einen der Ströme gesendet wird, wird am Ende in einem Fluss landen. Wir beginnen zunächst mit einem einzelnen Produzenten, aber wir werden mehrere Produzenten hinzufügen, wenn dieses Beispiel funktioniert.

Die mpsc::channel-Funktion gibt ein Tupel zurück, dessen erstes Element der Sendeende - der Sender - und das zweite Element der Empfangsende - der Empfänger - ist. Die Abkürzungen tx und rx werden in vielen Bereichen traditionell für Sender und Empfänger verwendet, daher benennen wir unsere Variablen so, um jedes Ende anzuzeigen. Wir verwenden eine let-Anweisung mit einem Muster, das das Tupel aufspaltet; wir werden die Verwendung von Mustern in let-Anweisungen und das Aufspalten in Kapitel 18 besprechen. Für jetzt wissen Sie, dass das Verwenden einer let-Anweisung auf diese Weise ein bequemer Ansatz ist, um die Teile des von mpsc::channel zurückgegebenen Tupels zu extrahieren.

Lassen Sie uns die Sendeende in einen neu erzeugten Thread verschieben und einen String senden, sodass der neu erzeugte Thread mit dem Hauptthread kommuniziert, wie in Listing 16-7 gezeigt. Dies ist wie das Legen einer Gummitüte in den Fluss upstream oder das Senden einer Chatnachricht von einem Thread an einen anderen.

Dateiname: src/main.rs

use std::sync::mpsc;
use std::thread;

fn main() {
    let (tx, rx) = mpsc::channel();

    thread::spawn(move || {
        let val = String::from("hi");
        tx.send(val).unwrap();
    });
}

Listing 16-7: Verschieben von tx in einen neu erzeugten Thread und Senden von "hi"

Wir verwenden erneut thread::spawn, um einen neuen Thread zu erstellen, und dann move, um tx in die Closure zu verschieben, sodass der neu erzeugte Thread tx besitzt. Der neu erzeugte Thread muss den Sender besitzen, um Nachrichten über den Kanal senden zu können.

Der Sender hat eine send-Methode, die den Wert annimmt, den wir senden möchten. Die send-Methode gibt einen Result<T, E>-Typ zurück, sodass die Sendoperation einen Fehler zurückgeben wird, wenn der Empfänger bereits gelöscht wurde und es keinen Ort gibt, an den ein Wert gesendet werden kann. In diesem Beispiel rufen wir unwrap auf, um im Falle eines Fehlers einen Fehler auszulösen. In einer echten Anwendung würden wir dies jedoch richtig behandeln: kehren Sie zu Kapitel 9 zurück, um Strategien zur richtigen Fehlerbehandlung zu überprüfen.

In Listing 16-8 werden wir den Wert aus dem Empfänger im Hauptthread abrufen. Dies ist wie das Abholen der Gummitüte aus dem Wasser am Ende des Flusses oder das Empfangen einer Chatnachricht.

Dateiname: src/main.rs

use std::sync::mpsc;
use std::thread;

fn main() {
    let (tx, rx) = mpsc::channel();

    thread::spawn(move || {
        let val = String::from("hi");
        tx.send(val).unwrap();
    });

    let received = rx.recv().unwrap();
    println!("Got: {received}");
}

Listing 16-8: Empfangen des Werts "hi" im Hauptthread und Ausgeben

Der Empfänger hat zwei nützliche Methoden: recv und try_recv. Wir verwenden recv, das Abkürzung für receive ist, das die Ausführung des Hauptthreads blockieren und bis zu einem Wert warten wird, der über den Kanal gesendet wird. Wenn ein Wert gesendet wird, wird recv ihn in einem Result<T, E> zurückgeben. Wenn der Sender geschlossen wird, wird recv einen Fehler zurückgeben, um anzuzeigen, dass keine weiteren Werte mehr kommen werden.

Die try_recv-Methode blockiert nicht, sondern gibt stattdessen sofort ein Result<T, E> zurück: einen Ok-Wert, der eine Nachricht enthält, wenn eine verfügbar ist, und einen Err-Wert, wenn keine Nachrichten vorhanden sind. Das Verwenden von try_recv ist nützlich, wenn dieser Thread andere Arbeit zu erledigen hat, während er auf Nachrichten wartet: wir könnten eine Schleife schreiben, die try_recv regelmäßig aufruft, eine Nachricht behandelt, wenn eine verfügbar ist, und andernfalls für eine Weile andere Arbeit erledigt, bis wir erneut prüfen.

Wir haben recv in diesem Beispiel aus Einfachheit verwendet; wir haben keine andere Arbeit für den Hauptthread, als auf Nachrichten zu warten, daher ist es angemessen, den Hauptthread zu blockieren.

Wenn wir den Code in Listing 16-8 ausführen, werden wir den Wert aus dem Hauptthread sehen:

Got: hi

Perfekt!

Kanäle und Eigentumsübertragung

Die Eigentumsregeln spielen eine entscheidende Rolle bei der Nachrichtenübertragung, da sie Ihnen helfen, sicheres, konkurrierendes Code zu schreiben. Das Vermeiden von Fehlern bei der konkurrierenden Programmierung ist der Vorteil, wenn Sie sich bei der gesamten Rust-Programmierung um das Eigentum kümmern. Lassen Sie uns ein Experiment durchführen, um zu zeigen, wie Kanäle und Eigentum zusammenarbeiten, um Probleme zu vermeiden: wir werden versuchen, einen val-Wert im erzeugten Thread nachdem wir ihn über den Kanal gesendet haben, zu verwenden. Versuchen Sie, den Code in Listing 16-9 zu kompilieren, um zu sehen, warum dieser Code nicht zugelassen wird.

Dateiname: src/main.rs

use std::sync::mpsc;
use std::thread;

fn main() {
    let (tx, rx) = mpsc::channel();

    thread::spawn(move || {
        let val = String::from("hi");
        tx.send(val).unwrap();
        println!("val is {val}");
    });

    let received = rx.recv().unwrap();
    println!("Got: {received}");
}

Listing 16-9: Versuch, val nach dem Senden über den Kanal zu verwenden

Hier versuchen wir, val auszugeben, nachdem wir es über tx.send über den Kanal gesendet haben. Dies zuzulassen wäre ein schlechter Gedanke: sobald der Wert an einen anderen Thread gesendet wurde, könnte dieser Thread ihn modifizieren oder löschen, bevor wir versuchen, den Wert erneut zu verwenden. Möglicherweise könnten die Änderungen des anderen Threads aufgrund inkonsistenter oder nicht vorhandener Daten Fehler oder unerwartete Ergebnisse verursachen. Rust gibt uns jedoch einen Fehler, wenn wir versuchen, den Code in Listing 16-9 zu kompilieren:

error[E0382]: borrow of moved value: `val`
  --> src/main.rs:10:31
   |
8  |         let val = String::from("hi");
   |             --- move occurs because `val` has type `String`, which does
not implement the `Copy` trait
9  |         tx.send(val).unwrap();
   |                 --- value moved here
10 |         println!("val is {val}");
   |                           ^^^ value borrowed here after move

Unser konkurrierender Fehler hat einen Kompilierungsfehler verursacht. Die send-Funktion übernimmt die Eigentumsgewalt über ihren Parameter, und wenn der Wert verschoben wird, übernimmt der Empfänger die Eigentumsgewalt darüber. Dies verhindert, dass wir versehentlich den Wert erneut verwenden, nachdem wir ihn gesendet haben; das Eigentumssystem überprüft, dass alles in Ordnung ist.

Senden mehrerer Werte und Beobachten des wartenden Empfängers

Der Code in Listing 16-8 hat sich kompiliert und ausgeführt, aber er hat uns nicht eindeutig gezeigt, dass zwei separate Threads über den Kanal miteinander kommunizieren. In Listing 16-10 haben wir einige Änderungen vorgenommen, die beweisen, dass der Code in Listing 16-8 parallel läuft: Der erzeugte Thread wird jetzt mehrere Nachrichten senden und zwischen jeder Nachricht eine Sekunde pausieren.

Dateiname: src/main.rs

use std::sync::mpsc;
use std::thread;
use std::time::Duration;

fn main() {
    let (tx, rx) = mpsc::channel();

    thread::spawn(move || {
        let vals = vec![
            String::from("hi"),
            String::from("from"),
            String::from("the"),
            String::from("thread"),
        ];

        for val in vals {
            tx.send(val).unwrap();
            thread::sleep(Duration::from_secs(1));
        }
    });

    for received in rx {
        println!("Got: {received}");
    }
}

Listing 16-10: Senden mehrerer Nachrichten und Pausieren zwischen jeder

Diesmal hat der erzeugte Thread einen Vektor von Strings, die wir an den Hauptthread senden möchten. Wir iterieren über sie, senden jedes einzeln und pausieren zwischen jedem, indem wir die thread::sleep-Funktion mit einem Duration-Wert von einer Sekunde aufrufen.

Im Hauptthread rufen wir die recv-Funktion nicht mehr explizit auf: Stattdessen behandeln wir rx als Iterator. Für jeden empfangenen Wert drucken wir ihn aus. Wenn der Kanal geschlossen wird, endet die Iteration.

Wenn Sie den Code in Listing 16-10 ausführen, sollten Sie die folgende Ausgabe sehen, wobei zwischen jeder Zeile eine Sekunde Pause ist:

Got: hi
Got: from
Got: the
Got: thread

Da wir keinen Code haben, der in der for-Schleife im Hauptthread pausiert oder verzögert, können wir erkennen, dass der Hauptthread auf die Empfang von Werten vom erzeugten Thread wartet.

Erstellen mehrerer Produzenten durch Klonen des Senders

Früher haben wir erwähnt, dass mpsc die Abkürzung für multiple producer, single consumer ist. Lassen Sie uns mpsc in Aktion setzen und den Code in Listing 16-10 erweitern, um mehrere Threads zu erstellen, die alle Werte an den gleichen Empfänger senden. Wir können dies tun, indem wir den Sender klonen, wie in Listing 16-11 gezeigt.

Dateiname: src/main.rs

--snip--

let (tx, rx) = mpsc::channel();

let tx1 = tx.clone();
thread::spawn(move || {
    let vals = vec![
        String::from("hi"),
        String::from("from"),
        String::from("the"),
        String::from("thread"),
    ];

    for val in vals {
        tx1.send(val).unwrap();
        thread::sleep(Duration::from_secs(1));
    }
});

thread::spawn(move || {
    let vals = vec![
        String::from("more"),
        String::from("messages"),
        String::from("for"),
        String::from("you"),
    ];

    for val in vals {
        tx.send(val).unwrap();
        thread::sleep(Duration::from_secs(1));
    }
});

for received in rx {
    println!("Got: {received}");
}

--snip--

Listing 16-11: Senden mehrerer Nachrichten von mehreren Produzenten

Diesmal rufen wir vor dem Erstellen des ersten erzeugten Threads clone auf dem Sender auf. Dies gibt uns einen neuen Sender, den wir an den ersten erzeugten Thread übergeben können. Wir übergeben den ursprünglichen Sender an einen zweiten erzeugten Thread. Dadurch erhalten wir zwei Threads, die jeweils unterschiedliche Nachrichten an den einen Empfänger senden.

Wenn Sie den Code ausführen, sollte Ihre Ausgabe ungefähr so aussehen:

Got: hi
Got: more
Got: from
Got: messages
Got: for
Got: the
Got: thread
Got: you

Je nach Ihrem System können Sie die Werte in einer anderen Reihenfolge sehen. Dies macht die Konkurrenz sowohl interessant als auch schwierig. Wenn Sie mit thread::sleep experimentieren und ihm verschiedene Werte in den verschiedenen Threads geben, wird jede Ausführung noch unsicherer und erzeugt jedes Mal eine andere Ausgabe.

Jetzt, nachdem wir gesehen haben, wie Kanäle funktionieren, betrachten wir eine andere Methode der Konkurrenz.

Zusammenfassung

Herzlichen Glückwunsch! Sie haben das Labor "Using Message Passing to Transfer Data Between Threads" abgeschlossen. Sie können in LabEx weitere Labs ausprobieren, um Ihre Fähigkeiten zu verbessern.