Arrêt propre et nettoyage

RustRustBeginner
Pratiquer maintenant

This tutorial is from open-source community. Access the source code

💡 Ce tutoriel est traduit par l'IA à partir de la version anglaise. Pour voir la version originale, vous pouvez cliquer ici

Introduction

Bienvenue dans Graceful Shutdown and Cleanup. Ce laboratoire est une partie du Rust Book. Vous pouvez pratiquer vos compétences Rust dans LabEx.

Dans ce laboratoire, nous allons implémenter un mécanisme d'arrêt propre et de nettoyage dans notre code en utilisant le trait Drop et en fournissant un moyen pour les threads de cesser d'accepter de nouvelles requêtes et de s'arrêter.

Graceful Shutdown and Cleanup

Le code de la liste 20-20 répond aux requêtes de manière asynchrone en utilisant un pool de threads, comme prévu. Nous recevons quelques avertissements concernant les champs workers, id et thread que nous n'utilisons pas de manière directe, ce qui nous rappelle que nous ne nettoyons rien. Lorsque nous utilisons la méthode moins élégante du ctrl-C pour arrêter le thread principal, tous les autres threads sont également stoppés immédiatement, même s'ils sont en train de traiter une requête.

Ensuite, nous allons implémenter le trait Drop pour appeler join sur chacun des threads du pool afin qu'ils puissent terminer les requêtes sur lesquelles ils travaillent avant de se fermer. Ensuite, nous allons implémenter un moyen de dire aux threads qu'ils devraient cesser d'accepter de nouvelles requêtes et de s'arrêter. Pour voir ce code en action, nous modifierons notre serveur pour n'accepter que deux requêtes avant de fermer proprement son pool de threads.

Implémentation du trait Drop sur ThreadPool

Commenceons par implémenter Drop sur notre pool de threads. Lorsque le pool est supprimé, tous nos threads devraient tous rejoindre pour s'assurer qu'ils terminent leur travail. La liste 20-22 montre une première tentative d'implémentation de Drop; ce code ne fonctionnera pas encore.

Nom de fichier : src/lib.rs

impl Drop for ThreadPool {
    fn drop(&mut self) {
      1 for worker in &mut self.workers {
          2 println!("Shutting down worker {}", worker.id);

          3 worker.thread.join().unwrap();
        }
    }
}

Liste 20-22 : Joindre chaque thread lorsque le pool de threads sort de portée

Tout d'abord, nous parcourons chacun des workers du pool de threads [1]. Nous utilisons &mut pour cela car self est une référence mutable, et nous devons également être en mesure de modifier worker. Pour chaque worker, nous affichons un message indiquant que cette instance particulière de Worker est en train de se fermer [2], puis nous appelons join sur le thread de cette instance de Worker [3]. Si l'appel à join échoue, nous utilisons unwrap pour forcer Rust à générer une panique et passer à une fermeture non propre.

Voici l'erreur que nous obtenons lorsque nous compilons ce code :

error[E0507]: cannot move out of `worker.thread` which is behind a mutable
reference
    --> src/lib.rs:52:13
     |
52   |             worker.thread.join().unwrap();
     |             ^^^^^^^^^^^^^ ------ `worker.thread` moved due to this
method call
     |             |
     |             move occurs because `worker.thread` has type
`JoinHandle<()>`, which does not implement the `Copy` trait
     |
note: this function takes ownership of the receiver `self`, which moves
`worker.thread`

L'erreur nous indique que nous ne pouvons pas appeler join car nous n'avons qu'un emprunt mutable de chaque worker et que join prend la propriété de son argument. Pour résoudre ce problème, nous devons déplacer le thread hors de l'instance de Worker qui possède thread afin que join puisse consommer le thread. Nous l'avons fait dans la liste 17-15 : si Worker contient une Option<thread::JoinHandle<()>> au lieu de cela, nous pouvons appeler la méthode take sur l'Option pour déplacer la valeur hors de la variante Some et laisser une variante None à sa place. En d'autres termes, un Worker qui est en cours d'exécution aura une variante Some dans thread, et lorsque nous voulons nettoyer un Worker, nous remplaçons Some par None de sorte que le Worker n'ait plus de thread à exécuter.

Donc, nous savons que nous voulons mettre à jour la définition de Worker comme ceci :

Nom de fichier : src/lib.rs

struct Worker {
    id: usize,
    thread: Option<thread::JoinHandle<()>>,
}

Maintenant, laissons le compilateur trouver les autres endroits qui doivent être modifiés. En vérifiant ce code, nous obtenons deux erreurs :

error[E0599]: no method named `join` found for enum `Option` in the current
scope
  --> src/lib.rs:52:27
   |
52 |             worker.thread.join().unwrap();
   |                           ^^^^ method not found in
