Apagado y limpieza adecuados

RustRustBeginner
Practicar Ahora

This tutorial is from open-source community. Access the source code

💡 Este tutorial está traducido por IA desde la versión en inglés. Para ver la versión original, puedes hacer clic aquí

Introducción

Bienvenido a Graceful Shutdown and Cleanup. Esta práctica es parte del Rust Book. Puedes practicar tus habilidades de Rust en LabEx.

En esta práctica, implementaremos un mecanismo de apagado y limpieza adecuado en nuestro código utilizando el trato Drop y proporcionando una forma para que los subprocesos dejen de aceptar nuevas solicitudes y se apaguen.

Apagado y limpieza adecuados

El código de la Lista 20-20 está respondiendo a solicitudes de manera asincrónica mediante el uso de un grupo de subprocesos, tal como lo planeamos. Obtenemos algunas advertencias sobre los campos workers, id y thread que no estamos utilizando de manera directa, lo que nos recuerda que no estamos limpiando nada. Cuando usamos el método menos elegante de ctrl-C para detener el subproceso principal, todos los demás subprocesos se detienen inmediatamente también, incluso si están en medio de atender una solicitud.

A continuación, implementaremos el trato Drop para llamar a join en cada uno de los subprocesos del grupo, para que puedan terminar las solicitudes en las que están trabajando antes de cerrar. Luego implementaremos una forma de decir a los subprocesos que deben dejar de aceptar nuevas solicitudes y apagarse. Para ver este código en acción, modificaremos nuestro servidor para que acepte solo dos solicitudes antes de apagar adecuadamente su grupo de subprocesos.

Implementando el trato Drop en ThreadPool

Comencemos implementando Drop en nuestro grupo de subprocesos. Cuando se elimina el grupo, todos nuestros subprocesos deben unirse para asegurarse de terminar su trabajo. La Lista 20-22 muestra un primer intento de implementación de Drop; este código aún no funcionará correctamente.

Nombre de archivo: src/lib.rs

impl Drop for ThreadPool {
    fn drop(&mut self) {
      1 for worker in &mut self.workers {
          2 println!("Apagando el trabajador {}", worker.id);

          3 worker.thread.join().unwrap();
        }
    }
}

Lista 20-22: Uniendo cada subproceso cuando el grupo de subprocesos sale del ámbito

Primero, recorremos cada uno de los workers del grupo de subprocesos [1]. Usamos &mut para esto porque self es una referencia mutable, y también necesitamos poder mutar worker. Para cada worker, imprimimos un mensaje diciendo que esta instancia particular de Worker se está apagando [2], y luego llamamos a join en el subproceso de esa instancia de Worker [3]. Si la llamada a join falla, usamos unwrap para que Rust se detenga abruptamente y entre en un apagado no adecuado.

Aquí está el error que obtenemos cuando compilamos este código:

error[E0507]: no se puede mover `worker.thread` que está detrás de una
referencia mutable
    --> src/lib.rs:52:13
     |
52   |             worker.thread.join().unwrap();
     |             ^^^^^^^^^^^^^ ------ `worker.thread` movido debido a
este método de llamada
     |             |
     |             el movimiento ocurre porque `worker.thread` tiene el
tipo `JoinHandle<()>`, que no implementa el trato `Copy`
     |
nota: esta función toma posesión del receptor `self`, lo que mueve
`worker.thread`

El error nos dice que no podemos llamar a join porque solo tenemos un préstamo mutable de cada worker y join toma posesión de su argumento. Para resolver este problema, necesitamos mover el subproceso fuera de la instancia de Worker que posee thread para que join pueda consumir el subproceso. Hicimos esto en la Lista 17-15: si Worker contiene una Option<thread::JoinHandle<()>> en lugar de eso, podemos llamar al método take en la Option para mover el valor fuera de la variante Some y dejar una variante None en su lugar. En otras palabras, un Worker que está en ejecución tendrá una variante Some en thread, y cuando queramos limpiar un Worker, reemplazaremos Some con None para que el Worker no tenga un subproceso para ejecutar.

Entonces, sabemos que queremos actualizar la definición de Worker de la siguiente manera:

Nombre de archivo: src/lib.rs

struct Worker {
    id: usize,
    thread: Option<thread::JoinHandle<()>>,
}

Ahora, apoyémonos en el compilador para encontrar los otros lugares que necesitan cambiar. Al revisar este código, obtenemos dos errores:

error[E0599]: no se encontró el método llamado `join` para el enum
`Option` en el ámbito actual
  --> src/lib.rs:52:27
   |
52 |             worker.thread.join().unwrap();
   |                           ^^^^ método no encontrado en
