Rust によるマルチスレッドサーバ開発

RustRustBeginner
オンラインで実践に進む

This tutorial is from open-source community. Access the source code

💡 このチュートリアルは英語版からAIによって翻訳されています。原文を確認するには、 ここをクリックしてください

はじめに

単一スレッドサーバをマルチスレッドサーバに変換するへようこそ。この実験は、Rust Bookの一部です。LabEx で Rust のスキルを練習することができます。

この実験では、単一スレッドサーバをマルチスレッドサーバに変換して、同時に複数の要求を処理する効率を向上させます。

単一スレッドサーバをマルチスレッドサーバに変換する

今のところ、サーバは要求を順番に処理します。つまり、最初の処理が終了するまで、2 つ目の接続を処理しません。サーバがますます多くの要求を受け取る場合、この直列実行はますます最適ではありません。サーバが処理に時間がかかる要求を受け取った場合、新しい要求は迅速に処理できても、その後の要求は長い要求が終了するまで待たなければなりません。これを修正する必要がありますが、まずは問題を実際に見てみましょう。

低速な要求のシミュレーション

低速な処理を行う要求が、現在のサーバ実装に対して行われる他の要求にどのような影響を与えるか見てみましょう。リスト 20-10 では、5 秒間スリープしてから応答する疑似的な低速な応答で、/sleep への要求の処理を実装しています。

ファイル名:src/main.rs

use std::{
    fs,
    io::{prelude::*, BufReader},
    net::{TcpListener, TcpStream},
    thread,
    time::Duration,
};
--snip--

fn handle_connection(mut stream: TcpStream) {
    --snip--

    let (status_line, filename) = 1 match &request_line[..] {
      2 "GET / HTTP/1.1" => ("HTTP/1.1 200 OK", "hello.html"),
      3 "GET /sleep HTTP/1.1" => {
            thread::sleep(Duration::from_secs(5));
            ("HTTP/1.1 200 OK", "hello.html")
        }
      4 _ => ("HTTP/1.1 404 NOT FOUND", "404.html"),
    };

    --snip--
}

リスト 20-10: 5 秒間スリープすることで低速な要求をシミュレートする

3 つのケースがあるので、今では if から match に切り替えました [1]。文字列リテラル値とのパターンマッチングのために、request_line のスライスを明示的にマッチさせる必要があります。match は、等値メソッドのように自動的な参照と参照解除を行いません。

最初のアーム [2] は、リスト 20-9 の if ブロックと同じです。2 番目のアーム [3] は、/sleep への要求をマッチさせます。その要求が受け取られると、サーバは成功した HTML ページをレンダリングする前に 5 秒間スリープします。3 番目のアーム [4] は、リスト 20-9 の else ブロックと同じです。

サーバがどれほど単純であるかがわかります。実際のライブラリは、もっと簡潔な方法で複数の要求の認識を処理します!

cargo run を使ってサーバを起動します。そして、2 つのブラウザウィンドウを開きます。1 つは http://127.0.0.1:7878 用で、もう 1 つは http://127.0.0.1:7878/sleep 用です。以前と同じように、/ URI を何度か入力すると、すばやく応答することがわかります。しかし、/sleep を入力してから / を読み込むと、/sleep が 5 秒間完全にスリープするのを待ってから読み込まれることがわかります。

低速な要求の後に要求がバックアップするのを回避するために、いくつかの手法があります。ここで実装するのは、スレッドプールです。

スレッドプールを使ったスループットの向上

「スレッドプール」とは、生成されたスレッドのグループであり、タスクを処理するために待機していて準備ができています。プログラムが新しいタスクを受け取ると、プール内のスレッドの 1 つをそのタスクに割り当て、そのスレッドがタスクを処理します。プール内の残りのスレッドは、最初のスレッドが処理している間に入ってくる他のタスクを処理するために利用可能です。最初のスレッドがタスクの処理を終えると、それは待機中のスレッドのプールに戻され、新しいタスクを処理する準備ができます。スレッドプールを使うことで、接続を同時に処理して、サーバのスループットを向上させることができます。

DoS 攻撃から守るために、プール内のスレッド数を少ない数に制限します。もし、プログラムが各要求が入ってきたときに新しいスレッドを作成するようにしていた場合、誰かがサーバに 1000 万回の要求を行うことで、すべてのサーバのリソースを使い果たして要求の処理を停止させて大混乱を引き起こす可能性があります。

