Rust 멀티스레드 서버 개발

Beginner

This tutorial is from open-source community. Access the source code

소개

단일 스레드 서버를 멀티스레드 서버로 전환하기에 오신 것을 환영합니다. 이 랩은 Rust Book의 일부입니다. LabEx 에서 Rust 기술을 연습할 수 있습니다.

이 랩에서는 단일 스레드 서버를 멀티스레드 서버로 변환하여 여러 요청을 동시에 처리하는 효율성을 향상시킬 것입니다.

단일 스레드 서버를 멀티스레드 서버로 전환하기

현재 서버는 각 요청을 차례대로 처리합니다. 즉, 첫 번째 연결 처리가 완료될 때까지 두 번째 연결을 처리하지 않습니다. 서버가 점점 더 많은 요청을 받으면 이러한 직렬 실행은 점점 더 비효율적이 될 것입니다. 서버가 처리하는 데 시간이 오래 걸리는 요청을 받으면, 새로운 요청을 빠르게 처리할 수 있더라도 긴 요청이 완료될 때까지 후속 요청이 대기해야 합니다. 이 문제를 해결해야 하지만, 먼저 문제 발생 상황을 살펴보겠습니다.

느린 요청 시뮬레이션

느린 처리 요청이 현재 서버 구현에 대한 다른 요청에 어떤 영향을 미칠 수 있는지 살펴보겠습니다. Listing 20-10 은 서버가 응답하기 전에 5 초 동안 대기하도록 하는 시뮬레이션된 느린 응답으로 /sleep에 대한 요청을 처리하는 방법을 구현합니다.

파일 이름: src/main.rs

use std::{
    fs,
    io::{prelude::*, BufReader},
    net::{TcpListener, TcpStream},
    thread,
    time::Duration,
};
--snip--

fn handle_connection(mut stream: TcpStream) {
    --snip--

    let (status_line, filename) = 1 match &request_line[..] {
      2 "GET / HTTP/1.1" => ("HTTP/1.1 200 OK", "hello.html"),
      3 "GET /sleep HTTP/1.1" => {
            thread::sleep(Duration::from_secs(5));
            ("HTTP/1.1 200 OK", "hello.html")
        }
      4 _ => ("HTTP/1.1 404 NOT FOUND", "404.html"),
    };

    --snip--
}

Listing 20-10: 5 초 동안 대기하여 느린 요청 시뮬레이션

이제 세 가지 경우 [1]가 있으므로 if에서 match로 전환했습니다. 문자열 리터럴 값에 대해 패턴 일치를 수행하려면 request_line의 슬라이스를 명시적으로 일치시켜야 합니다. match는 동등성 메서드처럼 자동 참조 및 역참조를 수행하지 않습니다.

첫 번째 arm [2]은 Listing 20-9 의 if 블록과 동일합니다. 두 번째 arm [3]은 /sleep에 대한 요청과 일치합니다. 해당 요청을 받으면 서버는 성공적인 HTML 페이지를 렌더링하기 전에 5 초 동안 대기합니다. 세 번째 arm [4]은 Listing 20-9 의 else 블록과 동일합니다.

서버가 얼마나 원시적인지 알 수 있습니다. 실제 라이브러리는 여러 요청의 인식을 훨씬 덜 장황한 방식으로 처리할 것입니다!

cargo run을 사용하여 서버를 시작합니다. 그런 다음 두 개의 브라우저 창을 엽니다. 하나는 http://127.0.0.1:7878이고 다른 하나는 *http://127.0.0.1:7878/sleep*입니다. 이전과 마찬가지로 / URI 를 몇 번 입력하면 빠르게 응답하는 것을 볼 수 있습니다. 그러나 /sleep을 입력한 다음 */*를 로드하면 */*가 sleep이 전체 5 초 동안 대기한 후에 로드되는 것을 볼 수 있습니다.

느린 요청 뒤에서 요청이 백업되는 것을 방지하기 위해 사용할 수 있는 여러 기술이 있습니다. 우리가 구현할 기술은 스레드 풀 (thread pool) 입니다.

스레드 풀로 처리량 개선하기

*스레드 풀 (thread pool)*은 작업을 처리하기 위해 대기하고 준비된 스레드 그룹입니다. 프로그램이 새로운 작업을 받으면 풀의 스레드 중 하나를 해당 작업에 할당하고 해당 스레드가 작업을 처리합니다. 첫 번째 스레드가 처리하는 동안 풀의 나머지 스레드는 들어오는 다른 모든 작업을 처리할 수 있습니다. 첫 번째 스레드가 작업을 처리하는 것을 마치면 유휴 스레드 풀로 반환되어 새로운 작업을 처리할 준비가 됩니다. 스레드 풀을 사용하면 연결을 동시에 처리하여 서버의 처리량을 늘릴 수 있습니다.

DoS 공격으로부터 보호하기 위해 풀의 스레드 수를 소수로 제한합니다. 프로그램이 들어오는 각 요청에 대해 새로운 스레드를 생성하면, 누군가 서버에 1,000 만 개의 요청을 보내 서버의 모든 리소스를 소모하고 요청 처리를 중단시켜 혼란을 야기할 수 있습니다.

