Graceful Shutdown and Cleanup

RustRustBeginner
Jetzt üben

This tutorial is from open-source community. Access the source code

💡 Dieser Artikel wurde von AI-Assistenten übersetzt. Um die englische Version anzuzeigen, können Sie hier klicken

Einführung

Willkommen zu Graceful Shutdown and Cleanup. Dieser Lab ist Teil des Rust Buchs. Du kannst deine Rust-Fähigkeiten in LabEx üben.

In diesem Lab werden wir einen Mechanismus für ein gracefull Shutdown und die Bereinigung in unserem Code implementieren, indem wir das Drop-Trait nutzen und eine Möglichkeit schaffen, dass Threads aufhören, neue Anfragen zu akzeptieren und sich herunterfahren.

Graceful Shutdown and Cleanup

Der Code in Listing 20-20 reagiert wie geplant asynchron auf Anfragen, indem er einen Threadpool verwendet. Wir erhalten einige Warnungen über die Felder workers, id und thread, die wir nicht direkt verwenden, was uns daran erinnert, dass wir nichts bereinigen. Wenn wir die weniger elegante Methode mit Strg+C verwenden, um den Hauptthread zu stoppen, werden auch alle anderen Threads sofort gestoppt, auch wenn sie mitten in der Bearbeitung einer Anfrage sind.

Als nächstes implementieren wir das Drop-Trait, um join auf jeden Thread im Pool aufzurufen, damit sie die Anfragen, die sie bearbeiten, abschließen können, bevor sie sich schließen. Dann implementieren wir eine Möglichkeit, den Threads mitzuteilen, dass sie keine neuen Anfragen mehr akzeptieren und sich herunterfahren sollten. Um diesen Code in Aktion zu sehen, modifizieren wir unseren Server, sodass er nur zwei Anfragen akzeptiert, bevor er seinen Threadpool gracefully herunterfährt.

Implementieren des Drop-Traits für ThreadPool

Lassen Sie uns beginnen, indem wir Drop für unseren Threadpool implementieren. Wenn der Pool gelöscht wird, sollten alle unsere Threads join aufrufen, um sicherzustellen, dass sie ihre Arbeit beenden. Listing 20-22 zeigt einen ersten Versuch einer Drop-Implementierung; dieser Code funktioniert noch nicht ganz.

Dateiname: src/lib.rs

impl Drop for ThreadPool {
    fn drop(&mut self) {
      1 for worker in &mut self.workers {
          2 println!("Shutting down worker {}", worker.id);

          3 worker.thread.join().unwrap();
        }
    }
}

Listing 20-22: Joinen jedes Threads, wenn der Threadpool außerhalb des Gültigkeitsbereichs geht

Zunächst durchlaufen wir jeden Worker im Threadpool [1]. Wir verwenden &mut dafür, weil self eine mutable Referenz ist und wir auch worker mutieren müssen. Für jeden Worker drucken wir eine Nachricht, die besagt, dass diese bestimmte Worker-Instanz heruntergefahren wird [2], und rufen dann join auf dem Thread dieser Worker-Instanz auf [3]. Wenn der Aufruf von join fehlschlägt, verwenden wir unwrap, um Rust zu einem Panik zu bringen und in einen ungracefulen Shutdown zu gehen.

Hier ist der Fehler, den wir erhalten, wenn wir diesen Code kompilieren:

error[E0507]: cannot move out of `worker.thread` which is behind a mutable
reference
    --> src/lib.rs:52:13
     |
52   |             worker.thread.join().unwrap();
     |             ^^^^^^^^^^^^^ ------ `worker.thread` moved due to this
method call
     |             |
     |             move occurs because `worker.thread` has type
`JoinHandle<()>`, which does not implement the `Copy` trait
     |
note: this function takes ownership of the receiver `self`, which moves
`worker.thread`

