Введение
Добро пожаловать в Graceful Shutdown and Cleanup. Эта лабораторная работа является частью Rust Book. Вы можете практиковать свои навыки Rust в LabEx.
В этой лабораторной работе мы реализуем механизм graceful shutdown и очистки в нашем коде, используя трейт Drop и предоставляя способ для потоков прекратить принимать новые запросы и выключиться.
Graceful Shutdown and Cleanup
Код в Листинге 20-20 асинхронно обрабатывает запросы с использованием пула потоков, как мы планировали. Мы получаем несколько предупреждений о полях workers, id и thread, которые мы не используем напрямую, что напоминает нам, что мы ничего не очищаем. Когда мы используем менее элегантный метод ctrl-C для остановки главного потока, все остальные потоки также немедленно останавливаются, даже если они находятся в середине обработки запроса.
Далее мы реализуем трейт Drop, чтобы вызвать метод join для каждого потока в пуле, чтобы они могли завершить обработку запросов, которые они обрабатывают, перед закрытием. Затем мы реализуем способ сообщить потокам, что они должны прекратить принимать новые запросы и выключиться. Чтобы увидеть этот код в действии, мы модифицируем наш сервер, чтобы он принимал только два запроса, прежде чем优雅но выключить свой пул потоков.
Реализация трейта Drop для ThreadPool
Начнем с реализации трейта Drop для нашего пула потоков. Когда пул удаляется, все наши потоки должны присоединиться, чтобы убедиться, что они завершат свою работу. Листинг 20-22 показывает первый вариант реализации Drop; этот код еще не будет работать должным образом.
Имя файла: src/lib.rs
impl Drop for ThreadPool {
fn drop(&mut self) {
1 for worker in &mut self.workers {
2 println!("Shutting down worker {}", worker.id);
3 worker.thread.join().unwrap();
}
}
}
Листинг 20-22: Присоединение каждого потока, когда пул потоков выходит из области видимости
Сначала мы перебираем каждый worker в пуле потоков [1]. Мы используем &mut для этого, потому что self - это изменяемая ссылка, и нам также нужно быть able изменять worker. Для каждого worker мы выводим сообщение о том, что этот конкретный экземпляр Worker завершает свою работу [2], а затем мы вызываем join для потока этого экземпляра Worker [3]. Если вызов join завершается неудачно, мы используем unwrap, чтобы заставить Rust вызвать панику и произвести неэлегантный выход.
Вот ошибка, которую мы получаем, когда компилируем этот код:
error[E0507]: cannot move out of `worker.thread` which is behind a mutable
reference
--> src/lib.rs:52:13
|
52 | worker.thread.join().unwrap();
| ^^^^^^^^^^^^^ ------ `worker.thread` moved due to this
method call
| |
| move occurs because `worker.thread` has type
`JoinHandle<()>`, which does not implement the `Copy` trait
|
note: this function takes ownership of the receiver `self`, which moves
`worker.thread`
Ошибка говорит нам, что мы не можем вызвать join, потому что у нас есть только изменяемая ссылка на каждый worker, а join получает владение своим аргументом. Чтобы решить эту проблему, мы должны переместить поток из экземпляра Worker, который владеет thread, чтобы join мог потребовать поток. Мы сделали это в Листинге 17-15: если Worker хранит Option<thread::JoinHandle<()>> вместо этого, мы можем вызвать метод take для Option, чтобы переместить значение из варианта Some и оставить вариант None на его месте. Другими словами, работающий Worker будет иметь вариант Some в thread, а когда мы хотим очистить Worker, мы заменим Some на None, чтобы Worker не имел потока для выполнения.
Таким образом, мы знаем, что хотим обновить определение Worker следующим образом:
Имя файла: src/lib.rs
struct Worker {
id: usize,
thread: Option<thread::JoinHandle<()>>,
}
Теперь давайте полагаемся на компилятор, чтобы найти другие места, которые нужно изменить. Проверив этот код, мы получаем две ошибки:
error[E0599]: no method named `join` found for enum `Option` in the current
scope
--> src/lib.rs:52:27
|
52 | worker.thread.join().unwrap();
| ^^^^ method not found in
`Option<JoinHandle<()>>`
error[E0308]: mismatched types
--> src/lib.rs:72:22
|
72 | Worker { id, thread }
| ^^^^^^ expected enum `Option`, found struct
`JoinHandle`
|
= note: expected enum `Option<JoinHandle<()>>`
found struct `JoinHandle<_>`
help: try wrapping the expression in `Some`
|
72 | Worker { id, thread: Some(thread) }
| +++++++++++++ +
Давайте решим вторую ошибку, которая указывает на код в конце Worker::new; мы должны обернуть значение thread в Some, когда создаем новый Worker. Примите следующие изменения, чтобы исправить эту ошибку:
Имя файла: src/lib.rs
impl Worker {
fn new(
id: usize,
receiver: Arc<Mutex<mpsc::Receiver<Job>>>,
) -> Worker {
--snip--
Worker {
id,
thread: Some(thread),
}
}
}
Первая ошибка находится в нашей реализации Drop. Мы упоминали ранее, что планируем вызвать take для значения Option, чтобы переместить thread из worker. Следующие изменения это сделают:
Имя файла: src/lib.rs
impl Drop for ThreadPool {
fn drop(&mut self) {
for worker in &mut self.workers {
println!("Shutting down worker {}", worker.id);
1 if let Some(thread) = worker.thread.take() {
2 thread.join().unwrap();
}
}
}
}
Как обсуждалось в главе 17, метод take для Option извлекает вариант Some и оставляет None на его месте. Мы используем if let, чтобы разобрать вариант Some и получить поток [1]; затем мы вызываем join для потока [2]. Если у экземпляра Worker поток уже None, мы знаем, что Worker уже имел свой поток очищенным, поэтому ничего не происходит в этом случае.
Сигнализация потокам для прекращения прослушивания заданий
После всех внесенных изменений наш код компилируется без каких-либо предупреждений. Однако, плохая новость заключается в том, что этот код не работает так, как мы хотим. Ключом является логика в замыканиях, выполняемых потоками экземпляров Worker: в настоящее время мы вызываем join, но это не остановит потоки, потому что они loop бесконечно, ожидая заданий. Если мы попытаемся удалить наш ThreadPool с нашей текущей реализацией drop, главный поток будет заблокирован навсегда, ожидая завершения первого потока.
Чтобы исправить эту проблему, нам нужно внести изменения в реализацию drop для ThreadPool, а затем внести изменения в цикл Worker.
Сначала мы изменим реализацию drop для ThreadPool, чтобы явно удалить sender перед ожиданием завершения потоков. Листинг 20-23 показывает изменения в ThreadPool, чтобы явно удалить sender. Мы используем ту же технику с Option и take, что и с потоком, чтобы быть able переместить sender из ThreadPool.
Имя файла: src/lib.rs
pub struct ThreadPool {
workers: Vec<Worker>,
sender: Option<mpsc::Sender<Job>>,
}
--snip--
impl ThreadPool {
pub fn new(size: usize) -> ThreadPool {
--snip--
ThreadPool {
workers,
sender: Some(sender),
}
}
pub fn execute<F>(&self, f: F)
where
F: FnOnce() + Send + 'static,
{
let job = Box::new(f);
self.sender
.as_ref()
.unwrap()
.send(job)
.unwrap();
}
}
impl Drop for ThreadPool {
fn drop(&mut self) {
1 drop(self.sender.take());
for worker in &mut self.workers {
println!("Shutting down worker {}", worker.id);
if let Some(thread) = worker.thread.take() {
thread.join().unwrap();
}
}
}
}
Листинг 20-23: Явное удаление sender перед присоединением потоков Worker
Удаление sender [1] закрывает канал, что означает, что больше не будут отправляться сообщения. Когда это происходит, все вызовы recv, которые Worker экземпляры делают в бесконечном цикле, вернут ошибку. В Листинге 20-24 мы изменяем цикл Worker, чтобы gracefully выйти из цикла в этом случае, что означает, что потоки завершатся, когда реализация drop для ThreadPool вызовет join для них.
Имя файла: src/lib.rs
impl Worker {
fn new(
id: usize,
receiver: Arc<Mutex<mpsc::Receiver<Job>>>,
) -> Worker {
let thread = thread::spawn(move || loop {
let message = receiver.lock().unwrap().recv();
match message {
Ok(job) => {
println!(
"Worker {id} got a job; executing."
);
job();
}
Err(_) => {
println!(
"Worker {id} shutting down."
);
break;
}
}
});
Worker {
id,
thread: Some(thread),
}
}
}
Листинг 20-24: Явное выход из цикла, когда recv возвращает ошибку
Чтобы увидеть этот код в действии, давайте изменим main, чтобы он принимал только два запроса, прежде чем gracefully выключить сервер, как показано в Листинге 20-25.
Имя файла: src/main.rs
fn main() {
let listener = TcpListener::bind("127.0.0.1:7878").unwrap();
let pool = ThreadPool::new(4);
for stream in listener.incoming().take(2) {
let stream = stream.unwrap();
pool.execute(|| {
handle_connection(stream);
});
}
println!("Shutting down.");
}
Листинг 20-25: Выключение сервера после обработки двух запросов путем выхода из цикла
В реальном веб-сервере вы, вероятно, не захотите выключать его после обработки только двух запросов. Этот код просто демонстрирует, что graceful shutdown и очистка работают должным образом.
Метод take определен в трейте Iterator и ограничивает итерацию максимум первыми двумя элементами. ThreadPool выйдет из области видимости в конце main, и реализация drop будет выполняться.
Запустите сервер с помощью cargo run и сделайте три запроса. Третий запрос должен завершиться с ошибкой, и в вашем терминале вы должны увидеть вывод, похожий на этот:
$ cargo run
Compiling hello v0.1.0 (file:///projects/hello)
Finished dev [unoptimized + debuginfo] target(s) in 1.0s
Running `target/debug/hello`
Worker 0 got a job; executing.
Shutting down.
Shutting down worker 0
Worker 3 got a job; executing.
Worker 1 disconnected; shutting down.
Worker 2 disconnected; shutting down.
Worker 3 disconnected; shutting down.
Worker 0 disconnected; shutting down.
Shutting down worker 1
Shutting down worker 2
Shutting down worker 3
Вы, возможно, увидите другой порядок вывода идентификаторов Worker и сообщений. Мы можем увидеть, как этот код работает по сообщениям: экземпляры Worker 0 и 3 получили первые два запроса. Сервер прекратил принимать соединения после второго соединения, и реализация Drop для ThreadPool начинает выполняться, даже прежде чем Worker 3 начнет свою работу. Удаление sender отключает все экземпляры Worker и сообщает им выключиться. Каждый экземпляр Worker выводит сообщение при отключении, а затем пул потоков вызывает join, чтобы дождаться завершения каждого потока Worker.
Обратите внимание на один интересный аспект этого конкретного выполнения: ThreadPool удалил sender, и перед тем, как какой-либо Worker получил ошибку, мы пытались присоединиться к Worker 0. Worker 0 еще не получил ошибку от recv, поэтому главный поток заблокировался, ожидая завершения Worker 0. Между тем, Worker 3 получил задание, а затем все потоки получили ошибку. Когда Worker 0 завершился, главный поток ждал завершения остальных экземпляров Worker. В этот момент они все вышли из своих циклов и остановились.
Поздравляем! Теперь мы завершили наш проект; у нас есть базовый веб-сервер, который использует пул потоков для асинхронного ответа. Мы можем выполнить graceful shutdown сервера, который очищает все потоки в пуле. См. https://www.nostarch.com/Rust2021, чтобы скачать полный код этой главы для справки.
Мы могли бы сделать здесь еще больше! Если вы хотите продолжить улучшать этот проект, здесь есть некоторые идеи:
- Добавьте больше документации для
ThreadPoolи его публичных методов. - Добавьте тесты функциональности библиотеки.
- Замените вызовы
unwrapна более надежное обработку ошибок. - Используйте
ThreadPoolдля выполнения какой-либо задачи, кроме обслуживания веб-запросов. - Найдите крейт для пула потоков на https://crates.io и реализуйте аналогичный веб-сервер с использованием этого крейта вместо нашего. Затем сравните его API и надежность с нашим пулом потоков.
Резюме
Поздравляем! Вы завершили лабораторную работу Graceful Shutdown and Cleanup. Вы можете практиковаться в других лабораторных работах в LabEx, чтобы улучшить свои навыки.