Desenvolvimento de Servidor Multithread em Rust

Beginner

This tutorial is from open-source community. Access the source code

Introdução

Bem-vindo a Transformando Nosso Servidor de Thread Única em um Servidor Multithread. Este laboratório faz parte do Livro de Rust. Você pode praticar suas habilidades em Rust no LabEx.

Neste laboratório, transformaremos nosso servidor de thread única em um servidor multithread para melhorar sua eficiência no processamento de múltiplas requisições simultaneamente.

Transformando Nosso Servidor de Thread Única em um Servidor Multithread

Atualmente, o servidor processará cada requisição por vez, o que significa que ele não processará uma segunda conexão até que a primeira termine o processamento. Se o servidor receber cada vez mais requisições, essa execução serial se tornará cada vez menos otimizada. Se o servidor receber uma requisição que leva muito tempo para processar, as requisições subsequentes terão que esperar até que a requisição demorada seja finalizada, mesmo que as novas requisições possam ser processadas rapidamente. Precisaremos corrigir isso, mas primeiro analisaremos o problema em ação.

Simulando uma Requisição Lenta

Analisaremos como uma requisição de processamento lento pode afetar outras requisições feitas à nossa implementação atual do servidor. A Listagem 20-10 implementa o tratamento de uma requisição para /sleep com uma resposta lenta simulada que fará com que o servidor durma por cinco segundos antes de responder.

Nome do arquivo: src/main.rs

use std::{
    fs,
    io::{prelude::*, BufReader},
    net::{TcpListener, TcpStream},
    thread,
    time::Duration,
};
--snip--

fn handle_connection(mut stream: TcpStream) {
    --snip--

    let (status_line, filename) = 1 match &request_line[..] {
      2 "GET / HTTP/1.1" => ("HTTP/1.1 200 OK", "hello.html"),
      3 "GET /sleep HTTP/1.1" => {
            thread::sleep(Duration::from_secs(5));
            ("HTTP/1.1 200 OK", "hello.html")
        }
      4 _ => ("HTTP/1.1 404 NOT FOUND", "404.html"),
    };

    --snip--
}

Listagem 20-10: Simulando uma requisição lenta dormindo por cinco segundos

Mudamos de if para match agora que temos três casos [1]. Precisamos corresponder explicitamente a uma fatia de request_line para fazer a correspondência de padrão com os valores literais de string; match não faz referência e desreferenciação automáticas, como o método de igualdade faz.

O primeiro braço [2] é o mesmo que o bloco if da Listagem 20-9. O segundo braço [3] corresponde a uma requisição para /sleep. Quando essa requisição é recebida, o servidor dormirá por cinco segundos antes de renderizar a página HTML de sucesso. O terceiro braço [4] é o mesmo que o bloco else da Listagem 20-9.

Você pode ver como nosso servidor é primitivo: bibliotecas reais lidariam com o reconhecimento de múltiplas requisições de uma forma muito menos verbosa!

Inicie o servidor usando cargo run. Em seguida, abra duas janelas do navegador: uma para http://127.0.0.1:7878 e a outra para http://127.0.0.1:7878/sleep. Se você inserir o URI / algumas vezes, como antes, verá que ele responde rapidamente. Mas se você inserir /sleep e, em seguida, carregar /, verá que / espera até que sleep tenha dormido por seus cinco segundos completos antes de carregar.

Existem múltiplas técnicas que poderíamos usar para evitar que as requisições fiquem presas atrás de uma requisição lenta; a que implementaremos é um pool de threads (thread pool).

Melhorando a Vazão com um Pool de Threads

Um pool de threads é um grupo de threads geradas que estão esperando e prontas para lidar com uma tarefa. Quando o programa recebe uma nova tarefa, ele atribui uma das threads no pool à tarefa, e essa thread processará a tarefa. As threads restantes no pool estão disponíveis para lidar com quaisquer outras tarefas que chegarem enquanto a primeira thread está processando. Quando a primeira thread termina de processar sua tarefa, ela é retornada ao pool de threads ociosas, pronta para lidar com uma nova tarefa. Um pool de threads permite que você processe conexões simultaneamente, aumentando a vazão do seu servidor.

Limitaremos o número de threads no pool a um número pequeno para nos proteger de ataques DoS; se tivéssemos nosso programa criando uma nova thread para cada requisição que chegasse, alguém fazendo 10 milhões de requisições ao nosso servidor poderia causar estragos usando todos os recursos do nosso servidor e paralisando o processamento das requisições.

