Graceful Shutdown 및 정리

Beginner

This tutorial is from open-source community. Access the source code

소개

Graceful Shutdown and Cleanup에 오신 것을 환영합니다. 이 랩은 Rust Book의 일부입니다. LabEx 에서 Rust 기술을 연습할 수 있습니다.

이 랩에서는 Drop 트레이트를 활용하고, 스레드가 새로운 요청을 더 이상 받지 않고 종료할 수 있는 방법을 제공하여 코드에서 graceful shutdown 및 cleanup 메커니즘을 구현할 것입니다.

Graceful Shutdown and Cleanup

Listing 20-20 의 코드는 의도한 대로 스레드 풀을 사용하여 비동기적으로 요청에 응답합니다. workers, id, 그리고 thread 필드를 직접적으로 사용하지 않아, 아무것도 정리하지 않고 있다는 것을 상기시키는 몇 가지 경고가 나타납니다. 덜 우아한 ctrl-C 방식을 사용하여 메인 스레드를 중단하면, 다른 모든 스레드도 요청을 처리하는 중이더라도 즉시 중단됩니다.

다음으로, Drop 트레이트를 구현하여 풀의 각 스레드에서 join을 호출하여 종료 전에 작업을 완료할 수 있도록 할 것입니다. 그런 다음, 스레드에게 새로운 요청을 더 이상 받지 않고 종료해야 함을 알리는 방법을 구현할 것입니다. 이 코드가 작동하는 것을 보기 위해, 서버가 스레드 풀을 graceful shutdown 하기 전에 두 개의 요청만 받도록 수정할 것입니다.

ThreadPool 에 Drop 트레이트 구현하기

ThreadPool 에 Drop을 구현하는 것부터 시작해 봅시다. 풀이 drop 될 때, 스레드는 모두 join 하여 작업을 완료해야 합니다. Listing 20-22 는 Drop 구현의 첫 번째 시도를 보여줍니다; 이 코드는 아직 제대로 작동하지 않습니다.

파일 이름: src/lib.rs

impl Drop for ThreadPool {
    fn drop(&mut self) {
      1 for worker in &mut self.workers {
          2 println!("Shutting down worker {}", worker.id);

          3 worker.thread.join().unwrap();
        }
    }
}

Listing 20-22: 스레드 풀이 범위를 벗어날 때 각 스레드를 join

먼저 스레드 풀의 각 workers를 반복합니다 [1]. self가 가변 참조이므로 &mut를 사용하며, worker도 변경할 수 있어야 합니다. 각 worker에 대해, 이 특정 Worker 인스턴스가 종료되고 있음을 알리는 메시지를 출력합니다 [2], 그런 다음 해당 Worker 인스턴스의 스레드에서 join을 호출합니다 [3]. join 호출이 실패하면, unwrap을 사용하여 Rust 가 패닉 상태로 들어가 graceful shutdown 을 하지 않도록 합니다.

이 코드를 컴파일할 때 발생하는 오류는 다음과 같습니다:

error[E0507]: cannot move out of `worker.thread` which is behind a mutable
reference
    --> src/lib.rs:52:13
     |
52   |             worker.thread.join().unwrap();
     |             ^^^^^^^^^^^^^ ------ `worker.thread` moved due to this
method call
     |             |
     |             move occurs because `worker.thread` has type
`JoinHandle<()>`, which does not implement the `Copy` trait
     |
note: this function takes ownership of the receiver `self`, which moves
`worker.thread`

오류는 각 worker에 대한 가변 빌림만 있고 join이 인수의 소유권을 가져가기 때문에 join을 호출할 수 없다고 알려줍니다. 이 문제를 해결하려면, join이 스레드를 소비할 수 있도록 thread를 소유하는 Worker 인스턴스에서 스레드를 이동해야 합니다. Listing 17-15 에서 이 작업을 수행했습니다: WorkerOption<thread::JoinHandle<()>>을 대신 가지고 있다면, Option에서 take 메서드를 호출하여 값을 Some 변형에서 이동시키고 그 자리에 None 변형을 남길 수 있습니다. 즉, 실행 중인 WorkerthreadSome 변형을 갖게 되며, Worker를 정리하려는 경우 SomeNone으로 대체하여 Worker가 실행할 스레드가 없도록 합니다.

따라서 Worker의 정의를 다음과 같이 업데이트하려고 합니다:

파일 이름: src/lib.rs

struct Worker {
    id: usize,
    thread: Option<thread::JoinHandle<()>>,
}

이제 컴파일러에 의존하여 변경해야 할 다른 위치를 찾아봅시다. 이 코드를 확인하면 두 개의 오류가 발생합니다:

error[E0599]: no method named `join` found for enum `Option` in the current
scope
  --> src/lib.rs:52:27
   |
52 |             worker.thread.join().unwrap();
   |                           ^^^^ method not found in
`Option<JoinHandle<()>>`

error[E0308]: mismatched types
  --> src/lib.rs:72:22
   |
