Desarrollo de servidores multihilo en Rust

RustRustBeginner
Practicar Ahora

This tutorial is from open-source community. Access the source code

💡 Este tutorial está traducido por IA desde la versión en inglés. Para ver la versión original, puedes hacer clic aquí

Introducción

Bienvenido a Transformando nuestro servidor de un solo hilo en un servidor multihilo. Esta práctica es parte del Rust Book. Puedes practicar tus habilidades de Rust en LabEx.

En esta práctica, transformaremos nuestro servidor de un solo hilo en un servidor multihilo para mejorar su eficiencia al procesar múltiples solicitudes simultáneamente.

Transformando nuestro servidor de un solo hilo en un servidor multihilo

En este momento, el servidor procesará cada solicitud por turnos, lo que significa que no procesará una segunda conexión hasta que la primera haya terminado de procesarse. Si el servidor recibe cada vez más solicitudes, esta ejecución secuencial será cada vez menos óptima. Si el servidor recibe una solicitud que tarda mucho en procesarse, las solicitudes posteriores tendrán que esperar hasta que la solicitud larga haya terminado, incluso si las nuevas solicitudes se pueden procesar rápidamente. Tendremos que solucionar esto, pero primero veremos el problema en acción.

Simulando una solicitud lenta

Veremos cómo una solicitud que tarda en procesarse puede afectar otras solicitudes hechas a nuestra implementación actual de servidor. La Lista 20-10 implementa el manejo de una solicitud a /sleep con una respuesta simulada lenta que hará que el servidor duerma durante cinco segundos antes de responder.

Nombre de archivo: src/main.rs

use std::{
    fs,
    io::{prelude::*, BufReader},
    net::{TcpListener, TcpStream},
    thread,
    time::Duration,
};
--snip--

fn handle_connection(mut stream: TcpStream) {
    --snip--

    let (status_line, filename) = 1 match &request_line[..] {
      2 "GET / HTTP/1.1" => ("HTTP/1.1 200 OK", "hello.html"),
      3 "GET /sleep HTTP/1.1" => {
            thread::sleep(Duration::from_secs(5));
            ("HTTP/1.1 200 OK", "hello.html")
        }
      4 _ => ("HTTP/1.1 404 NOT FOUND", "404.html"),
    };

    --snip--
}

Lista 20-10: Simulando una solicitud lenta durmiendo durante cinco segundos

Hemos cambiado de if a match ahora que tenemos tres casos [1]. Necesitamos hacer coincidir explícitamente en un trozo de request_line para hacer coincidir con los valores literales de cadena; match no hace referencia automática y desreferenciación, como lo hace el método de igualdad.

El primer brazo [2] es el mismo que el bloque if de la Lista 20-9. El segundo brazo [3] coincide con una solicitud a /sleep. Cuando se recibe esa solicitud, el servidor dormirá durante cinco segundos antes de renderizar la página HTML exitosa. El tercer brazo [4] es el mismo que el bloque else de la Lista 20-9.

Puedes ver lo primitivo que es nuestro servidor: las bibliotecas reales manejarían el reconocimiento de múltiples solicitudes de manera mucho menos verbosa.

Inicie el servidor usando cargo run. Luego abra dos ventanas del navegador: una para http://127.0.0.1:7878 y la otra para http://127.0.0.1:7878/sleep. Si ingresa la URI / varias veces, como antes, verá que responde rápidamente. Pero si ingresa /sleep y luego carga /, verá que / espera hasta que sleep haya dormido durante sus cinco segundos completos antes de cargar.

Hay múltiples técnicas que podríamos usar para evitar que las solicitudes se acumulen detrás de una solicitud lenta; la que implementaremos es un grupo de subprocesos.

Mejora de la rendimiento con un grupo de subprocesos

Un grupo de subprocesos es un grupo de subprocesos creados que están esperando y listos para manejar una tarea. Cuando el programa recibe una nueva tarea, asigna uno de los subprocesos del grupo a la tarea, y ese subproceso procesará la tarea. Los subprocesos restantes del grupo están disponibles para manejar cualquier otra tarea que llegue mientras el primer subproceso está procesando. Cuando el primer subproceso ha terminado de procesar su tarea, se devuelve al grupo de subprocesos ociosos, listo para manejar una nueva tarea. Un grupo de subprocesos te permite procesar conexiones de manera concurrente, aumentando la rendimiento de tu servidor.

Limitaremos el número de subprocesos en el grupo a un número pequeño para protegernos de ataques DoS; si tuviéramos que nuestro programa creara un nuevo subproceso para cada solicitud que llegara, alguien que hiciera 10 millones de solicitudes a nuestro servidor podría causar estragos al consumir todos los recursos de nuestro servidor y detener el procesamiento de solicitudes.