Em vez de gerar threads ilimitadas, então, teremos um número fixo de threads esperando no pool. As requisições que chegam são enviadas ao pool para processamento. O pool manterá uma fila de requisições recebidas. Cada uma das threads no pool retirará uma requisição dessa fila, lidará com a requisição e, em seguida, pedirá outra requisição à fila. Com este design, podemos processar até N requisições simultaneamente, onde N é o número de threads. Se cada thread estiver respondendo a uma requisição de longa duração, as requisições subsequentes ainda podem ficar presas na fila, mas aumentamos o número de requisições de longa duração que podemos lidar antes de chegar a esse ponto.

Esta técnica é apenas uma das muitas maneiras de melhorar a vazão de um servidor web. Outras opções que você pode explorar são o modelo fork/join, o modelo de I/O assíncrono de thread único e o modelo de I/O assíncrono multithreaded. Se você estiver interessado neste tópico, pode ler mais sobre outras soluções e tentar implementá-las; com uma linguagem de baixo nível como Rust, todas essas opções são possíveis.

Antes de começarmos a implementar um pool de threads, vamos falar sobre como o uso do pool deve ser. Quando você está tentando projetar código, escrever a interface do cliente primeiro pode ajudar a orientar seu design. Escreva a API do código para que ela seja estruturada da maneira que você deseja chamá-la; então, implemente a funcionalidade dentro dessa estrutura, em vez de implementar a funcionalidade e, em seguida, projetar a API pública.

Semelhante a como usamos o desenvolvimento orientado a testes no projeto no Capítulo 12, usaremos o desenvolvimento orientado a compilador aqui. Escreveremos o código que chama as funções que queremos e, em seguida, analisaremos os erros do compilador para determinar o que devemos mudar em seguida para fazer o código funcionar. Antes de fazermos isso, no entanto, exploraremos a técnica que não usaremos como ponto de partida.

Gerando uma Thread para Cada Requisição

Primeiramente, vamos explorar como nosso código pode parecer se ele criasse uma nova thread para cada conexão. Como mencionado anteriormente, este não é nosso plano final devido aos problemas com a potencial geração de um número ilimitado de threads, mas é um ponto de partida para obter um servidor multithreaded funcional primeiro. Então, adicionaremos o pool de threads como uma melhoria, e contrastar as duas soluções será mais fácil.

A Listagem 20-11 mostra as alterações a serem feitas em main para gerar uma nova thread para lidar com cada fluxo dentro do loop for.

Nome do arquivo: src/main.rs

fn main() {
    let listener = TcpListener::bind("127.0.0.1:7878").unwrap();

    for stream in listener.incoming() {
        let stream = stream.unwrap();

        thread::spawn(|| {
            handle_connection(stream);
        });
    }
}

Listagem 20-11: Gerando uma nova thread para cada fluxo

Como você aprendeu no Capítulo 16, thread::spawn criará uma nova thread e, em seguida, executará o código no closure na nova thread. Se você executar este código e carregar /sleep em seu navegador, e depois / em mais duas abas do navegador, você realmente verá que as requisições para / não precisam esperar /sleep terminar. No entanto, como mencionamos, isso acabará sobrecarregando o sistema porque você estaria criando novas threads sem nenhum limite.

Criando um Número Finito de Threads

Queremos que nosso pool de threads funcione de maneira semelhante e familiar, para que a mudança de threads para um pool de threads não exija grandes alterações no código que usa nossa API. A Listagem 20-12 mostra a interface hipotética para uma struct ThreadPool que queremos usar em vez de thread::spawn.

Nome do arquivo: src/main.rs

fn main() {
    let listener = TcpListener::bind("127.0.0.1:7878").unwrap();
  1 let pool = ThreadPool::new(4);

    for stream in listener.incoming() {
        let stream = stream.unwrap();

      2 pool.execute(|| {
            handle_connection(stream);
        });
    }
}

Listagem 20-12: Nossa interface ThreadPool ideal

Usamos ThreadPool::new para criar um novo pool de threads com um número configurável de threads, neste caso quatro [1]. Então, no loop for, pool.execute tem uma interface semelhante a thread::spawn, pois recebe um closure que o pool deve executar para cada fluxo [2]. Precisamos implementar pool.execute para que ele receba o closure e o dê a uma thread no pool para executar. Este código ainda não compilará, mas tentaremos para que o compilador possa nos guiar em como corrigi-lo.

Construindo um ThreadPool Usando Desenvolvimento Orientado pelo Compilador

Faça as alterações na Listagem 20-12 em src/main.rs e, em seguida, vamos usar os erros do compilador de cargo check para orientar nosso desenvolvimento. Aqui está o primeiro erro que obtemos:

$ cargo check
    Checking hello v0.1.0 (file:///projects/hello)
error[E0433]: failed to resolve: use of undeclared type `ThreadPool`
  --> src/main.rs:11:16
   |
11 |     let pool = ThreadPool::new(4);
   |                ^^^^^^^^^^ use of undeclared type `ThreadPool`

Ótimo! Este erro nos diz que precisamos de um tipo ou módulo ThreadPool, então vamos construir um agora. Nossa implementação ThreadPool será independente do tipo de trabalho que nosso servidor web está fazendo. Então, vamos mudar o crate hello de um crate binário para um crate de biblioteca para conter nossa implementação ThreadPool. Depois de mudarmos para um crate de biblioteca, também poderíamos usar a biblioteca de pool de threads separada para qualquer trabalho que quisermos fazer usando um pool de threads, não apenas para servir requisições web.

Crie um arquivo src/lib.rs que contenha o seguinte, que é a definição mais simples de uma struct ThreadPool que podemos ter por enquanto:

Nome do arquivo: src/lib.rs

pub struct ThreadPool;

Em seguida, edite o arquivo main.rs para trazer ThreadPool para o escopo do crate da biblioteca, adicionando o seguinte código ao topo de src/main.rs:

Nome do arquivo: src/main.rs

use hello::ThreadPool;

Este código ainda não funcionará, mas vamos verificá-lo novamente para obter o próximo erro que precisamos abordar:

$ cargo check
    Checking hello v0.1.0 (file:///projects/hello)
error[E0599]: no function or associated item named `new` found for struct
`ThreadPool` in the current scope
  --> src/main.rs:12:28
   |
12 |     let pool = ThreadPool::new(4);
   |                            ^^^ function or associated item not found in
`ThreadPool`

Este erro indica que, em seguida, precisamos criar uma função associada chamada new para ThreadPool. Também sabemos que new precisa ter um parâmetro que possa aceitar 4 como um argumento e deve retornar uma instância ThreadPool. Vamos implementar a função new mais simples que terá essas características:

Nome do arquivo: src/lib.rs

pub struct ThreadPool;

impl ThreadPool {
    pub fn new(size: usize) -> ThreadPool {
        ThreadPool
    }
}

Escolhemos usize como o tipo do parâmetro size porque sabemos que um número negativo de threads não faz sentido. Também sabemos que usaremos este 4 como o número de elementos em uma coleção de threads, que é para o que o tipo usize serve, como discutido em "Tipos Inteiros".

Vamos verificar o código novamente:

$ cargo check
    Checking hello v0.1.0 (file:///projects/hello)
error[E0599]: no method named `execute` found for struct `ThreadPool` in the
current scope
  --> src/main.rs:17:14
   |
17 |         pool.execute(|| {
   |              ^^^^^^^ method not found in `ThreadPool`

Agora o erro ocorre porque não temos um método execute em ThreadPool. Lembre-se de "Criando um Número Finito de Threads" que decidimos que nosso pool de threads deveria ter uma interface semelhante a thread::spawn. Além disso, implementaremos a função execute para que ela receba o closure que lhe é dado e o dê a uma thread ociosa no pool para executar.

Definiremos o método execute em ThreadPool para receber um closure como um parâmetro. Lembre-se de "Movendo Valores Capturados de Closures e os Traits Fn" que podemos receber closures como parâmetros com três traits diferentes: Fn, FnMut e FnOnce. Precisamos decidir qual tipo de closure usar aqui. Sabemos que acabaremos fazendo algo semelhante à implementação da biblioteca padrão thread::spawn, então podemos olhar para quais limites a assinatura de thread::spawn tem em seu parâmetro. A documentação nos mostra o seguinte:

pub fn spawn<F, T>(f: F) -> JoinHandle<T>
    where
        F: FnOnce() -> T,
        F: Send + 'static,
        T: Send + 'static,

O parâmetro de tipo F é aquele com o qual estamos preocupados aqui; o parâmetro de tipo T está relacionado ao valor de retorno, e não estamos preocupados com isso. Podemos ver que spawn usa FnOnce como o limite de trait em F. Isso é provavelmente o que queremos também, porque eventualmente passaremos o argumento que recebemos em execute para spawn. Podemos ter ainda mais confiança de que FnOnce é o trait que queremos usar porque a thread para executar uma requisição só executará o closure dessa requisição uma vez, o que corresponde ao Once em FnOnce.

O parâmetro de tipo F também tem o limite de trait Send e o limite de tempo de vida 'static, que são úteis em nossa situação: precisamos de Send para transferir o closure de uma thread para outra e 'static porque não sabemos quanto tempo a thread levará para executar. Vamos criar um método execute em ThreadPool que receberá um parâmetro genérico do tipo F com esses limites:

Nome do arquivo: src/lib.rs

impl ThreadPool {
    --snip--
    pub fn execute<F>(&self, f: F)
    where
        F: FnOnce() 1 + Send + 'static,
    {
    }
}

Ainda usamos o () após FnOnce [1] porque este FnOnce representa um closure que não recebe parâmetros e retorna o tipo unit (). Assim como as definições de função, o tipo de retorno pode ser omitido da assinatura, mas mesmo que não tenhamos parâmetros, ainda precisamos dos parênteses.

Novamente, esta é a implementação mais simples do método execute: ele não faz nada, mas estamos apenas tentando fazer nosso código compilar. Vamos verificá-lo novamente:

$ cargo check
    Checking hello v0.1.0 (file:///projects/hello)
    Finished dev [unoptimized + debuginfo] target(s) in 0.24s

Ele compila! Mas observe que, se você tentar cargo run e fizer uma requisição no navegador, verá os erros no navegador que vimos no início do capítulo. Nossa biblioteca ainda não está chamando o closure passado para execute!

Nota: Um ditado que você pode ouvir sobre linguagens com compiladores rigorosos, como Haskell e Rust, é "se o código compila, ele funciona". Mas este ditado não é universalmente verdadeiro. Nosso projeto compila, mas não faz absolutamente nada! Se estivéssemos construindo um projeto real e completo, este seria um bom momento para começar a escrever testes unitários para verificar se o código compila e tem o comportamento que queremos.

Validando o Número de Threads em new

Não estamos fazendo nada com os parâmetros para new e execute. Vamos implementar os corpos dessas funções com o comportamento que queremos. Para começar, vamos pensar em new. Anteriormente, escolhemos um tipo sem sinal para o parâmetro size porque um pool com um número negativo de threads não faz sentido. No entanto, um pool com zero threads também não faz sentido, mas zero é um usize perfeitamente válido. Adicionaremos código para verificar se size é maior que zero antes de retornar uma instância ThreadPool e fazer com que o programa entre em pânico se receber zero, usando a macro assert!, conforme mostrado na Listagem 20-13.

Nome do arquivo: src/lib.rs

impl ThreadPool {
    /// Cria um novo ThreadPool.
    ///
    /// O tamanho é o número de threads no pool.
    ///
  1 /// ## Panics
    ///
    /// A função `new` entrará em pânico se o tamanho for zero.
    pub fn new(size: usize) -> ThreadPool {
      2 assert!(size > 0);

        ThreadPool
    }

    --snip--
}

Listagem 20-13: Implementando ThreadPool::new para entrar em pânico se size for zero

Também adicionamos alguma documentação para nosso ThreadPool com comentários de documentação. Observe que seguimos as boas práticas de documentação adicionando uma seção que destaca as situações em que nossa função pode entrar em pânico [1], conforme discutido no Capítulo 14. Tente executar cargo doc --open e clicar na struct ThreadPool para ver como a documentação gerada para new se parece!

Em vez de adicionar a macro assert! como fizemos aqui [2], poderíamos mudar new para build e retornar um Result como fizemos com Config::build no projeto I/O na Listagem 12-9. Mas decidimos neste caso que tentar criar um pool de threads sem nenhuma thread deve ser um erro irrecuperável. Se você estiver se sentindo ambicioso, tente escrever uma função chamada build com a seguinte assinatura para comparar com a função new:

pub fn build(
    size: usize
) -> Result<ThreadPool, PoolCreationError> {

Criando Espaço para Armazenar as Threads

Agora que temos uma maneira de saber que temos um número válido de threads para armazenar no pool, podemos criar essas threads e armazená-las na struct ThreadPool antes de retornar a struct. Mas como "armazenamos" uma thread? Vamos dar outra olhada na assinatura de thread::spawn:

pub fn spawn<F, T>(f: F) -> JoinHandle<T>
    where
        F: FnOnce() -> T,
        F: Send + 'static,
        T: Send + 'static,

A função spawn retorna um JoinHandle<T>, onde T é o tipo que o closure retorna. Vamos tentar usar JoinHandle também e ver o que acontece. Em nosso caso, os closures que estamos passando para o pool de threads lidarão com a conexão e não retornarão nada, então T será o tipo unitário ().

O código na Listagem 20-14 compilará, mas ainda não cria nenhuma thread. Mudamos a definição de ThreadPool para conter um vetor de instâncias thread::JoinHandle<()>, inicializamos o vetor com uma capacidade de size, configuramos um loop for que executará algum código para criar as threads e retornamos uma instância ThreadPool contendo-as.

Nome do arquivo: src/lib.rs

1 use std::thread;

pub struct ThreadPool {
  2 threads: Vec<thread::JoinHandle<()>>,
}

impl ThreadPool {
    --snip--
    pub fn new(size: usize) -> ThreadPool {
        assert!(size > 0);

      3 let mut threads = Vec::with_capacity(size);

        for _ in 0..size {
            // create some threads and store them in the vector
        }

        ThreadPool { threads }
    }
    --snip--
}

Listagem 20-14: Criando um vetor para ThreadPool conter as threads

Trouxemos std::thread para o escopo no crate da biblioteca [1] porque estamos usando thread::JoinHandle como o tipo dos itens no vetor em ThreadPool [2].

Depois que um tamanho válido é recebido, nosso ThreadPool cria um novo vetor que pode conter size itens [3]. A função with_capacity executa a mesma tarefa que Vec::new, mas com uma diferença importante: ela pré-aloca espaço no vetor. Como sabemos que precisamos armazenar size elementos no vetor, fazer essa alocação antecipadamente é um pouco mais eficiente do que usar Vec::new, que redimensiona a si mesmo à medida que os elementos são inseridos.

Quando você executar cargo check novamente, ele deverá ser bem-sucedido.

Enviando Código do ThreadPool para uma Thread

Deixamos um comentário no loop for na Listagem 20-14 sobre a criação de threads. Aqui, veremos como realmente criamos threads. A biblioteca padrão fornece thread::spawn como uma maneira de criar threads, e thread::spawn espera receber algum código que a thread deve executar assim que a thread é criada. No entanto, em nosso caso, queremos criar as threads e fazê-las esperar por código que enviaremos mais tarde. A implementação de threads da biblioteca padrão não inclui nenhuma maneira de fazer isso; temos que implementá-lo manualmente.

Implementaremos esse comportamento introduzindo uma nova estrutura de dados entre o ThreadPool e as threads que gerenciará esse novo comportamento. Chamaremos essa estrutura de dados de Worker (Trabalhador), que é um termo comum em implementações de pooling. O Worker pega o código que precisa ser executado e executa o código em sua thread.

Pense nas pessoas trabalhando na cozinha de um restaurante: os trabalhadores esperam até que os pedidos cheguem dos clientes e, em seguida, são responsáveis por pegar esses pedidos e preenchê-los.

Em vez de armazenar um vetor de instâncias JoinHandle<()> no pool de threads, armazenaremos instâncias da struct Worker. Cada Worker armazenará uma única instância JoinHandle<()>. Em seguida, implementaremos um método em Worker que receberá um closure de código para executar e enviá-lo para a thread já em execução para execução. Também daremos a cada Worker um id para que possamos distinguir entre as diferentes instâncias de Worker no pool ao registrar ou depurar.

Aqui está o novo processo que acontecerá quando criarmos um ThreadPool. Implementaremos o código que envia o closure para a thread depois de configurarmos o Worker dessa maneira:

  1. Definir uma struct Worker que contém um id e um JoinHandle<()>.
  2. Mudar ThreadPool para conter um vetor de instâncias Worker.
  3. Definir uma função Worker::new que recebe um número id e retorna uma instância Worker que contém o id e uma thread gerada com um closure vazio.
  4. Em ThreadPool::new, use o contador do loop for para gerar um id, criar um novo Worker com esse id e armazenar o Worker no vetor.

Se você está pronto para um desafio, tente implementar essas alterações por conta própria antes de olhar o código na Listagem 20-15.

Pronto? Aqui está a Listagem 20-15 com uma maneira de fazer as modificações anteriores.

Nome do arquivo: src/lib.rs

use std::thread;

pub struct ThreadPool {
  1 workers: Vec<Worker>,
}

impl ThreadPool {
    --snip--
    pub fn new(size: usize) -> ThreadPool {
        assert!(size > 0);

        let mut workers = Vec::with_capacity(size);

      2 for id in 0..size {
          3 workers.push(Worker::new(id));
        }

        ThreadPool { workers }
    }
    --snip--
}

4 struct Worker {
    id: usize,
    thread: thread::JoinHandle<()>,
}

impl Worker {
  5 fn new(id: usize) -> Worker {
      6 let thread = thread::spawn(|| {});

        Worker { 7 id, 8 thread }
    }
}

Listagem 20-15: Modificando ThreadPool para conter instâncias Worker em vez de conter threads diretamente

Mudamos o nome do campo em ThreadPool de threads para workers porque agora ele contém instâncias Worker em vez de instâncias JoinHandle<()> [1]. Usamos o contador no loop for [2] como um argumento para Worker::new e armazenamos cada novo Worker no vetor chamado workers [3].

O código externo (como nosso servidor em src/main.rs) não precisa saber os detalhes da implementação sobre o uso de uma struct Worker dentro de ThreadPool, então tornamos a struct Worker [4] e sua função new [5] privadas. A função Worker::new usa o id que damos a ela [7] e armazena uma instância JoinHandle<()> [8] que é criada gerando uma nova thread usando um closure vazio [6].

Nota: Se o sistema operacional não puder criar uma thread porque não há recursos de sistema suficientes, thread::spawn entrará em pânico. Isso fará com que todo o nosso servidor entre em pânico, mesmo que a criação de algumas threads possa ser bem-sucedida. Para simplificar, esse comportamento é aceitável, mas em uma implementação de pool de threads de produção, você provavelmente gostaria de usar std::thread::Builder e seu método spawn que retorna Result em vez disso.

Este código compilará e armazenará o número de instâncias Worker que especificamos como um argumento para ThreadPool::new. Mas ainda não estamos processando o closure que recebemos em execute. Vamos ver como fazer isso a seguir.

Enviando Requisições para Threads via Canais

O próximo problema que abordaremos é que os closures fornecidos para thread::spawn não fazem absolutamente nada. Atualmente, obtemos o closure que queremos executar no método execute. Mas precisamos dar a thread::spawn um closure para executar quando criamos cada Worker durante a criação do ThreadPool.

Queremos que as structs Worker que acabamos de criar busquem o código a ser executado de uma fila mantida no ThreadPool e enviem esse código para sua thread para execução.

Os canais sobre os quais aprendemos no Capítulo 16 - uma maneira simples de se comunicar entre duas threads - seriam perfeitos para este caso de uso. Usaremos um canal para funcionar como a fila de tarefas, e execute enviará uma tarefa do ThreadPool para as instâncias Worker, que enviarão a tarefa para sua thread. Aqui está o plano:

  1. O ThreadPool criará um canal e manterá o remetente.
  2. Cada Worker manterá o receptor.
  3. Criaremos uma nova struct Job que conterá os closures que queremos enviar pelo canal.
  4. O método execute enviará a tarefa que deseja executar através do remetente.
  5. Em sua thread, o Worker fará um loop sobre seu receptor e executará os closures de quaisquer tarefas que receber.

Vamos começar criando um canal em ThreadPool::new e mantendo o remetente na instância ThreadPool, conforme mostrado na Listagem 20-16. A struct Job não contém nada por enquanto, mas será o tipo de item que estamos enviando pelo canal.

Nome do arquivo: src/lib.rs

use std::{sync::mpsc, thread};

pub struct ThreadPool {
    workers: Vec<Worker>,
    sender: mpsc::Sender<Job>,
}

struct Job;

impl ThreadPool {
    --snip--
    pub fn new(size: usize) -> ThreadPool {
        assert!(size > 0);

      1 let (sender, receiver) = mpsc::channel();

        let mut workers = Vec::with_capacity(size);

        for id in 0..size {
            workers.push(Worker::new(id));
        }

        ThreadPool { workers, 2 sender }
    }
    --snip--
}

Listagem 20-16: Modificando ThreadPool para armazenar o remetente de um canal que transmite instâncias Job

Em ThreadPool::new, criamos nosso novo canal [1] e fazemos com que o pool mantenha o remetente [2]. Isso compilará com sucesso.

Vamos tentar passar um receptor do canal para cada Worker enquanto o pool de threads cria o canal. Sabemos que queremos usar o receptor na thread que as instâncias Worker geram, então faremos referência ao parâmetro receiver no closure. O código na Listagem 20-17 ainda não compilará.

Nome do arquivo: src/lib.rs

impl ThreadPool {
    --snip--
    pub fn new(size: usize) -> ThreadPool {
        assert!(size > 0);

        let (sender, receiver) = mpsc::channel();

        let mut workers = Vec::with_capacity(size);

        for id in 0..size {
          1 workers.push(Worker::new(id, receiver));
        }

        ThreadPool { workers, sender }
    }
    --snip--
}

--snip--

impl Worker {
    fn new(id: usize, receiver: mpsc::Receiver<Job>) -> Worker {
        let thread = thread::spawn(|| {
          2 receiver;
        });

        Worker { id, thread }
    }
}

Listagem 20-17: Passando o receptor para cada Worker

Fizemos algumas pequenas e simples alterações: passamos o receptor para Worker::new [1] e, em seguida, o usamos dentro do closure [2].

Quando tentamos verificar este código, obtemos este erro:

$ cargo check
    Checking hello v0.1.0 (file:///projects/hello)
error[E0382]: use of moved value: `receiver`
  --> src/lib.rs:26:42
   |
21 |         let (sender, receiver) = mpsc::channel();
   |                      -------- move occurs because `receiver` has type
`std::sync::mpsc::Receiver<Job>`, which does not implement the `Copy` trait
...
26 |             workers.push(Worker::new(id, receiver));
   |                                          ^^^^^^^^ value moved here, in
previous iteration of loop

O código está tentando passar receiver para várias instâncias Worker. Isso não funcionará, como você se lembrará do Capítulo 16: a implementação de canal que o Rust fornece é de múltiplos produtores, único consumidor. Isso significa que não podemos simplesmente clonar a extremidade consumidora do canal para corrigir este código. Também não queremos enviar uma mensagem várias vezes para vários consumidores; queremos uma lista de mensagens com várias instâncias Worker de modo que cada mensagem seja processada uma vez.

Além disso, tirar uma tarefa da fila do canal envolve a mutação do receiver, então as threads precisam de uma maneira segura de compartilhar e modificar o receiver; caso contrário, podemos obter condições de corrida (conforme abordado no Capítulo 16).

Lembre-se dos ponteiros inteligentes thread-safe discutidos no Capítulo 16: para compartilhar a propriedade em várias threads e permitir que as threads mutem o valor, precisamos usar Arc<Mutex<T>>. O tipo Arc permitirá que várias instâncias Worker possuam o receptor, e Mutex garantirá que apenas um Worker obtenha uma tarefa do receptor por vez. A Listagem 20-18 mostra as alterações que precisamos fazer.

Nome do arquivo: src/lib.rs

use std::{
    sync::{mpsc, Arc, Mutex},
    thread,
};
--snip--

impl ThreadPool {
    --snip--
    pub fn new(size: usize) -> ThreadPool {
        assert!(size > 0);

        let (sender, receiver) = mpsc::channel();

      1 let receiver = Arc::new(Mutex::new(receiver));

        let mut workers = Vec::with_capacity(size);

        for id in 0..size {
            workers.push(
                Worker::new(id, Arc::clone(& 2 receiver))
            );
        }

        ThreadPool { workers, sender }
    }

    --snip--
}

--snip--

impl Worker {
    fn new(
        id: usize,
        receiver: Arc<Mutex<mpsc::Receiver<Job>>>,
    ) -> Worker {
        --snip--
    }
}

Listagem 20-18: Compartilhando o receptor entre as instâncias Worker usando Arc e Mutex

Em ThreadPool::new, colocamos o receptor em um Arc e um Mutex [1]. Para cada novo Worker, clonamos o Arc para aumentar a contagem de referência para que as instâncias Worker possam compartilhar a propriedade do receptor [2].

Com essas alterações, o código compila! Estamos chegando lá!

Implementando o Método execute

Vamos finalmente implementar o método execute em ThreadPool. Também mudaremos Job de uma struct para um alias de tipo para um objeto trait que contém o tipo de closure que execute recebe. Como discutido em "Criando Sinônimos de Tipo com Aliases de Tipo", aliases de tipo nos permitem tornar tipos longos mais curtos para facilitar o uso. Veja a Listagem 20-19.

Nome do arquivo: src/lib.rs

--snip--

type Job = Box<dyn FnOnce() + Send + 'static>;

impl ThreadPool {
    --snip--

    pub fn execute<F>(&self, f: F)
    where
        F: FnOnce() + Send + 'static,
    {
      1 let job = Box::new(f);

      2 self.sender.send(job).unwrap();
    }
}

--snip--

Listagem 20-19: Criando um alias de tipo Job para um Box que contém cada closure e, em seguida, enviando a tarefa pelo canal

Depois de criar uma nova instância Job usando o closure que obtemos em execute [1], enviamos essa tarefa pela extremidade de envio do canal [2]. Estamos chamando unwrap em send para o caso de falha no envio. Isso pode acontecer se, por exemplo, pararmos todas as nossas threads de execução, o que significa que a extremidade receptora parou de receber novas mensagens. No momento, não podemos impedir que nossas threads sejam executadas: nossas threads continuam executando enquanto o pool existir. A razão pela qual usamos unwrap é que sabemos que o caso de falha não acontecerá, mas o compilador não sabe disso.

Mas ainda não terminamos! No Worker, nosso closure sendo passado para thread::spawn ainda apenas referencia a extremidade receptora do canal. Em vez disso, precisamos que o closure faça um loop para sempre, pedindo à extremidade receptora do canal uma tarefa e executando a tarefa quando a recebe. Vamos fazer a alteração mostrada na Listagem 20-20 para Worker::new.

Nome do arquivo: src/lib.rs

--snip--

impl Worker {
    fn new(
        id: usize,
        receiver: Arc<Mutex<mpsc::Receiver<Job>>>,
    ) -> Worker {
        let thread = thread::spawn(move || loop {
            let job = receiver
              1 .lock()
              2 .unwrap()
              3 .recv()
              4 .unwrap();

            println!("Worker {id} got a job; executing.");

            job();
        });

        Worker { id, thread }
    }
}

Listagem 20-20: Recebendo e executando as tarefas na thread da instância Worker

Aqui, primeiro chamamos lock no receiver para adquirir o mutex [1], e então chamamos unwrap para entrar em pânico em quaisquer erros [2]. Adquirir um bloqueio pode falhar se o mutex estiver em um estado envenenado, o que pode acontecer se alguma outra thread entrar em pânico enquanto mantém o bloqueio em vez de liberar o bloqueio. Nessa situação, chamar unwrap para fazer com que essa thread entre em pânico é a ação correta a ser tomada. Sinta-se à vontade para alterar este unwrap para um expect com uma mensagem de erro que seja significativa para você.

Se obtivermos o bloqueio no mutex, chamamos recv para receber um Job do canal [3]. Um unwrap final também passa por quaisquer erros aqui [4], o que pode ocorrer se a thread que mantém o remetente foi desligada, semelhante a como o método send retorna Err se o receptor for desligado.

A chamada para recv bloqueia, então, se ainda não houver uma tarefa, a thread atual esperará até que uma tarefa se torne disponível. O Mutex<T> garante que apenas uma thread Worker por vez esteja tentando solicitar uma tarefa.

Nosso pool de threads agora está em um estado de funcionamento! Dê um cargo run e faça algumas solicitações:

[object Object]

Sucesso! Agora temos um pool de threads que executa conexões de forma assíncrona. Nunca há mais de quatro threads criadas, então nosso sistema não ficará sobrecarregado se o servidor receber muitas solicitações. Se fizermos uma solicitação para /sleep, o servidor poderá atender outras solicitações fazendo com que outra thread as execute.

Nota: Se você abrir /sleep em várias janelas do navegador simultaneamente, elas poderão carregar uma de cada vez em intervalos de cinco segundos. Alguns navegadores da web executam várias instâncias da mesma solicitação sequencialmente por motivos de cache. Essa limitação não é causada pelo nosso servidor web.

Depois de aprender sobre o loop while let no Capítulo 18, você pode estar se perguntando por que não escrevemos o código da thread Worker conforme mostrado na Listagem 20-21.

Nome do arquivo: src/lib.rs

--snip--

impl Worker {
    fn new(
        id: usize,
        receiver: Arc<Mutex<mpsc::Receiver<Job>>>,
    ) -> Worker {
        let thread = thread::spawn(move || {
            while let Ok(job) = receiver.lock().unwrap().recv() {
                println!("Worker {id} got a job; executing.");

                job();
            }
        });

        Worker { id, thread }
    }
}

Listagem 20-21: Uma implementação alternativa de Worker::new usando while let

Este código compila e executa, mas não resulta no comportamento de thread desejado: uma solicitação lenta ainda fará com que outras solicitações esperem para serem processadas. A razão é um tanto sutil: a struct Mutex não possui um método unlock público porque a propriedade do bloqueio é baseada no tempo de vida do MutexGuard<T> dentro do LockResult<MutexGuard<T>> que o método lock retorna. No tempo de compilação, o verificador de empréstimo pode então impor a regra de que um recurso protegido por um Mutex não pode ser acessado a menos que tenhamos o bloqueio. No entanto, essa implementação também pode resultar no bloqueio sendo mantido por mais tempo do que o pretendido se não estivermos atentos ao tempo de vida do MutexGuard<T>.

O código na Listagem 20-20 que usa let job = receiver.lock().unwrap().recv().unwrap(); funciona porque com let, quaisquer valores temporários usados na expressão do lado direito do sinal de igual são descartados imediatamente quando a instrução let termina. No entanto, while let (e if let e match) não descarta valores temporários até o final do bloco associado. Na Listagem 20-21, o bloqueio permanece mantido durante a chamada para job(), o que significa que outras instâncias Worker não podem receber tarefas.

Resumo

Parabéns! Você concluiu o laboratório Transformando Nosso Servidor de Thread Única em um Servidor Multithread. Você pode praticar mais laboratórios no LabEx para aprimorar suas habilidades.