`Option<JoinHandle<()>>`

error[E0308]: mismatched types
  --> src/lib.rs:72:22
   |
72 |         Worker { id, thread }
   |                      ^^^^^^ expected enum `Option`, found struct
`JoinHandle`
   |
   = note: expected enum `Option<JoinHandle<()>>`
            found struct `JoinHandle<_>`
help: try wrapping the expression in `Some`
   |
72 |         Worker { id, thread: Some(thread) }
   |                      +++++++++++++      +

Traitons la seconde erreur, qui pointe vers le code à la fin de Worker::new ; nous devons envelopper la valeur thread dans Some lorsque nous créons un nouveau Worker. Apportez les modifications suivantes pour corriger cette erreur :

Nom de fichier : src/lib.rs

impl Worker {
    fn new(
        id: usize,
        receiver: Arc<Mutex<mpsc::Receiver<Job>>>,
    ) -> Worker {
        --snip--

        Worker {
            id,
            thread: Some(thread),
        }
    }
}

La première erreur se trouve dans notre implémentation de Drop. Nous avons mentionné précédemment que nous avions l'intention d'appeler take sur la valeur Option pour déplacer thread hors de worker. Les modifications suivantes le feront :

Nom de fichier : src/lib.rs

impl Drop for ThreadPool {
    fn drop(&mut self) {
        for worker in &mut self.workers {
            println!("Shutting down worker {}", worker.id);

          1 if let Some(thread) = worker.thread.take() {
              2 thread.join().unwrap();
            }
        }
    }
}

Comme discuté au chapitre 17, la méthode take sur Option extrait la variante Some et laisse None à sa place. Nous utilisons if let pour déstructurer la Some et obtenir le thread [1] ; puis nous appelons join sur le thread [2]. Si le thread d'une instance de Worker est déjà None, nous savons que le Worker a déjà eu son thread nettoyé, donc rien ne se passe dans ce cas.

Signaler aux threads de cesser d'écouter les jobs

Avec toutes les modifications que nous avons apportées, notre code se compile sans avertissements. Cependant, le mauvais news est que ce code ne fonctionne pas comme nous le souhaitons encore. La clé est la logique dans les closures exécutées par les threads des instances de Worker : pour l'instant, nous appelons join, mais cela ne fermera pas les threads, car ils bouclent à l'infini à la recherche de jobs. Si nous essayons de supprimer notre ThreadPool avec notre implémentation actuelle de drop, le thread principal bloquera à l'infini, en attendant que le premier thread termine.

Pour résoudre ce problème, nous devrons apporter une modification à l'implémentation de drop de ThreadPool puis une modification à la boucle de Worker.

Tout d'abord, nous modifierons l'implémentation de drop de ThreadPool pour supprimer explicitement le sender avant d'attendre que les threads se terminent. La liste 20-23 montre les modifications apportées à ThreadPool pour supprimer explicitement sender. Nous utilisons la même technique Option et take que celle que nous avons utilisée avec le thread pour pouvoir déplacer sender hors de ThreadPool.

Nom de fichier : src/lib.rs

pub struct ThreadPool {
    workers: Vec<Worker>,
    sender: Option<mpsc::Sender<Job>>,
}
--snip--
impl ThreadPool {
    pub fn new(size: usize) -> ThreadPool {
        --snip--

        ThreadPool {
            workers,
            sender: Some(sender),
        }
    }

    pub fn execute<F>(&self, f: F)
    where
        F: FnOnce() + Send + 'static,
    {
        let job = Box::new(f);

        self.sender
           .as_ref()
           .unwrap()
           .send(job)
           .unwrap();
    }
}

impl Drop for ThreadPool {
    fn drop(&mut self) {
      1 drop(self.sender.take());

        for worker in &mut self.workers {
            println!("Shutting down worker {}", worker.id);

            if let Some(thread) = worker.thread.take() {
                thread.join().unwrap();
            }
        }
    }
}

Liste 20-23 : Suppression explicite de sender avant de rejoindre les threads de Worker

Supprimer sender [1] ferme le canal, ce qui indique qu'aucun message ne sera envoyé plus. Lorsque cela se produit, toutes les appels à recv que les instances de Worker effectuent dans la boucle infinie retourneront une erreur. Dans la liste 20-24, nous changeons la boucle de Worker pour sortir de la boucle de manière propre dans ce cas, ce qui signifie que les threads se termineront lorsque l'implémentation de drop de ThreadPool appellera join sur eux.

Nom de fichier : src/lib.rs

