Développement de serveurs multi-threaded en Rust

RustRustBeginner
Pratiquer maintenant

This tutorial is from open-source community. Access the source code

💡 Ce tutoriel est traduit par l'IA à partir de la version anglaise. Pour voir la version originale, vous pouvez cliquer ici

Introduction

Bienvenue dans Turning Our Single-Threaded Server Into a Multithreaded Server. Ce laboratoire est une partie du Rust Book. Vous pouvez pratiquer vos compétences Rust dans LabEx.

Dans ce laboratoire, nous allons transformer notre serveur mono-fil en un serveur multi-fils pour améliorer son efficacité dans le traitement de plusieurs requêtes simultanément.

Turning Our Single-Threaded Server into a Multithreaded Server

En ce moment, le serveur traite chaque requête tour à tour, ce qui signifie qu'il ne traitera pas une deuxième connexion avant que la première ne soit terminée. Si le serveur reçoit de plus en plus de requêtes, cette exécution séquentielle deviendra de moins en moins optimale. Si le serveur reçoit une requête qui prend beaucoup de temps à traiter, les requêtes suivantes devront attendre que la longue requête soit terminée, même si les nouvelles requêtes peuvent être traitées rapidement. Nous devrons régler ce problème, mais d'abord, nous examinerons le problème en action.

Simulating a Slow Request

Nous allons examiner comment une requête à traitement lent peut affecter les autres requêtes envoyées à notre implémentation de serveur actuelle. Le Listing 20-10 implémente la gestion d'une requête à /sleep avec une réponse simulée lente qui forcera le serveur à dormir pendant cinq secondes avant de répondre.

Nom de fichier : src/main.rs

use std::{
    fs,
    io::{prelude::*, BufReader},
    net::{TcpListener, TcpStream},
    thread,
    time::Duration,
};
--snip--

fn handle_connection(mut stream: TcpStream) {
    --snip--

    let (status_line, filename) = 1 match &request_line[..] {
      2 "GET / HTTP/1.1" => ("HTTP/1.1 200 OK", "hello.html"),
      3 "GET /sleep HTTP/1.1" => {
            thread::sleep(Duration::from_secs(5));
            ("HTTP/1.1 200 OK", "hello.html")
        }
      4 _ => ("HTTP/1.1 404 NOT FOUND", "404.html"),
    };

    --snip--
}

Listing 20-10 : Simulation d'une requête lente en dormant pendant cinq secondes

Nous avons changé d'if à match maintenant que nous avons trois cas [1]. Nous devons explicitement matcher sur une tranche de request_line pour effectuer une correspondance de motifs avec les valeurs littérales de chaîne ; match ne fait pas de référencement et de déréférencement automatique, comme la méthode d'égalité le fait.

Le premier bras [2] est le même que le bloc if du Listing 20-9. Le second bras [3] correspond à une requête à /sleep. Lorsque cette requête est reçue, le serveur dormira pendant cinq secondes avant de générer la page HTML réussie. Le troisième bras [4] est le même que le bloc else du Listing 20-9.

Vous pouvez voir à quel point notre serveur est primitif : des bibliothèques réelles géreraient la reconnaissance de multiples requêtes de manière beaucoup moins verbeuse!

Démarrez le serveur avec cargo run. Ensuite, ouvrez deux fenêtres de navigateur : l'une pour http://127.0.0.1:7878 et l'autre pour http://127.0.0.1:7878/sleep. Si vous entrez l'URI / plusieurs fois, comme auparavant, vous verrez qu'elle répondra rapidement. Mais si vous entrez /sleep puis chargez /, vous verrez que / attendra jusqu'à ce que sleep ait dormi pendant ses cinq secondes complètes avant de se charger.

Il existe plusieurs techniques que nous pourrions utiliser pour éviter que les requêtes ne se bloquent derrière une requête lente ; celle que nous allons implémenter est un pool de threads.

Improving Throughput with a Thread Pool

Un thread pool est un groupe de threads lancés qui sont en attente et prêts à traiter une tâche. Lorsque le programme reçoit une nouvelle tâche, il attribue l'une des threads du pool à la tâche, et ce thread traitera la tâche. Les autres threads du pool sont disponibles pour traiter toute autre tâche qui arrive tandis que le premier thread est en train de traiter. Lorsque le premier thread a fini de traiter sa tâche, il est renvoyé dans le pool de threads inactifs, prêt à traiter une nouvelle tâche. Un thread pool vous permet de traiter les connexions de manière concurrente, augmentant le débit de votre serveur.

Nous limiterons le nombre de threads dans le pool à un nombre réduit pour nous protéger contre les attaques DoS ; si nous avions notre programme créer un nouveau thread pour chaque requête qui arrive, quelqu'un qui envoie 10 millions de requêtes à notre serveur pourrait causer des désordres en utilisant toutes les ressources de notre serveur et en arrêtant le traitement des requêtes.