따라서 무제한 스레드를 생성하는 대신 풀에서 대기하는 고정된 수의 스레드를 갖게 됩니다. 들어오는 요청은 처리를 위해 풀로 전송됩니다. 풀은 들어오는 요청의 큐를 유지합니다. 풀의 각 스레드는 이 큐에서 요청을 꺼내 처리한 다음 큐에 다른 요청을 요청합니다. 이 설계를 사용하면 N 개의 요청을 동시에 처리할 수 있으며, 여기서 N 은 스레드 수입니다. 각 스레드가 장기 실행 요청에 응답하는 경우 후속 요청은 여전히 큐에 백업될 수 있지만, 해당 지점에 도달하기 전에 처리할 수 있는 장기 실행 요청의 수를 늘렸습니다.

이 기술은 웹 서버의 처리량을 개선하는 여러 가지 방법 중 하나일 뿐입니다. 탐색할 수 있는 다른 옵션으로는 fork/join 모델, 단일 스레드 비동기 I/O 모델 및 멀티스레드 비동기 I/O 모델이 있습니다. 이 주제에 관심이 있다면 다른 솔루션에 대해 자세히 알아보고 구현해 볼 수 있습니다. Rust 와 같은 저수준 언어를 사용하면 이러한 모든 옵션이 가능합니다.

스레드 풀을 구현하기 전에 풀을 사용하는 모습이 어떠해야 하는지 이야기해 보겠습니다. 코드를 설계하려는 경우, 먼저 클라이언트 인터페이스를 작성하면 설계를 안내하는 데 도움이 될 수 있습니다. 코드를 호출하려는 방식으로 구조화되도록 API 를 작성합니다. 그런 다음 공용 API 를 설계하는 대신 해당 구조 내에서 기능을 구현합니다.

12 장에서 프로젝트에서 테스트 주도 개발을 사용했던 방식과 유사하게, 여기서는 컴파일러 주도 개발을 사용합니다. 원하는 함수를 호출하는 코드를 작성한 다음 컴파일러의 오류를 살펴보고 코드가 작동하도록 하기 위해 다음에 무엇을 변경해야 하는지 결정합니다. 그러나 그 전에 시작점으로 사용할 수 없는 기술을 살펴보겠습니다.

각 요청에 대해 스레드 생성하기

먼저, 각 연결에 대해 새로운 스레드를 생성하는 경우 코드가 어떻게 보일 수 있는지 살펴보겠습니다. 앞서 언급했듯이, 잠재적으로 무제한 수의 스레드를 생성하는 문제로 인해 이것은 최종 계획이 아니지만, 먼저 작동하는 멀티스레드 서버를 얻기 위한 시작점입니다. 그런 다음 스레드 풀을 개선 사항으로 추가하고 두 솔루션을 비교하는 것이 더 쉬워질 것입니다.

Listing 20-11 은 for 루프 내에서 각 스트림을 처리하기 위해 새로운 스레드를 생성하도록 main에 적용할 변경 사항을 보여줍니다.

파일 이름: src/main.rs

fn main() {
    let listener = TcpListener::bind("127.0.0.1:7878").unwrap();

    for stream in listener.incoming() {
        let stream = stream.unwrap();

        thread::spawn(|| {
            handle_connection(stream);
        });
    }
}

Listing 20-11: 각 스트림에 대해 새로운 스레드 생성

16 장에서 배운 것처럼, thread::spawn은 새로운 스레드를 생성한 다음 새로운 스레드에서 클로저의 코드를 실행합니다. 이 코드를 실행하고 브라우저에서 /sleep을 로드한 다음 다른 두 개의 브라우저 탭에서 */*를 로드하면, */*에 대한 요청이 /sleep이 완료될 때까지 기다릴 필요가 없다는 것을 실제로 확인할 수 있습니다. 그러나 언급했듯이, 이는 결국 시스템을 압도할 것입니다. 왜냐하면 제한 없이 새로운 스레드를 생성하게 될 것이기 때문입니다.

유한한 수의 스레드 생성하기

스레드 풀이 스레드에서 스레드 풀로 전환할 때 API 를 사용하는 코드에 큰 변경 사항이 필요하지 않도록, 스레드 풀이 유사하고 익숙한 방식으로 작동하기를 원합니다. Listing 20-12 는 thread::spawn 대신 사용하려는 ThreadPool 구조체의 가상 인터페이스를 보여줍니다.

파일 이름: src/main.rs

fn main() {
    let listener = TcpListener::bind("127.0.0.1:7878").unwrap();
  1 let pool = ThreadPool::new(4);

    for stream in listener.incoming() {
        let stream = stream.unwrap();

      2 pool.execute(|| {
            handle_connection(stream);
        });
    }
}

Listing 20-12: 이상적인 ThreadPool 인터페이스

ThreadPool::new를 사용하여 구성 가능한 수의 스레드를 가진 새로운 스레드 풀을 생성합니다. 이 경우 4 개입니다 [1]. 그런 다음 for 루프에서 pool.execute는 각 스트림에 대해 풀이 실행해야 하는 클로저를 사용한다는 점에서 thread::spawn과 유사한 인터페이스를 갖습니다 [2]. pool.execute를 구현하여 클로저를 가져와 풀의 스레드에 제공하여 실행해야 합니다. 이 코드는 아직 컴파일되지 않지만, 컴파일러가 이를 수정하는 방법을 안내할 수 있도록 시도해 보겠습니다.

컴파일러 기반 개발을 사용하여 ThreadPool 구축하기

Listing 20-12 의 변경 사항을 src/main.rs에 적용한 다음, cargo check의 컴파일러 오류를 사용하여 개발을 진행해 보겠습니다. 다음은 처음으로 얻는 오류입니다.

