Aprenda sobre Geradores Gerenciados

Beginner

This tutorial is from open-source community. Access the source code

Introdução

Neste laboratório, você aprenderá sobre geradores gerenciados e entenderá como impulsioná-los de maneiras incomuns. Você também construirá um agendador de tarefas simples e criará um servidor de rede usando geradores.

Uma função geradora em Python requer código externo para ser executada. Por exemplo, um gerador de iteração só é executado quando iterado com um loop for, e corrotinas precisam que seu método send() seja chamado. Neste laboratório, exploraremos exemplos práticos de como impulsionar geradores em aplicações avançadas. Os arquivos criados durante este laboratório são multitask.py e server.py.

Este é um Lab Guiado, que fornece instruções passo a passo para ajudá-lo a aprender e praticar. Siga as instruções cuidadosamente para completar cada etapa e ganhar experiência prática. Dados históricos mostram que este é um laboratório de nível iniciante com uma taxa de conclusão de 84%. Recebeu uma taxa de avaliações positivas de 80% dos estudantes.

Entendendo os Geradores Python

Vamos começar revisando o que são geradores em Python. Em Python, geradores são um tipo especial de função. Eles são diferentes das funções regulares. Quando você chama uma função regular, ela é executada do início ao fim e retorna um único valor. No entanto, uma função geradora retorna um iterador, que é um objeto que podemos iterar, o que significa que podemos acessar seus valores um por um.

Geradores usam a instrução yield para retornar valores. Em vez de retornar todos os valores de uma vez, como uma função regular, um gerador retorna valores um de cada vez. Após produzir um valor (yielding), o gerador suspende sua execução. Na próxima vez que pedimos um valor, ele retoma a execução de onde parou.

Criando um Gerador Simples

Agora, vamos criar um gerador simples. No WebIDE, você precisa criar um novo arquivo. Este arquivo conterá o código para nosso gerador. Nomeie o arquivo generator_demo.py e coloque-o no diretório /home/labex/project. Aqui está o conteúdo que você deve colocar no arquivo:

## Generator function that counts down from n
def countdown(n):
    print(f"Starting countdown from {n}")
    while n > 0:
        yield n
        n -= 1
    print("Countdown complete!")

## Create a generator object
counter = countdown(5)

## Drive the generator manually
print(next(counter))  ## 5
print(next(counter))  ## 4
print(next(counter))  ## 3

## Iterate through remaining values
for value in counter:
    print(value)  ## 2, 1

Neste código, primeiro definimos uma função geradora chamada countdown. Esta função recebe um número n como argumento e conta regressivamente de n a 1. Dentro da função, usamos um loop while para decrementar n e produzir (yield) cada valor. Quando chamamos countdown(5), ele cria um objeto gerador chamado counter.

Em seguida, usamos a função next() para obter manualmente valores do gerador. Cada vez que chamamos next(counter), o gerador retoma a execução de onde parou e produz o próximo valor. Depois de obter manualmente três valores, usamos um loop for para iterar pelos valores restantes no gerador.

Para executar este código, abra o terminal e execute o seguinte comando:

python3 /home/labex/project/generator_demo.py

Quando você executar o código, deverá ver a seguinte saída:

Starting countdown from 5
5
4
3
2
1
Countdown complete!

Vamos notar como a função geradora se comporta:

  1. A função geradora inicia sua execução quando chamamos next(counter) pela primeira vez. Antes disso, a função é apenas definida e nenhuma contagem regressiva real foi iniciada.
  2. Ela pausa em cada instrução yield. Após produzir um valor, ela para e espera pela próxima chamada para next().
  3. Quando chamamos next() novamente, ela continua de onde parou. Por exemplo, após produzir 5, ela lembra o estado e continua a decrementar n e produzir o próximo valor.
  4. A função geradora completa sua execução após o último valor ser produzido. Em nosso caso, após produzir 1, ela imprime "Countdown complete!".

Essa capacidade de pausar e retomar a execução é o que torna os geradores poderosos. É muito útil para tarefas como agendamento de tarefas e programação assíncrona, onde precisamos realizar várias tarefas de forma eficiente sem bloquear a execução de outras tarefas.

Criando um Agendador de Tarefas com Geradores

Em programação, um agendador de tarefas é uma ferramenta crucial que ajuda a gerenciar e executar múltiplas tarefas de forma eficiente. Nesta seção, usaremos geradores para construir um agendador de tarefas simples que pode executar múltiplas funções geradoras simultaneamente. Isso mostrará como os geradores podem ser gerenciados para realizar multitarefa cooperativa, o que significa que as tarefas se revezam para executar e compartilhar o tempo de execução.

Primeiro, você precisa criar um novo arquivo. Navegue até o diretório /home/labex/project e crie um arquivo chamado multitask.py. Este arquivo conterá o código para nosso agendador de tarefas.

