はじめに
単一スレッドサーバをマルチスレッドサーバに変換するへようこそ。この実験は、Rust Bookの一部です。LabEx で Rust のスキルを練習することができます。
この実験では、単一スレッドサーバをマルチスレッドサーバに変換して、同時に複数の要求を処理する効率を向上させます。
This tutorial is from open-source community. Access the source code
💡 このチュートリアルは英語版からAIによって翻訳されています。原文を確認するには、 ここをクリックしてください
単一スレッドサーバをマルチスレッドサーバに変換するへようこそ。この実験は、Rust Bookの一部です。LabEx で Rust のスキルを練習することができます。
この実験では、単一スレッドサーバをマルチスレッドサーバに変換して、同時に複数の要求を処理する効率を向上させます。
今のところ、サーバは要求を順番に処理します。つまり、最初の処理が終了するまで、2 つ目の接続を処理しません。サーバがますます多くの要求を受け取る場合、この直列実行はますます最適ではありません。サーバが処理に時間がかかる要求を受け取った場合、新しい要求は迅速に処理できても、その後の要求は長い要求が終了するまで待たなければなりません。これを修正する必要がありますが、まずは問題を実際に見てみましょう。
低速な処理を行う要求が、現在のサーバ実装に対して行われる他の要求にどのような影響を与えるか見てみましょう。リスト 20-10 では、5 秒間スリープしてから応答する疑似的な低速な応答で、/sleep への要求の処理を実装しています。
ファイル名:src/main.rs
use std::{
fs,
io::{prelude::*, BufReader},
net::{TcpListener, TcpStream},
thread,
time::Duration,
};
--snip--
fn handle_connection(mut stream: TcpStream) {
--snip--
let (status_line, filename) = 1 match &request_line[..] {
2 "GET / HTTP/1.1" => ("HTTP/1.1 200 OK", "hello.html"),
3 "GET /sleep HTTP/1.1" => {
thread::sleep(Duration::from_secs(5));
("HTTP/1.1 200 OK", "hello.html")
}
4 _ => ("HTTP/1.1 404 NOT FOUND", "404.html"),
};
--snip--
}
リスト 20-10: 5 秒間スリープすることで低速な要求をシミュレートする
3 つのケースがあるので、今では if
から match
に切り替えました [1]。文字列リテラル値とのパターンマッチングのために、request_line
のスライスを明示的にマッチさせる必要があります。match
は、等値メソッドのように自動的な参照と参照解除を行いません。
最初のアーム [2] は、リスト 20-9 の if
ブロックと同じです。2 番目のアーム [3] は、/sleep への要求をマッチさせます。その要求が受け取られると、サーバは成功した HTML ページをレンダリングする前に 5 秒間スリープします。3 番目のアーム [4] は、リスト 20-9 の else
ブロックと同じです。
サーバがどれほど単純であるかがわかります。実際のライブラリは、もっと簡潔な方法で複数の要求の認識を処理します!
cargo run
を使ってサーバを起動します。そして、2 つのブラウザウィンドウを開きます。1 つは http://127.0.0.1:7878 用で、もう 1 つは http://127.0.0.1:7878/sleep 用です。以前と同じように、/ URI を何度か入力すると、すばやく応答することがわかります。しかし、/sleep を入力してから / を読み込むと、/ は sleep
が 5 秒間完全にスリープするのを待ってから読み込まれることがわかります。
低速な要求の後に要求がバックアップするのを回避するために、いくつかの手法があります。ここで実装するのは、スレッドプールです。
「スレッドプール」とは、生成されたスレッドのグループであり、タスクを処理するために待機していて準備ができています。プログラムが新しいタスクを受け取ると、プール内のスレッドの 1 つをそのタスクに割り当て、そのスレッドがタスクを処理します。プール内の残りのスレッドは、最初のスレッドが処理している間に入ってくる他のタスクを処理するために利用可能です。最初のスレッドがタスクの処理を終えると、それは待機中のスレッドのプールに戻され、新しいタスクを処理する準備ができます。スレッドプールを使うことで、接続を同時に処理して、サーバのスループットを向上させることができます。
DoS 攻撃から守るために、プール内のスレッド数を少ない数に制限します。もし、プログラムが各要求が入ってきたときに新しいスレッドを作成するようにしていた場合、誰かがサーバに 1000 万回の要求を行うことで、すべてのサーバのリソースを使い果たして要求の処理を停止させて大混乱を引き起こす可能性があります。
そのため、無制限のスレッドを生成するのではなく、プール内に固定数のスレッドを待機させます。入ってきた要求は、処理のためにプールに送られます。プールは、入ってきた要求のキューを維持します。プール内の各スレッドは、このキューから要求を取り出し、要求を処理し、その後、キューに別の要求を求めます。この設計では、N 個の要求を同時に処理することができます。ここで N はスレッドの数です。各スレッドが長時間実行される要求に応答している場合、その後の要求は依然としてキューにバックアップする可能性がありますが、その時点に達する前に処理できる長時間実行される要求の数を増やしました。
この技術は、Web サーバのスループットを向上させるための多くの方法の 1 つにすぎません。他に探検できるオプションとしては、フォーク/ジョインモデル、単一スレッドの非同期 I/O モデル、およびマルチスレッドの非同期 I/O モデルがあります。このトピックに興味がある場合は、他のソリューションについてもっと読んで実装してみることができます。Rust のような低レベル言語では、これらのオプションすべてが可能です。
スレッドプールを実装する前に、プールを使った場合がどのようになるかについて話し合いましょう。コードを設計しようとするとき、最初にクライアントインターフェイスを書くことで、設計を導くことができます。コードの API をその呼び出し方を望むように構造化するように書きます。そして、その構造内で機能を実装します。機能を実装してから公開 API を設計するのではなくです。
第 12 章のプロジェクトでテスト駆動開発を使った方法と同様に、ここではコンパイラ駆動開発を使います。私たちが呼び出したい関数を呼び出すコードを書き、その後、コードが動作するように次に何を変更すべきかを決定するために、コンパイラからのエラーを見ます。しかし、それを行う前に、最初として使わない技術を探ります。
まず、コードが各接続に対して新しいスレッドを生成する場合のコードの見た目を見てみましょう。前述の通り、潜在的に無制限の数のスレッドを生成する問題のため、これは私たちの最終的な計画ではありませんが、まず動作するマルチスレッドサーバを作るための出発点としては役立ちます。その後、スレッドプールを改善策として追加し、2 つのソリューションを比較することがより容易になります。
リスト 20-11 は、for
ループ内の各ストリームを処理するために新しいスレッドを生成するために main
に加える変更を示しています。
ファイル名:src/main.rs
fn main() {
let listener = TcpListener::bind("127.0.0.1:7878").unwrap();
for stream in listener.incoming() {
let stream = stream.unwrap();
thread::spawn(|| {
handle_connection(stream);
});
}
}
リスト 20-11: 各ストリームに対して新しいスレッドを生成する
第 16 章で学んだように、thread::spawn
は新しいスレッドを作成し、その後、クロージャ内のコードを新しいスレッドで実行します。このコードを実行して、ブラウザで /sleep を読み込み、その後、さらに 2 つのブラウザタブで / を読み込むと、/ への要求が /sleep が終了するのを待たなくて済むことが確認できます。しかし、前述の通り、これは最終的にはシステムを圧倒してしまうでしょう。なぜなら、制限なく新しいスレッドを作成してしまうからです。
私たちは、スレッドプールが同じような慣れ親しんだ方法で動作するようにしたいので、スレッドからスレッドプールに切り替える際に、API を使用するコードに大きな変更を加える必要はありません。リスト 20-12 は、thread::spawn
の代わりに使用したい ThreadPool
構造体の仮想インターフェイスを示しています。
ファイル名:src/main.rs
fn main() {
let listener = TcpListener::bind("127.0.0.1:7878").unwrap();
1 let pool = ThreadPool::new(4);
for stream in listener.incoming() {
let stream = stream.unwrap();
2 pool.execute(|| {
handle_connection(stream);
});
}
}
リスト 20-12: 理想的な ThreadPool
インターフェイス
この場合、4 つのスレッド数を設定できる新しいスレッドプールを作成するために、ThreadPool::new
を使用しています [1]。そして、for
ループでは、pool.execute
は thread::spawn
と同じようなインターフェイスを持っています。つまり、各ストリームに対して実行するクロージャを受け取ります [2]。pool.execute
を実装する必要があります。そうすることで、クロージャを受け取り、プール内のスレッドに実行させることができます。このコードはまだコンパイルされませんが、修正方法を教えてくれるように、コンパイラに挑戦してみましょう。
リスト 20-12 の変更を src/main.rs
に加え、その後、cargo check
からのコンパイラエラーを使って開発を進めましょう。最初に得られるエラーは次の通りです。
$ cargo check
Checking hello v0.1.0 (file:///projects/hello)
error[E0433]: failed to resolve: use of undeclared type `ThreadPool`
--> src/main.rs:11:16
|
11 | let pool = ThreadPool::new(4);
| ^^^^^^^^^^ use of undeclared type `ThreadPool`
素晴らしい!このエラーは、ThreadPool
型またはモジュールが必要であることを教えてくれます。ですから、今から作成しましょう。私たちの ThreadPool
の実装は、Web サーバが行っている作業の種類に依存しません。ですから、hello
クレートをバイナリクレートからライブラリクレートに切り替えて、ThreadPool
の実装を保持しましょう。ライブラリクレートに変更した後は、スレッドプールを使用して行いたい作業に対して、Web 要求の処理に限定されることなく、別のスレッドプールライブラリを使用することもできます。
src/lib.rs
ファイルを作成し、次の内容を含めます。これは、今のところで私たちが持てる ThreadPool
構造体の最も単純な定義です。
ファイル名:src/lib.rs
pub struct ThreadPool;
そして、src/main.rs
の先頭に次のコードを追加することで、main.rs
ファイルを編集して、ライブラリクレートから ThreadPool
をスコープ内に持ち込みます。
ファイル名:src/main.rs
use hello::ThreadPool;
このコードはまだ動作しませんが、次に対処する必要のあるエラーを取得するために、もう一度チェックしましょう。
$ cargo check
Checking hello v0.1.0 (file:///projects/hello)
error[E0599]: no function or associated item named `new` found for struct
`ThreadPool` in the current scope
--> src/main.rs:12:28
|
12 | let pool = ThreadPool::new(4);
| ^^^ function or associated item not found in
`ThreadPool`
このエラーは、次に ThreadPool
に対して new
という名前の関連付けられた関数を作成する必要があることを示しています。また、new
には、引数として 4
を受け取り、ThreadPool
インスタンスを返す 1 つのパラメータが必要であることも知っています。これらの特性を持つ最も単純な new
関数を実装しましょう。
ファイル名:src/lib.rs
pub struct ThreadPool;
impl ThreadPool {
pub fn new(size: usize) -> ThreadPool {
ThreadPool
}
}
size
パラメータの型として usize
を選んだのは、負の数のスレッドは意味をなさないことがわかっているからです。また、この 4
をスレッドのコレクションの要素数として使用することがわかっており、これが usize
型の目的であることも、「整数型」で説明した通りです。
もう一度コードをチェックしましょう。
$ cargo check
Checking hello v0.1.0 (file:///projects/hello)
error[E0599]: no method named `execute` found for struct `ThreadPool` in the
current scope
--> src/main.rs:17:14
|
17 | pool.execute(|| {
| ^^^^^^^ method not found in `ThreadPool`
今回のエラーは、ThreadPool
に execute
メソッドがないために発生します。「有限数のスレッドを作成する」で思い出してください。私たちは、スレッドプールが thread::spawn
と同じようなインターフェイスを持つべきだと決めました。また、execute
関数を実装して、与えられたクロージャを受け取り、プール内の待機中のスレッドに実行させます。
ThreadPool
の execute
メソッドを定義して、クロージャをパラメータとして取るようにします。「クロージャからキャプチャされた値を移動させるときと Fn トレイト」で思い出してください。クロージャを 3 つの異なるトレイト:Fn
、FnMut
、および FnOnce
を使ってパラメータとして取ることができます。ここでどの種類のクロージャを使うかを決める必要があります。最終的には、標準ライブラリの thread::spawn
の実装と同じようなことを行うことになるので、thread::spawn
のシグネチャがパラメータにどのような制約を持つかを見ることができます。ドキュメントからは次のように示されています。
pub fn spawn<F, T>(f: F) -> JoinHandle<T>
where
F: FnOnce() -> T,
F: Send + 'static,
T: Send + 'static,
ここでは F
型パラメータが関心の対象です。T
型パラメータは戻り値に関係しており、これには関心がありません。spawn
は F
に対するトレイトバウンドとして FnOnce
を使用していることがわかります。これもおそらく私たちが望むものです。なぜなら、最終的には execute
で受け取った引数を spawn
に渡すからです。FnOnce
が私たちが使用したいトレイトであることをさらに確信できるのは、要求を実行するためのスレッドがその要求のクロージャを一度だけ実行するだけであり、これは FnOnce
の Once
に一致するからです。
F
型パラメータにはまた、トレイトバウンド Send
と寿命バウンド 'static
があります。これらは私たちの状況で役立ちます。クロージャを 1 つのスレッドから別のスレッドに転送するために Send
が必要であり、スレッドが実行するのにどれくらいの時間がかかるかわからないために 'static
が必要です。これらの制約を持つ型 F
のジェネリックパラメータを取る ThreadPool
の execute
メソッドを作成しましょう。
ファイル名:src/lib.rs
impl ThreadPool {
--snip--
pub fn execute<F>(&self, f: F)
where
F: FnOnce() 1 + Send + 'static,
{
}
}
FnOnce
の後にまだ ()
を使用しています [1]。この FnOnce
は、パラメータを持たず、ユニット型 ()
を返すクロージャを表しています。関数定義と同様に、シグネチャから戻り値の型を省略できますが、パラメータがなくても、依然として括弧が必要です。
再び、これは execute
メソッドの最も単純な実装です。何もしませんが、私たちはただコードをコンパイルさせようとしています。もう一度チェックしましょう。
$ cargo check
Checking hello v0.1.0 (file:///projects/hello)
Finished dev [unoptimized + debuginfo] target(s) in 0.24s
コンパイルされました!ただし、cargo run
を試してブラウザで要求を行うと、章の冒頭で見たブラウザのエラーが表示されます。私たちのライブラリは、実際には execute
に渡されたクロージャを呼び出していません!
注:Haskell や Rust のような厳密なコンパイラを持つ言語に関して、「コードがコンパイルされれば、動作する」という言葉を耳にすることがあります。しかし、この言葉は必ずしも普遍的に正しいとは限りません。私たちのプロジェクトはコンパイルされますが、まったく何もしません!本当の完全なプロジェクトを構築している場合、これはコードがコンパイルされ _ かつ _ 私たちが望む動作を持つことを確認するためのユニットテストを書き始めるのに良いタイミングです。
new
と execute
のパラメータに対して何もしていません。これらの関数の本体を、私たちが望む動作で実装しましょう。まず、new
について考えてみましょう。前述の通り、size
パラメータに対して符号なし型を選びました。なぜなら、負の数のスレッドを持つプールは意味をなさないからです。しかし、ゼロ個のスレッドを持つプールも意味をなさないですが、ゼロは完全に有効な usize
です。ThreadPool
インスタンスを返す前に、size
がゼロより大きいことを確認するコードを追加し、assert!
マクロを使ってゼロを受け取った場合にプログラムをパニックにさせます。これはリスト 20-13 に示されています。
ファイル名:src/lib.rs
impl ThreadPool {
/// Create a new ThreadPool.
///
/// The size is the number of threads in the pool.
///
1 /// ## Panics
///
/// The `new` function will panic if the size is zero.
pub fn new(size: usize) -> ThreadPool {
2 assert!(size > 0);
ThreadPool
}
--snip--
}
リスト 20-13: size
がゼロの場合にパニックになるように ThreadPool::new
を実装する
また、doc コメントを使って ThreadPool
のドキュメントを追加しました。第 14 章で説明したように、関数がパニックになる状況を明記するセクションを追加することで、良いドキュメントの作成方法を守っています [1]。cargo doc --open
を実行し、ThreadPool
構造体をクリックして、new
の生成されたドキュメントを見てみてください!
ここで行ったように assert!
マクロを追加する代わりに [2]、new
を build
に変更して、リスト 12-9 の I/O プロジェクトの Config::build
と同じように Result
を返すこともできます。しかし、この場合、ゼロ個のスレッドでスレッドプールを作成しようとすることは回復不可能なエラーであると判断しました。もし意欲的であれば、次のシグネチャを持つ build
という名前の関数を書いて、new
関数と比較してみてください。
pub fn build(
size: usize
) -> Result<ThreadPool, PoolCreationError> {
これで、プールに格納するための有効な数のスレッドがあることを知る方法があるので、それらのスレッドを作成して、構造体を返す前に ThreadPool
構造体に格納することができます。しかし、どのようにしてスレッドを「格納」するのでしょうか。thread::spawn
のシグネチャをもう一度見てみましょう。
pub fn spawn<F, T>(f: F) -> JoinHandle<T>
where
F: FnOnce() -> T,
F: Send + 'static,
T: Send + 'static,
spawn
関数は JoinHandle<T>
を返します。ここで、T
はクロージャが返す型です。JoinHandle
も使ってみて、何が起こるか見てみましょう。私たちの場合、スレッドプールに渡すクロージャは接続を処理して何も返さないので、T
はユニット型 ()
になります。
リスト 20-14 のコードはコンパイルされますが、まだスレッドは作成されていません。ThreadPool
の定義を変更して、thread::JoinHandle<()>
インスタンスのベクトルを保持するようにし、ベクトルを size
の容量で初期化し、スレッドを作成するためのコードを実行する for
ループを設定し、それらを含む ThreadPool
インスタンスを返しました。
ファイル名:src/lib.rs
1 use std::thread;
pub struct ThreadPool {
2 threads: Vec<thread::JoinHandle<()>>,
}
impl ThreadPool {
--snip--
pub fn new(size: usize) -> ThreadPool {
assert!(size > 0);
3 let mut threads = Vec::with_capacity(size);
for _ in 0..size {
// create some threads and store them in the vector
}
ThreadPool { threads }
}
--snip--
}
リスト 20-14: ThreadPool
がスレッドを保持するためのベクトルを作成する
std::thread
をライブラリクレートのスコープ内に持ち込んでいます [1]。なぜなら、ThreadPool
のベクトル内の要素の型として thread::JoinHandle<()>
を使用しているからです [2]。
有効なサイズを受け取ると、私たちの ThreadPool
は size
個の要素を保持できる新しいベクトルを作成します [3]。with_capacity
関数は Vec::new
と同じタスクを行いますが、重要な違いがあります。それは、ベクトル内に事前に空間を割り当てます。ベクトル内に size
個の要素を格納する必要があることがわかっているので、この事前割り当ては、要素が挿入されるときに自動的にサイズを調整する Vec::new
を使うよりもわずかに効率的です。
もう一度 cargo check
を実行すると、成功するはずです。
リスト 20-14 の for
ループに、スレッドの作成に関するコメントを残しました。ここでは、実際にスレッドを作成する方法を見ていきます。標準ライブラリは、スレッドを作成する方法として thread::spawn
を提供しており、thread::spawn
は、スレッドが作成されるとすぐに実行するコードを取得することを期待しています。しかし、私たちの場合、スレッドを作成して、後で送信するコードを待たせたいと考えています。標準ライブラリのスレッドの実装には、そのような方法は含まれていません。手動で実装する必要があります。
この動作を実装するには、ThreadPool
とスレッドの間に新しいデータ構造を導入して、この新しい動作を管理します。このデータ構造を Worker と呼びます。これは、プーリングの実装で一般的な用語です。Worker
は、実行する必要のあるコードを拾い上げて、そのコードを自分のスレッドで実行します。
レストランのキッチンで働く人を想像してみてください。従業員は、客からの注文が届くまで待ち、その注文を受け取って処理する責任があります。
スレッドプールに JoinHandle<()>
インスタンスのベクトルを格納する代わりに、Worker
構造体のインスタンスを格納します。各 Worker
は、1 つの JoinHandle<()>
インスタンスを格納します。そして、Worker
のメソッドを実装して、実行するコードのクロージャを受け取り、それを既に実行中のスレッドに送信して実行させます。また、各 Worker
に id
を与えて、ログ記録やデバッグ時にプール内の異なる Worker
インスタンスを区別できるようにします。
これが、ThreadPool
を作成する際に起こる新しいプロセスです。このように Worker
をセットアップした後、クロージャをスレッドに送信するコードを実装します。
id
と JoinHandle<()>
を保持する Worker
構造体を定義する。ThreadPool
を変更して、Worker
インスタンスのベクトルを保持する。id
番号を受け取り、id
と空のクロージャで生成されたスレッドを保持する Worker
インスタンスを返す Worker::new
関数を定義する。ThreadPool::new
では、for
ループのカウンタを使って id
を生成し、その id
で新しい Worker
を作成し、Worker
をベクトルに格納する。もしチャレンジが好きなら、リスト 20-15 のコードを見る前に、自分でこれらの変更を実装してみてください。
準備はできましたか?以下に、前述の変更を行う 1 つの方法を示すリスト 20-15 を掲載します。
ファイル名:src/lib.rs
use std::thread;
pub struct ThreadPool {
1 workers: Vec<Worker>,
}
impl ThreadPool {
--snip--
pub fn new(size: usize) -> ThreadPool {
assert!(size > 0);
let mut workers = Vec::with_capacity(size);
2 for id in 0..size {
3 workers.push(Worker::new(id));
}
ThreadPool { workers }
}
--snip--
}
4 struct Worker {
id: usize,
thread: thread::JoinHandle<()>,
}
impl Worker {
5 fn new(id: usize) -> Worker {
6 let thread = thread::spawn(|| {});
Worker { 7 id, 8 thread }
}
}
リスト 20-15: ThreadPool
を変更して、直接スレッドを保持する代わりに Worker
インスタンスを保持する
ThreadPool
のフィールド名を threads
から workers
に変更しました。なぜなら、今では JoinHandle<()>
インスタンスではなく Worker
インスタンスを保持しているからです [1]。for
ループのカウンタを [2]、Worker::new
の引数として使い、各新しい Worker
を workers
という名前のベクトルに格納します [3]。
外部コード(src/main.rs
のサーバのようなもの)は、ThreadPool
内で Worker
構造体を使用する実装の詳細を知る必要はありません。ですから、Worker
構造体 [4] とその new
関数 [5] を非公開にします。Worker::new
関数は、与えられた id
を使って [7]、空のクロージャを使って新しいスレッドを生成することで作成された JoinHandle<()>
インスタンスを格納します [8] [6]。
注:システムリソースが足りないためにオペレーティングシステムがスレッドを作成できない場合、
thread::spawn
はパニックになります。これにより、一部のスレッドの作成が成功したとしても、私たちのサーバ全体がパニックになります。単純化のため、この動作は問題ありませんが、本番環境のスレッドプールの実装では、おそらくstd::thread::Builder
とそのspawn
メソッドを使って、代わりにResult
を返す方が良いでしょう。
このコードはコンパイルされ、ThreadPool::new
に引数として指定した Worker
インスタンスの数を格納します。しかし、まだ execute
で受け取ったクロージャを処理していません。次に、それを行う方法を見ていきましょう。
次に解決する問題は、thread::spawn
に与えられるクロージャが何もしないことです。現在、execute
メソッドで実行したいクロージャを取得しています。しかし、ThreadPool
を作成する際に各 Worker
を作成するときに、thread::spawn
に実行するクロージャを与える必要があります。
先ほど作成した Worker
構造体が、ThreadPool
に保持されているキューから実行するコードを取得し、そのコードを自分のスレッドに送信して実行するようにしたいです。
第 16 章で学んだチャネルは、2 つのスレッド間で通信する簡単な方法です。このユースケースには最適です。チャネルをジョブのキューとして機能させ、execute
がジョブを ThreadPool
から Worker
インスタンスに送信し、それがジョブを自分のスレッドに送信します。これが計画です。
ThreadPool
はチャネルを作成し、送信側を保持する。Worker
は受信側を保持する。Job
構造体を作成する。execute
メソッドは、実行したいジョブを送信側を通じて送信する。Worker
は受信側をループ処理し、受信したジョブのクロージャを実行する。まず、ThreadPool::new
でチャネルを作成し、ThreadPool
インスタンスに送信側を保持するようにしましょう。これはリスト 20-16 に示されています。Job
構造体は現在何も保持していませんが、チャネルを通じて送信するアイテムの型になります。
ファイル名:src/lib.rs
use std::{sync::mpsc, thread};
pub struct ThreadPool {
workers: Vec<Worker>,
sender: mpsc::Sender<Job>,
}
struct Job;
impl ThreadPool {
--snip--
pub fn new(size: usize) -> ThreadPool {
assert!(size > 0);
1 let (sender, receiver) = mpsc::channel();
let mut workers = Vec::with_capacity(size);
for id in 0..size {
workers.push(Worker::new(id));
}
ThreadPool { workers, 2 sender }
}
--snip--
}
リスト 20-16: Job
インスタンスを送信するチャネルの送信側を格納するように ThreadPool
を変更する
ThreadPool::new
では、新しいチャネルを作成し [1]、プールが送信側を保持するようにします [2]。これは正常にコンパイルされます。
ThreadPool
がチャネルを作成するときに、各 Worker
にチャネルの受信側を渡してみましょう。Worker
インスタンスが生成するスレッドで受信側を使用したいので、クロージャ内で receiver
パラメータを参照します。リスト 20-17 のコードはまだコンパイルされません。
ファイル名:src/lib.rs
impl ThreadPool {
--snip--
pub fn new(size: usize) -> ThreadPool {
assert!(size > 0);
let (sender, receiver) = mpsc::channel();
let mut workers = Vec::with_capacity(size);
for id in 0..size {
1 workers.push(Worker::new(id, receiver));
}
ThreadPool { workers, sender }
}
--snip--
}
--snip--
impl Worker {
fn new(id: usize, receiver: mpsc::Receiver<Job>) -> Worker {
let thread = thread::spawn(|| {
2 receiver;
});
Worker { id, thread }
}
}
リスト 20-17: 各 Worker
に受信側を渡す
小さな単純な変更を加えました。受信側を Worker::new
に渡し [1]、そしてクロージャ内で使用します [2]。
このコードをチェックしようとすると、次のエラーが表示されます。
$ cargo check
Checking hello v0.1.0 (file:///projects/hello)
error[E0382]: use of moved value: `receiver`
--> src/lib.rs:26:42
|
21 | let (sender, receiver) = mpsc::channel();
| -------- move occurs because `receiver` has type
`std::sync::mpsc::Receiver<Job>`, which does not implement the `Copy` trait
...
26 | workers.push(Worker::new(id, receiver));
| ^^^^^^^^ value moved here, in
previous iteration of loop
コードは receiver
を複数の Worker
インスタンスに渡そうとしています。これは機能しません。第 16 章で思い出してください。Rust が提供するチャネルの実装は、複数の _ プロデューサー _、単一の _ コンシューマー _ です。これは、このコードを修正するためにチャネルの消費側を単にクローンすることはできないことを意味します。また、複数のコンシューマーにメッセージを複数回送信したくはありません。複数の Worker
インスタンスがある場合に、各メッセージが 1 回だけ処理されるように、1 つのメッセージのリストが欲しいです。
また、チャネルキューからジョブを取り出すには、receiver
を変更する必要があります。したがって、スレッドは receiver
を共有して変更する安全な方法が必要です。そうでなければ、競合条件が発生する可能性があります(第 16 章で説明されています)。
第 16 章で説明したスレッドセーフなスマートポインタを思い出してください。複数のスレッド間で所有権を共有し、スレッドが値を変更できるようにするには、Arc<Mutex<T>>
を使用する必要があります。Arc
型は、複数の Worker
インスタンスが受信側を所有できるようにし、Mutex
は、1 つの Worker
だけが受信側からジョブを取得できるようにします。リスト 20-18 に必要な変更を示します。
ファイル名:src/lib.rs
use std::{
sync::{mpsc, Arc, Mutex},
thread,
};
--snip--
impl ThreadPool {
--snip--
pub fn new(size: usize) -> ThreadPool {
assert!(size > 0);
let (sender, receiver) = mpsc::channel();
1 let receiver = Arc::new(Mutex::new(receiver));
let mut workers = Vec::with_capacity(size);
for id in 0..size {
workers.push(
Worker::new(id, Arc::clone(& 2 receiver))
);
}
ThreadPool { workers, sender }
}
--snip--
}
--snip--
impl Worker {
fn new(
id: usize,
receiver: Arc<Mutex<mpsc::Receiver<Job>>>,
) -> Worker {
--snip--
}
}
リスト 20-18: Arc
と Mutex
を使用して Worker
インスタンス間で受信側を共有する
ThreadPool::new
では、受信側を Arc
と Mutex
に入れます [1]。各新しい Worker
に対して、Arc
をクローンして参照カウントを増やし、Worker
インスタンスが受信側の所有権を共有できるようにします [2]。
これらの変更により、コードはコンパイルされます!着々と進んでいます!
さて、ついに ThreadPool
の execute
メソッドを実装しましょう。また、Job
を構造体から、execute
が受け取るクロージャの型を保持するトレイトオブジェクトの型エイリアスに変更します。「型エイリアスを使った型の同義語の作成」で説明したように、型エイリアスは使いやすさのために長い型を短くすることができます。リスト 20-19 を見てください。
ファイル名:src/lib.rs
--snip--
type Job = Box<dyn FnOnce() + Send + 'static>;
impl ThreadPool {
--snip--
pub fn execute<F>(&self, f: F)
where
F: FnOnce() + Send + 'static,
{
1 let job = Box::new(f);
2 self.sender.send(job).unwrap();
}
}
--snip--
リスト 20-19: 各クロージャを保持する Box
用の Job
型エイリアスを作成し、そのジョブをチャネルに送信する
execute
で受け取ったクロージャを使って新しい Job
インスタンスを作成した後 [1]、そのジョブをチャネルの送信側に送信します [2]。送信が失敗した場合に備えて、send
で unwrap
を呼んでいます。たとえば、すべてのスレッドの実行を停止した場合、つまり受信側が新しいメッセージの受信を停止した場合に、これが発生する可能性があります。現時点では、スレッドの実行を停止することはできません。プールが存在する限り、スレッドは継続して実行されます。unwrap
を使う理由は、失敗のケースが起こらないことを私たちは知っているが、コンパイラはそれを知らないからです。
しかし、まだ終わりではありません!Worker
では、thread::spawn
に渡されるクロージャはまだ、チャネルの受信側を 参照 するだけです。代わりに、クロージャが永久にループし、チャネルの受信側にジョブを要求し、ジョブを受け取ったときに実行するようにする必要があります。Worker::new
に対して、リスト 20-20 に示す変更を加えましょう。
ファイル名:src/lib.rs
--snip--
impl Worker {
fn new(
id: usize,
receiver: Arc<Mutex<mpsc::Receiver<Job>>>,
) -> Worker {
let thread = thread::spawn(move || loop {
let job = receiver
1.lock()
2.unwrap()
3.recv()
4.unwrap();
println!("Worker {id} got a job; executing.");
job();
});
Worker { id, thread }
}
}
リスト 20-20: Worker
インスタンスのスレッド内でジョブを受信して実行する
ここでは、まず receiver
の lock
を呼んでミューテックスを取得し [1]、次に unwrap
を呼んでエラーがあった場合にパニックになります [2]。ミューテックスが 汚染された 状態にある場合、ロックを取得することが失敗する可能性があります。これは、他のスレッドがロックを保持したままパニックになり、ロックを解放しなかった場合に発生する可能性があります。この状況では、このスレッドがパニックになるように unwrap
を呼ぶことが正しいアクションです。あなたが好きなら、この unwrap
を、あなたにとって意味のあるエラーメッセージ付きの expect
に変更しても構いません。
ミューテックスのロックを取得できた場合、recv
を呼んでチャネルから Job
を受信します [3]。最後の unwrap
もまた、ここでエラーがあった場合にパニックになります [4]。これは、送信側を保持しているスレッドがシャットダウンした場合に発生する可能性があります。受信側がシャットダウンした場合に send
メソッドが Err
を返すのと同じようです。
recv
の呼び出しはブロックされます。したがって、まだジョブがない場合、現在のスレッドはジョブが利用可能になるまで待ちます。Mutex<T>
は、1 つの Worker
スレッドだけが 1 度にジョブを要求しようとしていることを保証します。
これで、私たちのスレッドプールは動作状態になりました!cargo run
を実行して、いくつかの要求を行ってみましょう。
[object Object]
成功です!これで、非同期で接続を実行するスレッドプールができました。作成されるスレッドは 4 つを超えません。したがって、サーバが多くの要求を受け取った場合でも、システムが過負荷になることはありません。/sleep
に要求を行うと、サーバは別のスレッドがそれらを実行することで、他の要求を処理することができます。
注:同時に複数のブラウザウィンドウで /sleep を開くと、5 秒間隔で 1 つずつ読み込まれる場合があります。一部のウェブブラウザは、キャッシュの理由で、同じ要求の複数のインスタンスを順次実行します。この制限は、私たちのウェブサーバによるものではありません。
第 18 章で while let
ループについて学んだ後、あなたはおそらく、なぜ私たちがリスト 20-21 に示すように Worker
スレッドのコードを書かなかったのか疑問に思うでしょう。
ファイル名:src/lib.rs
--snip--
impl Worker {
fn new(
id: usize,
receiver: Arc<Mutex<mpsc::Receiver<Job>>>,
) -> Worker {
let thread = thread::spawn(move || {
while let Ok(job) = receiver.lock().unwrap().recv() {
println!("Worker {id} got a job; executing.");
job();
}
});
Worker { id, thread }
}
}
リスト 20-21: while let
を使った Worker::new
の代替実装
このコードはコンパイルされて実行されますが、望ましいスレッドの動作をもたらさない。理由はやや微妙である。Mutex
構造体には公開の unlock
メソッドがない。なぜなら、ロックの所有権は、lock
メソッドが返す LockResult<MutexGuard<T>>
内の MutexGuard<T>
の寿命に基づいているからである。コンパイル時に、バーローチェッカーは、Mutex
によって保護されるリソースは、ロックを保持していない限りアクセスできないというルールを強制できる。しかし、この実装では、MutexGuard<T>
の寿命に注意しない場合、ロックが意図したより長く保持される場合もある。
等号の右辺の式で使用される一時的な値は、let
ステートメントが終了するとすぐに破棄される。しかし、while let
(および if let
と match
)は、関連するブロックの終了まで一時的な値を破棄しない。リスト 20-21 では、job()
の呼び出しの期間中、ロックが保持され続ける。これは、他の Worker
インスタンスがジョブを受け取れないことを意味する。
おめでとうございます!あなたは、単一スレッドのサーバをマルチスレッドサーバに変える実験を完了しました。あなたの技術を向上させるために、LabEx でさらに多くの実験を行って練習してください。