はじめに
グレースフルシャットダウンとクリーンアップへようこそ。この実験は、Rust Bookの一部です。LabEx で Rust のスキルを練習できます。
この実験では、Dropトレイトを利用してコードにグレースフルシャットダウンとクリーンアップメカニズムを実装し、スレッドが新しい要求を受け付けなくなり、シャットダウンする方法を提供します。
グレースフルシャットダウンとクリーンアップ
リスト 20-20 のコードは、意図通りにスレッドプールを使って非同期で要求に応答しています。workers、id、threadフィールドについて、直接使っていないことに関する警告がいくつか表示されます。これは、何もクリーンアップしていないことを思い出させます。よりエレガントではない ctrl-C メソッドを使ってメインスレッドを停止させると、他のすべてのスレッドも即座に停止します。たとえ、要求の処理の途中であってもです。
次に、Dropトレイトを実装して、プール内の各スレッドにjoinを呼び出します。これにより、クローズする前に処理している要求を完了させることができます。そして、スレッドに新しい要求を受け付けなくなり、シャットダウンするように伝える方法を実装します。このコードを動作させるには、サーバーを変更して、グレースフルにスレッドプールをシャットダウンする前に 2 つの要求のみを受け付けるようにします。
スレッドプールにおける Drop トレイトの実装
まずは、スレッドプールにDropを実装しましょう。プールが破棄されるとき、すべてのスレッドが join して、作業を終えることができるようにする必要があります。リスト 20-22 は、Drop実装の最初の試みを示していますが、このコードはまだうまく動作しません。
ファイル名:src/lib.rs
impl Drop for ThreadPool {
fn drop(&mut self) {
1 for worker in &mut self.workers {
2 println!("Shutting down worker {}", worker.id);
3 worker.thread.join().unwrap();
}
}
}
リスト 20-22: スレッドプールがスコープ外になるときに各スレッドを join する
まず、スレッドプールの各workerをループ処理します[1]。ここでは&mutを使っています。なぜならselfは可変参照であり、workerも変更する必要があるからです。各workerに対して、この特定のWorkerインスタンスがシャットダウンされていることを示すメッセージを出力します[2]。そして、そのWorkerインスタンスのスレッドにjoinを呼び出します[3]。joinの呼び出しが失敗した場合、unwrapを使って Rust がパニックになり、グレースフルでないシャットダウンに入ります。
このコードをコンパイルすると、以下のエラーが表示されます。
error[E0507]: cannot move out of `worker.thread` which is behind a mutable
reference
--> src/lib.rs:52:13
|
52 | worker.thread.join().unwrap();
| ^^^^^^^^^^^^^ ------ `worker.thread` moved due to this
method call
| |
| move occurs because `worker.thread` has type
`JoinHandle<()>`, which does not implement the `Copy` trait
|
note: this function takes ownership of the receiver `self`, which moves
`worker.thread`
このエラーは、各workerの可変参照しか持っていないのにjoinを呼び出せないことを示しています。joinは引数の所有権を取得するからです。この問題を解決するには、threadを所有するWorkerインスタンスからスレッドを移動させる必要があります。そうすることでjoinがスレッドを消費できるようになります。これは、リスト 17-15 で行ったことと同じです。WorkerがOption<thread::JoinHandle<()>>を保持している場合、Optionのtakeメソッドを呼び出して、Someバリアントから値を移動させ、その代わりにNoneバリアントを残すことができます。つまり、実行中のWorkerはthreadにSomeバリアントがあり、Workerをクリーンアップしたいときには、SomeをNoneに置き換えることで、Workerが実行するスレッドがなくなります。
ですから、Workerの定義を以下のように更新することがわかります。
ファイル名:src/lib.rs
struct Worker {
id: usize,
thread: Option<thread::JoinHandle<()>>,
}
次に、コンパイラに頼って、変更が必要な他の場所を見つけましょう。このコードをチェックすると、2 つのエラーが表示されます。
error[E0599]: no method named `join` found for enum `Option` in the current
scope
--> src/lib.rs:52:27
|
52 | worker.thread.join().unwrap();
| ^^^^ method not found in
`Option<JoinHandle<()>>`
error[E0308]: mismatched types
--> src/lib.rs:72:22
|
72 | Worker { id, thread }
| ^^^^^^ expected enum `Option`, found struct
`JoinHandle`
|
= note: expected enum `Option<JoinHandle<()>>`
found struct `JoinHandle<_>`
help: try wrapping the expression in `Some`
|
72 | Worker { id, thread: Some(thread) }
| +++++++++++++ +
2 番目のエラーを解決しましょう。これは、Worker::newの末尾のコードを指しています。新しいWorkerを作成するとき、thread値をSomeでラップする必要があります。このエラーを修正するために、次のように変更します。
ファイル名:src/lib.rs
impl Worker {
fn new(
id: usize,
receiver: Arc<Mutex<mpsc::Receiver<Job>>>,
) -> Worker {
--snip--
Worker {
id,
thread: Some(thread),
}
}
}
最初のエラーは、Drop実装にあります。前述の通り、Option値のtakeを呼び出して、workerからthreadを移動させる予定でした。次の変更を行うことでそれができます。
ファイル名:src/lib.rs
impl Drop for ThreadPool {
fn drop(&mut self) {
for worker in &mut self.workers {
println!("Shutting down worker {}", worker.id);
1 if let Some(thread) = worker.thread.take() {
2 thread.join().unwrap();
}
}
}
}
第 17 章で説明したように、OptionのtakeメソッドはSomeバリアントを取り出し、その代わりにNoneを残します。if letを使ってSomeを分解し、スレッドを取得します[1]。そして、そのスレッドにjoinを呼び出します[2]。Workerインスタンスのスレッドが既にNoneである場合、そのWorkerは既にスレッドがクリーンアップされていることがわかります。したがって、その場合何も起こりません。
スレッドに対してジョブの受信を停止するように信号を送る
これまでに行ったすべての変更により、コードは警告なしでコンパイルされます。しかし、残念なことに、このコードはまだ私たちが望むように機能しません。鍵は、Workerインスタンスのスレッドによって実行されるクロージャ内のロジックにあります。現在のところ、joinを呼び出していますが、これではスレッドが停止しません。なぜなら、ジョブを探し続けるために永久にloopしているからです。現在のdropの実装でThreadPoolを破棄しようとすると、メインスレッドは最初のスレッドが終了するのを永久に待ち、ブロックされます。
この問題を解決するには、ThreadPoolのdrop実装を変更し、その後Workerのループを変更する必要があります。
まず、ThreadPoolのdrop実装を変更して、スレッドが終了するのを待つ前に明示的にsenderを破棄します。リスト 20-23 は、senderを明示的に破棄するためのThreadPoolの変更を示しています。スレッドと同じOptionとtakeテクニックを使用して、senderをThreadPoolから移動させることができます。
ファイル名:src/lib.rs
pub struct ThreadPool {
workers: Vec<Worker>,
sender: Option<mpsc::Sender<Job>>,
}
--snip--
impl ThreadPool {
pub fn new(size: usize) -> ThreadPool {
--snip--
ThreadPool {
workers,
sender: Some(sender),
}
}
pub fn execute<F>(&self, f: F)
where
F: FnOnce() + Send + 'static,
{
let job = Box::new(f);
self.sender
.as_ref()
.unwrap()
.send(job)
.unwrap();
}
}
impl Drop for ThreadPool {
fn drop(&mut self) {
1 drop(self.sender.take());
for worker in &mut self.workers {
println!("Shutting down worker {}", worker.id);
if let Some(thread) = worker.thread.take() {
thread.join().unwrap();
}
}
}
}
リスト 20-23: Workerスレッドに join する前にsenderを明示的に破棄する
senderを破棄することで[1]、チャネルが閉じられ、これはもうメッセージが送信されないことを示します。そのとき、Workerインスタンスが無限ループで行うすべてのrecvの呼び出しはエラーを返します。リスト 20-24 では、その場合にループをグレースフルに終了するようにWorkerのループを変更します。これは、ThreadPoolのdrop実装がjoinを呼び出すときに、スレッドが終了することを意味します。
ファイル名:src/lib.rs
impl Worker {
fn new(
id: usize,
receiver: Arc<Mutex<mpsc::Receiver<Job>>>,
) -> Worker {
let thread = thread::spawn(move || loop {
let message = receiver.lock().unwrap().recv();
match message {
Ok(job) => {
println!(
"Worker {id} got a job; executing."
);
job();
}
Err(_) => {
println!(
"Worker {id} shutting down."
);
break;
}
}
});
Worker {
id,
thread: Some(thread),
}
}
}
リスト 20-24: recvがエラーを返すときに明示的にループから抜ける
このコードを動作させるには、mainを変更して、サーバーをグレースフルにシャットダウンする前に 2 つの要求のみを受け付けるようにします。これは、リスト 20-25 に示されています。
ファイル名:src/main.rs
fn main() {
let listener = TcpListener::bind("127.0.0.1:7878").unwrap();
let pool = ThreadPool::new(4);
for stream in listener.incoming().take(2) {
let stream = stream.unwrap();
pool.execute(|| {
handle_connection(stream);
});
}
println!("Shutting down.");
}
リスト 20-25: ループを抜けることで 2 つの要求を処理した後にサーバーをシャットダウンする
本当の世界のウェブサーバーは、2 つの要求を処理した後にシャットダウンすることは望ましくありません。このコードは、グレースフルなシャットダウンとクリーンアップが機能していることを示すだけです。
takeメソッドはIteratorトレイトに定義されており、反復処理を最大 2 つの最初の項目に制限します。ThreadPoolはmainの末尾でスコープ外になり、drop実装が実行されます。
cargo runでサーバーを起動し、3 つの要求を行ってみましょう。3 番目の要求はエラーになり、ターミナルには以下のような出力が表示されるはずです。
$ cargo run
Compiling hello v0.1.0 (file:///projects/hello)
Finished dev [unoptimized + debuginfo] target(s) in 1.0s
Running `target/debug/hello`
Worker 0 got a job; executing.
Shutting down.
Shutting down worker 0
Worker 3 got a job; executing.
Worker 1 disconnected; shutting down.
Worker 2 disconnected; shutting down.
Worker 3 disconnected; shutting down.
Worker 0 disconnected; shutting down.
Shutting down worker 1
Shutting down worker 2
Shutting down worker 3
Workerの ID と表示されるメッセージの順序は異なる場合があります。このコードがどのように機能するかは、メッセージからわかります。Workerインスタンス 0 と 3 が最初の 2 つの要求を受け取りました。2 番目の接続後、サーバーは接続を受け付けなくなり、ThreadPoolのDrop実装はWorker 3 が作業を開始する前に実行され始めます。senderを破棄することで、すべてのWorkerインスタンスが切断され、シャットダウンするように指示されます。Workerインスタンスはそれぞれ切断するときにメッセージを出力し、その後、スレッドプールは各Workerスレッドが終了するのを待つためにjoinを呼び出します。
この特定の実行の興味深い点に注意してください。ThreadPoolはsenderを破棄し、どのWorkerもエラーを受け取る前に、Worker 0 にjoinを試みました。Worker 0 はまだrecvからエラーを受け取っていなかったので、メインスレッドはWorker 0 が終了するのを待ってブロックされました。その間、Worker 3 はジョブを受け取り、その後すべてのスレッドはエラーを受け取りました。Worker 0 が終了すると、メインスレッドは残りのWorkerインスタンスが終了するのを待ちました。その時点で、すべてのインスタンスはループを抜け、停止しました。
おめでとうございます!これでプロジェクトが完了しました。非同期で応答するためにスレッドプールを使用する基本的なウェブサーバーができました。サーバーのグレースフルなシャットダウンができ、プール内のすべてのスレッドがクリーンアップされます。この章のコード全体をダウンロードするには、https://www.nostarch.com/Rust2021を参照してください。
ここではもっとできます!このプロジェクトをさらに強化したい場合は、以下のアイデアがあります。
ThreadPoolとその公開メソッドに追加のドキュメントを追加する。- ライブラリの機能のテストを追加する。
unwrapへの呼び出しを、より堅牢なエラー処理に変更する。- ウェブ要求を処理する以外のタスクを実行するために
ThreadPoolを使用する。 - https://crates.ioでスレッドプールのクレートを見つけ、そのクレートを使用して同様のウェブサーバーを実装する。その後、実装したスレッドプールとの API と堅牢性を比較する。
まとめ
おめでとうございます!グレースフルシャットダウンとクリーンアップの実験を完了しました。LabEx でさらに多くの実験を行って、スキルを向上させることができます。