そのため、無制限のスレッドを生成するのではなく、プール内に固定数のスレッドを待機させます。入ってきた要求は、処理のためにプールに送られます。プールは、入ってきた要求のキューを維持します。プール内の各スレッドは、このキューから要求を取り出し、要求を処理し、その後、キューに別の要求を求めます。この設計では、N 個の要求を同時に処理することができます。ここで N はスレッドの数です。各スレッドが長時間実行される要求に応答している場合、その後の要求は依然としてキューにバックアップする可能性がありますが、その時点に達する前に処理できる長時間実行される要求の数を増やしました。

この技術は、Web サーバのスループットを向上させるための多くの方法の 1 つにすぎません。他に探検できるオプションとしては、フォーク/ジョインモデル、単一スレッドの非同期 I/O モデル、およびマルチスレッドの非同期 I/O モデルがあります。このトピックに興味がある場合は、他のソリューションについてもっと読んで実装してみることができます。Rust のような低レベル言語では、これらのオプションすべてが可能です。

スレッドプールを実装する前に、プールを使った場合がどのようになるかについて話し合いましょう。コードを設計しようとするとき、最初にクライアントインターフェイスを書くことで、設計を導くことができます。コードの API をその呼び出し方を望むように構造化するように書きます。そして、その構造内で機能を実装します。機能を実装してから公開 API を設計するのではなくです。

第 12 章のプロジェクトでテスト駆動開発を使った方法と同様に、ここではコンパイラ駆動開発を使います。私たちが呼び出したい関数を呼び出すコードを書き、その後、コードが動作するように次に何を変更すべきかを決定するために、コンパイラからのエラーを見ます。しかし、それを行う前に、最初として使わない技術を探ります。

各要求に対してスレッドを生成する

まず、コードが各接続に対して新しいスレッドを生成する場合のコードの見た目を見てみましょう。前述の通り、潜在的に無制限の数のスレッドを生成する問題のため、これは私たちの最終的な計画ではありませんが、まず動作するマルチスレッドサーバを作るための出発点としては役立ちます。その後、スレッドプールを改善策として追加し、2 つのソリューションを比較することがより容易になります。

リスト 20-11 は、for ループ内の各ストリームを処理するために新しいスレッドを生成するために main に加える変更を示しています。

ファイル名:src/main.rs

fn main() {
    let listener = TcpListener::bind("127.0.0.1:7878").unwrap();

    for stream in listener.incoming() {
        let stream = stream.unwrap();

        thread::spawn(|| {
            handle_connection(stream);
        });
    }
}

リスト 20-11: 各ストリームに対して新しいスレッドを生成する

第 16 章で学んだように、thread::spawn は新しいスレッドを作成し、その後、クロージャ内のコードを新しいスレッドで実行します。このコードを実行して、ブラウザで /sleep を読み込み、その後、さらに 2 つのブラウザタブで / を読み込むと、/ への要求が /sleep が終了するのを待たなくて済むことが確認できます。しかし、前述の通り、これは最終的にはシステムを圧倒してしまうでしょう。なぜなら、制限なく新しいスレッドを作成してしまうからです。

有限数のスレッドを作成する

私たちは、スレッドプールが同じような慣れ親しんだ方法で動作するようにしたいので、スレッドからスレッドプールに切り替える際に、API を使用するコードに大きな変更を加える必要はありません。リスト 20-12 は、thread::spawn の代わりに使用したい ThreadPool 構造体の仮想インターフェイスを示しています。

ファイル名:src/main.rs

fn main() {
    let listener = TcpListener::bind("127.0.0.1:7878").unwrap();
  1 let pool = ThreadPool::new(4);

    for stream in listener.incoming() {
        let stream = stream.unwrap();

      2 pool.execute(|| {
            handle_connection(stream);
        });
    }
}

リスト 20-12: 理想的な ThreadPool インターフェイス

この場合、4 つのスレッド数を設定できる新しいスレッドプールを作成するために、ThreadPool::new を使用しています [1]。そして、for ループでは、pool.executethread::spawn と同じようなインターフェイスを持っています。つまり、各ストリームに対して実行するクロージャを受け取ります [2]。pool.execute を実装する必要があります。そうすることで、クロージャを受け取り、プール内のスレッドに実行させることができます。このコードはまだコンパイルされませんが、修正方法を教えてくれるように、コンパイラに挑戦してみましょう。

コンパイラ駆動開発を使った ThreadPool の構築

リスト 20-12 の変更を src/main.rs に加え、その後、cargo check からのコンパイラエラーを使って開発を進めましょう。最初に得られるエラーは次の通りです。