`Option<JoinHandle<()>>`

error[E0308]: tipos no coincidentes
  --> src/lib.rs:72:22
   |
72 |         Worker { id, thread }
   |                      ^^^^^^ se esperaba el enum `Option`, se
encontró la struct `JoinHandle`
   |
   = nota: se esperaba el enum `Option<JoinHandle<()>>`
            se encontró la struct `JoinHandle<_>`
ayuda: intente envolver la expresión en `Some`
   |
72 |         Worker { id, thread: Some(thread) }
   |                      +++++++++++++      +

Vamos a abordar el segundo error, que apunta al código al final de Worker::new; necesitamos envolver el valor de thread en Some cuando creamos un nuevo Worker. Haga los siguientes cambios para corregir este error:

Nombre de archivo: src/lib.rs

impl Worker {
    fn new(
        id: usize,
        receiver: Arc<Mutex<mpsc::Receiver<Job>>>,
    ) -> Worker {
        --snip--

        Worker {
            id,
            thread: Some(thread),
        }
    }
}

El primer error está en nuestra implementación de Drop. Mencionamos anteriormente que teníamos la intención de llamar a take en el valor de Option para mover thread fuera de worker. Los siguientes cambios lo harán:

Nombre de archivo: src/lib.rs

impl Drop for ThreadPool {
    fn drop(&mut self) {
        for worker in &mut self.workers {
            println!("Apagando el trabajador {}", worker.id);

          1 if let Some(thread) = worker.thread.take() {
              2 thread.join().unwrap();
            }
        }
    }
}

Como se discutió en el Capítulo 17, el método take en Option toma la variante Some y deja None en su lugar. Estamos usando if let para desestructurar la Some y obtener el subproceso [1]; luego llamamos a join en el subproceso [2]. Si el subproceso de una instancia de Worker ya es None, sabemos que el Worker ya ha tenido su subproceso limpiado, por lo que en ese caso nada pasa.

Señalando a los subprocesos para que dejen de escuchar trabajos

Con todos los cambios que hemos realizado, nuestro código se compila sin advertencias. Sin embargo, la mala noticia es que este código no funciona de la manera que queremos todavía. La clave está en la lógica de las clausuras ejecutadas por los subprocesos de las instancias de Worker: en este momento, llamamos a join, pero eso no detendrá los subprocesos, porque loop para siempre buscando trabajos. Si intentamos eliminar nuestro ThreadPool con nuestra implementación actual de drop, el subproceso principal se bloqueará para siempre, esperando a que el primer subproceso termine.

Para solucionar este problema, necesitaremos un cambio en la implementación de drop de ThreadPool y luego un cambio en el Worker loop.

Primero, cambiaremos la implementación de drop de ThreadPool para eliminar explícitamente el sender antes de esperar a que los subprocesos terminen. La Lista 20-23 muestra los cambios en ThreadPool para eliminar explícitamente sender. Usamos la misma técnica de Option y take que hicimos con el subproceso para poder mover sender fuera de ThreadPool.

Nombre de archivo: src/lib.rs

pub struct ThreadPool {
    workers: Vec<Worker>,
    sender: Option<mpsc::Sender<Job>>,
}
--snip--
impl ThreadPool {
    pub fn new(size: usize) -> ThreadPool {
        --snip--

        ThreadPool {
            workers,
            sender: Some(sender),
        }
    }

    pub fn execute<F>(&self, f: F)
    where
        F: FnOnce() + Send + 'static,
    {
        let job = Box::new(f);

        self.sender
           .as_ref()
           .unwrap()
           .send(job)
           .unwrap();
    }
}

impl Drop for ThreadPool {
    fn drop(&mut self) {
      1 drop(self.sender.take());

        for worker in &mut self.workers {
            println!("Apagando el trabajador {}", worker.id);

            if let Some(thread) = worker.thread.take() {
                thread.join().unwrap();
            }
        }
    }
}

Lista 20-23: Eliminando explícitamente sender antes de unir los subprocesos Worker

Eliminar sender [1] cierra el canal, lo que indica que no se enviarán más mensajes. Cuando eso sucede, todas las llamadas a recv que realizan las instancias de Worker en el bucle infinito devolverán un error. En la Lista 20-24, cambiamos el Worker loop para salir del bucle de manera adecuada en ese caso, lo que significa que los subprocesos terminarán cuando la implementación de drop de ThreadPool llame a join en ellos.

Nombre de archivo: src/lib.rs