Plutôt que de lancer un nombre illimité de threads, nous aurons donc un nombre fixe de threads en attente dans le pool. Les requêtes qui arrivent sont envoyées dans le pool pour traitement. Le pool maintiendra une file d'attente de requêtes entrantes. Chacun des threads dans le pool retira une requête de cette file, traitera la requête, puis demandera à la file une autre requête. Avec cette conception, nous pouvons traiter jusqu'à N requêtes en même temps, où N est le nombre de threads. Si chaque thread répond à une requête longue durée, les requêtes suivantes peuvent toujours se geler dans la file d'attente, mais nous avons augmenté le nombre de requêtes à longue durée que nous pouvons traiter avant d'arriver à ce point.

Cette technique n'est qu'un des nombreux moyens d'améliorer le débit d'un serveur web. D'autres options que vous pourriez explorer sont le modèle fork/join, le modèle d'entrée/sortie asynchrone mono-fil et le modèle d'entrée/sortie asynchrone multi-fil. Si vous êtes intéressé par ce sujet, vous pouvez en lire plus sur d'autres solutions et essayer de les implémenter ; avec un langage de bas niveau comme Rust, toutes ces options sont possibles.

Avant de commencer à implémenter un thread pool, parlons de ce que devrait ressembler l'utilisation du pool. Lorsque vous essayez de concevoir du code, écrire d'abord l'interface client peut aider à guider votre conception. Écrivez l'API du code de manière à ce qu'elle soit structurée comme vous voulez l'appeler ; puis implémentez la fonctionnalité dans cette structure plutôt que d'implémenter la fonctionnalité puis de concevoir l'API publique.

De manière similaire à la façon dont nous avons utilisé le développement piloté par les tests dans le projet du Chapitre 12, nous utiliserons ici le développement piloté par le compilateur. Nous écrirons le code qui appelle les fonctions que nous voulons, puis nous examinerons les erreurs du compilateur pour déterminer ce que nous devrions changer ensuite pour que le code fonctionne. Avant de le faire, cependant, nous explorerons la technique que nous ne allons pas utiliser comme point de départ.

Spawning a Thread for Each Request

Tout d'abord, explorons à quoi pourrait ressembler notre code s'il créait un nouveau thread pour chaque connexion. Comme mentionné précédemment, ce n'est pas notre plan final en raison des problèmes liés à la génération potentiellement illimitée de threads, mais c'est un point de départ pour obtenir tout d'abord un serveur multithreadé fonctionnel. Ensuite, nous ajouterons le thread pool en tant qu'amélioration, et il sera plus facile de comparer les deux solutions.

Le Listing 20-11 montre les modifications à apporter à main pour lancer un nouveau thread pour traiter chaque flux dans la boucle for.

Nom de fichier : src/main.rs

fn main() {
    let listener = TcpListener::bind("127.0.0.1:7878").unwrap();

    for stream in listener.incoming() {
        let stream = stream.unwrap();

        thread::spawn(|| {
            handle_connection(stream);
        });
    }
}

Listing 20-11 : Lancement d'un nouveau thread pour chaque flux

Comme vous l'avez appris au Chapitre 16, thread::spawn créera un nouveau thread puis exécutera le code dans la closure dans le nouveau thread. Si vous exécutez ce code et chargez /sleep dans votre navigateur, puis / dans deux autres onglets de navigateur, vous verrez effectivement que les requêtes à / n'ont pas à attendre que /sleep soit terminée. Cependant, comme nous l'avons mentionné, cela finira par surcharger le système car vous créeriez de nouveaux threads sans limite.

Creating a Finite Number of Threads

Nous voulons que notre thread pool fonctionne de manière similaire et familière, de sorte que passer des threads à un thread pool ne nécessite pas de grandes modifications dans le code utilisant notre API. Le Listing 20-12 montre l'interface hypothétique d'une structure ThreadPool que nous souhaitons utiliser au lieu de thread::spawn.

Nom de fichier : src/main.rs

fn main() {
    let listener = TcpListener::bind("127.0.0.1:7878").unwrap();
  1 let pool = ThreadPool::new(4);

    for stream in listener.incoming() {
        let stream = stream.unwrap();

      2 pool.execute(|| {
            handle_connection(stream);
        });
    }
}

Listing 20-12 : Notre interface idéale pour ThreadPool

Nous utilisons ThreadPool::new pour créer un nouveau thread pool avec un nombre configurable de threads, ici quatre [1]. Ensuite, dans la boucle for, pool.execute a une interface similaire à thread::spawn en ce sens qu'il prend une closure que le pool devrait exécuter pour chaque flux [2]. Nous devons implémenter pool.execute de sorte qu'il prenne la closure et la donne à un thread dans le pool pour l'exécuter. Ce code ne compilera pas encore, mais nous allons essayer pour que le compilateur puisse nous guider sur la manière de le corriger.

Building ThreadPool Using Compiler-Driven Development

