Graceful Shutdown and Cleanup

Beginner

This tutorial is from open-source community. Access the source code

Введение

Добро пожаловать в Graceful Shutdown and Cleanup. Эта лабораторная работа является частью Rust Book. Вы можете практиковать свои навыки Rust в LabEx.

В этой лабораторной работе мы реализуем механизм graceful shutdown и очистки в нашем коде, используя трейт Drop и предоставляя способ для потоков прекратить принимать новые запросы и выключиться.

Graceful Shutdown and Cleanup

Код в Листинге 20-20 асинхронно обрабатывает запросы с использованием пула потоков, как мы планировали. Мы получаем несколько предупреждений о полях workers, id и thread, которые мы не используем напрямую, что напоминает нам, что мы ничего не очищаем. Когда мы используем менее элегантный метод ctrl-C для остановки главного потока, все остальные потоки также немедленно останавливаются, даже если они находятся в середине обработки запроса.

Далее мы реализуем трейт Drop, чтобы вызвать метод join для каждого потока в пуле, чтобы они могли завершить обработку запросов, которые они обрабатывают, перед закрытием. Затем мы реализуем способ сообщить потокам, что они должны прекратить принимать новые запросы и выключиться. Чтобы увидеть этот код в действии, мы модифицируем наш сервер, чтобы он принимал только два запроса, прежде чем优雅но выключить свой пул потоков.

Реализация трейта Drop для ThreadPool

Начнем с реализации трейта Drop для нашего пула потоков. Когда пул удаляется, все наши потоки должны присоединиться, чтобы убедиться, что они завершат свою работу. Листинг 20-22 показывает первый вариант реализации Drop; этот код еще не будет работать должным образом.

Имя файла: src/lib.rs

impl Drop for ThreadPool {
    fn drop(&mut self) {
      1 for worker in &mut self.workers {
          2 println!("Shutting down worker {}", worker.id);

          3 worker.thread.join().unwrap();
        }
    }
}

Листинг 20-22: Присоединение каждого потока, когда пул потоков выходит из области видимости

Сначала мы перебираем каждый worker в пуле потоков [1]. Мы используем &mut для этого, потому что self - это изменяемая ссылка, и нам также нужно быть able изменять worker. Для каждого worker мы выводим сообщение о том, что этот конкретный экземпляр Worker завершает свою работу [2], а затем мы вызываем join для потока этого экземпляра Worker [3]. Если вызов join завершается неудачно, мы используем unwrap, чтобы заставить Rust вызвать панику и произвести неэлегантный выход.

Вот ошибка, которую мы получаем, когда компилируем этот код:

error[E0507]: cannot move out of `worker.thread` which is behind a mutable
reference
    --> src/lib.rs:52:13
     |
52   |             worker.thread.join().unwrap();
     |             ^^^^^^^^^^^^^ ------ `worker.thread` moved due to this
method call
     |             |
     |             move occurs because `worker.thread` has type
`JoinHandle<()>`, which does not implement the `Copy` trait
     |
note: this function takes ownership of the receiver `self`, which moves
`worker.thread`

Ошибка говорит нам, что мы не можем вызвать join, потому что у нас есть только изменяемая ссылка на каждый worker, а join получает владение своим аргументом. Чтобы решить эту проблему, мы должны переместить поток из экземпляра Worker, который владеет thread, чтобы join мог потребовать поток. Мы сделали это в Листинге 17-15: если Worker хранит Option<thread::JoinHandle<()>> вместо этого, мы можем вызвать метод take для Option, чтобы переместить значение из варианта Some и оставить вариант None на его месте. Другими словами, работающий Worker будет иметь вариант Some в thread, а когда мы хотим очистить Worker, мы заменим Some на None, чтобы Worker не имел потока для выполнения.

Таким образом, мы знаем, что хотим обновить определение Worker следующим образом:

Имя файла: src/lib.rs

struct Worker {
    id: usize,
    thread: Option<thread::JoinHandle<()>>,
}

Теперь давайте полагаемся на компилятор, чтобы найти другие места, которые нужно изменить. Проверив этот код, мы получаем две ошибки:

error[E0599]: no method named `join` found for enum `Option` in the current
scope
  --> src/lib.rs:52:27
   |
52 |             worker.thread.join().unwrap();
   |                           ^^^^ method not found in
`Option<JoinHandle<()>>`