$ cargo check
    Checking hello v0.1.0 (file:///projects/hello)
error[E0433]: failed to resolve: use of undeclared type `ThreadPool`
  --> src/main.rs:11:16
   |
11 |     let pool = ThreadPool::new(4);
   |                ^^^^^^^^^^ use of undeclared type `ThreadPool`

素晴らしい!このエラーは、ThreadPool 型またはモジュールが必要であることを教えてくれます。ですから、今から作成しましょう。私たちの ThreadPool の実装は、Web サーバが行っている作業の種類に依存しません。ですから、hello クレートをバイナリクレートからライブラリクレートに切り替えて、ThreadPool の実装を保持しましょう。ライブラリクレートに変更した後は、スレッドプールを使用して行いたい作業に対して、Web 要求の処理に限定されることなく、別のスレッドプールライブラリを使用することもできます。

src/lib.rs ファイルを作成し、次の内容を含めます。これは、今のところで私たちが持てる ThreadPool 構造体の最も単純な定義です。

ファイル名:src/lib.rs

pub struct ThreadPool;

そして、src/main.rs の先頭に次のコードを追加することで、main.rs ファイルを編集して、ライブラリクレートから ThreadPool をスコープ内に持ち込みます。

ファイル名:src/main.rs

use hello::ThreadPool;

このコードはまだ動作しませんが、次に対処する必要のあるエラーを取得するために、もう一度チェックしましょう。

$ cargo check
    Checking hello v0.1.0 (file:///projects/hello)
error[E0599]: no function or associated item named `new` found for struct
`ThreadPool` in the current scope
  --> src/main.rs:12:28
   |
12 |     let pool = ThreadPool::new(4);
   |                            ^^^ function or associated item not found in
`ThreadPool`

このエラーは、次に ThreadPool に対して new という名前の関連付けられた関数を作成する必要があることを示しています。また、new には、引数として 4 を受け取り、ThreadPool インスタンスを返す 1 つのパラメータが必要であることも知っています。これらの特性を持つ最も単純な new 関数を実装しましょう。

ファイル名:src/lib.rs

pub struct ThreadPool;

impl ThreadPool {
    pub fn new(size: usize) -> ThreadPool {
        ThreadPool
    }
}

size パラメータの型として usize を選んだのは、負の数のスレッドは意味をなさないことがわかっているからです。また、この 4 をスレッドのコレクションの要素数として使用することがわかっており、これが usize 型の目的であることも、「整数型」で説明した通りです。

もう一度コードをチェックしましょう。

$ cargo check
    Checking hello v0.1.0 (file:///projects/hello)
error[E0599]: no method named `execute` found for struct `ThreadPool` in the
current scope
  --> src/main.rs:17:14
   |
17 |         pool.execute(|| {
   |              ^^^^^^^ method not found in `ThreadPool`

今回のエラーは、ThreadPoolexecute メソッドがないために発生します。「有限数のスレッドを作成する」で思い出してください。私たちは、スレッドプールが thread::spawn と同じようなインターフェイスを持つべきだと決めました。また、execute 関数を実装して、与えられたクロージャを受け取り、プール内の待機中のスレッドに実行させます。

ThreadPoolexecute メソッドを定義して、クロージャをパラメータとして取るようにします。「クロージャからキャプチャされた値を移動させるときと Fn トレイト」で思い出してください。クロージャを 3 つの異なるトレイト:FnFnMut、および FnOnce を使ってパラメータとして取ることができます。ここでどの種類のクロージャを使うかを決める必要があります。最終的には、標準ライブラリの thread::spawn の実装と同じようなことを行うことになるので、thread::spawn のシグネチャがパラメータにどのような制約を持つかを見ることができます。ドキュメントからは次のように示されています。

pub fn spawn<F, T>(f: F) -> JoinHandle<T>
    where
        F: FnOnce() -> T,
        F: Send + 'static,
        T: Send + 'static,

ここでは F 型パラメータが関心の対象です。T 型パラメータは戻り値に関係しており、これには関心がありません。spawnF に対するトレイトバウンドとして FnOnce を使用していることがわかります。これもおそらく私たちが望むものです。なぜなら、最終的には execute で受け取った引数を spawn に渡すからです。FnOnce が私たちが使用したいトレイトであることをさらに確信できるのは、要求を実行するためのスレッドがその要求のクロージャを一度だけ実行するだけであり、これは FnOnceOnce に一致するからです。

F 型パラメータにはまた、トレイトバウンド Send と寿命バウンド 'static があります。これらは私たちの状況で役立ちます。クロージャを 1 つのスレッドから別のスレッドに転送するために Send が必要であり、スレッドが実行するのにどれくらいの時間がかかるかわからないために 'static が必要です。これらの制約を持つ型 F のジェネリックパラメータを取る ThreadPoolexecute メソッドを作成しましょう。

ファイル名:src/lib.rs

impl ThreadPool {
    --snip--
    pub fn execute<F>(&self, f: F)
    where
        F: FnOnce() 1 + Send + 'static,
    {
    }
}

FnOnce の後にまだ () を使用しています [1]。この FnOnce は、パラメータを持たず、ユニット型 () を返すクロージャを表しています。関数定義と同様に、シグネチャから戻り値の型を省略できますが、パラメータがなくても、依然として括弧が必要です。

再び、これは execute メソッドの最も単純な実装です。何もしませんが、私たちはただコードをコンパイルさせようとしています。もう一度チェックしましょう。

$ cargo check
    Checking hello v0.1.0 (file:///projects/hello)
    Finished dev [unoptimized + debuginfo] target(s) in 0.24s

コンパイルされました!ただし、cargo run を試してブラウザで要求を行うと、章の冒頭で見たブラウザのエラーが表示されます。私たちのライブラリは、実際には execute に渡されたクロージャを呼び出していません!

注:Haskell や Rust のような厳密なコンパイラを持つ言語に関して、「コードがコンパイルされれば、動作する」という言葉を耳にすることがあります。しかし、この言葉は必ずしも普遍的に正しいとは限りません。私たちのプロジェクトはコンパイルされますが、まったく何もしません!本当の完全なプロジェクトを構築している場合、これはコードがコンパイルされ _ かつ _ 私たちが望む動作を持つことを確認するためのユニットテストを書き始めるのに良いタイミングです。

new におけるスレッド数の検証

newexecute のパラメータに対して何もしていません。これらの関数の本体を、私たちが望む動作で実装しましょう。まず、new について考えてみましょう。前述の通り、size パラメータに対して符号なし型を選びました。なぜなら、負の数のスレッドを持つプールは意味をなさないからです。しかし、ゼロ個のスレッドを持つプールも意味をなさないですが、ゼロは完全に有効な usize です。ThreadPool インスタンスを返す前に、size がゼロより大きいことを確認するコードを追加し、assert! マクロを使ってゼロを受け取った場合にプログラムをパニックにさせます。これはリスト 20-13 に示されています。

ファイル名:src/lib.rs

impl ThreadPool {
    /// Create a new ThreadPool.
    ///
    /// The size is the number of threads in the pool.
    ///
  1 /// ## Panics
    ///
    /// The `new` function will panic if the size is zero.
    pub fn new(size: usize) -> ThreadPool {
      2 assert!(size > 0);

        ThreadPool
    }

    --snip--
}

リスト 20-13: size がゼロの場合にパニックになるように ThreadPool::new を実装する

また、doc コメントを使って ThreadPool のドキュメントを追加しました。第 14 章で説明したように、関数がパニックになる状況を明記するセクションを追加することで、良いドキュメントの作成方法を守っています [1]。cargo doc --open を実行し、ThreadPool 構造体をクリックして、new の生成されたドキュメントを見てみてください!

ここで行ったように assert! マクロを追加する代わりに [2]、newbuild に変更して、リスト 12-9 の I/O プロジェクトの Config::build と同じように Result を返すこともできます。しかし、この場合、ゼロ個のスレッドでスレッドプールを作成しようとすることは回復不可能なエラーであると判断しました。もし意欲的であれば、次のシグネチャを持つ build という名前の関数を書いて、new 関数と比較してみてください。

pub fn build(
    size: usize
) -> Result<ThreadPool, PoolCreationError> {

スレッドを格納するための空間を作成する

これで、プールに格納するための有効な数のスレッドがあることを知る方法があるので、それらのスレッドを作成して、構造体を返す前に ThreadPool 構造体に格納することができます。しかし、どのようにしてスレッドを「格納」するのでしょうか。thread::spawn のシグネチャをもう一度見てみましょう。

pub fn spawn<F, T>(f: F) -> JoinHandle<T>
    where
        F: FnOnce() -> T,
        F: Send + 'static,
        T: Send + 'static,

spawn 関数は JoinHandle<T> を返します。ここで、T はクロージャが返す型です。JoinHandle も使ってみて、何が起こるか見てみましょう。私たちの場合、スレッドプールに渡すクロージャは接続を処理して何も返さないので、T はユニット型 () になります。

リスト 20-14 のコードはコンパイルされますが、まだスレッドは作成されていません。ThreadPool の定義を変更して、thread::JoinHandle<()> インスタンスのベクトルを保持するようにし、ベクトルを size の容量で初期化し、スレッドを作成するためのコードを実行する for ループを設定し、それらを含む ThreadPool インスタンスを返しました。

ファイル名:src/lib.rs

1 use std::thread;

pub struct ThreadPool {
  2 threads: Vec<thread::JoinHandle<()>>,
}

impl ThreadPool {
    --snip--
    pub fn new(size: usize) -> ThreadPool {
        assert!(size > 0);

      3 let mut threads = Vec::with_capacity(size);

        for _ in 0..size {
            // create some threads and store them in the vector
        }

        ThreadPool { threads }
    }
    --snip--
}

リスト 20-14: ThreadPool がスレッドを保持するためのベクトルを作成する

std::thread をライブラリクレートのスコープ内に持ち込んでいます [1]。なぜなら、ThreadPool のベクトル内の要素の型として thread::JoinHandle<()> を使用しているからです [2]。

有効なサイズを受け取ると、私たちの ThreadPoolsize 個の要素を保持できる新しいベクトルを作成します [3]。with_capacity 関数は Vec::new と同じタスクを行いますが、重要な違いがあります。それは、ベクトル内に事前に空間を割り当てます。ベクトル内に size 個の要素を格納する必要があることがわかっているので、この事前割り当ては、要素が挿入されるときに自動的にサイズを調整する Vec::new を使うよりもわずかに効率的です。

もう一度 cargo check を実行すると、成功するはずです。

ThreadPool からスレッドにコードを送信する

リスト 20-14 の for ループに、スレッドの作成に関するコメントを残しました。ここでは、実際にスレッドを作成する方法を見ていきます。標準ライブラリは、スレッドを作成する方法として thread::spawn を提供しており、thread::spawn は、スレッドが作成されるとすぐに実行するコードを取得することを期待しています。しかし、私たちの場合、スレッドを作成して、後で送信するコードを待たせたいと考えています。標準ライブラリのスレッドの実装には、そのような方法は含まれていません。手動で実装する必要があります。

この動作を実装するには、ThreadPool とスレッドの間に新しいデータ構造を導入して、この新しい動作を管理します。このデータ構造を Worker と呼びます。これは、プーリングの実装で一般的な用語です。Worker は、実行する必要のあるコードを拾い上げて、そのコードを自分のスレッドで実行します。

レストランのキッチンで働く人を想像してみてください。従業員は、客からの注文が届くまで待ち、その注文を受け取って処理する責任があります。

スレッドプールに JoinHandle<()> インスタンスのベクトルを格納する代わりに、Worker 構造体のインスタンスを格納します。各 Worker は、1 つの JoinHandle<()> インスタンスを格納します。そして、Worker のメソッドを実装して、実行するコードのクロージャを受け取り、それを既に実行中のスレッドに送信して実行させます。また、各 Workerid を与えて、ログ記録やデバッグ時にプール内の異なる Worker インスタンスを区別できるようにします。

これが、ThreadPool を作成する際に起こる新しいプロセスです。このように Worker をセットアップした後、クロージャをスレッドに送信するコードを実装します。

  1. idJoinHandle<()> を保持する Worker 構造体を定義する。
  2. ThreadPool を変更して、Worker インスタンスのベクトルを保持する。
  3. id 番号を受け取り、id と空のクロージャで生成されたスレッドを保持する Worker インスタンスを返す Worker::new 関数を定義する。
  4. ThreadPool::new では、for ループのカウンタを使って id を生成し、その id で新しい Worker を作成し、Worker をベクトルに格納する。

もしチャレンジが好きなら、リスト 20-15 のコードを見る前に、自分でこれらの変更を実装してみてください。

準備はできましたか?以下に、前述の変更を行う 1 つの方法を示すリスト 20-15 を掲載します。

ファイル名:src/lib.rs

use std::thread;

pub struct ThreadPool {
  1 workers: Vec<Worker>,
}

impl ThreadPool {
    --snip--
    pub fn new(size: usize) -> ThreadPool {
        assert!(size > 0);

        let mut workers = Vec::with_capacity(size);

      2 for id in 0..size {
          3 workers.push(Worker::new(id));
        }

        ThreadPool { workers }
    }
    --snip--
}

4 struct Worker {
    id: usize,
    thread: thread::JoinHandle<()>,
}

impl Worker {
  5 fn new(id: usize) -> Worker {
      6 let thread = thread::spawn(|| {});

        Worker { 7 id, 8 thread }
    }
}

リスト 20-15: ThreadPool を変更して、直接スレッドを保持する代わりに Worker インスタンスを保持する

ThreadPool のフィールド名を threads から workers に変更しました。なぜなら、今では JoinHandle<()> インスタンスではなく Worker インスタンスを保持しているからです [1]。for ループのカウンタを [2]、Worker::new の引数として使い、各新しい Workerworkers という名前のベクトルに格納します [3]。

外部コード(src/main.rs のサーバのようなもの)は、ThreadPool 内で Worker 構造体を使用する実装の詳細を知る必要はありません。ですから、Worker 構造体 [4] とその new 関数 [5] を非公開にします。Worker::new 関数は、与えられた id を使って [7]、空のクロージャを使って新しいスレッドを生成することで作成された JoinHandle<()> インスタンスを格納します [8] [6]。

注:システムリソースが足りないためにオペレーティングシステムがスレッドを作成できない場合、thread::spawn はパニックになります。これにより、一部のスレッドの作成が成功したとしても、私たちのサーバ全体がパニックになります。単純化のため、この動作は問題ありませんが、本番環境のスレッドプールの実装では、おそらく std::thread::Builder とその spawn メソッドを使って、代わりに Result を返す方が良いでしょう。

このコードはコンパイルされ、ThreadPool::new に引数として指定した Worker インスタンスの数を格納します。しかし、まだ execute で受け取ったクロージャを処理していません。次に、それを行う方法を見ていきましょう。

チャネルを通じてスレッドに要求を送信する

次に解決する問題は、thread::spawn に与えられるクロージャが何もしないことです。現在、execute メソッドで実行したいクロージャを取得しています。しかし、ThreadPool を作成する際に各 Worker を作成するときに、thread::spawn に実行するクロージャを与える必要があります。

先ほど作成した Worker 構造体が、ThreadPool に保持されているキューから実行するコードを取得し、そのコードを自分のスレッドに送信して実行するようにしたいです。

第 16 章で学んだチャネルは、2 つのスレッド間で通信する簡単な方法です。このユースケースには最適です。チャネルをジョブのキューとして機能させ、execute がジョブを ThreadPool から Worker インスタンスに送信し、それがジョブを自分のスレッドに送信します。これが計画です。

  1. ThreadPool はチャネルを作成し、送信側を保持する。
  2. Worker は受信側を保持する。
  3. チャネルを通じて送信したいクロージャを保持する新しい Job 構造体を作成する。
  4. execute メソッドは、実行したいジョブを送信側を通じて送信する。
  5. そのスレッド内で、Worker は受信側をループ処理し、受信したジョブのクロージャを実行する。

まず、ThreadPool::new でチャネルを作成し、ThreadPool インスタンスに送信側を保持するようにしましょう。これはリスト 20-16 に示されています。Job 構造体は現在何も保持していませんが、チャネルを通じて送信するアイテムの型になります。

ファイル名:src/lib.rs

use std::{sync::mpsc, thread};

pub struct ThreadPool {
    workers: Vec<Worker>,
    sender: mpsc::Sender<Job>,
}

struct Job;

impl ThreadPool {
    --snip--
    pub fn new(size: usize) -> ThreadPool {
        assert!(size > 0);

      1 let (sender, receiver) = mpsc::channel();

        let mut workers = Vec::with_capacity(size);

        for id in 0..size {
            workers.push(Worker::new(id));
        }

        ThreadPool { workers, 2 sender }
    }
    --snip--
}

リスト 20-16: Job インスタンスを送信するチャネルの送信側を格納するように ThreadPool を変更する

ThreadPool::new では、新しいチャネルを作成し [1]、プールが送信側を保持するようにします [2]。これは正常にコンパイルされます。

ThreadPool がチャネルを作成するときに、各 Worker にチャネルの受信側を渡してみましょう。Worker インスタンスが生成するスレッドで受信側を使用したいので、クロージャ内で receiver パラメータを参照します。リスト 20-17 のコードはまだコンパイルされません。

ファイル名:src/lib.rs

impl ThreadPool {
    --snip--
    pub fn new(size: usize) -> ThreadPool {
        assert!(size > 0);

        let (sender, receiver) = mpsc::channel();

        let mut workers = Vec::with_capacity(size);

        for id in 0..size {
          1 workers.push(Worker::new(id, receiver));
        }

        ThreadPool { workers, sender }
    }
    --snip--
}

--snip--

impl Worker {
    fn new(id: usize, receiver: mpsc::Receiver<Job>) -> Worker {
        let thread = thread::spawn(|| {
          2 receiver;
        });

        Worker { id, thread }
    }
}

リスト 20-17: 各 Worker に受信側を渡す

小さな単純な変更を加えました。受信側を Worker::new に渡し [1]、そしてクロージャ内で使用します [2]。

このコードをチェックしようとすると、次のエラーが表示されます。

$ cargo check
    Checking hello v0.1.0 (file:///projects/hello)
error[E0382]: use of moved value: `receiver`
  --> src/lib.rs:26:42
   |
21 |         let (sender, receiver) = mpsc::channel();
   |                      -------- move occurs because `receiver` has type
`std::sync::mpsc::Receiver<Job>`, which does not implement the `Copy` trait
...
26 |             workers.push(Worker::new(id, receiver));
   |                                          ^^^^^^^^ value moved here, in
previous iteration of loop

コードは receiver を複数の Worker インスタンスに渡そうとしています。これは機能しません。第 16 章で思い出してください。Rust が提供するチャネルの実装は、複数の _ プロデューサー _、単一の _ コンシューマー _ です。これは、このコードを修正するためにチャネルの消費側を単にクローンすることはできないことを意味します。また、複数のコンシューマーにメッセージを複数回送信したくはありません。複数の Worker インスタンスがある場合に、各メッセージが 1 回だけ処理されるように、1 つのメッセージのリストが欲しいです。

また、チャネルキューからジョブを取り出すには、receiver を変更する必要があります。したがって、スレッドは receiver を共有して変更する安全な方法が必要です。そうでなければ、競合条件が発生する可能性があります(第 16 章で説明されています)。

第 16 章で説明したスレッドセーフなスマートポインタを思い出してください。複数のスレッド間で所有権を共有し、スレッドが値を変更できるようにするには、Arc<Mutex<T>> を使用する必要があります。Arc 型は、複数の Worker インスタンスが受信側を所有できるようにし、Mutex は、1 つの Worker だけが受信側からジョブを取得できるようにします。リスト 20-18 に必要な変更を示します。

ファイル名:src/lib.rs

use std::{
    sync::{mpsc, Arc, Mutex},
    thread,
};
--snip--

impl ThreadPool {
    --snip--
    pub fn new(size: usize) -> ThreadPool {
        assert!(size > 0);

        let (sender, receiver) = mpsc::channel();

      1 let receiver = Arc::new(Mutex::new(receiver));

        let mut workers = Vec::with_capacity(size);

        for id in 0..size {
            workers.push(
                Worker::new(id, Arc::clone(& 2 receiver))
            );
        }

        ThreadPool { workers, sender }
    }

    --snip--
}

--snip--

impl Worker {
    fn new(
        id: usize,
        receiver: Arc<Mutex<mpsc::Receiver<Job>>>,
    ) -> Worker {
        --snip--
    }
}

リスト 20-18: ArcMutex を使用して Worker インスタンス間で受信側を共有する

ThreadPool::new では、受信側を ArcMutex に入れます [1]。各新しい Worker に対して、Arc をクローンして参照カウントを増やし、Worker インスタンスが受信側の所有権を共有できるようにします [2]。

これらの変更により、コードはコンパイルされます!着々と進んでいます!

execute メソッドの実装

さて、ついに ThreadPoolexecute メソッドを実装しましょう。また、Job を構造体から、execute が受け取るクロージャの型を保持するトレイトオブジェクトの型エイリアスに変更します。「型エイリアスを使った型の同義語の作成」で説明したように、型エイリアスは使いやすさのために長い型を短くすることができます。リスト 20-19 を見てください。

ファイル名:src/lib.rs

--snip--

type Job = Box<dyn FnOnce() + Send + 'static>;

impl ThreadPool {
    --snip--

    pub fn execute<F>(&self, f: F)
    where
        F: FnOnce() + Send + 'static,
    {
      1 let job = Box::new(f);

      2 self.sender.send(job).unwrap();
    }
}

--snip--

リスト 20-19: 各クロージャを保持する Box 用の Job 型エイリアスを作成し、そのジョブをチャネルに送信する

execute で受け取ったクロージャを使って新しい Job インスタンスを作成した後 [1]、そのジョブをチャネルの送信側に送信します [2]。送信が失敗した場合に備えて、sendunwrap を呼んでいます。たとえば、すべてのスレッドの実行を停止した場合、つまり受信側が新しいメッセージの受信を停止した場合に、これが発生する可能性があります。現時点では、スレッドの実行を停止することはできません。プールが存在する限り、スレッドは継続して実行されます。unwrap を使う理由は、失敗のケースが起こらないことを私たちは知っているが、コンパイラはそれを知らないからです。

しかし、まだ終わりではありません!Worker では、thread::spawn に渡されるクロージャはまだ、チャネルの受信側を 参照 するだけです。代わりに、クロージャが永久にループし、チャネルの受信側にジョブを要求し、ジョブを受け取ったときに実行するようにする必要があります。Worker::new に対して、リスト 20-20 に示す変更を加えましょう。

ファイル名:src/lib.rs

--snip--

impl Worker {
    fn new(
        id: usize,
        receiver: Arc<Mutex<mpsc::Receiver<Job>>>,
    ) -> Worker {
        let thread = thread::spawn(move || loop {
            let job = receiver
              1.lock()
              2.unwrap()
              3.recv()
              4.unwrap();

            println!("Worker {id} got a job; executing.");

            job();
        });

        Worker { id, thread }
    }
}

リスト 20-20: Worker インスタンスのスレッド内でジョブを受信して実行する

ここでは、まず receiverlock を呼んでミューテックスを取得し [1]、次に unwrap を呼んでエラーがあった場合にパニックになります [2]。ミューテックスが 汚染された 状態にある場合、ロックを取得することが失敗する可能性があります。これは、他のスレッドがロックを保持したままパニックになり、ロックを解放しなかった場合に発生する可能性があります。この状況では、このスレッドがパニックになるように unwrap を呼ぶことが正しいアクションです。あなたが好きなら、この unwrap を、あなたにとって意味のあるエラーメッセージ付きの expect に変更しても構いません。

ミューテックスのロックを取得できた場合、recv を呼んでチャネルから Job を受信します [3]。最後の unwrap もまた、ここでエラーがあった場合にパニックになります [4]。これは、送信側を保持しているスレッドがシャットダウンした場合に発生する可能性があります。受信側がシャットダウンした場合に send メソッドが Err を返すのと同じようです。

recv の呼び出しはブロックされます。したがって、まだジョブがない場合、現在のスレッドはジョブが利用可能になるまで待ちます。Mutex<T> は、1 つの Worker スレッドだけが 1 度にジョブを要求しようとしていることを保証します。

これで、私たちのスレッドプールは動作状態になりました!cargo run を実行して、いくつかの要求を行ってみましょう。

[object Object]

成功です!これで、非同期で接続を実行するスレッドプールができました。作成されるスレッドは 4 つを超えません。したがって、サーバが多くの要求を受け取った場合でも、システムが過負荷になることはありません。/sleep に要求を行うと、サーバは別のスレッドがそれらを実行することで、他の要求を処理することができます。

注:同時に複数のブラウザウィンドウで /sleep を開くと、5 秒間隔で 1 つずつ読み込まれる場合があります。一部のウェブブラウザは、キャッシュの理由で、同じ要求の複数のインスタンスを順次実行します。この制限は、私たちのウェブサーバによるものではありません。

第 18 章で while let ループについて学んだ後、あなたはおそらく、なぜ私たちがリスト 20-21 に示すように Worker スレッドのコードを書かなかったのか疑問に思うでしょう。

ファイル名:src/lib.rs

--snip--

impl Worker {
    fn new(
        id: usize,
        receiver: Arc<Mutex<mpsc::Receiver<Job>>>,
    ) -> Worker {
        let thread = thread::spawn(move || {
            while let Ok(job) = receiver.lock().unwrap().recv() {
                println!("Worker {id} got a job; executing.");

                job();
            }
        });

        Worker { id, thread }
    }
}

リスト 20-21: while let を使った Worker::new の代替実装

このコードはコンパイルされて実行されますが、望ましいスレッドの動作をもたらさない。理由はやや微妙である。Mutex 構造体には公開の unlock メソッドがない。なぜなら、ロックの所有権は、lock メソッドが返す LockResult<MutexGuard<T>> 内の MutexGuard<T> の寿命に基づいているからである。コンパイル時に、バーローチェッカーは、Mutex によって保護されるリソースは、ロックを保持していない限りアクセスできないというルールを強制できる。しかし、この実装では、MutexGuard<T> の寿命に注意しない場合、ロックが意図したより長く保持される場合もある。

等号の右辺の式で使用される一時的な値は、let ステートメントが終了するとすぐに破棄される。しかし、while let(および if letmatch)は、関連するブロックの終了まで一時的な値を破棄しない。リスト 20-21 では、job() の呼び出しの期間中、ロックが保持され続ける。これは、他の Worker インスタンスがジョブを受け取れないことを意味する。

まとめ

おめでとうございます!あなたは、単一スレッドのサーバをマルチスレッドサーバに変える実験を完了しました。あなたの技術を向上させるために、LabEx でさらに多くの実験を行って練習してください。