Apportez les modifications du Listing 20-12 à src/main.rs, puis utilisons les erreurs du compilateur provenant de cargo check pour guider notre développement. Voici la première erreur que nous obtenons :

$ cargo check
    Checking hello v0.1.0 (file:///projects/hello)
error[E0433]: failed to resolve: use of undeclared type `ThreadPool`
  --> src/main.rs:11:16
   |
11 |     let pool = ThreadPool::new(4);
   |                ^^^^^^^^^^ use of undeclared type `ThreadPool`

Parfait! Cette erreur nous indique que nous avons besoin d'un type ou d'un module ThreadPool, donc nous allons en construire un maintenant. Notre implémentation de ThreadPool sera indépendante du type de travail que fait notre serveur web. Alors, passons le crate hello d'un crate binaire à un crate bibliothèque pour y conserver notre implémentation de ThreadPool. Après avoir changé en un crate bibliothèque, nous pourrions également utiliser la bibliothèque de thread pool séparée pour tout travail que nous voudrions effectuer à l'aide d'un thread pool, pas seulement pour servir des requêtes web.

Créez un fichier src/lib.rs qui contient ce qui suit, qui est la définition la plus simple d'une structure ThreadPool que nous pouvons avoir pour l'instant :

Nom de fichier : src/lib.rs

pub struct ThreadPool;

Ensuite, éditez le fichier main.rs pour porter ThreadPool dans la portée à partir du crate bibliothèque en ajoutant le code suivant en haut de src/main.rs :

Nom de fichier : src/main.rs

use hello::ThreadPool;

Ce code ne fonctionnera toujours pas, mais vérifions-le à nouveau pour obtenir la prochaine erreur que nous devons résoudre :

$ cargo check
    Checking hello v0.1.0 (file:///projects/hello)
error[E0599]: no function or associated item named `new` found for struct
`ThreadPool` in the current scope
  --> src/main.rs:12:28
   |
12 |     let pool = ThreadPool::new(4);
   |                            ^^^ function or associated item not found in
`ThreadPool`

Cette erreur indique que nous devons ensuite créer une fonction associée nommée new pour ThreadPool. Nous savons également que new doit avoir un paramètre qui peut accepter 4 en tant qu'argument et devrait renvoyer une instance de ThreadPool. Implémentons la fonction new la plus simple qui aura ces caractéristiques :

Nom de fichier : src/lib.rs

pub struct ThreadPool;

impl ThreadPool {
    pub fn new(size: usize) -> ThreadPool {
        ThreadPool
    }
}

Nous avons choisi usize comme type du paramètre size car nous savons qu'un nombre négatif de threads n'a pas de sens. Nous savons également que nous utiliserons ce 4 comme nombre d'éléments dans une collection de threads, ce qui est ce que le type usize est destiné à, comme discuté dans "Types entiers".

Vérifions le code à nouveau :

$ cargo check
    Checking hello v0.1.0 (file:///projects/hello)
error[E0599]: no method named `execute` found for struct `ThreadPool` in the
current scope
  --> src/main.rs:17:14
   |
17 |         pool.execute(|| {
   |              ^^^^^^^ method not found in `ThreadPool`

Maintenant, l'erreur se produit parce que nous n'avons pas de méthode execute sur ThreadPool. Rappelez-vous de "Creating a Finite Number of Threads" que nous avons décidé que notre thread pool devrait avoir une interface similaire à thread::spawn. De plus, nous allons implémenter la fonction execute de sorte qu'elle prenne la closure qui lui est donnée et la donne à un thread inactif dans le pool pour l'exécuter.

Nous allons définir la méthode execute sur ThreadPool pour prendre une closure en tant que paramètre. Rappelez-vous de "Moving Captured Values Out of Closures and the Fn Traits" que nous pouvons prendre des closures en tant que paramètres avec trois traits différents : Fn, FnMut et FnOnce. Nous devons décider quel type de closure utiliser ici. Nous savons que nous finirons par faire quelque chose de similaire à l'implémentation de thread::spawn de la bibliothèque standard, donc nous pouvons regarder quelles contraintes a la signature de thread::spawn sur son paramètre. La documentation nous montre ce qui suit :

pub fn spawn<F, T>(f: F) -> JoinHandle<T>
    where
        F: FnOnce() -> T,
        F: Send + 'static,
        T: Send + 'static,

Le paramètre de type F est celui qui nous intéresse ici ; le paramètre de type T est lié à la valeur de retour, et nous n'avons pas à nous en occuper. Nous pouvons voir que spawn utilise FnOnce comme contrainte de trait sur F. C'est probablement ce que nous voulons également, car nous finirons par passer l'argument que nous obtenons dans execute à spawn. Nous pouvons être encore plus convaincus que FnOnce est le trait que nous voulons utiliser car le thread pour exécuter une requête n'exécutera la closure de cette requête qu'une seule fois, ce qui correspond à l'Once dans FnOnce.

Le paramètre de type F a également la contrainte de trait Send et la contrainte de durée de vie 'static, qui sont utiles dans notre cas : nous avons besoin de Send pour transférer la closure d'un thread à un autre et 'static car nous ne savons pas combien de temps le thread mettra pour s'exécuter. Créons une méthode execute sur ThreadPool qui prendra un paramètre générique de type F avec ces contraintes :

Nom de fichier : src/lib.rs

impl ThreadPool {
    --snip--
    pub fn execute<F>(&self, f: F)
    where
        F: FnOnce() 1 + Send + 'static,
    {
    }
}

Nous utilisons toujours les () après FnOnce [1] car ce FnOnce représente une closure qui prend aucun paramètre et renvoie le type unité (). Tout comme les définitions de fonctions, le type de retour peut être omis de la signature, mais même si nous n'avons pas de paramètres, nous avons toujours besoin des parenthèses.

Encore une fois, c'est la plus simple implémentation de la méthode execute : elle ne fait rien, mais nous essayons seulement de compiler notre code. Vérifions-le à nouveau :

$ cargo check
    Checking hello v0.1.0 (file:///projects/hello)
    Finished dev [unoptimized + debuginfo] target(s) in 0.24s

Ça compile! Mais notez que si vous essayez cargo run et que vous effectuez une requête dans le navigateur, vous verrez les erreurs dans le navigateur que nous avons vues au début du chapitre. Notre bibliothèque n'appelle pas encore la closure passée à execute!

Note : Vous pourriez entendre dire à propos de langages avec des compilateurs stricts, tels que Haskell et Rust, que "si le code compile, ça fonctionne". Mais cette expression n'est pas universellement vraie. Notre projet compile, mais il ne fait absolument rien! Si nous construisions un projet réel et complet, il serait temps de commencer à écrire des tests unitaires pour vérifier que le code compile et a le comportement que nous voulons.

Validating the Number of Threads in new

Nous ne faisons rien avec les paramètres de new et execute. Implémentons le corps de ces fonctions avec le comportement que nous voulons. Pour commencer, pensons à new. Plus tôt, nous avons choisi un type non signé pour le paramètre size car un pool avec un nombre négatif de threads n'a pas de sens. Cependant, un pool avec zéro threads n'a pas de sens non plus, et pourtant zéro est un usize parfaitement valide. Nous allons ajouter du code pour vérifier que size est supérieur à zéro avant de renvoyer une instance de ThreadPool et faire planter le programme s'il reçoit un zéro en utilisant le macro assert!, comme montré dans le Listing 20-13.

Nom de fichier : src/lib.rs

impl ThreadPool {
    /// Crée un nouveau ThreadPool.
    ///
    /// La taille est le nombre de threads dans le pool.
    ///
  1 /// ## Panics
    ///
    /// La fonction `new` provoquera une panique si la taille est égale à zéro.
    pub fn new(size: usize) -> ThreadPool {
      2 assert!(size > 0);

        ThreadPool
    }

    --snip--
}

Listing 20-13 : Implémentation de ThreadPool::new pour provoquer une panique si size est égal à zéro

Nous avons également ajouté de la documentation pour notre ThreadPool avec des commentaires de doc. Notez que nous avons suivi les bonnes pratiques de documentation en ajoutant une section qui indique les situations dans lesquelles notre fonction peut provoquer une panique [1], comme discuté au Chapitre 14. Essayez d'exécuter cargo doc --open et cliquez sur la structure ThreadPool pour voir à quoi ressemblent les docs générées pour new!

Au lieu d'ajouter le macro assert! comme nous l'avons fait ici [2], nous pourrions changer new en build et renvoyer un Result comme nous l'avons fait avec Config::build dans le projet I/O du Listing 12-9. Mais dans ce cas, nous avons décidé qu'essayer de créer un thread pool sans aucun thread devrait être une erreur irrécupérable. Si vous êtes ambitieux, essayez d'écrire une fonction nommée build avec la signature suivante pour la comparer avec la fonction new :

pub fn build(
    size: usize
) -> Result<ThreadPool, PoolCreationError> {

Creating Space to Store the Threads

Maintenant que nous avons un moyen de savoir que nous avons un nombre valide de threads à stocker dans le pool, nous pouvons créer ces threads et les stocker dans la structure ThreadPool avant de renvoyer la structure. Mais comment "stockons-nous" un thread? Regardons à nouveau la signature de thread::spawn :

pub fn spawn<F, T>(f: F) -> JoinHandle<T>
    where
        F: FnOnce() -> T,
        F: Send + 'static,
        T: Send + 'static,

La fonction spawn renvoie un JoinHandle<T>, où T est le type que la closure renvoie. Essayons d'utiliser JoinHandle également et voyons ce qui se passe. Dans notre cas, les closures que nous passons au thread pool géreront la connexion et ne renverront rien, donc T sera le type unité ().

Le code du Listing 20-14 compilera mais ne créera pas encore de threads. Nous avons changé la définition de ThreadPool pour stocker un vecteur d'instances de thread::JoinHandle<()>, initialisé le vecteur avec une capacité de size, configuré une boucle for qui exécutera du code pour créer les threads et renvoyé une instance de ThreadPool les contenant.

Nom de fichier : src/lib.rs

1 use std::thread;

pub struct ThreadPool {
  2 threads: Vec<thread::JoinHandle<()>>,
}

impl ThreadPool {
    --snip--
    pub fn new(size: usize) -> ThreadPool {
        assert!(size > 0);

      3 let mut threads = Vec::with_capacity(size);

        for _ in 0..size {
            // créer quelques threads et les stocker dans le vecteur
        }

        ThreadPool { threads }
    }
    --snip--
}

Listing 20-14 : Création d'un vecteur pour ThreadPool pour stocker les threads

Nous avons porté std::thread dans la portée dans le crate bibliothèque [1] car nous utilisons thread::JoinHandle comme type des éléments dans le vecteur de ThreadPool [2].

Une fois une taille valide reçue, notre ThreadPool crée un nouveau vecteur qui peut stocker size éléments [3]. La fonction with_capacity effectue la même tâche que Vec::new mais avec une différence importante : elle réserve de l'espace dans le vecteur à l'avance. Comme nous savons que nous devons stocker size éléments dans le vecteur, effectuer cette allocation d'avance est légèrement plus efficace que d'utiliser Vec::new, qui redimensionne lui-même lorsqu'éléments sont insérés.

Lorsque vous exécutez cargo check à nouveau, cela devrait réussir.

Sending Code from the ThreadPool to a Thread

Nous avons laissé un commentaire dans la boucle for du Listing 20-14 concernant la création de threads. Ici, nous allons voir comment nous créons réellement des threads. La bibliothèque standard fournit thread::spawn comme moyen de créer des threads, et thread::spawn attend de recevoir du code que le thread devrait exécuter dès qu'il est créé. Cependant, dans notre cas, nous voulons créer les threads et les faire attendre du code que nous enverrons plus tard. L'implémentation des threads de la bibliothèque standard ne comprend pas de moyen de le faire ; nous devons l'implémenter manuellement.

Nous allons implémenter ce comportement en introduisant une nouvelle structure de données entre le ThreadPool et les threads qui gérera ce nouveau comportement. Nous appellerons cette structure de données Worker, qui est un terme courant dans les implémentations de pooling. Le Worker prend le code qui doit être exécuté et exécute le code dans son thread.

Imaginez des personnes travaillant dans la cuisine d'un restaurant : les employés attendent jusqu'à ce que des commandes arrivent des clients, puis ils sont responsables de prendre ces commandes et de les exécuter.

Au lieu de stocker un vecteur d'instances de JoinHandle<()> dans le thread pool, nous allons stocker des instances de la structure Worker. Chaque Worker stockera une seule instance de JoinHandle<()>. Ensuite, nous implémenterons une méthode sur Worker qui prendra une closure de code à exécuter et la transmettra au thread déjà en cours d'exécution pour exécution. Nous donnerons également à chaque Worker un id pour que nous puissions distinguer entre les différentes instances de Worker dans le pool lors de la journalisation ou du débogage.

Voici le nouveau processus qui se produira lorsque nous créerons un ThreadPool. Nous implémenterons le code qui envoie la closure au thread après avoir configuré Worker de cette manière :

  1. Définir une structure Worker qui contient un id et un JoinHandle<()>.
  2. Modifier ThreadPool pour stocker un vecteur d'instances de Worker.
  3. Définir une fonction Worker::new qui prend un numéro d'id et renvoie une instance de Worker qui contient l'id et un thread lancé avec une closure vide.
  4. Dans ThreadPool::new, utiliser le compteur de boucle for pour générer un id, créer un nouveau Worker avec cet id et stocker le Worker dans le vecteur.

Si vous êtes prêt à relever un défi, essayez d'implémenter ces modifications vous-même avant de regarder le code du Listing 20-15.

Prêt? Voici le Listing 20-15 avec une manière de faire les modifications précédentes.

Nom de fichier : src/lib.rs

use std::thread;

pub struct ThreadPool {
  1 workers: Vec<Worker>,
}

impl ThreadPool {
    --snip--
    pub fn new(size: usize) -> ThreadPool {
        assert!(size > 0);

        let mut workers = Vec::with_capacity(size);

      2 for id in 0..size {
          3 workers.push(Worker::new(id));
        }

        ThreadPool { workers }
    }
    --snip--
}

4 struct Worker {
    id: usize,
    thread: thread::JoinHandle<()>,
}

impl Worker {
  5 fn new(id: usize) -> Worker {
      6 let thread = thread::spawn(|| {});

        Worker { 7 id, 8 thread }
    }
}

Listing 20-15: Modifying ThreadPool to hold Worker instances instead of holding threads directly

Nous avons changé le nom du champ sur ThreadPool de threads en workers car il stocke désormais des instances de Worker au lieu d'instances de JoinHandle<()> [1]. Nous utilisons le compteur dans la boucle for [2] comme argument pour Worker::new, et nous stockons chaque nouveau Worker dans le vecteur nommé workers [3].

Le code externe (comme notre serveur dans src/main.rs) n'a pas besoin de connaître les détails d'implémentation concernant l'utilisation d'une structure Worker dans ThreadPool, donc nous rendons la structure Worker [4] et sa fonction new [5] privées. La fonction Worker::new utilise l'id que nous lui donnons [7] et stocke une instance de JoinHandle<()> [8] qui est créée en lançant un nouveau thread avec une closure vide [6].

Note: Si le système d'exploitation ne peut pas créer un thread car il n'y a pas assez de ressources système, thread::spawn provoquera une panique. Cela fera planter tout notre serveur, même si la création de certains threads peut réussir. Pour simplifier, ce comportement est acceptable, mais dans une implémentation de thread pool en production, vous voudriez probablement utiliser std::thread::Builder et sa méthode spawn qui renvoie Result à la place.

Ce code compilera et stockera le nombre d'instances de Worker que nous avons spécifié comme argument pour ThreadPool::new. Mais nous n'avons toujours pas traité la closure que nous recevons dans execute. Voyons comment faire cela ensuite.

Sending Requests to Threads via Channels

Le prochain problème que nous allons résoudre est que les closures données à thread::spawn ne font absolument rien. Actuellement, nous obtenons la closure que nous voulons exécuter dans la méthode execute. Mais nous devons donner à thread::spawn une closure à exécuter lorsque nous créons chaque Worker lors de la création du ThreadPool.

Nous voulons que les structs Worker que nous venons de créer récupèrent le code à exécuter dans une file d'attente contenue dans le ThreadPool et envoient ce code à son thread pour exécution.

Les canaux dont nous avons parlé au Chapitre 16 - une manière simple de communiquer entre deux threads - seraient parfait pour ce cas d'utilisation. Nous utiliserons un canal pour fonctionner comme une file d'attente de tâches, et execute enverra une tâche du ThreadPool aux instances de Worker, qui enverront la tâche à son thread. Voici le plan :

  1. Le ThreadPool créera un canal et gardera le sender.
  2. Chaque Worker gardera le receiver.
  3. Nous créerons une nouvelle struct Job qui contiendra les closures que nous voulons envoyer par le canal.
  4. La méthode execute enverra la tâche qu'elle veut exécuter via le sender.
  5. Dans son thread, le Worker bouclera sur son receiver et exécutera les closures de toutes les tâches qu'il reçoit.

Commencons par créer un canal dans ThreadPool::new et en garder le sender dans l'instance de ThreadPool, comme montré dans le Listing 20-16. La struct Job ne contient rien pour le moment mais sera le type d'élément que nous envoyons par le canal.

Nom de fichier : src/lib.rs

use std::{sync::mpsc, thread};

pub struct ThreadPool {
    workers: Vec<Worker>,
    sender: mpsc::Sender<Job>,
}

struct Job;

impl ThreadPool {
    --snip--
    pub fn new(size: usize) -> ThreadPool {
        assert!(size > 0);

      1 let (sender, receiver) = mpsc::channel();

        let mut workers = Vec::with_capacity(size);

        for id in 0..size {
            workers.push(Worker::new(id));
        }

        ThreadPool { workers, 2 sender }
    }
    --snip--
}

Listing 20-16: Modifying ThreadPool to store the sender of a channel that transmits Job instances

Dans ThreadPool::new, nous créons notre nouveau canal [1] et le pool garde le sender [2]. Cela compilera avec succès.

Essayons de passer un receiver du canal à chaque Worker lorsque le thread pool crée le canal. Nous savons que nous voulons utiliser le receiver dans le thread que les instances de Worker lancent, donc nous ferons référence au paramètre receiver dans la closure. Le code du Listing 20-17 ne compilera pas encore tout à fait.

Nom de fichier : src/lib.rs

impl ThreadPool {
    --snip--
    pub fn new(size: usize) -> ThreadPool {
        assert!(size > 0);

        let (sender, receiver) = mpsc::channel();

        let mut workers = Vec::with_capacity(size);

        for id in 0..size {
          1 workers.push(Worker::new(id, receiver));
        }

        ThreadPool { workers, sender }
    }
    --snip--
}

--snip--

impl Worker {
    fn new(id: usize, receiver: mpsc::Receiver<Job>) -> Worker {
        let thread = thread::spawn(|| {
          2 receiver;
        });

        Worker { id, thread }
    }
}

Listing 20-17: Passing the receiver to each Worker

Nous avons apporté quelques modifications petites et simples : nous passons le receiver à Worker::new [1], puis nous l'utilisons à l'intérieur de la closure [2].

Lorsque nous essayons de vérifier ce code, nous obtenons cette erreur :

$ cargo check
    Checking hello v0.1.0 (file:///projects/hello)
error[E0382]: use of moved value: `receiver`
  --> src/lib.rs:26:42
   |
21 |         let (sender, receiver) = mpsc::channel();
   |                      -------- move occurs because `receiver` has type
`std::sync::mpsc::Receiver<Job>`, which does not implement the `Copy` trait
...
26 |             workers.push(Worker::new(id, receiver));
   |                                          ^^^^^^^^ value moved here, in
previous iteration of loop

Le code essaie de passer receiver à plusieurs instances de Worker. Cela ne fonctionnera pas, comme vous le rappellerez du Chapitre 16 : l'implémentation de canal que Rust fournit est multiple producteur, unique consommateur. Cela signifie que nous ne pouvons pas simplement cloner l'extrémité consommateur du canal pour corriger ce code. Nous ne voulons également pas envoyer un message plusieurs fois à plusieurs consommateurs ; nous voulons une liste de messages avec plusieurs instances de Worker de sorte que chaque message soit traité une fois.

De plus, prendre une tâche dans la file d'attente du canal implique de modifier le receiver, donc les threads ont besoin d'un moyen sûr de partager et de modifier receiver ; sinon, nous risquons d'avoir des conditions de course (comme décrit au Chapitre 16).

Rappelez-vous les pointeurs intelligents sécurisés pour les threads discutés au Chapitre 16 : pour partager la propriété entre plusieurs threads et permettre aux threads de modifier la valeur, nous devons utiliser Arc<Mutex<T>>. Le type Arc permettra à plusieurs instances de Worker d'avoir la propriété du receiver, et Mutex assurera qu'un seul Worker obtienne une tâche du receiver à la fois. Le Listing 20-18 montre les modifications que nous devons apporter.

Nom de fichier : src/lib.rs

use std::{
    sync::{mpsc, Arc, Mutex},
    thread,
};
--snip--

impl ThreadPool {
    --snip--
    pub fn new(size: usize) -> ThreadPool {
        assert!(size > 0);

        let (sender, receiver) = mpsc::channel();

      1 let receiver = Arc::new(Mutex::new(receiver));

        let mut workers = Vec::with_capacity(size);

        for id in 0..size {
            workers.push(
                Worker::new(id, Arc::clone(& 2 receiver))
            );
        }

        ThreadPool { workers, sender }
    }

    --snip--
}

--snip--

impl Worker {
    fn new(
        id: usize,
        receiver: Arc<Mutex<mpsc::Receiver<Job>>>,
    ) -> Worker {
        --snip--
    }
}

Listing 20-18: Sharing the receiver among the Worker instances using Arc and Mutex

Dans ThreadPool::new, nous mettons le receiver dans un Arc et un Mutex [1]. Pour chaque nouveau Worker, nous clonons l'Arc pour augmenter le compte de référence de sorte que les instances de Worker puissent partager la propriété du receiver [2].

Avec ces modifications, le code compile! Nous y arrivons!

Implementing the execute Method

Implémentons enfin la méthode execute sur ThreadPool. Nous changerons également Job d'une struct en un alias de type pour un objet de trait qui contient le type de closure que execute reçoit. Comme discuté dans "Creating Type Synonyms with Type Aliases", les aliases de type nous permettent de simplifier des types longs pour faciliter leur utilisation. Considérez le Listing 20-19.

Nom de fichier : src/lib.rs

--snip--

type Job = Box<dyn FnOnce() + Send + 'static>;

impl ThreadPool {
    --snip--

    pub fn execute<F>(&self, f: F)
    where
        F: FnOnce() + Send + 'static,
    {
      1 let job = Box::new(f);

      2 self.sender.send(job).unwrap();
    }
}

--snip--

Listing 20-19: Creating a Job type alias for a Box that holds each closure and then sending the job down the channel

Après avoir créé une nouvelle instance de Job à l'aide de la closure que nous obtenons dans execute [1], nous envoyons cette tâche par l'extrémité d'envoi du canal [2]. Nous appelons unwrap sur send pour le cas où l'envoi échoue. Cela peut arriver si, par exemple, nous arrêtons tous nos threads d'exécution, ce qui signifie que l'extrémité de réception a cessé de recevoir de nouveaux messages. En ce moment, nous ne pouvons pas arrêter nos threads d'exécution : nos threads continuent d'exécuter tant que le pool existe. La raison pour laquelle nous utilisons unwrap est que nous savons que le cas d'échec ne se produira pas, mais le compilateur ne le sait pas.

Mais nous ne sommes pas tout à fait terminés! Dans le Worker, la closure que nous passons à thread::spawn ne fait encore que référencer l'extrémité de réception du canal. Au lieu de cela, nous devons que la closure boucle à l'infini, demandant à l'extrémité de réception du canal une tâche et exécutant la tâche lorsqu'elle en reçoit une. Apportons le changement montré dans le Listing 20-20 à Worker::new.

Nom de fichier : src/lib.rs

--snip--

impl Worker {
    fn new(
        id: usize,
        receiver: Arc<Mutex<mpsc::Receiver<Job>>>,
    ) -> Worker {
        let thread = thread::spawn(move || loop {
            let job = receiver
              1.lock()
              2.unwrap()
              3.recv()
              4.unwrap();

            println!("Worker {id} got a job; executing.");

            job();
        });

        Worker { id, thread }
    }
}

Listing 20-20: Receiving and executing the jobs in the Worker instance's thread

Ici, nous appelons d'abord lock sur le receiver pour acquérir le verrou [1], puis nous appelons unwrap pour générer une panique en cas d'erreur [2]. L'acquisition d'un verrou peut échouer si le verrou est dans un état empoisonné, ce qui peut arriver si un autre thread a généré une panique tout en maintenant le verrou au lieu de le libérer. Dans cette situation, appeler unwrap pour que ce thread génère une panique est la bonne action à prendre. N'hésitez pas à changer cet unwrap en un expect avec un message d'erreur qui a du sens pour vous.

Si nous obtenons le verrou sur le verrou, nous appelons recv pour recevoir une Job du canal [3]. Un dernier unwrap passe également outre toute erreur ici [4], qui peut survenir si le thread possédant l'expéditeur s'est arrêté, de manière similaire à la façon dont la méthode send renvoie Err si le récepteur s'arrête.

L'appel à recv bloque, donc si il n'y a pas de tâche pour le moment, le thread actuel attendra jusqu'à ce qu'une tâche devienne disponible. Le Mutex<T> garantit qu'un seul thread Worker à la fois essaie de demander une tâche.

Notre thread pool est maintenant en état de fonctionnement! Donnez-lui un cargo run et effectuez quelques requêtes :

[object Object]

Succès! Nous avons maintenant un thread pool qui exécute les connexions de manière asynchrone. Il n'y a jamais plus de quatre threads créés, donc notre système ne risque pas d'être surchargé si le serveur reçoit beaucoup de requêtes. Si nous effectuons une requête à /sleep, le serveur sera capable de traiter d'autres requêtes en faisant exécuter celles-ci par un autre thread.

Note: Si vous ouvrez /sleep dans plusieurs fenêtres de navigateur simultanément, elles peuvent charger une par une avec un intervalle de cinq secondes. Certains navigateurs web exécutent plusieurs instances de la même requête séquentiellement pour des raisons de mise en cache. Cette limitation n'est pas causée par notre serveur web.

Après avoir appris sur la boucle while let au Chapitre 18, vous vous demandez peut-être pourquoi nous n'avons pas écrit le code du thread Worker comme montré dans le Listing 20-21.

Nom de fichier : src/lib.rs

--snip--

impl Worker {
    fn new(
        id: usize,
        receiver: Arc<Mutex<mpsc::Receiver<Job>>>,
    ) -> Worker {
        let thread = thread::spawn(move || {
            while let Ok(job) = receiver.lock().unwrap().recv() {
                println!("Worker {id} got a job; executing.");

                job();
            }
        });

        Worker { id, thread }
    }
}

Listing 20-21: An alternative implementation of Worker::new using while let

Ce code compile et s'exécute mais ne produit pas le comportement de threading souhaité : une requête lente fera toujours attendre les autres requêtes à être traitées. La raison est un peu subtile : la struct Mutex n'a pas de méthode publique unlock car la propriété du verrou est basée sur la durée de vie du MutexGuard<T> dans le LockResult<MutexGuard<T>> que la méthode lock renvoie. Au moment de la compilation, le vérificateur d'emprunt peut donc appliquer la règle selon laquelle une ressource protégée par un Mutex ne peut pas être accédée à moins que nous ne tenions le verrou. Cependant, cette implémentation peut également entraîner le verrou étant maintenu plus longtemps que prévu si nous ne sommes pas attentifs à la durée de vie du MutexGuard<T>.

Le code du Listing 20-20 qui utilise let job = receiver.lock().unwrap().recv().unwrap(); fonctionne car avec let, toutes les valeurs temporaires utilisées dans l'expression du côté droit de l'égalité sont immédiatement supprimées lorsque l'instruction let se termine. Cependant, while let (et if let et match) ne supprime pas les valeurs temporaires jusqu'à la fin du bloc associé. Dans le Listing 20-21, le verrou reste maintenu pendant la durée de l'appel à job(), ce qui signifie que d'autres instances de Worker ne peuvent pas recevoir de tâches.

Summary

Félicitations! Vous avez terminé le laboratoire de transformation de notre serveur mono-thread en serveur multi-thread. Vous pouvez pratiquer d'autres laboratoires dans LabEx pour améliorer vos compétences.