error[E0308]: mismatched types
  --> src/lib.rs:72:22
   |
72 |         Worker { id, thread }
   |                      ^^^^^^ expected enum `Option`, found struct
`JoinHandle`
   |
   = note: expected enum `Option<JoinHandle<()>>`
            found struct `JoinHandle<_>`
help: try wrapping the expression in `Some`
   |
72 |         Worker { id, thread: Some(thread) }
   |                      +++++++++++++      +

Давайте решим вторую ошибку, которая указывает на код в конце Worker::new; мы должны обернуть значение thread в Some, когда создаем новый Worker. Примите следующие изменения, чтобы исправить эту ошибку:

Имя файла: src/lib.rs

impl Worker {
    fn new(
        id: usize,
        receiver: Arc<Mutex<mpsc::Receiver<Job>>>,
    ) -> Worker {
        --snip--

        Worker {
            id,
            thread: Some(thread),
        }
    }
}

Первая ошибка находится в нашей реализации Drop. Мы упоминали ранее, что планируем вызвать take для значения Option, чтобы переместить thread из worker. Следующие изменения это сделают:

Имя файла: src/lib.rs

impl Drop for ThreadPool {
    fn drop(&mut self) {
        for worker in &mut self.workers {
            println!("Shutting down worker {}", worker.id);

          1 if let Some(thread) = worker.thread.take() {
              2 thread.join().unwrap();
            }
        }
    }
}

Как обсуждалось в главе 17, метод take для Option извлекает вариант Some и оставляет None на его месте. Мы используем if let, чтобы разобрать вариант Some и получить поток [1]; затем мы вызываем join для потока [2]. Если у экземпляра Worker поток уже None, мы знаем, что Worker уже имел свой поток очищенным, поэтому ничего не происходит в этом случае.

Сигнализация потокам для прекращения прослушивания заданий

После всех внесенных изменений наш код компилируется без каких-либо предупреждений. Однако, плохая новость заключается в том, что этот код не работает так, как мы хотим. Ключом является логика в замыканиях, выполняемых потоками экземпляров Worker: в настоящее время мы вызываем join, но это не остановит потоки, потому что они loop бесконечно, ожидая заданий. Если мы попытаемся удалить наш ThreadPool с нашей текущей реализацией drop, главный поток будет заблокирован навсегда, ожидая завершения первого потока.

Чтобы исправить эту проблему, нам нужно внести изменения в реализацию drop для ThreadPool, а затем внести изменения в цикл Worker.

Сначала мы изменим реализацию drop для ThreadPool, чтобы явно удалить sender перед ожиданием завершения потоков. Листинг 20-23 показывает изменения в ThreadPool, чтобы явно удалить sender. Мы используем ту же технику с Option и take, что и с потоком, чтобы быть able переместить sender из ThreadPool.

Имя файла: src/lib.rs

pub struct ThreadPool {
    workers: Vec<Worker>,
    sender: Option<mpsc::Sender<Job>>,
}
--snip--
impl ThreadPool {
    pub fn new(size: usize) -> ThreadPool {
        --snip--

        ThreadPool {
            workers,
            sender: Some(sender),
        }
    }

    pub fn execute<F>(&self, f: F)
    where
        F: FnOnce() + Send + 'static,
    {
        let job = Box::new(f);

        self.sender
           .as_ref()
           .unwrap()
           .send(job)
           .unwrap();
    }
}

impl Drop for ThreadPool {
    fn drop(&mut self) {
      1 drop(self.sender.take());

        for worker in &mut self.workers {
            println!("Shutting down worker {}", worker.id);

            if let Some(thread) = worker.thread.take() {
                thread.join().unwrap();
            }
        }
    }
}

Листинг 20-23: Явное удаление sender перед присоединением потоков Worker

Удаление sender [1] закрывает канал, что означает, что больше не будут отправляться сообщения. Когда это происходит, все вызовы recv, которые Worker экземпляры делают в бесконечном цикле, вернут ошибку. В Листинге 20-24 мы изменяем цикл Worker, чтобы gracefully выйти из цикла в этом случае, что означает, что потоки завершатся, когда реализация drop для ThreadPool вызовет join для них.

Имя файла: src/lib.rs