En lugar de crear subprocesos ilimitados, entonces, tendremos un número fijo de subprocesos esperando en el grupo. Las solicitudes que llegan se envían al grupo para su procesamiento. El grupo mantendrá una cola de solicitudes entrantes. Cada uno de los subprocesos del grupo tomará una solicitud de esta cola, manejará la solicitud y luego solicitará otra solicitud a la cola. Con este diseño, podemos procesar hasta N solicitudes concurrentemente, donde N es el número de subprocesos. Si cada subproceso está respondiendo a una solicitud de larga duración, las solicitudes posteriores todavía pueden acumularse en la cola, pero hemos aumentado el número de solicitudes de larga duración que podemos manejar antes de llegar a ese punto.

Esta técnica es solo una de las muchas maneras de mejorar la rendimiento de un servidor web. Otras opciones que podrías explorar son el modelo fork/join, el modelo de E/S asíncrona de un solo subproceso y el modelo de E/S asíncrona multihilo. Si estás interesado en este tema, puedes leer más sobre otras soluciones e intentar implementarlas; con un lenguaje de bajo nivel como Rust, todas estas opciones son posibles.

Antes de comenzar a implementar un grupo de subprocesos, hablemos de cómo debería ser el uso del grupo. Cuando estás intentando diseñar código, escribir primero la interfaz del cliente puede ayudarte a guiar tu diseño. Escribe la API del código de manera que quede estructurada como quieres llamarla; luego implementa la funcionalidad dentro de esa estructura en lugar de implementar la funcionalidad y luego diseñar la API pública.

Similar a cómo usamos el desarrollo dirigido por pruebas en el proyecto del Capítulo 12, usaremos aquí el desarrollo dirigido por el compilador. Escribiremos el código que llama a las funciones que queremos, y luego veremos los errores del compilador para determinar qué debemos cambiar a continuación para que el código funcione. Antes de hacer eso, sin embargo, exploraremos la técnica que no vamos a usar como punto de partida.

Creación de un subproceso para cada solicitud

Primero, exploremos cómo podría verse nuestro código si creara un nuevo subproceso para cada conexión. Como se mencionó anteriormente, este no es nuestro plan final debido a los problemas de la posible creación de un número ilimitado de subprocesos, pero es un punto de partida para obtener primero un servidor multihilo funcional. Luego agregaremos el grupo de subprocesos como una mejora, y contrastar las dos soluciones será más fácil.

La Lista 20-11 muestra los cambios que se deben hacer a main para crear un nuevo subproceso para manejar cada flujo dentro del bucle for.

Nombre de archivo: src/main.rs

fn main() {
    let listener = TcpListener::bind("127.0.0.1:7878").unwrap();

    for stream in listener.incoming() {
        let stream = stream.unwrap();

        thread::spawn(|| {
            handle_connection(stream);
        });
    }
}

Lista 20-11: Creación de un nuevo subproceso para cada flujo

Como aprendiste en el Capítulo 16, thread::spawn creará un nuevo subproceso y luego ejecutará el código en la clausura en el nuevo subproceso. Si ejecutas este código y cargas /sleep en tu navegador, luego / en dos pestañas más del navegador, realmente verás que las solicitudes a / no tienen que esperar a que /sleep termine. Sin embargo, como mencionamos, esto eventualmente sobrecargará el sistema porque estarías creando nuevos subprocesos sin ningún límite.

Creación de un número finito de subprocesos

Queremos que nuestro grupo de subprocesos funcione de manera similar y familiar, de modo que cambiar de subprocesos a un grupo de subprocesos no requiera grandes cambios en el código que utiliza nuestra API. La Lista 20-12 muestra la interfaz hipotética para una estructura ThreadPool que queremos usar en lugar de thread::spawn.

Nombre de archivo: src/main.rs

fn main() {
    let listener = TcpListener::bind("127.0.0.1:7878").unwrap();
  1 let pool = ThreadPool::new(4);

    for stream in listener.incoming() {
        let stream = stream.unwrap();

      2 pool.execute(|| {
            handle_connection(stream);
        });
    }
}

Lista 20-12: Nuestra interfaz ideal de ThreadPool

Usamos ThreadPool::new para crear un nuevo grupo de subprocesos con un número configurable de subprocesos, en este caso cuatro [1]. Luego, en el bucle for, pool.execute tiene una interfaz similar a thread::spawn en el sentido de que toma una clausura que el grupo debe ejecutar para cada flujo [2]. Necesitamos implementar pool.execute de modo que tome la clausura y la deje ejecutar por un subproceso en el grupo. Este código aún no se compilará, pero lo intentaremos para que el compilador nos pueda guiar en cómo corregirlo.

Construyendo ThreadPool usando el desarrollo dirigido por el compilador