72 |         Worker { id, thread }
   |                      ^^^^^^ expected enum `Option`, found struct
`JoinHandle`
   |
   = note: expected enum `Option<JoinHandle<()>>`
            found struct `JoinHandle<_>`
help: try wrapping the expression in `Some`
   |
72 |         Worker { id, thread: Some(thread) }
   |                      +++++++++++++      +

두 번째 오류부터 해결해 봅시다. 이 오류는 Worker::new의 끝 부분에 있는 코드를 가리킵니다; 새로운 Worker를 생성할 때 thread 값을 Some으로 래핑해야 합니다. 이 오류를 수정하려면 다음 변경 사항을 적용하십시오:

파일 이름: src/lib.rs

impl Worker {
    fn new(
        id: usize,
        receiver: Arc<Mutex<mpsc::Receiver<Job>>>,
    ) -> Worker {
        --snip--

        Worker {
            id,
            thread: Some(thread),
        }
    }
}

첫 번째 오류는 Drop 구현에 있습니다. 앞서 Option 값에서 take를 호출하여 threadworker에서 이동시키려고 했습니다. 다음 변경 사항을 통해 이를 수행할 수 있습니다:

파일 이름: src/lib.rs

impl Drop for ThreadPool {
    fn drop(&mut self) {
        for worker in &mut self.workers {
            println!("Shutting down worker {}", worker.id);

          1 if let Some(thread) = worker.thread.take() {
              2 thread.join().unwrap();
            }
        }
    }
}

17 장에서 논의했듯이, Optiontake 메서드는 Some 변형을 가져와서 None을 그 자리에 남깁니다. if let을 사용하여 Some을 분해하고 스레드를 가져옵니다 [1]; 그런 다음 스레드에서 join을 호출합니다 [2]. Worker 인스턴스의 스레드가 이미 None인 경우, 해당 Worker가 이미 스레드를 정리했으므로 이 경우에는 아무 일도 일어나지 않습니다.

스레드에게 작업 수신 중단을 알리기

우리가 변경한 모든 사항으로 인해, 코드는 경고 없이 컴파일됩니다. 하지만, 좋지 않은 소식은 이 코드가 아직 원하는 방식으로 작동하지 않는다는 것입니다. 핵심은 Worker 인스턴스의 스레드에서 실행되는 클로저의 로직입니다: 현재, join을 호출하지만, 스레드는 영원히 작업을 찾기 위해 loop하므로 종료되지 않습니다. 현재 drop 구현으로 ThreadPool을 drop 하려고 하면, 메인 스레드는 첫 번째 스레드가 완료될 때까지 영원히 블록됩니다.

이 문제를 해결하려면, ThreadPool drop 구현을 변경한 다음 Worker 루프를 변경해야 합니다.

먼저, 스레드가 완료될 때까지 기다리기 전에 sender를 명시적으로 drop 하도록 ThreadPool drop 구현을 변경합니다. Listing 20-23 은 sender를 명시적으로 drop 하도록 ThreadPool에 대한 변경 사항을 보여줍니다. senderThreadPool에서 이동할 수 있도록 스레드에서 사용했던 것과 동일한 Optiontake 기술을 사용합니다.

파일 이름: src/lib.rs

pub struct ThreadPool {
    workers: Vec<Worker>,
    sender: Option<mpsc::Sender<Job>>,
}
--snip--
impl ThreadPool {
    pub fn new(size: usize) -> ThreadPool {
        --snip--

        ThreadPool {
            workers,
            sender: Some(sender),
        }
    }

    pub fn execute<F>(&self, f: F)
    where
        F: FnOnce() + Send + 'static,
    {
        let job = Box::new(f);

        self.sender
            .as_ref()
            .unwrap()
            .send(job)
            .unwrap();
    }
}

impl Drop for ThreadPool {
    fn drop(&mut self) {
      1 drop(self.sender.take());

        for worker in &mut self.workers {
            println!("Shutting down worker {}", worker.id);

            if let Some(thread) = worker.thread.take() {
                thread.join().unwrap();
            }
        }
    }
}

Listing 20-23: Worker 스레드를 join 하기 전에 sender를 명시적으로 drop

sender를 drop [1]하면 채널이 닫히고, 더 이상 메시지가 전송되지 않음을 나타냅니다. 그렇게 되면, Worker 인스턴스가 무한 루프에서 수행하는 모든 recv 호출은 오류를 반환합니다. Listing 20-24 에서, Worker 루프를 변경하여 해당 경우에 루프를 graceful 하게 종료합니다. 즉, ThreadPool drop 구현이 스레드에서 join을 호출할 때 스레드가 완료됩니다.

파일 이름: src/lib.rs

impl Worker {
    fn new(
        id: usize,
        receiver: Arc<Mutex<mpsc::Receiver<Job>>>,
    ) -> Worker {
        let thread = thread::spawn(move || loop {
            let message = receiver.lock().unwrap().recv();

            match message {
                Ok(job) => {
                    println!(
                        "Worker {id} got a job; executing."
                    );

                    job();
                }
                Err(_) => {
                    println!(
                        "Worker {id} shutting down."
                    );
                    break;
                }
            }
        });

        Worker {
            id,
            thread: Some(thread),
        }
    }
}