impl Worker {
    fn new(
        id: usize,
        receiver: Arc<Mutex<mpsc::Receiver<Job>>>,
    ) -> Worker {
        let thread = thread::spawn(move || loop {
            let message = receiver.lock().unwrap().recv();

            match message {
                Ok(job) => {
                    println!(
                        "Worker {id} got a job; executing."
                    );

                    job();
                }
                Err(_) => {
                    println!(
                        "Worker {id} shutting down."
                    );
                    break;
                }
            }
        });

        Worker {
            id,
            thread: Some(thread),
        }
    }
}

Листинг 20-24: Явное выход из цикла, когда recv возвращает ошибку

Чтобы увидеть этот код в действии, давайте изменим main, чтобы он принимал только два запроса, прежде чем gracefully выключить сервер, как показано в Листинге 20-25.

Имя файла: src/main.rs

fn main() {
    let listener = TcpListener::bind("127.0.0.1:7878").unwrap();
    let pool = ThreadPool::new(4);

    for stream in listener.incoming().take(2) {
        let stream = stream.unwrap();

        pool.execute(|| {
            handle_connection(stream);
        });
    }

    println!("Shutting down.");
}

Листинг 20-25: Выключение сервера после обработки двух запросов путем выхода из цикла

В реальном веб-сервере вы, вероятно, не захотите выключать его после обработки только двух запросов. Этот код просто демонстрирует, что graceful shutdown и очистка работают должным образом.

Метод take определен в трейте Iterator и ограничивает итерацию максимум первыми двумя элементами. ThreadPool выйдет из области видимости в конце main, и реализация drop будет выполняться.

Запустите сервер с помощью cargo run и сделайте три запроса. Третий запрос должен завершиться с ошибкой, и в вашем терминале вы должны увидеть вывод, похожий на этот:

$ cargo run
   Compiling hello v0.1.0 (file:///projects/hello)
    Finished dev [unoptimized + debuginfo] target(s) in 1.0s
     Running `target/debug/hello`
Worker 0 got a job; executing.
Shutting down.
Shutting down worker 0
Worker 3 got a job; executing.
Worker 1 disconnected; shutting down.
Worker 2 disconnected; shutting down.
Worker 3 disconnected; shutting down.
Worker 0 disconnected; shutting down.
Shutting down worker 1
Shutting down worker 2
Shutting down worker 3

Вы, возможно, увидите другой порядок вывода идентификаторов Worker и сообщений. Мы можем увидеть, как этот код работает по сообщениям: экземпляры Worker 0 и 3 получили первые два запроса. Сервер прекратил принимать соединения после второго соединения, и реализация Drop для ThreadPool начинает выполняться, даже прежде чем Worker 3 начнет свою работу. Удаление sender отключает все экземпляры Worker и сообщает им выключиться. Каждый экземпляр Worker выводит сообщение при отключении, а затем пул потоков вызывает join, чтобы дождаться завершения каждого потока Worker.

Обратите внимание на один интересный аспект этого конкретного выполнения: ThreadPool удалил sender, и перед тем, как какой-либо Worker получил ошибку, мы пытались присоединиться к Worker 0. Worker 0 еще не получил ошибку от recv, поэтому главный поток заблокировался, ожидая завершения Worker 0. Между тем, Worker 3 получил задание, а затем все потоки получили ошибку. Когда Worker 0 завершился, главный поток ждал завершения остальных экземпляров Worker. В этот момент они все вышли из своих циклов и остановились.

Поздравляем! Теперь мы завершили наш проект; у нас есть базовый веб-сервер, который использует пул потоков для асинхронного ответа. Мы можем выполнить graceful shutdown сервера, который очищает все потоки в пуле. См. https://www.nostarch.com/Rust2021, чтобы скачать полный код этой главы для справки.

Мы могли бы сделать здесь еще больше! Если вы хотите продолжить улучшать этот проект, здесь есть некоторые идеи:

  • Добавьте больше документации для ThreadPool и его публичных методов.
  • Добавьте тесты функциональности библиотеки.
  • Замените вызовы unwrap на более надежное обработку ошибок.
  • Используйте ThreadPool для выполнения какой-либо задачи, кроме обслуживания веб-запросов.
  • Найдите крейт для пула потоков на https://crates.io и реализуйте аналогичный веб-сервер с использованием этого крейта вместо нашего. Затем сравните его API и надежность с нашим пулом потоков.

Резюме

Поздравляем! Вы завершили лабораторную работу Graceful Shutdown and Cleanup. Вы можете практиковаться в других лабораторных работах в LabEx, чтобы улучшить свои навыки.