Der Fehler sagt uns, dass wir join nicht aufrufen können, weil wir nur eine mutable Referenz auf jeden worker haben und join die Eigentumsgewalt über seinen Argument nimmt. Um dieses Problem zu lösen, müssen wir den Thread aus der Worker-Instanz, die thread besitzt, herausbewegen, damit join den Thread konsumieren kann. Wir haben das in Listing 17-15 gemacht: Wenn Worker ein Option<thread::JoinHandle<()>> hält, können wir die take-Methode auf dem Option aufrufen, um den Wert aus der Some-Variante herauszubewegen und an seiner Stelle eine None-Variante zu hinterlassen. Mit anderen Worten, ein laufender Worker wird in thread eine Some-Variante haben, und wenn wir einen Worker bereinigen möchten, ersetzen wir Some mit None, sodass der Worker keinen Thread mehr zum Ausführen hat.

Wir wissen also, dass wir die Definition von Worker so aktualisieren möchten:

Dateiname: src/lib.rs

struct Worker {
    id: usize,
    thread: Option<thread::JoinHandle<()>>,
}

Lassen Sie uns jetzt auf den Compiler vertrauen, um die anderen Stellen zu finden, die geändert werden müssen. Wenn wir diesen Code überprüfen, erhalten wir zwei Fehler:

error[E0599]: no method named `join` found for enum `Option` in the current
scope
  --> src/lib.rs:52:27
   |
52 |             worker.thread.join().unwrap();
   |                           ^^^^ method not found in
`Option<JoinHandle<()>>`

error[E0308]: mismatched types
  --> src/lib.rs:72:22
   |
72 |         Worker { id, thread }
   |                      ^^^^^^ expected enum `Option`, found struct
`JoinHandle`
   |
   = note: expected enum `Option<JoinHandle<()>>`
            found struct `JoinHandle<_>`
help: try wrapping the expression in `Some`
   |
72 |         Worker { id, thread: Some(thread) }
   |                      +++++++++++++      +

Lassen Sie uns uns zunächst den zweiten Fehler ansehen, der auf den Code am Ende von Worker::new zeigt; wir müssen den thread-Wert in Some umschließen, wenn wir eine neue Worker erstellen. Machen Sie die folgenden Änderungen, um diesen Fehler zu beheben:

Dateiname: src/lib.rs

impl Worker {
    fn new(
        id: usize,
        receiver: Arc<Mutex<mpsc::Receiver<Job>>>,
    ) -> Worker {
        --snip--

        Worker {
            id,
            thread: Some(thread),
        }
    }
}

Der erste Fehler befindet sich in unserer Drop-Implementierung. Wir haben zuvor erwähnt, dass wir die take-Methode auf dem Option-Wert aufrufen möchten, um thread aus worker herauszubewegen. Die folgenden Änderungen werden dies tun:

Dateiname: src/lib.rs

impl Drop for ThreadPool {
    fn drop(&mut self) {
        for worker in &mut self.workers {
            println!("Shutting down worker {}", worker.id);

          1 if let Some(thread) = worker.thread.take() {
              2 thread.join().unwrap();
            }
        }
    }
}

Wie in Kapitel 17 diskutiert, nimmt die take-Methode auf Option die Some-Variante heraus und lässt None an ihrer Stelle. Wir verwenden if let, um die Some zu zerlegen und den Thread zu erhalten [1]; dann rufen wir join auf dem Thread auf [2]. Wenn der Thread einer Worker-Instanz bereits None ist, wissen wir, dass der Thread von Worker bereits bereinigt wurde, sodass in diesem Fall nichts passiert.

Signalieren an die Threads, dass sie auf Jobs warten sollen

Mit all den Änderungen, die wir vorgenommen haben, compiliert unser Code ohne Warnungen. Die schlechte Nachricht ist jedoch, dass dieser Code noch nicht so funktioniert, wie wir es möchten. Der Schlüssel liegt in der Logik in den Closures, die von den Threads der Worker-Instanzen ausgeführt werden: Momentan rufen wir join auf, aber das wird die Threads nicht herunterfahren, weil sie in einer Endlosschleife loop laufen, um Jobs zu suchen. Wenn wir versuchen, unseren ThreadPool mit unserer aktuellen drop-Implementierung zu löschen, wird der Hauptthread für immer blockieren und auf den ersten Thread warten, bis er fertig ist.