$ cargo check
    Checking hello v0.1.0 (file:///projects/hello)
error[E0433]: failed to resolve: use of undeclared type `ThreadPool`
  --> src/main.rs:11:16
   |
11 |     let pool = ThreadPool::new(4);
   |                ^^^^^^^^^^ use of undeclared type `ThreadPool`

훌륭합니다! 이 오류는 ThreadPool 타입 또는 모듈이 필요하다는 것을 알려주므로, 이제 하나를 구축하겠습니다. ThreadPool 구현은 웹 서버가 수행하는 작업의 종류와는 독립적입니다. 따라서 hello 크레이트를 바이너리 크레이트에서 라이브러리 크레이트로 전환하여 ThreadPool 구현을 포함시키겠습니다. 라이브러리 크레이트로 변경한 후에는 웹 요청을 처리하는 것뿐만 아니라 스레드 풀을 사용하여 수행하려는 모든 작업에 대해 별도의 스레드 풀 라이브러리를 사용할 수도 있습니다.

다음 내용을 포함하는 src/lib.rs 파일을 생성합니다. 이는 현재로서는 가질 수 있는 ThreadPool 구조체의 가장 간단한 정의입니다.

파일 이름: src/lib.rs

pub struct ThreadPool;

그런 다음 main.rs 파일을 편집하여 다음 코드를 src/main.rs의 맨 위에 추가하여 라이브러리 크레이트에서 ThreadPool을 범위 내로 가져옵니다.

파일 이름: src/main.rs

use hello::ThreadPool;

이 코드는 아직 작동하지 않지만, 다음 오류를 얻기 위해 다시 확인해 보겠습니다.

$ cargo check
    Checking hello v0.1.0 (file:///projects/hello)
error[E0599]: no function or associated item named `new` found for struct
`ThreadPool` in the current scope
  --> src/main.rs:12:28
   |
12 |     let pool = ThreadPool::new(4);
   |                            ^^^ function or associated item not found in
`ThreadPool`

이 오류는 다음으로 ThreadPool에 대해 new라는 연관 함수를 생성해야 함을 나타냅니다. 또한 new는 인수로 4를 허용하고 ThreadPool 인스턴스를 반환할 수 있는 하나의 매개변수를 가져야 한다는 것을 알고 있습니다. 이러한 특성을 갖는 가장 간단한 new 함수를 구현해 보겠습니다.

파일 이름: src/lib.rs

pub struct ThreadPool;

impl ThreadPool {
    pub fn new(size: usize) -> ThreadPool {
        ThreadPool
    }
}

size 매개변수의 타입으로 usize를 선택했습니다. 왜냐하면 음수 스레드 수는 말이 안 된다는 것을 알고 있기 때문입니다. 또한 이 4를 스레드 컬렉션의 요소 수로 사용할 것이며, 이는 "정수 타입"에서 논의한 것처럼 usize 타입의 용도임을 알고 있습니다.

코드를 다시 확인해 보겠습니다.

$ cargo check
    Checking hello v0.1.0 (file:///projects/hello)
error[E0599]: no method named `execute` found for struct `ThreadPool` in the
current scope
  --> src/main.rs:17:14
   |
17 |         pool.execute(|| {
   |              ^^^^^^^ method not found in `ThreadPool`

이제 ThreadPoolexecute 메서드가 없기 때문에 오류가 발생합니다. "유한한 수의 스레드 생성하기"에서 스레드 풀이 thread::spawn과 유사한 인터페이스를 가져야 한다고 결정했음을 기억하십시오. 또한 execute 함수를 구현하여 제공된 클로저를 가져와 풀의 유휴 스레드에 제공하여 실행하도록 하겠습니다.

ThreadPoolexecute 메서드를 정의하여 클로저를 매개변수로 받도록 하겠습니다. "클로저에서 캡처된 값을 이동시키고 Fn 트레이트"에서 세 가지 다른 트레이트인 Fn, FnMut, FnOnce를 사용하여 클로저를 매개변수로 사용할 수 있음을 기억하십시오. 여기에서 어떤 종류의 클로저를 사용할지 결정해야 합니다. 결국 표준 라이브러리 thread::spawn 구현과 유사한 작업을 수행할 것이므로, thread::spawn의 시그니처가 매개변수에 대해 갖는 바운드를 살펴볼 수 있습니다. 문서는 다음을 보여줍니다.

pub fn spawn<F, T>(f: F) -> JoinHandle<T>
    where
        F: FnOnce() -> T,
        F: Send + 'static,
        T: Send + 'static,

F 타입 매개변수가 여기서 우리가 관심을 갖는 것입니다. T 타입 매개변수는 반환 값과 관련이 있으며, 우리는 그것에 관심이 없습니다. spawnF에 대한 트레이트 바운드로 FnOnce를 사용한다는 것을 알 수 있습니다. 이는 우리가 원하는 것일 가능성이 높습니다. 왜냐하면 결국 execute에서 얻은 인수를 spawn에 전달할 것이기 때문입니다. 요청을 실행하기 위한 스레드가 해당 요청의 클로저를 한 번만 실행하므로, FnOnce가 우리가 사용하려는 트레이트라는 것을 더욱 확신할 수 있습니다. 이는 FnOnceOnce와 일치합니다.

F 타입 매개변수는 또한 트레이트 바운드 Send와 라이프타임 바운드 'static을 갖습니다. 이는 우리의 상황에서 유용합니다. 클로저를 한 스레드에서 다른 스레드로 전송하려면 Send가 필요하고, 스레드가 실행하는 데 얼마나 걸릴지 모르기 때문에 'static이 필요합니다. 이러한 바운드를 가진 타입 F의 제네릭 매개변수를 사용하는 ThreadPoolexecute 메서드를 생성해 보겠습니다.

파일 이름: src/lib.rs

impl ThreadPool {
    --snip--
    pub fn execute<F>(&self, f: F)
    where
        F: FnOnce() 1 + Send + 'static,
    {
    }
}

FnOnce는 매개변수를 받지 않고 유닛 타입 ()을 반환하는 클로저를 나타내므로, FnOnce 뒤에 ()를 계속 사용합니다 [1]. 함수 정의와 마찬가지로 반환 타입을 시그니처에서 생략할 수 있지만, 매개변수가 없더라도 괄호가 필요합니다.

다시 말하지만, 이것은 execute 메서드의 가장 간단한 구현입니다. 아무것도 하지 않지만, 우리는 단지 코드를 컴파일하려고 노력하고 있습니다. 다시 확인해 보겠습니다.

$ cargo check
    Checking hello v0.1.0 (file:///projects/hello)
    Finished dev [unoptimized + debuginfo] target(s) in 0.24s

컴파일됩니다! 하지만 cargo run을 시도하고 브라우저에서 요청을 하면, 장의 시작 부분에서 보았던 브라우저의 오류가 표시됩니다. 우리 라이브러리는 실제로 execute에 전달된 클로저를 아직 호출하지 않습니다!

참고: Haskell 및 Rust 와 같이 엄격한 컴파일러가 있는 언어에 대해 들을 수 있는 말은 "코드가 컴파일되면 작동한다"는 것입니다. 그러나 이 말은 보편적으로 사실이 아닙니다. 우리 프로젝트는 컴파일되지만, 아무것도 하지 않습니다! 실제 완전한 프로젝트를 구축하는 경우, 코드가 컴파일되고 원하는 동작을 하는지 확인하기 위해 단위 테스트를 작성하는 것이 좋습니다.

new 에서 스레드 수 유효성 검사하기

newexecute의 매개변수로 아무것도 하지 않고 있습니다. 원하는 동작으로 이러한 함수의 본문을 구현해 보겠습니다. 먼저, new에 대해 생각해 보겠습니다. 앞서 음수 스레드 수를 가진 풀은 의미가 없기 때문에 size 매개변수에 부호 없는 타입을 선택했습니다. 그러나 0 개의 스레드를 가진 풀도 의미가 없지만, 0 은 완벽하게 유효한 usize입니다. Listing 20-13 에 표시된 것처럼 assert! 매크로를 사용하여 size가 0 보다 큰지 확인하는 코드를 추가하고 0 을 받으면 프로그램이 패닉하도록 하겠습니다.

파일 이름: src/lib.rs

impl ThreadPool {
    /// 새로운 ThreadPool 을 생성합니다.
    ///
    /// size 는 풀의 스레드 수입니다.
    ///
  1 /// ## Panics
    ///
    /// `new` 함수는 size 가 0 이면 패닉합니다.
    pub fn new(size: usize) -> ThreadPool {
      2 assert!(size > 0);

        ThreadPool
    }

    --snip--
}

Listing 20-13: size가 0 이면 패닉하도록 ThreadPool::new 구현하기

또한 doc 주석을 사용하여 ThreadPool에 대한 몇 가지 문서를 추가했습니다. 14 장에서 논의한 것처럼, 함수가 패닉할 수 있는 상황을 명시하는 섹션을 추가하여 훌륭한 문서화 관행을 따랐습니다 [1]. cargo doc --open을 실행하고 ThreadPool 구조체를 클릭하여 new에 대해 생성된 문서가 어떻게 보이는지 확인해 보세요!

여기에서 수행한 것처럼 assert! 매크로를 추가하는 대신 [2], Listing 12-9 의 I/O 프로젝트에서 Config::build를 수행한 것처럼 newbuild로 변경하고 Result를 반환할 수 있습니다. 그러나 이 경우 스레드 없이 스레드 풀을 생성하려고 하는 것은 복구할 수 없는 오류여야 한다고 결정했습니다. 야심이 있다면, new 함수와 비교하기 위해 다음 시그니처를 가진 build라는 함수를 작성해 보세요.

pub fn build(
    size: usize
) -> Result<ThreadPool, PoolCreationError> {

스레드를 저장할 공간 만들기

이제 풀에 저장할 유효한 스레드 수를 아는 방법이 있으므로, 해당 스레드를 생성하고 구조체를 반환하기 전에 ThreadPool 구조체에 저장할 수 있습니다. 하지만 어떻게 스레드를 "저장"할까요? thread::spawn 시그니처를 다시 살펴보겠습니다.

pub fn spawn<F, T>(f: F) -> JoinHandle<T>
    where
        F: FnOnce() -> T,
        F: Send + 'static,
        T: Send + 'static,

spawn 함수는 JoinHandle<T>를 반환하며, 여기서 T는 클로저가 반환하는 타입입니다. JoinHandle도 사용해 보고 어떤 일이 발생하는지 살펴보겠습니다. 우리의 경우, 스레드 풀에 전달하는 클로저는 연결을 처리하고 아무것도 반환하지 않으므로, T는 유닛 타입 ()이 됩니다.

Listing 20-14 의 코드는 컴파일되지만 아직 스레드를 생성하지 않습니다. ThreadPool의 정의를 thread::JoinHandle<()> 인스턴스의 벡터를 포함하도록 변경하고, size의 용량으로 벡터를 초기화하고, 스레드를 생성하기 위해 일부 코드를 실행하는 for 루프를 설정하고, 이를 포함하는 ThreadPool 인스턴스를 반환했습니다.

파일 이름: src/lib.rs

1 use std::thread;

pub struct ThreadPool {
  2 threads: Vec<thread::JoinHandle<()>>,
}

impl ThreadPool {
    --snip--
    pub fn new(size: usize) -> ThreadPool {
        assert!(size > 0);

      3 let mut threads = Vec::with_capacity(size);

        for _ in 0..size {
            // create some threads and store them in the vector
        }

        ThreadPool { threads }
    }
    --snip--
}

Listing 20-14: 스레드를 저장하기 위해 ThreadPool이 사용할 벡터 생성하기

thread::JoinHandleThreadPool [2]의 벡터 항목의 타입으로 사용하고 있으므로, 라이브러리 크레이트에서 std::thread를 범위 내로 가져왔습니다 [1].

유효한 크기가 수신되면, ThreadPoolsize 항목을 저장할 수 있는 새 벡터를 생성합니다 [3]. with_capacity 함수는 Vec::new와 동일한 작업을 수행하지만 중요한 차이점이 있습니다. 즉, 벡터에 공간을 미리 할당합니다. size 요소를 벡터에 저장해야 한다는 것을 알고 있으므로, 이 할당을 미리 수행하는 것이 요소가 삽입될 때 자체적으로 크기를 조정하는 Vec::new을 사용하는 것보다 약간 더 효율적입니다.

cargo check를 다시 실행하면 성공해야 합니다.

ThreadPool 에서 스레드로 코드 전송하기

Listing 20-14 의 for 루프에서 스레드 생성과 관련하여 주석을 남겼습니다. 여기서는 실제로 스레드를 생성하는 방법을 살펴보겠습니다. 표준 라이브러리는 스레드를 생성하는 방법으로 thread::spawn을 제공하며, thread::spawn은 스레드가 생성되는 즉시 스레드가 실행해야 하는 코드를 받기를 기대합니다. 그러나 우리의 경우, 스레드를 생성하고 나중에 보낼 코드를 대기하도록 하려고 합니다. 표준 라이브러리의 스레드 구현에는 이를 수행할 방법이 포함되어 있지 않으므로, 수동으로 구현해야 합니다.

이 동작을 구현하기 위해 ThreadPool과 이 새로운 동작을 관리할 스레드 사이에 새로운 데이터 구조를 도입할 것입니다. 이 데이터 구조를 Worker라고 부르겠습니다. 이는 풀링 구현에서 흔히 사용되는 용어입니다. Worker는 실행해야 하는 코드를 가져와 해당 스레드에서 코드를 실행합니다.

레스토랑 주방에서 일하는 사람들을 생각해 보세요. 작업자는 고객으로부터 주문이 들어올 때까지 기다린 다음, 해당 주문을 받아 채우는 역할을 합니다.

스레드 풀에 JoinHandle<()> 인스턴스의 벡터를 저장하는 대신, Worker 구조체의 인스턴스를 저장할 것입니다. 각 Worker는 단일 JoinHandle<()> 인스턴스를 저장합니다. 그런 다음 실행할 코드의 클로저를 가져와 이미 실행 중인 스레드로 실행을 위해 보내는 Worker에 대한 메서드를 구현할 것입니다. 또한 로깅 또는 디버깅 시 풀에서 서로 다른 Worker 인스턴스를 구별할 수 있도록 각 Workerid를 부여할 것입니다.

다음은 ThreadPool을 생성할 때 발생할 새로운 프로세스입니다. 이 방식으로 Worker를 설정한 후 클로저를 스레드로 보내는 코드를 구현할 것입니다.

  1. idJoinHandle<()>을 보유하는 Worker 구조체를 정의합니다.
  2. ThreadPoolWorker 인스턴스의 벡터를 보유하도록 변경합니다.
  3. id 번호를 가져와 id와 빈 클로저로 스폰된 스레드를 보유하는 Worker 인스턴스를 반환하는 Worker::new 함수를 정의합니다.
  4. ThreadPool::new에서 for 루프 카운터를 사용하여 id를 생성하고, 해당 id로 새 Worker를 생성하고, Worker를 벡터에 저장합니다.

도전해 보고 싶다면, Listing 20-15 의 코드를 보기 전에 직접 이러한 변경 사항을 구현해 보세요.

준비되었나요? 다음은 앞의 수정을 수행하는 한 가지 방법인 Listing 20-15 입니다.

파일 이름: src/lib.rs

use std::thread;

pub struct ThreadPool {
  1 workers: Vec<Worker>,
}

impl ThreadPool {
    --snip--
    pub fn new(size: usize) -> ThreadPool {
        assert!(size > 0);

        let mut workers = Vec::with_capacity(size);

      2 for id in 0..size {
          3 workers.push(Worker::new(id));
        }

        ThreadPool { workers }
    }
    --snip--
}

4 struct Worker {
    id: usize,
    thread: thread::JoinHandle<()>,
}

impl Worker {
  5 fn new(id: usize) -> Worker {
      6 let thread = thread::spawn(|| {});

        Worker { 7 id, 8 thread }
    }
}

Listing 20-15: 스레드를 직접 보유하는 대신 Worker 인스턴스를 보유하도록 ThreadPool 수정하기

ThreadPool의 필드 이름을 threads에서 workers로 변경했습니다. 이제 JoinHandle<()> 인스턴스 대신 Worker 인스턴스를 보유하고 있기 때문입니다 [1]. for 루프 [2]의 카운터를 Worker::new에 대한 인수로 사용하고, 각 새 Workerworkers라는 벡터에 저장합니다 [3].

외부 코드 (예: src/main.rs의 서버) 는 ThreadPool 내에서 Worker 구조체를 사용하는 것과 관련된 구현 세부 정보를 알 필요가 없으므로, Worker 구조체 [4]와 해당 new 함수 [5]를 비공개로 만듭니다. Worker::new 함수는 제공한 id [7]를 사용하고, 빈 클로저 [6]를 사용하여 새 스레드를 스폰하여 생성된 JoinHandle<()> 인스턴스 [8]를 저장합니다.

참고: 운영 체제가 시스템 리소스가 부족하여 스레드를 생성할 수 없는 경우, thread::spawn은 패닉합니다. 그러면 일부 스레드 생성이 성공하더라도 전체 서버가 패닉하게 됩니다. 단순성을 위해 이 동작은 괜찮지만, 프로덕션 스레드 풀 구현에서는 std::thread::BuilderResult를 반환하는 해당 spawn 메서드를 사용하는 것이 좋습니다.

이 코드는 컴파일되고 ThreadPool::new에 대한 인수로 지정한 Worker 인스턴스 수를 저장합니다. 하지만 execute에서 얻는 클로저는 아직 처리하지 않고 있습니다. 다음으로 이를 수행하는 방법을 살펴보겠습니다.

채널을 통해 스레드로 요청 보내기

다음으로 해결할 문제는 thread::spawn에 제공된 클로저가 아무것도 하지 않는다는 것입니다. 현재, execute 메서드에서 실행하려는 클로저를 가져옵니다. 하지만 ThreadPool을 생성하는 동안 각 Worker를 생성할 때 thread::spawn에 실행할 클로저를 제공해야 합니다.

방금 생성한 Worker 구조체가 ThreadPool에 보관된 큐에서 실행할 코드를 가져와 해당 코드를 실행하기 위해 스레드로 보내도록 하려고 합니다.

16 장에서 배운 채널 (두 스레드 간의 간단한 통신 방법) 은 이 사용 사례에 완벽합니다. 채널을 작업 큐로 사용하고, execute는 작업을 ThreadPool에서 Worker 인스턴스로 보내고, Worker 인스턴스는 작업을 해당 스레드로 보낼 것입니다. 계획은 다음과 같습니다.

  1. ThreadPool은 채널을 생성하고 발신자를 유지합니다.
  2. Worker는 수신자를 유지합니다.
  3. 채널을 통해 보내려는 클로저를 보유할 새 Job 구조체를 생성합니다.
  4. execute 메서드는 실행하려는 작업을 발신자를 통해 보냅니다.
  5. Worker는 해당 스레드에서 수신자를 반복하고 수신하는 모든 작업의 클로저를 실행합니다.

Listing 20-16 과 같이 ThreadPool::new에서 채널을 생성하고 ThreadPool 인스턴스에서 발신자를 유지하는 것부터 시작해 보겠습니다. Job 구조체는 현재 아무것도 보유하지 않지만 채널을 통해 보내는 항목의 타입이 됩니다.

파일 이름: src/lib.rs

use std::{sync::mpsc, thread};

pub struct ThreadPool {
    workers: Vec<Worker>,
    sender: mpsc::Sender<Job>,
}

struct Job;

impl ThreadPool {
    --snip--
    pub fn new(size: usize) -> ThreadPool {
        assert!(size > 0);

      1 let (sender, receiver) = mpsc::channel();

        let mut workers = Vec::with_capacity(size);

        for id in 0..size {
            workers.push(Worker::new(id));
        }

        ThreadPool { workers, 2 sender }
    }
    --snip--
}

Listing 20-16: Job 인스턴스를 전송하는 채널의 발신자를 저장하도록 ThreadPool 수정하기

ThreadPool::new에서 새 채널 [1]을 생성하고 풀이 발신자 [2]를 유지하도록 합니다. 이렇게 하면 성공적으로 컴파일됩니다.

스레드 풀이 채널을 생성할 때 채널의 수신자를 각 Worker로 전달해 보겠습니다. Worker 인스턴스가 스폰하는 스레드에서 수신자를 사용하므로, 클로저에서 receiver 매개변수를 참조할 것입니다. Listing 20-17 의 코드는 아직 완전히 컴파일되지 않습니다.

파일 이름: src/lib.rs

impl ThreadPool {
    --snip--
    pub fn new(size: usize) -> ThreadPool {
        assert!(size > 0);

        let (sender, receiver) = mpsc::channel();

        let mut workers = Vec::with_capacity(size);

        for id in 0..size {
          1 workers.push(Worker::new(id, receiver));
        }

        ThreadPool { workers, sender }
    }
    --snip--
}

--snip--

impl Worker {
    fn new(id: usize, receiver: mpsc::Receiver<Job>) -> Worker {
        let thread = thread::spawn(|| {
          2 receiver;
        });

        Worker { id, thread }
    }
}

Listing 20-17: 각 Worker에 수신자 전달하기

몇 가지 작고 간단한 변경을 했습니다. 수신자를 Worker::new [1]으로 전달한 다음 클로저 [2] 내에서 사용합니다.

이 코드를 확인하려고 하면 다음과 같은 오류가 발생합니다.

$ cargo check
    Checking hello v0.1.0 (file:///projects/hello)
error[E0382]: use of moved value: `receiver`
  --> src/lib.rs:26:42
   |
21 |         let (sender, receiver) = mpsc::channel();
   |                      -------- move occurs because `receiver` has type
`std::sync::mpsc::Receiver<Job>`, which does not implement the `Copy` trait
...
26 |             workers.push(Worker::new(id, receiver));
   |                                          ^^^^^^^^ value moved here, in
previous iteration of loop

이 코드는 receiver를 여러 Worker 인스턴스에 전달하려고 합니다. 16 장에서 기억하듯이, 이 방법은 작동하지 않습니다. Rust 가 제공하는 채널 구현은 다중 생산자, 단일 소비자입니다. 즉, 이 코드를 수정하기 위해 채널의 소비 측면을 복제할 수 없습니다. 또한 여러 소비자에게 여러 번 메시지를 보내고 싶지 않습니다. 각 메시지가 한 번 처리되도록 여러 Worker 인스턴스가 있는 메시지 목록을 원합니다.

또한 채널 큐에서 작업을 가져오는 것은 receiver를 변경하는 것을 포함하므로, 스레드는 receiver를 안전하게 공유하고 수정할 방법이 필요합니다. 그렇지 않으면 경쟁 조건이 발생할 수 있습니다 (16 장에서 다룸).

16 장에서 논의한 스레드 안전 스마트 포인터를 기억하세요. 여러 스레드에서 소유권을 공유하고 스레드가 값을 변경하도록 하려면 Arc<Mutex<T>>를 사용해야 합니다. Arc 타입은 여러 Worker 인스턴스가 수신자를 소유하도록 하고, Mutex는 한 번에 하나의 Worker만 수신자로부터 작업을 가져오도록 합니다. Listing 20-18 은 필요한 변경 사항을 보여줍니다.

파일 이름: src/lib.rs

use std::{
    sync::{mpsc, Arc, Mutex},
    thread,
};
--snip--

impl ThreadPool {
    --snip--
    pub fn new(size: usize) -> ThreadPool {
        assert!(size > 0);

        let (sender, receiver) = mpsc::channel();

      1 let receiver = Arc::new(Mutex::new(receiver));

        let mut workers = Vec::with_capacity(size);

        for id in 0..size {
            workers.push(
                Worker::new(id, Arc::clone(& 2 receiver))
            );
        }

        ThreadPool { workers, sender }
    }

    --snip--
}

--snip--

impl Worker {
    fn new(
        id: usize,
        receiver: Arc<Mutex<mpsc::Receiver<Job>>>,
    ) -> Worker {
        --snip--
    }
}

Listing 20-18: ArcMutex를 사용하여 Worker 인스턴스 간에 수신자 공유하기

ThreadPool::new에서 수신자를 ArcMutex [1]에 넣었습니다. 각 새 Worker에 대해 Arc를 복제하여 참조 횟수를 늘리므로 Worker 인스턴스가 수신자의 소유권을 공유할 수 있습니다 [2].

이러한 변경 사항을 통해 코드가 컴파일됩니다! 거의 다 왔습니다!

execute 메서드 구현하기

마침내 ThreadPool에서 execute 메서드를 구현해 보겠습니다. 또한 Job을 구조체에서 execute가 받는 클로저의 타입을 보유하는 트레이트 객체에 대한 타입 별칭으로 변경합니다. "타입 별칭으로 타입 동의어 만들기"에서 설명했듯이, 타입 별칭을 사용하면 긴 타입을 사용 편의성을 위해 더 짧게 만들 수 있습니다. Listing 20-19 를 살펴보세요.

파일 이름: src/lib.rs

--snip--

type Job = Box<dyn FnOnce() + Send + 'static>;

impl ThreadPool {
    --snip--

    pub fn execute<F>(&self, f: F)
    where
        F: FnOnce() + Send + 'static,
    {
      1 let job = Box::new(f);

      2 self.sender.send(job).unwrap();
    }
}

--snip--

Listing 20-19: 각 클로저를 보유하는 Box에 대한 Job 타입 별칭을 생성한 다음 채널을 통해 작업을 전송하기

execute에서 얻는 클로저를 사용하여 새 Job 인스턴스를 생성한 후 [1], 해당 작업을 채널의 전송 끝으로 보냅니다 [2]. 전송에 실패하는 경우를 대비하여 send에 대해 unwrap을 호출하고 있습니다. 예를 들어, 모든 스레드의 실행을 중지하면 수신 끝이 새 메시지 수신을 중지하는 경우에 발생할 수 있습니다. 현재, 스레드의 실행을 중지할 수 없습니다. 풀이 존재하는 한 스레드는 계속 실행됩니다. unwrap을 사용하는 이유는 실패 사례가 발생하지 않는다는 것을 알고 있지만 컴파일러는 이를 알지 못하기 때문입니다.

하지만 아직 다 끝나지 않았습니다! Worker에서 thread::spawn에 전달되는 클로저는 여전히 채널의 수신 끝을 참조할 뿐입니다. 대신, 클로저가 영원히 루프를 돌면서 채널의 수신 끝에 작업을 요청하고 작업을 받으면 실행해야 합니다. Listing 20-20 에 표시된 변경 사항을 Worker::new에 적용해 보겠습니다.

파일 이름: src/lib.rs

--snip--

impl Worker {
    fn new(
        id: usize,
        receiver: Arc<Mutex<mpsc::Receiver<Job>>>,
    ) -> Worker {
        let thread = thread::spawn(move || loop {
            let job = receiver
              1 .lock()
              2 .unwrap()
              3 .recv()
              4 .unwrap();

            println!("Worker {id} got a job; executing.");

            job();
        });

        Worker { id, thread }
    }
}

Listing 20-20: Worker 인스턴스의 스레드에서 작업 수신 및 실행

여기서는 먼저 receiver에서 lock을 호출하여 뮤텍스를 획득하고 [1], 오류가 발생하면 unwrap을 호출하여 패닉합니다 [2]. 뮤텍스를 획득하는 데 실패할 수 있습니다. 뮤텍스가 poisoned 상태인 경우, 즉 다른 스레드가 잠금을 해제하는 대신 잠금을 유지한 채 패닉한 경우에 발생할 수 있습니다. 이 상황에서는 unwrap을 호출하여 이 스레드가 패닉하도록 하는 것이 올바른 조치입니다. 이 unwrap을 원하는 의미의 오류 메시지가 있는 expect로 자유롭게 변경할 수 있습니다.

뮤텍스에 대한 잠금을 얻으면 recv를 호출하여 채널에서 Job을 수신합니다 [3]. 마지막 unwrap은 여기에서도 모든 오류를 지나갑니다 [4]. 이는 발신자를 보유한 스레드가 종료된 경우 발생할 수 있으며, 수신자가 종료되면 send 메서드가 Err를 반환하는 방식과 유사합니다.

recv 호출은 차단되므로, 아직 작업이 없으면 현재 스레드는 작업이 사용 가능해질 때까지 대기합니다. Mutex<T>는 한 번에 하나의 Worker 스레드만 작업을 요청하도록 보장합니다.

이제 스레드 풀이 작동 상태입니다! cargo run을 실행하고 몇 가지 요청을 해보세요.

[object Object]

성공! 이제 연결을 비동기적으로 실행하는 스레드 풀이 있습니다. 4 개 이상의 스레드가 생성되지 않으므로 서버가 많은 요청을 받더라도 시스템이 과부하되지 않습니다. /sleep에 대한 요청을 하면 서버는 다른 스레드가 실행하도록 하여 다른 요청을 처리할 수 있습니다.

참고: 여러 브라우저 창에서 동시에 /sleep을 열면 5 초 간격으로 한 번에 하나씩 로드될 수 있습니다. 일부 웹 브라우저는 캐싱을 위해 동일한 요청의 여러 인스턴스를 순차적으로 실행합니다. 이 제한은 웹 서버로 인해 발생하는 것이 아닙니다.

18 장에서 while let 루프에 대해 배우고 나면 Listing 20-21 과 같이 Worker 스레드 코드를 작성하지 않은 이유가 궁금할 수 있습니다.

파일 이름: src/lib.rs

--snip--

impl Worker {
    fn new(
        id: usize,
        receiver: Arc<Mutex<mpsc::Receiver<Job>>>,
    ) -> Worker {
        let thread = thread::spawn(move || {
            while let Ok(job) = receiver.lock().unwrap().recv() {
                println!("Worker {id} got a job; executing.");

                job();
            }
        });

        Worker { id, thread }
    }
}

Listing 20-21: while let을 사용하는 Worker::new의 대체 구현

이 코드는 컴파일되고 실행되지만 원하는 스레딩 동작이 발생하지 않습니다. 느린 요청은 다른 요청이 처리될 때까지 대기하게 합니다. 그 이유는 다소 미묘합니다. Mutex 구조체에는 공개된 unlock 메서드가 없습니다. 잠금의 소유권은 lock 메서드가 반환하는 LockResult<MutexGuard<T>> 내의 MutexGuard<T>의 수명에 기반하기 때문입니다. 컴파일 시, borrow checker 는 Mutex로 보호되는 리소스는 잠금을 보유하지 않는 한 액세스할 수 없다는 규칙을 적용할 수 있습니다. 그러나 이 구현은 MutexGuard<T>의 수명을 염두에 두지 않으면 의도보다 더 오래 잠금이 유지될 수도 있습니다.

let job = receiver.lock().unwrap().recv().unwrap();를 사용하는 Listing 20-20 의 코드는 let을 사용하면 등호 오른쪽에 있는 표현식에서 사용된 모든 임시 값이 let 문이 종료될 때 즉시 삭제되기 때문에 작동합니다. 그러나 while let(및 if letmatch) 은 연결된 블록이 끝날 때까지 임시 값을 삭제하지 않습니다. Listing 20-21 에서 잠금은 job() 호출 기간 동안 유지되므로 다른 Worker 인스턴스는 작업을 수신할 수 없습니다.

요약

축하합니다! 단일 스레드 서버를 멀티 스레드 서버로 전환하는 랩을 완료했습니다. LabEx 에서 더 많은 랩을 연습하여 실력을 향상시킬 수 있습니다.