## multitask.py

from collections import deque

## Task queue
tasks = deque()

## Simple task scheduler
def run():
    while tasks:
        task = tasks.popleft()  ## Get the next task
        try:
            task.send(None)     ## Resume the task
            tasks.append(task)  ## Put it back in the queue
        except StopIteration:
            print('Task done')  ## Task is complete

## Example task 1: Countdown
def countdown(n):
    while n > 0:
        print('T-minus', n)
        yield              ## Pause execution
        n -= 1

## Example task 2: Count up
def countup(n):
    x = 0
    while x < n:
        print('Up we go', x)
        yield              ## Pause execution
        x += 1

Agora, vamos detalhar como este agendador de tarefas funciona:

  1. Usamos um deque (fila de duas extremidades) para armazenar nossas tarefas geradoras. Um deque é uma estrutura de dados que permite adicionar e remover elementos de ambas as extremidades de forma eficiente. É uma ótima escolha para nossa fila de tarefas porque precisamos adicionar tarefas ao final e removê-las do início.
  2. A função run() é o coração do nosso agendador de tarefas. Ele pega tarefas da fila uma por uma:
    • Ele retoma cada tarefa usando send(None). Isso é semelhante a usar next() em um gerador. Ele diz ao gerador para continuar a execução de onde parou.
    • Após a tarefa produzir (yield), ela é adicionada de volta ao final da fila. Dessa forma, a tarefa terá outra chance de ser executada mais tarde.
    • Quando uma tarefa é concluída (levanta StopIteration), ela é removida da fila. Isso indica que a tarefa terminou sua execução.
  3. Cada instrução yield em nossas tarefas geradoras age como um ponto de pausa. Quando um gerador atinge uma instrução yield, ele pausa sua execução e devolve o controle ao agendador. Isso permite que outras tarefas sejam executadas.

Essa abordagem implementa multitarefa cooperativa. Cada tarefa voluntariamente cede o controle de volta ao agendador, permitindo que outras tarefas sejam executadas. Dessa forma, múltiplas tarefas podem compartilhar o tempo de execução e serem executadas simultaneamente.

Testando Nosso Agendador de Tarefas

Agora, vamos adicionar um teste ao nosso arquivo multitask.py. O objetivo deste teste é executar múltiplas tarefas ao mesmo tempo, o que é conhecido como execução concorrente. A execução concorrente permite que diferentes tarefas progridam aparentemente ao mesmo tempo, embora em um ambiente de thread único, as tarefas realmente se revezem para executar.

Para realizar este teste, adicione o seguinte código ao final do arquivo multitask.py:

## Test our scheduler
if __name__ == '__main__':
    ## Add tasks to the queue
    tasks.append(countdown(10))  ## Count down from 10
    tasks.append(countdown(5))   ## Count down from 5
    tasks.append(countup(20))    ## Count up to 20

    ## Run all tasks
    run()

Neste código, primeiro verificamos se o script está sendo executado diretamente usando if __name__ == '__main__':. Em seguida, adicionamos três tarefas diferentes à fila tasks. As tarefas countdown contarão regressivamente a partir dos números fornecidos, e a tarefa countup contará até o número especificado. Finalmente, chamamos a função run() para começar a executar essas tarefas.

Após adicionar o código, execute-o com o seguinte comando no terminal:

python3 /home/labex/project/multitask.py

Quando você executar o código, deverá ver uma saída semelhante a esta (a ordem exata das linhas pode variar):

T-minus 10
T-minus 5
Up we go 0
T-minus 9
T-minus 4
Up we go 1
T-minus 8
T-minus 3
Up we go 2
...

Observe como a saída das diferentes tarefas é misturada. Esta é uma clara indicação de que nosso agendador está executando todas as três tarefas concorrentemente. Cada vez que uma tarefa atinge uma instrução yield, o agendador pausa essa tarefa e muda para outra, permitindo que todas as tarefas progridam ao longo do tempo.

Como Funciona

Vamos dar uma olhada mais de perto no que acontece quando nosso agendador é executado:

  1. Primeiro, adicionamos três tarefas geradoras à fila: countdown(10), countdown(5) e countup(20). Essas tarefas geradoras são funções especiais que podem pausar e retomar sua execução nas instruções yield.
  2. Em seguida, a função run() inicia seu trabalho:
    • Ele pega a primeira tarefa, countdown(10), da fila.
    • Ele executa esta tarefa até que ela atinja uma instrução yield. Quando atinge o yield, ele imprime "T-minus 10".
    • Depois disso, ele adiciona a tarefa countdown(10) de volta à fila para que ela possa ser executada novamente mais tarde.
    • Em seguida, ele pega a tarefa countdown(5) da fila.
    • Ele executa a tarefa countdown(5) até que ela atinja uma instrução yield, imprimindo "T-minus 5".
    • E este processo continua...