Haga los cambios de la Lista 20-12 en src/main.rs, y luego usemos los errores del compilador de cargo check para guiar nuestro desarrollo. Aquí está el primer error que obtenemos:

$ cargo check
    Checking hello v0.1.0 (file:///projects/hello)
error[E0433]: failed to resolve: use of undeclared type `ThreadPool`
  --> src/main.rs:11:16
   |
11 |     let pool = ThreadPool::new(4);
   |                ^^^^^^^^^^ use of undeclared type `ThreadPool`

¡Excelente! Este error nos dice que necesitamos un tipo o módulo ThreadPool, así que lo construiremos ahora. Nuestra implementación de ThreadPool será independiente del tipo de trabajo que está haciendo nuestro servidor web. Entonces, cambiemos el crate hello de un crate binario a un crate de biblioteca para contener nuestra implementación de ThreadPool. Después de cambiar a un crate de biblioteca, también podríamos usar la biblioteca separada de grupos de subprocesos para cualquier trabajo que queramos hacer usando un grupo de subprocesos, no solo para atender solicitudes web.

Cree un archivo src/lib.rs que contenga lo siguiente, que es la definición más simple de una estructura ThreadPool que podemos tener por ahora:

Nombre de archivo: src/lib.rs

pub struct ThreadPool;

Luego edite el archivo main.rs para traer ThreadPool al ámbito desde el crate de biblioteca agregando el siguiente código al principio de src/main.rs:

Nombre de archivo: src/main.rs

use hello::ThreadPool;

Este código todavía no funcionará, pero revisémoslo de nuevo para obtener el siguiente error que necesitamos resolver:

$ cargo check
    Checking hello v0.1.0 (file:///projects/hello)
error[E0599]: no function or associated item named `new` found for struct
`ThreadPool` in the current scope
  --> src/main.rs:12:28
   |
12 |     let pool = ThreadPool::new(4);
   |                            ^^^ function or associated item not found in
`ThreadPool`

Este error indica que a continuación necesitamos crear una función asociada llamada new para ThreadPool. También sabemos que new necesita tener un parámetro que pueda aceptar 4 como argumento y debe devolver una instancia de ThreadPool. Implementemos la función new más simple que tendrá esas características:

Nombre de archivo: src/lib.rs

pub struct ThreadPool;

impl ThreadPool {
    pub fn new(size: usize) -> ThreadPool {
        ThreadPool
    }
}

Elegimos usize como el tipo del parámetro size porque sabemos que un número negativo de subprocesos no tiene sentido. También sabemos que usaremos este 4 como el número de elementos en una colección de subprocesos, que es para lo que es el tipo usize, como se discutió en "Tipos enteros".

Revisemos el código de nuevo:

$ cargo check
    Checking hello v0.1.0 (file:///projects/hello)
error[E0599]: no method named `execute` found for struct `ThreadPool` in the
current scope
  --> src/main.rs:17:14
   |
17 |         pool.execute(|| {
   |              ^^^^^^^ method not found in `ThreadPool`

Ahora el error ocurre porque no tenemos un método execute en ThreadPool. Recuerde de "Creación de un número finito de subprocesos" que decidimos que nuestro grupo de subprocesos debería tener una interfaz similar a thread::spawn. Además, implementaremos la función execute de modo que tome la clausura que se le da y la deje ejecutar por un subproceso ocioso en el grupo.

Definiremos el método execute en ThreadPool para tomar una clausura como parámetro. Recuerde de "Moviendo valores capturados fuera de las clausuras y los tratos Fn" que podemos tomar clausuras como parámetros con tres tratos diferentes: Fn, FnMut y FnOnce. Necesitamos decidir qué tipo de clausura usar aquí. Sabemos que terminaremos haciendo algo similar a la implementación de thread::spawn de la biblioteca estándar, así que podemos ver qué límites tiene la firma de thread::spawn en su parámetro. La documentación nos muestra lo siguiente:

pub fn spawn<F, T>(f: F) -> JoinHandle<T>
    where
        F: FnOnce() -> T,
        F: Send + 'static,
        T: Send + 'static,

El parámetro de tipo F es el que nos interesa aquí; el parámetro de tipo T está relacionado con el valor de retorno, y no nos interesa eso. Podemos ver que spawn usa FnOnce como el trato límite en F. Probablemente esto también sea lo que queremos, porque eventualmente pasaremos el argumento que obtenemos en execute a spawn. Podemos estar más seguros de que FnOnce es el trato que queremos usar porque el subproceso para ejecutar una solicitud solo ejecutará la clausura de esa solicitud una vez, lo que coincide con el Once en FnOnce.

El parámetro de tipo F también tiene el trato límite Send y el límite de vida 'static, que son útiles en nuestra situación: necesitamos Send para transferir la clausura de un subproceso a otro y 'static porque no sabemos cuánto tiempo tardará el subproceso en ejecutarse. Cree un método execute en ThreadPool que tomará un parámetro genérico de tipo F con estos límites:

Nombre de archivo: src/lib.rs

impl ThreadPool {
    --snip--
    pub fn execute<F>(&self, f: F)
    where
        F: FnOnce() 1 + Send + 'static,
    {
    }
}

Todavía usamos los () después de FnOnce [1] porque este FnOnce representa una clausura que no toma parámetros y devuelve el tipo unitario (). Al igual que las definiciones de funciones, el tipo de retorno se puede omitir de la firma, pero incluso si no tenemos parámetros, todavía necesitamos los paréntesis.

Una vez más, esta es la implementación más simple del método execute: no hace nada, pero solo estamos intentando hacer que nuestro código se compile. Revisémoslo de nuevo:

$ cargo check
    Checking hello v0.1.0 (file:///projects/hello)
    Finished dev [unoptimized + debuginfo] target(s) in 0.24s

¡Se compila! Pero tenga en cuenta que si intenta cargo run y hace una solicitud en el navegador, verá los errores en el navegador que vimos al principio del capítulo. Nuestra biblioteca todavía no está llamando realmente a la clausura pasada a execute todavía.

Nota: Un dicho que es posible que escuche sobre lenguajes con compiladores estrictos, como Haskell y Rust, es "si el código se compila, funciona". Pero este dicho no es universalmente cierto. Nuestro proyecto se compila, pero no hace absolutamente nada. Si estuviéramos construyendo un proyecto real y completo, este sería un buen momento para comenzar a escribir pruebas unitarias para comprobar que el código se compila y tiene el comportamiento que queremos.

Validando el número de subprocesos en new

No estamos haciendo nada con los parámetros de new y execute. Implementemos los cuerpos de estas funciones con el comportamiento que queremos. Para comenzar, pensemos en new. Anteriormente elegimos un tipo sin signo para el parámetro size porque un grupo de subprocesos con un número negativo de subprocesos no tiene sentido. Sin embargo, un grupo de subprocesos con cero subprocesos también no tiene sentido, aunque cero es un usize perfectamente válido. Agregaremos código para comprobar que size es mayor que cero antes de devolver una instancia de ThreadPool y hacer que el programa se detenga con un error si recibe un cero usando la macro assert!, como se muestra en la Lista 20-13.

Nombre de archivo: src/lib.rs

impl ThreadPool {
    /// Crea un nuevo ThreadPool.
    ///
    /// El tamaño es el número de subprocesos en el grupo.
    ///
  1 /// ## Panics
    ///
    /// La función `new` se detendrá con un error si el tamaño es cero.
    pub fn new(size: usize) -> ThreadPool {
      2 assert!(size > 0);

        ThreadPool
    }

    --snip--
}

Lista 20-13: Implementando ThreadPool::new para detenerse con un error si size es cero

También hemos agregado algunos comentarios de documentación para nuestro ThreadPool con comentarios de documentación. Tenga en cuenta que seguimos las buenas prácticas de documentación al agregar una sección que señala las situaciones en las que nuestra función puede detenerse con un error [1], como se discutió en el Capítulo 14. Intente ejecutar cargo doc --open y hacer clic en la estructura ThreadPool para ver cómo se ven los documentos generados para new ¡

En lugar de agregar la macro assert! como lo hicimos aquí [2], podríamos cambiar new a build y devolver un Result como lo hicimos con Config::build en el proyecto de E/S de la Lista 12-9. Pero en este caso hemos decidido que intentar crear un grupo de subprocesos sin ningún subproceso debe ser un error irreparable. Si estás con ganas de probar, intenta escribir una función llamada build con la siguiente firma para compararla con la función new:

pub fn build(
    size: usize
) -> Result<ThreadPool, PoolCreationError> {

Creando espacio para almacenar los subprocesos

Ahora que tenemos una forma de saber que tenemos un número válido de subprocesos para almacenar en el grupo, podemos crear esos subprocesos y almacenarlos en la estructura ThreadPool antes de devolver la estructura. Pero, ¿cómo "almacenamos" un subproceso? Echemos otro vistazo a la firma de thread::spawn:

pub fn spawn<F, T>(f: F) -> JoinHandle<T>
    where
        F: FnOnce() -> T,
        F: Send + 'static,
        T: Send + 'static,

La función spawn devuelve un JoinHandle<T>, donde T es el tipo que devuelve la clausura. Intentemos usar JoinHandle también y ver qué pasa. En nuestro caso, las clausuras que estamos pasando al grupo de subprocesos manejarán la conexión y no devolverán nada, por lo que T será el tipo unitario ().

El código de la Lista 20-14 se compilará pero aún no creará ningún subproceso. Hemos cambiado la definición de ThreadPool para que contenga un vector de instancias de thread::JoinHandle<()>, inicializado el vector con una capacidad de size, configurado un bucle for que ejecutará algún código para crear los subprocesos y devuelto una instancia de ThreadPool que los contiene.

Nombre de archivo: src/lib.rs

1 use std::thread;

pub struct ThreadPool {
  2 threads: Vec<thread::JoinHandle<()>>,
}

impl ThreadPool {
    --snip--
    pub fn new(size: usize) -> ThreadPool {
        assert!(size > 0);

      3 let mut threads = Vec::with_capacity(size);

        for _ in 0..size {
            // create some threads and store them in the vector
        }

        ThreadPool { threads }
    }
    --snip--
}

Lista 20-14: Creando un vector para ThreadPool para almacenar los subprocesos

Hemos traído std::thread al ámbito en el crate de biblioteca [1] porque estamos usando thread::JoinHandle como el tipo de los elementos en el vector en ThreadPool [2].

Una vez que se recibe un tamaño válido, nuestro ThreadPool crea un nuevo vector que puede contener size elementos [3]. La función with_capacity realiza la misma tarea que Vec::new pero con una importante diferencia: reserva espacio previamente en el vector. Debido a que sabemos que necesitamos almacenar size elementos en el vector, hacer esta asignación por adelantado es ligeramente más eficiente que usar Vec::new, que se redimensiona a sí misma a medida que se insertan elementos.

Cuando vuelva a ejecutar cargo check, debería tener éxito.

Enviando código desde ThreadPool a un subproceso

Dejamos un comentario en el bucle for de la Lista 20-14 sobre la creación de subprocesos. Aquí, veremos cómo realmente creamos subprocesos. La biblioteca estándar proporciona thread::spawn como una forma de crear subprocesos, y thread::spawn espera recibir algún código que el subproceso debe ejecutar tan pronto como se crea el subproceso. Sin embargo, en nuestro caso, queremos crear los subprocesos y que espere por el código que enviaremos más tarde. La implementación de subprocesos de la biblioteca estándar no incluye ninguna forma de hacer eso; tenemos que implementarlo manualmente.

Implementaremos este comportamiento introduciendo una nueva estructura de datos entre ThreadPool y los subprocesos que gestionará este nuevo comportamiento. Llamaremos a esta estructura de datos Worker, que es un término común en las implementaciones de pooling. El Worker recoge el código que necesita ser ejecutado y lo ejecuta en su subproceso.

Imagina a las personas que trabajan en la cocina de un restaurante: los trabajadores esperan hasta que lleguen los pedidos de los clientes, y luego son responsables de tomar esos pedidos y atendérselos.

En lugar de almacenar un vector de instancias de JoinHandle<()> en el grupo de subprocesos, almacenaremos instancias de la estructura Worker. Cada Worker almacenará una sola instancia de JoinHandle<()>. Luego implementaremos un método en Worker que tomará una clausura de código a ejecutar y la enviará al subproceso ya en ejecución para su ejecución. También le daremos a cada Worker un id para que podamos distinguir entre las diferentes instancias de Worker en el grupo cuando registramos o depuramos.

Aquí está el nuevo proceso que sucederá cuando creemos un ThreadPool. Implementaremos el código que envía la clausura al subproceso después de tener Worker configurado de esta manera:

  1. Definir una estructura Worker que contiene un id y un JoinHandle<()>.
  2. Cambiar ThreadPool para contener un vector de instancias de Worker.
  3. Definir una función Worker::new que toma un número de id y devuelve una instancia de Worker que contiene el id y un subproceso creado con una clausura vacía.
  4. En ThreadPool::new, usar el contador del bucle for para generar un id, crear un nuevo Worker con ese id y almacenar el Worker en el vector.

Si estás dispuesto a un desafío, intenta implementar estos cambios por tu cuenta antes de ver el código de la Lista 20-15.

¿Listo? Aquí está la Lista 20-15 con una forma de hacer las modificaciones anteriores.

Nombre de archivo: src/lib.rs

use std::thread;

pub struct ThreadPool {
  1 workers: Vec<Worker>,
}

impl ThreadPool {
    --snip--
    pub fn new(size: usize) -> ThreadPool {
        assert!(size > 0);

        let mut workers = Vec::with_capacity(size);

      2 for id in 0..size {
          3 workers.push(Worker::new(id));
        }

        ThreadPool { workers }
    }
    --snip--
}

4 struct Worker {
    id: usize,
    thread: thread::JoinHandle<()>,
}

impl Worker {
  5 fn new(id: usize) -> Worker {
      6 let thread = thread::spawn(|| {});

        Worker { 7 id, 8 thread }
    }
}

Lista 20-15: Modificando ThreadPool para contener instancias de Worker en lugar de contener subprocesos directamente

Hemos cambiado el nombre del campo en ThreadPool de threads a workers porque ahora contiene instancias de Worker en lugar de instancias de JoinHandle<()> [1]. Usamos el contador en el bucle for [2] como argumento para Worker::new, y almacenamos cada nuevo Worker en el vector llamado workers [3].

El código externo (como nuestro servidor en src/main.rs) no necesita conocer los detalles de implementación sobre el uso de una estructura Worker dentro de ThreadPool, así que hacemos que la estructura Worker [4] y su función new [5] sean privadas. La función Worker::new usa el id que le damos [7] y almacena una instancia de JoinHandle<()> [8] que se crea al generar un nuevo subproceso usando una clausura vacía [6].

Nota: Si el sistema operativo no puede crear un subproceso porque no hay suficientes recursos del sistema, thread::spawn causará un error. Eso hará que todo nuestro servidor se detenga con un error, aunque la creación de algunos subprocesos puede tener éxito. Por simplicidad, este comportamiento es aceptable, pero en una implementación de grupo de subprocesos en producción, probablemente querrías usar std::thread::Builder y su método spawn que devuelve Result en lugar.

Este código se compilará y almacenará el número de instancias de Worker que especificamos como argumento para ThreadPool::new. Pero todavía no estamos procesando la clausura que obtenemos en execute. Veamos cómo hacerlo a continuación.

Enviando solicitudes a los subprocesos a través de canales

El siguiente problema que abordaremos es que las clausuras que se le dan a thread::spawn no hacen absolutamente nada. Actualmente, obtenemos la clausura que queremos ejecutar en el método execute. Pero necesitamos dar a thread::spawn una clausura para ejecutar cuando creamos cada Worker durante la creación del ThreadPool.

Queremos que las estructuras Worker que acabamos de crear obtengan el código a ejecutar de una cola que se mantenga en el ThreadPool y lo envíen a su subproceso para que se ejecute.

Los canales que aprendimos sobre en el Capítulo 16, una forma simple de comunicarse entre dos subprocesos, sería perfecta para este caso de uso. Usaremos un canal para funcionar como la cola de trabajos, y execute enviará un trabajo desde el ThreadPool a las instancias de Worker, que lo enviarán a su subproceso. Aquí está el plan:

  1. El ThreadPool creará un canal y se quedará con el emisor.
  2. Cada Worker se quedará con el receptor.
  3. Crearemos una nueva estructura Job que contendrá las clausuras que queremos enviar por el canal.
  4. El método execute enviará el trabajo que quiere ejecutar a través del emisor.
  5. En su subproceso, el Worker recorrerá su receptor y ejecutará las clausuras de cualquier trabajo que reciba.

Comencemos creando un canal en ThreadPool::new y manteniendo el emisor en la instancia de ThreadPool, como se muestra en la Lista 20-16. La estructura Job no contiene nada por ahora pero será el tipo de elemento que estamos enviando por el canal.

Nombre de archivo: src/lib.rs

use std::{sync::mpsc, thread};

pub struct ThreadPool {
    workers: Vec<Worker>,
    sender: mpsc::Sender<Job>,
}

struct Job;

impl ThreadPool {
    --snip--
    pub fn new(size: usize) -> ThreadPool {
        assert!(size > 0);

      1 let (sender, receiver) = mpsc::channel();

        let mut workers = Vec::with_capacity(size);

        for id in 0..size {
            workers.push(Worker::new(id));
        }

        ThreadPool { workers, 2 sender }
    }
    --snip--
}

Lista 20-16: Modificando ThreadPool para almacenar el emisor de un canal que transmite instancias de Job

En ThreadPool::new, creamos nuestro nuevo canal [1] y hacemos que el grupo se quede con el emisor [2]. Esto se compilará correctamente.

Intentemos pasar un receptor del canal a cada Worker cuando el grupo de subprocesos crea el canal. Sabemos que queremos usar el receptor en el subproceso que las instancias de Worker generan, así que haremos referencia al parámetro receptor en la clausura. El código de la Lista 20-17 todavía no se compilará.

Nombre de archivo: src/lib.rs

impl ThreadPool {
    --snip--
    pub fn new(size: usize) -> ThreadPool {
        assert!(size > 0);

        let (sender, receiver) = mpsc::channel();

        let mut workers = Vec::with_capacity(size);

        for id in 0..size {
          1 workers.push(Worker::new(id, receiver));
        }

        ThreadPool { workers, sender }
    }
    --snip--
}

--snip--

impl Worker {
    fn new(id: usize, receiver: mpsc::Receiver<Job>) -> Worker {
        let thread = thread::spawn(|| {
          2 receiver;
        });

        Worker { id, thread }
    }
}

Lista 20-17: Pasando el receptor a cada Worker

Hemos hecho algunos cambios pequeños y directos: pasamos el receptor a Worker::new [1], y luego lo usamos dentro de la clausura [2].

Cuando intentamos comprobar este código, obtenemos este error:

$ cargo check
    Checking hello v0.1.0 (file:///projects/hello)
error[E0382]: use of moved value: `receiver`
  --> src/lib.rs:26:42
   |
21 |         let (sender, receiver) = mpsc::channel();
   |                      -------- move occurs because `receiver` has type
`std::sync::mpsc::Receiver<Job>`, which does not implement the `Copy` trait
...
26 |             workers.push(Worker::new(id, receiver));
   |                                          ^^^^^^^^ value moved here, in
previous iteration of loop

El código está intentando pasar receptor a múltiples instancias de Worker. Esto no funcionará, como recordarás del Capítulo 16: la implementación de canal que Rust proporciona es de múltiples productores, un solo consumidor. Esto significa que no podemos simplemente clonar el extremo consumidor del canal para corregir este código. Tampoco queremos enviar un mensaje múltiples veces a múltiples consumidores; queremos una lista de mensajes con múltiples instancias de Worker de modo que cada mensaje se procese una vez.

Además, tomar un trabajo de la cola del canal implica mutar el receptor, por lo que los subprocesos necesitan una forma segura de compartir y modificar receptor; de lo contrario, es posible que obtengamos condiciones de carrera (como se cubre en el Capítulo 16).

Recuerde los punteros inteligentes seguros para subprocesos discutidos en el Capítulo 16: para compartir la propiedad entre múltiples subprocesos y permitir que los subprocesos muten el valor, necesitamos usar Arc<Mutex<T>>. El tipo Arc permitirá que múltiples instancias de Worker posean el receptor, y Mutex asegurará que solo un Worker obtenga un trabajo del receptor a la vez. La Lista 20-18 muestra los cambios que necesitamos hacer.

Nombre de archivo: src/lib.rs

use std::{
    sync::{mpsc, Arc, Mutex},
    thread,
};
--snip--

impl ThreadPool {
    --snip--
    pub fn new(size: usize) -> ThreadPool {
        assert!(size > 0);

        let (sender, receiver) = mpsc::channel();

      1 let receiver = Arc::new(Mutex::new(receiver));

        let mut workers = Vec::with_capacity(size);

        for id in 0..size {
            workers.push(
                Worker::new(id, Arc::clone(& 2 receiver))
            );
        }

        ThreadPool { workers, sender }
    }

    --snip--
}

--snip--

impl Worker {
    fn new(
        id: usize,
        receiver: Arc<Mutex<mpsc::Receiver<Job>>>,
    ) -> Worker {
        --snip--
    }
}

Lista 20-18: Compartiendo el receptor entre las instancias de Worker usando Arc y Mutex

En ThreadPool::new, ponemos el receptor en un Arc y un Mutex [1]. Para cada nuevo Worker, clonamos el Arc para aumentar el recuento de referencias para que las instancias de Worker puedan compartir la propiedad del receptor [2].

Con estos cambios, ¡el código se compila! Estamos llegando!

Implementando el método execute

Finalmente, implementemos el método execute en ThreadPool. También cambiaremos Job de una estructura a un alias de tipo para un objeto de trato que contiene el tipo de clausura que recibe execute. Como se discutió en "Creating Type Synonyms with Type Aliases", los alias de tipo nos permiten hacer que los tipos largos sean más cortos para mayor facilidad de uso. Echa un vistazo a la Lista 20-19.

Nombre de archivo: src/lib.rs

--snip--

type Job = Box<dyn FnOnce() + Send + 'static>;

impl ThreadPool {
    --snip--

    pub fn execute<F>(&self, f: F)
    where
        F: FnOnce() + Send + 'static,
    {
      1 let job = Box::new(f);

      2 self.sender.send(job).unwrap();
    }
}

--snip--

Lista 20-19: Creando un alias de tipo Job para un Box que contiene cada clausura y luego enviando el trabajo por el canal

Después de crear una nueva instancia de Job usando la clausura que obtenemos en execute [1], enviamos ese trabajo por el extremo de envío del canal [2]. Estamos llamando a unwrap en send para el caso en el que el envío falle. Esto podría suceder, por ejemplo, si detenemos la ejecución de todos nuestros subprocesos, lo que significa que el extremo de recepción ha dejado de recibir nuevos mensajes. En este momento, no podemos detener la ejecución de nuestros subprocesos: nuestros subprocesos continúan ejecutándose mientras el grupo existe. La razón por la que usamos unwrap es que sabemos que el caso de error no sucederá, pero el compilador no lo sabe.

Pero todavía no hemos terminado ¡Todavía! En el Worker, la clausura que se le pasa a thread::spawn todavía solo referencia el extremo de recepción del canal. En cambio, necesitamos que la clausura se repita para siempre, pidiendo al extremo de recepción del canal un trabajo y ejecutando el trabajo cuando lo recibe. Hagamos el cambio mostrado en la Lista 20-20 en Worker::new.

Nombre de archivo: src/lib.rs

--snip--

impl Worker {
    fn new(
        id: usize,
        receiver: Arc<Mutex<mpsc::Receiver<Job>>>,
    ) -> Worker {
        let thread = thread::spawn(move || loop {
            let job = receiver
              1.lock()
              2.unwrap()
              3.recv()
              4.unwrap();

            println!("Worker {id} got a job; executing.");

            job();
        });

        Worker { id, thread }
    }
}

Lista 20-20: Recibiendo y ejecutando los trabajos en el subproceso de la instancia de Worker

Aquí, primero llamamos a lock en el receptor para adquirir el mutex [1], y luego llamamos a unwrap para generar un error en cualquier error [2]. Adquirir un bloqueo puede fallar si el mutex está en un estado envenenado, lo que puede suceder si algún otro subproceso generó un error mientras mantenía el bloqueo en lugar de liberarlo. En esta situación, llamar a unwrap para que este subproceso genere un error es la acción correcta a tomar. Siéntase libre de cambiar este unwrap por un expect con un mensaje de error que tenga sentido para usted.

Si obtenemos el bloqueo del mutex, llamamos a recv para recibir un Job del canal [3]. Un último unwrap también se salta cualquier error aquí [4], que podría ocurrir si el subproceso que mantiene el emisor se ha detenido, de manera similar a cómo el método send devuelve Err si el receptor se detiene.

La llamada a recv se bloquea, por lo que si todavía no hay un trabajo, el subproceso actual esperará hasta que un trabajo esté disponible. El Mutex<T> asegura que solo un subproceso Worker a la vez intenta solicitar un trabajo.

Nuestro grupo de subprocesos ahora está en un estado de funcionamiento ¡Dale un cargo run y haz algunas solicitudes!

[object Object]

¡Éxito! Ahora tenemos un grupo de subprocesos que ejecuta conexiones de forma asincrónica. Nunca se crean más de cuatro subprocesos, por lo que nuestro sistema no se sobrecargará si el servidor recibe muchas solicitudes. Si hacemos una solicitud a /sleep, el servidor podrá atender otras solicitudes haciéndolas ejecutar otro subproceso.

Nota: Si abres /sleep en múltiples ventanas del navegador simultáneamente, es posible que se carguen una a una con intervalos de cinco segundos. Algunos navegadores web ejecutan múltiples instancias de la misma solicitud secuencialmente por razones de caché. Esta limitación no es causada por nuestro servidor web.

Después de aprender sobre el bucle while let en el Capítulo 18, es posible que te preguntes por qué no escribimos el código del subproceso Worker como se muestra en la Lista 20-21.

Nombre de archivo: src/lib.rs

--snip--

impl Worker {
    fn new(
        id: usize,
        receiver: Arc<Mutex<mpsc::Receiver<Job>>>,
    ) -> Worker {
        let thread = thread::spawn(move || {
            while let Ok(job) = receiver.lock().unwrap().recv() {
                println!("Worker {id} got a job; executing.");

                job();
            }
        });

        Worker { id, thread }
    }
}

Lista 20-21: Una implementación alternativa de Worker::new usando while let

Este código se compila y ejecuta pero no produce el comportamiento de subprocesamiento deseado: una solicitud lenta todavía hará que otras solicitudes esperen a ser procesadas. La razón es algo sutil: la estructura Mutex no tiene un método público unlock porque la propiedad del bloqueo se basa en la duración de la MutexGuard<T> dentro del LockResult<MutexGuard<T>> que devuelve el método lock. En tiempo de compilación, el verificador de préstamos puede entonces aplicar la regla de que un recurso protegido por un Mutex no puede ser accedido a menos que mantengamos el bloqueo. Sin embargo, esta implementación también puede causar que el bloqueo se mantenga más tiempo del esperado si no tenemos en cuenta la duración de la MutexGuard<T>.

El código de la Lista 20-20 que usa let job = receiver.lock().unwrap().recv().unwrap(); funciona porque con let, cualquier valor temporal usado en la expresión en el lado derecho del signo igual se descarta inmediatamente cuando la declaración let finaliza. Sin embargo, while let (y if let y match) no descarta valores temporales hasta el final del bloque asociado. En la Lista 20-21, el bloqueo permanece durante la duración de la llamada a job(), lo que significa que otras instancias de Worker no pueden recibir trabajos.

Resumen

¡Felicidades! Has completado el laboratorio de convertir nuestro servidor de un subproceso en un servidor de múltiples subprocesos. Puedes practicar más laboratorios en LabEx para mejorar tus habilidades.