Um dieses Problem zu beheben, müssen wir eine Änderung in der ThreadPool-drop-Implementierung vornehmen und dann eine Änderung in der Worker-Schleife.

Zunächst ändern wir die ThreadPool-drop-Implementierung, um sender explizit zu löschen, bevor wir auf die Threads warten, bis sie fertig sind. Listing 20-23 zeigt die Änderungen an ThreadPool, um sender explizit zu löschen. Wir verwenden die gleiche Option- und take-Technik wie bei dem Thread, um sender aus ThreadPool herausbewegen zu können.

Dateiname: src/lib.rs

pub struct ThreadPool {
    workers: Vec<Worker>,
    sender: Option<mpsc::Sender<Job>>,
}
--snip--
impl ThreadPool {
    pub fn new(size: usize) -> ThreadPool {
        --snip--

        ThreadPool {
            workers,
            sender: Some(sender),
        }
    }

    pub fn execute<F>(&self, f: F)
    where
        F: FnOnce() + Send + 'static,
    {
        let job = Box::new(f);

        self.sender
           .as_ref()
           .unwrap()
           .send(job)
           .unwrap();
    }
}

impl Drop for ThreadPool {
    fn drop(&mut self) {
      1 drop(self.sender.take());

        for worker in &mut self.workers {
            println!("Shutting down worker {}", worker.id);

            if let Some(thread) = worker.thread.take() {
                thread.join().unwrap();
            }
        }
    }
}

Listing 20-23: Explizites Löschen von sender vor dem Joinen der Worker-Threads

Das Löschen von sender [1] schließt den Kanal, was bedeutet, dass keine weiteren Nachrichten gesendet werden. Wenn das passiert, werden alle Aufrufe von recv, die die Worker-Instanzen in der Endlosschleife ausführen, einen Fehler zurückgeben. In Listing 20-24 ändern wir die Worker-Schleife, um in diesem Fall die Schleife elegant zu verlassen, was bedeutet, dass die Threads beendet werden, wenn die ThreadPool-drop-Implementierung join auf sie aufruft.

Dateiname: src/lib.rs

impl Worker {
    fn new(
        id: usize,
        receiver: Arc<Mutex<mpsc::Receiver<Job>>>,
    ) -> Worker {
        let thread = thread::spawn(move || loop {
            let message = receiver.lock().unwrap().recv();

            match message {
                Ok(job) => {
                    println!(
                        "Worker {id} got a job; executing."
                    );

                    job();
                }
                Err(_) => {
                    println!(
                        "Worker {id} shutting down."
                    );
                    break;
                }
            }
        });

        Worker {
            id,
            thread: Some(thread),
        }
    }
}

Listing 20-24: Explizites Beenden der Schleife, wenn recv einen Fehler zurückgibt

Um diesen Code in Aktion zu sehen, ändern wir main, um nur zwei Anfragen zu akzeptieren, bevor wir den Server gracefully herunterfahren, wie in Listing 20-25 gezeigt.

Dateiname: src/main.rs

fn main() {
    let listener = TcpListener::bind("127.0.0.1:7878").unwrap();
    let pool = ThreadPool::new(4);

    for stream in listener.incoming().take(2) {
        let stream = stream.unwrap();

        pool.execute(|| {
            handle_connection(stream);
        });
    }

    println!("Shutting down.");
}

Listing 20-25: Herunterfahren des Servers nach der Verarbeitung von zwei Anfragen, indem die Schleife verlassen wird

Ein echter Webserver würde sicherlich nicht nach nur zwei Anfragen herunterfahren. Dieser Code demonstriert nur, dass der gracefull Shutdown und die Bereinigung funktionieren.