Este ciclo continua até que todas as tarefas sejam concluídas. Cada tarefa tem a chance de ser executada por um curto período, o que dá a ilusão de execução concorrente sem a necessidade de usar threads ou callbacks. Threads são uma maneira mais complexa de alcançar a concorrência, e callbacks são usados em programação assíncrona. Nosso agendador simples usa geradores para obter um efeito semelhante de uma maneira mais direta.

Construindo um Servidor de Rede com Geradores

Nesta seção, vamos pegar o conceito de um agendador de tarefas que aprendemos e expandi-lo para criar algo mais prático: um servidor de rede simples. Este servidor pode lidar com múltiplas conexões de clientes ao mesmo tempo usando geradores. Geradores são um recurso poderoso do Python que permite que funções pausem e retomem sua execução, o que é muito útil para lidar com múltiplas tarefas sem bloquear.

Primeiro, você precisa criar um novo arquivo chamado server.py no diretório /home/labex/project. Este arquivo conterá o código para nosso servidor de rede.

## server.py

from socket import *
from select import select
from collections import deque

## Task system
tasks = deque()
recv_wait = {}   ## Map: socket -> task (for tasks waiting to receive)
send_wait = {}   ## Map: socket -> task (for tasks waiting to send)

def run():
    while any([tasks, recv_wait, send_wait]):
        ## If no active tasks, wait for I/O
        while not tasks:
            ## Wait for any socket to become ready for I/O
            can_recv, can_send, _ = select(recv_wait, send_wait, [])

            ## Add tasks waiting on readable sockets back to active queue
            for s in can_recv:
                tasks.append(recv_wait.pop(s))

            ## Add tasks waiting on writable sockets back to active queue
            for s in can_send:
                tasks.append(send_wait.pop(s))

        ## Get next task to run
        task = tasks.popleft()

        try:
            ## Resume the task
            reason, resource = task.send(None)

            ## Handle different yield reasons
            if reason == 'recv':
                ## Task is waiting to receive data
                recv_wait[resource] = task
            elif reason == 'send':
                ## Task is waiting to send data
                send_wait[resource] = task
            else:
                raise RuntimeError('Unknown yield reason %r' % reason)

        except StopIteration:
            print('Task done')

Este agendador aprimorado é um pouco mais complicado do que o anterior, mas segue as mesmas ideias fundamentais. Vamos detalhar as principais diferenças:

  1. As tarefas podem produzir (yield) uma razão ('recv' ou 'send') e um recurso (um socket). Isso significa que uma tarefa pode dizer ao agendador que está esperando para receber ou enviar dados em um socket específico.
  2. Dependendo da razão do yield, a tarefa é movida para uma área de espera diferente. Se uma tarefa estiver esperando para receber dados, ela vai para o dicionário recv_wait. Se estiver esperando para enviar dados, ela vai para o dicionário send_wait.
  3. A função select() é usada para descobrir quais sockets estão prontos para operações de I/O. Esta função verifica os sockets nos dicionários recv_wait e send_wait e retorna aqueles que estão prontos para receber ou enviar dados.
  4. Quando um socket está pronto, a tarefa associada é movida de volta para a fila ativa. Isso permite que a tarefa continue sua execução e execute a operação de I/O pela qual estava esperando.

Ao usar essas técnicas, nossas tarefas podem esperar eficientemente por I/O de rede sem bloquear a execução de outras tarefas. Isso torna nosso servidor de rede mais responsivo e capaz de lidar com múltiplas conexões de clientes simultaneamente.

Implementando um Servidor Echo

Agora, vamos adicionar a implementação de um servidor echo ao nosso arquivo server.py. Um servidor echo é um tipo de servidor que simplesmente envia de volta quaisquer dados que recebe de um cliente. Esta é uma ótima maneira de entender como os servidores lidam com dados recebidos e se comunicam com os clientes.

Adicione o seguinte código ao final do arquivo server.py. Este código configurará nosso servidor echo e lidará com as conexões dos clientes.

## TCP Server implementation
def tcp_server(address, handler):
    ## Create a TCP socket
    sock = socket(AF_INET, SOCK_STREAM)
    ## Set the socket option to reuse the address
    sock.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1)
    ## Bind the socket to the given address
    sock.bind(address)
    ## Start listening for incoming connections, with a backlog of 5
    sock.listen(5)

    while True:
        ## Yield to pause the function until a client connects
        yield 'recv', sock        ## Wait for a client connection
        ## Accept a client connection
        client, addr = sock.accept()
        ## Add a new handler task for this client to the tasks list
        tasks.append(handler(client, addr))  ## Start a handler task for this client