impl Worker {
    fn new(
        id: usize,
        receiver: Arc<Mutex<mpsc::Receiver<Job>>>,
    ) -> Worker {
        let thread = thread::spawn(move || loop {
            let message = receiver.lock().unwrap().recv();

            match message {
                Ok(job) => {
                    println!(
                        "Worker {id} got a job; executing."
                    );

                    job();
                }
                Err(_) => {
                    println!(
                        "Worker {id} shutting down."
                    );
                    break;
                }
            }
        });

        Worker {
            id,
            thread: Some(thread),
        }
    }
}

Liste 20-24 : Sortir explicitement de la boucle lorsque recv renvoie une erreur

Pour voir ce code en action, modifions main pour accepter seulement deux requêtes avant de fermer proprement le serveur, comme montré dans la liste 20-25.

Nom de fichier : src/main.rs

fn main() {
    let listener = TcpListener::bind("127.0.0.1:7878").unwrap();
    let pool = ThreadPool::new(4);

    for stream in listener.incoming().take(2) {
        let stream = stream.unwrap();

        pool.execute(|| {
            handle_connection(stream);
        });
    }

    println!("Shutting down.");
}

Liste 20-25 : Arrêter le serveur après avoir traité deux requêtes en sortant de la boucle

Vous ne voudriez pas qu'un serveur web du monde réel se ferme après avoir traité seulement deux requêtes. Ce code montre simplement que l'arrêt propre et le nettoyage fonctionnent correctement.

La méthode take est définie dans le trait Iterator et limite l'itération aux deux premiers éléments au plus. Le ThreadPool sortira de portée à la fin de main, et l'implémentation de drop s'exécutera.

Démarrez le serveur avec cargo run et effectuez trois requêtes. La troisième requête devrait générer une erreur, et dans votre terminal, vous devriez voir une sortie similaire à celle-ci :

$ cargo run
   Compiling hello v0.1.0 (file:///projects/hello)
    Finished dev [unoptimized + debuginfo] target(s) in 1.0s
     Running `target/debug/hello`
Worker 0 got a job; executing.
Shutting down.
Shutting down worker 0
Worker 3 got a job; executing.
Worker 1 disconnected; shutting down.
Worker 2 disconnected; shutting down.
Worker 3 disconnected; shutting down.
Worker 0 disconnected; shutting down.
Shutting down worker 1
Shutting down worker 2
Shutting down worker 3

Vous pouvez voir un ordre différent d'IDs de Worker et de messages affichés. Nous pouvons voir comment ce code fonctionne à partir des messages : les instances de Worker 0 et 3 ont reçu les deux premières requêtes. Le serveur a cessé d'accepter des connexions après la deuxième connexion, et l'implémentation de Drop sur ThreadPool commence à s'exécuter avant même que Worker 3 n'ait commencé son job. Supprimer le sender déconnecte toutes les instances de Worker et leur indique de se fermer. Les instances de Worker affichent chacun un message lorsqu'elles se déconnectent, puis le pool de threads appelle join pour attendre que chaque thread de Worker se termine.

Remarquez un aspect intéressant de cette exécution particulière : le ThreadPool a supprimé le sender, et avant que tout Worker n'ait reçu une erreur, nous avons essayé de rejoindre Worker 0. Worker 0 n'avait pas encore reçu d'erreur de recv, donc le thread principal a bloqué, en attendant que Worker 0 se termine. Pendant ce temps, Worker 3 a reçu un job puis tous les threads ont reçu une erreur. Lorsque Worker 0 a terminé, le thread principal a attendu que le reste des instances de Worker se terminent. À ce moment-là, elles étaient toutes sorties de leur boucle et étaient arrêtées.

Félicitations! Nous avons maintenant terminé notre projet ; nous avons un serveur web de base qui utilise un pool de threads pour répondre de manière asynchrone. Nous sommes capables d'effectuer un arrêt propre du serveur, qui nettoie tous les threads du pool. Consultez https://www.nostarch.com/Rust2021 pour télécharger le code complet de ce chapitre à des fins de référence.

Nous pourrions faire plus ici! Si vous voulez continuer à améliorer ce projet, voici quelques idées :

  • Ajoutez plus de documentation à ThreadPool et à ses méthodes publiques.
  • Ajoutez des tests de la fonctionnalité de la bibliothèque.
  • Changez les appels à unwrap pour un traitement d'erreur plus robuste.
  • Utilisez ThreadPool pour effectuer une tâche autre que le traitement de requêtes web.
  • Trouvez une boîte à outils de pool de threads sur https://crates.io et implémentez un serveur web similaire en utilisant la boîte à outils à la place. Ensuite, comparez son API et sa robustesse avec le pool de threads que nous avons implémenté.

Résumé

Félicitations! Vous avez terminé le laboratoire Graceful Shutdown and Cleanup. Vous pouvez pratiquer d'autres laboratoires dans LabEx pour améliorer vos compétences.