impl Worker {
    fn new(
        id: usize,
        receiver: Arc<Mutex<mpsc::Receiver<Job>>>,
    ) -> Worker {
        let thread = thread::spawn(move || loop {
            let message = receiver.lock().unwrap().recv();

            match message {
                Ok(job) => {
                    println!(
                        "Worker {id} recibió un trabajo; ejecutando."
                    );

                    job();
                }
                Err(_) => {
                    println!(
                        "Worker {id} se está apagando."
                    );
                    break;
                }
            }
        });

        Worker {
            id,
            thread: Some(thread),
        }
    }
}

Lista 20-24: Saliendo explícitamente del bucle cuando recv devuelve un error

Para ver este código en acción, modificaremos main para aceptar solo dos solicitudes antes de apagar adecuadamente el servidor, como se muestra en la Lista 20-25.

Nombre de archivo: src/main.rs

fn main() {
    let listener = TcpListener::bind("127.0.0.1:7878").unwrap();
    let pool = ThreadPool::new(4);

    for stream in listener.incoming().take(2) {
        let stream = stream.unwrap();

        pool.execute(|| {
            handle_connection(stream);
        });
    }

    println!("Apagando.");
}

Lista 20-25: Apagando el servidor después de atender dos solicitudes saliendo del bucle

No querrías que un servidor web del mundo real se apagara después de atender solo dos solicitudes. Este código solo demuestra que el apagado adecuado y la limpieza están en funcionamiento.

El método take está definido en el trato Iterator y limita la iteración a los primeros dos elementos como máximo. El ThreadPool saldrá del ámbito al final de main, y la implementación de drop se ejecutará.

Inicia el servidor con cargo run y haz tres solicitudes. La tercera solicitud debe generar un error, y en tu terminal deberías ver una salida similar a esta:

$ cargo run
   Compilando hello v0.1.0 (file:///projects/hello)
    Finished dev [no optimizado + información de depuración] target(s) en 1.0s
     Ejecutando `target/debug/hello`
Worker 0 recibió un trabajo; ejecutando.
Apagando.
Apagando el trabajador 0
Worker 3 recibió un trabajo; ejecutando.
Worker 1 se desconectó; apagando.
Worker 2 se desconectó; apagando.
Worker 3 se desconectó; apagando.
Worker 0 se desconectó; apagando.
Apagando el trabajador 1
Apagando el trabajador 2
Apagando el trabajador 3

Es posible que veas un orden diferente de los IDs de Worker y los mensajes impresos. Podemos ver cómo funciona este código a partir de los mensajes: las instancias de Worker 0 y 3 recibieron las primeras dos solicitudes. El servidor dejó de aceptar conexiones después de la segunda conexión, y la implementación de Drop en ThreadPool comienza a ejecutarse antes de que Worker 3 incluso comience su trabajo. Eliminar el sender desconecta todas las instancias de Worker y les dice que se apaguen. Las instancias de Worker imprimen un mensaje cada vez que se desconectan, y luego el grupo de subprocesos llama a join para esperar a que cada subproceso de Worker termine.

Observa un aspecto interesante de esta ejecución en particular: el ThreadPool eliminó el sender, y antes de que cualquier Worker recibiera un error, intentamos unir Worker 0. Worker 0 aún no había recibido un error de recv, por lo que el subproceso principal se bloqueó, esperando a que Worker 0 terminara. Mientras tanto, Worker 3 recibió un trabajo y luego todos los subprocesos recibieron un error. Cuando Worker 0 terminó, el subproceso principal esperó a que el resto de las instancias de Worker terminaran. En ese momento, todas habían salido de sus bucles y se detuvieron.

¡Felicidades! Ahora hemos completado nuestro proyecto; tenemos un servidor web básico que utiliza un grupo de subprocesos para responder de manera asincrónica. Somos capaces de realizar un apagado adecuado del servidor, lo que limpia todos los subprocesos del grupo. Visita https://www.nostarch.com/Rust2021 para descargar el código completo de este capítulo para referencia.

Podríamos hacer más aquí. Si quieres continuar mejorando este proyecto, aquí hay algunas ideas:

  • Agregar más documentación a ThreadPool y sus métodos públicos.
  • Agregar pruebas de la funcionalidad de la biblioteca.
  • Cambiar las llamadas a unwrap a un manejo de errores más robusto.
  • Usar ThreadPool para realizar alguna tarea diferente a atender solicitudes web.
  • Encuentra un cráneo de grupo de subprocesos en https://crates.io e implementa un servidor web similar usando el cráneo en lugar de eso. Luego compara su API y robustez con el grupo de subprocesos que implementamos.

Resumen

¡Felicidades! Has completado la práctica de Apagado y Limpieza Adecuados. Puedes practicar más prácticas en LabEx para mejorar tus habilidades.