## Echo handler - echoes back whatever the client sends
def echo_handler(client, address):
    print('Connection from', address)

    while True:
        ## Yield to pause the function until the client sends data
        yield 'recv', client      ## Wait until client sends data
        ## Receive up to 1000 bytes of data from the client
        data = client.recv(1000)

        if not data:              ## Client closed connection
            break

        ## Yield to pause the function until the client can receive data
        yield 'send', client      ## Wait until client can receive data
        ## Send the data back to the client with 'GOT:' prefix
        client.send(b'GOT:' + data)

    print('Connection closed')
    ## Close the client connection
    client.close()

## Start the server
if __name__ == '__main__':
    ## Add the tcp_server task to the tasks list
    tasks.append(tcp_server(('', 25000), echo_handler))
    ## Start the scheduler
    run()

Vamos entender este código passo a passo:

  1. A função tcp_server:

    • Primeiro, ela configura um socket para ouvir as conexões recebidas. Um socket é um ponto final para comunicação entre duas máquinas.
    • Em seguida, ela usa yield 'recv', sock para pausar a função até que um cliente se conecte. Esta é uma parte fundamental de nossa abordagem assíncrona.
    • Finalmente, ela cria uma nova tarefa de manipulador para cada conexão de cliente. Isso permite que o servidor lide com múltiplos clientes simultaneamente.
  2. A função echo_handler:

    • Ela produz 'recv', client para esperar que o cliente envie dados. Isso pausa a função até que os dados estejam disponíveis.
    • Ela produz 'send', client para esperar até que possa enviar dados de volta ao cliente. Isso garante que o cliente esteja pronto para receber os dados.
    • Ela processa os dados do cliente até que a conexão seja fechada pelo cliente.
  3. Quando executamos o servidor, ele adiciona a tarefa tcp_server à fila e inicia o agendador. O agendador é responsável por gerenciar todas as tarefas e garantir que elas sejam executadas de forma assíncrona.

Para testar o servidor, execute-o em um terminal:

python3 /home/labex/project/server.py

Você deve ver uma mensagem indicando que o servidor está em execução. Isso significa que o servidor agora está ouvindo as conexões recebidas.

Abra outro terminal e conecte-se ao servidor usando nc (netcat). Netcat é um utilitário simples que permite que você se conecte a um servidor e envie dados.

nc localhost 25000

Agora você pode digitar mensagens e vê-las sendo ecoadas de volta com o prefixo "GOT:":

Hello
GOT:Hello
World
GOT:World

Se você não tiver nc instalado, pode usar o telnetlib embutido do Python. Telnetlib é uma biblioteca que permite que você se conecte a um servidor usando o protocolo Telnet.

python3 -c "import telnetlib; t = telnetlib.Telnet('localhost', 25000); t.interact()"

Você pode abrir várias janelas de terminal e conectar vários clientes simultaneamente. O servidor lidará com todas as conexões simultaneamente, apesar de ser de thread único. Isso é graças ao nosso agendador de tarefas baseado em geradores, que permite que o servidor pause e retome as tarefas conforme necessário.

Como Funciona

Este exemplo demonstra uma aplicação poderosa de geradores para I/O assíncrono:

  1. O servidor produz (yield) quando, de outra forma, bloquearia esperando por I/O. Isso significa que, em vez de esperar indefinidamente por dados, o servidor pode pausar e deixar outras tarefas serem executadas.
  2. O agendador o move para uma área de espera até que o I/O esteja pronto. Isso garante que o servidor não desperdice recursos esperando por I/O.
  3. Outras tarefas podem ser executadas enquanto esperam a conclusão do I/O. Isso permite que o servidor lide com múltiplas tarefas simultaneamente.
  4. Quando o I/O está pronto, a tarefa continua de onde parou. Este é um recurso fundamental da programação assíncrona.

Este padrão forma a base de frameworks Python assíncronos modernos como asyncio, que foi adicionado à biblioteca padrão do Python na versão 3.4.

Resumo

Neste laboratório, você aprendeu sobre o conceito de geradores gerenciados em Python. Você explorou como pausar e retomar geradores usando a instrução yield e construiu um agendador de tarefas simples para executar múltiplos geradores simultaneamente. Além disso, você estendeu o agendador para lidar com I/O de rede de forma eficiente e implementou um servidor de rede capaz de lidar com múltiplas conexões simultaneamente.

Este padrão de uso de geradores para multitarefa cooperativa é uma técnica poderosa que sustenta muitos frameworks de programação assíncrona em Python, como o módulo asyncio embutido. A abordagem oferece várias vantagens, incluindo código sequencial simples, tratamento eficiente de I/O não bloqueante, multitarefa cooperativa sem múltiplos threads e controle preciso sobre a execução da tarefa. Essas técnicas são valiosas para a construção de aplicações e sistemas de rede de alto desempenho que exigem o tratamento eficiente de operações concorrentes.