Die take-Methode ist in dem Iterator-Trait definiert und begrenzt die Iteration auf höchstens die ersten zwei Elemente. Der ThreadPool wird am Ende von main außerhalb des Gültigkeitsbereichs gehen, und die drop-Implementierung wird ausgeführt.

Starten Sie den Server mit cargo run und machen Sie drei Anfragen. Die dritte Anfrage sollte einen Fehler ergeben, und in Ihrem Terminal sollten Sie eine Ausgabe ähnlich der folgenden sehen:

$ cargo run
   Compiling hello v0.1.0 (file:///projects/hello)
    Finished dev [unoptimized + debuginfo] target(s) in 1.0s
     Running `target/debug/hello`
Worker 0 got a job; executing.
Shutting down.
Shutting down worker 0
Worker 3 got a job; executing.
Worker 1 disconnected; shutting down.
Worker 2 disconnected; shutting down.
Worker 3 disconnected; shutting down.
Worker 0 disconnected; shutting down.
Shutting down worker 1
Shutting down worker 2
Shutting down worker 3

Sie könnten eine andere Reihenfolge von Worker-IDs und gedruckten Nachrichten sehen. Wir können sehen, wie dieser Code funktioniert, anhand der Nachrichten: Die Worker-Instanzen 0 und 3 erhielten die ersten zwei Anfragen. Der Server hat nach der zweiten Verbindung aufgehört, neue Verbindungen anzunehmen, und die Drop-Implementierung auf ThreadPool beginnt, bevor Worker 3 seine Aufgabe einmal gestartet hat, auszuführen. Das Löschen von sender trennt alle Worker-Instanzen und sagt ihnen, sich herunterzufahren. Die Worker-Instanzen drucken jeweils eine Nachricht, wenn sie getrennt werden, und dann ruft der Threadpool join auf, um auf jeden Worker-Thread zu warten, bis er fertig ist.

Bemerken Sie einen interessanten Aspekt dieser speziellen Ausführung: Der ThreadPool hat sender gelöscht, und bevor eine Worker einen Fehler erhalten hat, haben wir versucht, Worker 0 zu joinen. Worker 0 hatte noch keinen Fehler von recv erhalten, also blockierte der Hauptthread und wartete auf Worker 0, bis er fertig war. Inzwischen hat Worker 3 einen Job erhalten, und dann haben alle Threads einen Fehler erhalten. Wenn Worker 0 fertig war, hat der Hauptthread auf die restlichen Worker-Instanzen gewartet, bis sie fertig waren. Zu diesem Zeitpunkt waren sie alle aus ihrer Schleife herausgekommen und gestoppt.

Glückwunsch! Wir haben jetzt unser Projekt abgeschlossen; wir haben einen einfachen Webserver, der einen Threadpool verwendet, um asynchron zu reagieren. Wir können den Server gracefully herunterfahren, was alle Threads im Pool bereinigt. Siehe https://www.nostarch.com/Rust2021, um den vollständigen Code für dieses Kapitel herunterzuladen, um ihn als Referenz zu verwenden.

Wir könnten hier noch mehr tun! Wenn Sie das Projekt weiter verbessern möchten, hier sind einige Ideen:

  • Fügen Sie mehr Dokumentation zu ThreadPool und seinen öffentlichen Methoden hinzu.
  • Fügen Sie Tests der Funktionalität der Bibliothek hinzu.
  • Ändern Sie die Aufrufe von unwrap zu einem robusteren Fehlerhandling.
  • Verwenden Sie ThreadPool, um eine andere Aufgabe als das Beantworten von Webanfragen auszuführen.
  • Suchen Sie auf https://crates.io einen Threadpool-Crate und implementieren Sie einen ähnlichen Webserver mit dem Crate. Vergleichen Sie dann seine API und Robustheit mit dem von unserem Threadpool.

Zusammenfassung

Herzlichen Glückwunsch! Sie haben das Lab zu Graceful Shutdown and Cleanup abgeschlossen. Sie können in LabEx weitere Labs absolvieren, um Ihre Fähigkeiten zu verbessern.