Listing 20-24: recv가 오류를 반환할 때 루프에서 명시적으로 break

이 코드가 작동하는 것을 보기 위해, Listing 20-25 에 표시된 대로 서버를 graceful 하게 종료하기 전에 두 개의 요청만 수락하도록 main을 수정해 보겠습니다.

파일 이름: src/main.rs

fn main() {
    let listener = TcpListener::bind("127.0.0.1:7878").unwrap();
    let pool = ThreadPool::new(4);

    for stream in listener.incoming().take(2) {
        let stream = stream.unwrap();

        pool.execute(|| {
            handle_connection(stream);
        });
    }

    println!("Shutting down.");
}

Listing 20-25: 루프를 종료하여 두 개의 요청을 처리한 후 서버 종료

실제 웹 서버가 두 개의 요청만 처리한 후 종료되도록 하지는 않을 것입니다. 이 코드는 graceful shutdown 및 정리가 제대로 작동하는 것을 보여줍니다.

take 메서드는 Iterator 트레이트에 정의되어 있으며, 반복을 처음 두 항목으로 제한합니다. ThreadPoolmain의 끝에서 범위를 벗어나고, drop 구현이 실행됩니다.

cargo run으로 서버를 시작하고 세 개의 요청을 보냅니다. 세 번째 요청은 오류가 발생해야 하며, 터미널에서 다음과 유사한 출력을 볼 수 있습니다:

$ cargo run
   Compiling hello v0.1.0 (file:///projects/hello)
    Finished dev [unoptimized + debuginfo] target(s) in 1.0s
     Running `target/debug/hello`
Worker 0 got a job; executing.
Shutting down.
Shutting down worker 0
Worker 3 got a job; executing.
Worker 1 disconnected; shutting down.
Worker 2 disconnected; shutting down.
Worker 3 disconnected; shutting down.
Worker 0 disconnected; shutting down.
Shutting down worker 1
Shutting down worker 2
Shutting down worker 3

Worker ID 와 메시지가 다른 순서로 출력될 수 있습니다. 메시지에서 이 코드가 어떻게 작동하는지 확인할 수 있습니다: Worker 인스턴스 0 과 3 이 처음 두 개의 요청을 받았습니다. 서버는 두 번째 연결 후 연결 수락을 중단하고, ThreadPoolDrop 구현은 Worker 3 이 작업을 시작하기도 전에 실행을 시작합니다. sender를 drop 하면 모든 Worker 인스턴스가 연결 해제되고 종료하라는 지시를 받습니다. Worker 인스턴스는 연결 해제될 때 각각 메시지를 출력한 다음, 스레드 풀은 join을 호출하여 각 Worker 스레드가 완료될 때까지 기다립니다.

이 특정 실행의 한 가지 흥미로운 측면에 주목하십시오: ThreadPoolsender를 drop 했고, 어떤 Worker도 오류를 받기 전에 Worker 0 을 join 하려고 했습니다. Worker 0 은 아직 recv에서 오류를 받지 못했으므로, 메인 스레드는 Worker 0 이 완료될 때까지 블록되었습니다. 그동안, Worker 3 은 작업을 받았고, 그 다음 모든 스레드가 오류를 받았습니다. Worker 0 이 완료되면, 메인 스레드는 나머지 Worker 인스턴스가 완료될 때까지 기다렸습니다. 그 시점에서, 그들은 모두 루프를 종료하고 중단했습니다.

축하합니다! 이제 프로젝트를 완료했습니다; 비동기적으로 응답하기 위해 스레드 풀을 사용하는 기본 웹 서버가 있습니다. 서버의 graceful shutdown 을 수행할 수 있으며, 이는 풀의 모든 스레드를 정리합니다. 이 장의 전체 코드를 참조하려면 https://www.nostarch.com/Rust2021을 참조하십시오.

여기서 더 많은 작업을 수행할 수 있습니다! 이 프로젝트를 계속 개선하려면, 다음과 같은 아이디어가 있습니다:

  • ThreadPool 및 해당 public 메서드에 더 많은 문서를 추가합니다.
  • 라이브러리의 기능에 대한 테스트를 추가합니다.
  • unwrap 호출을 보다 강력한 오류 처리로 변경합니다.
  • 웹 요청을 처리하는 것 외에 다른 작업을 수행하기 위해 ThreadPool을 사용합니다.
  • https://crates.io에서 스레드 풀 크레이트를 찾아 해당 크레이트를 사용하여 유사한 웹 서버를 구현합니다. 그런 다음, 해당 API 와 견고성을 우리가 구현한 스레드 풀과 비교합니다.

요약

축하합니다! Graceful Shutdown 및 Cleanup 랩을 완료했습니다. LabEx 에서 더 많은 랩을 연습하여 실력을 향상시킬 수 있습니다.