Aprende sobre Generadores Gestionados

PythonPythonBeginner
Practicar Ahora

This tutorial is from open-source community. Access the source code

💡 Este tutorial está traducido por IA desde la versión en inglés. Para ver la versión original, puedes hacer clic aquí

Introducción

En este laboratorio, aprenderás sobre los generadores gestionados (managed generators) y entenderás cómo impulsarlos de formas inusuales. También construirás un sencillo programador de tareas (task scheduler) y crearás un servidor de red utilizando generadores.

Una función generadora en Python requiere código externo para ejecutarse. Por ejemplo, un generador de iteración solo se ejecuta cuando se itera con un bucle for, y las corrutinas necesitan que se llame a su método send(). En este laboratorio, exploraremos ejemplos prácticos de cómo impulsar generadores en aplicaciones avanzadas. Los archivos creados durante este laboratorio son multitask.py y server.py.


Skills Graph

%%%%{init: {'theme':'neutral'}}%%%% flowchart RL python(("Python")) -.-> python/AdvancedTopicsGroup(["Advanced Topics"]) python(("Python")) -.-> python/PythonStandardLibraryGroup(["Python Standard Library"]) python(("Python")) -.-> python/NetworkingGroup(["Networking"]) python/AdvancedTopicsGroup -.-> python/iterators("Iterators") python/AdvancedTopicsGroup -.-> python/generators("Generators") python/PythonStandardLibraryGroup -.-> python/data_collections("Data Collections") python/NetworkingGroup -.-> python/socket_programming("Socket Programming") subgraph Lab Skills python/iterators -.-> lab-132526{{"Aprende sobre Generadores Gestionados"}} python/generators -.-> lab-132526{{"Aprende sobre Generadores Gestionados"}} python/data_collections -.-> lab-132526{{"Aprende sobre Generadores Gestionados"}} python/socket_programming -.-> lab-132526{{"Aprende sobre Generadores Gestionados"}} end

Comprendiendo los generadores de Python

Comencemos revisando qué son los generadores en Python. En Python, los generadores son un tipo especial de función. Son diferentes de las funciones regulares. Cuando se llama a una función regular, se ejecuta desde el principio hasta el final y devuelve un solo valor. Sin embargo, una función generadora devuelve un iterador, que es un objeto a través del cual podemos iterar, lo que significa que podemos acceder a sus valores uno por uno.

Los generadores utilizan la declaración yield para devolver valores. En lugar de devolver todos los valores a la vez como una función regular, un generador devuelve valores uno a la vez. Después de devolver un valor, el generador suspende su ejecución. La próxima vez que solicitamos un valor, reanuda la ejecución desde donde se detuvo.

Creando un generador sencillo

Ahora, creemos un generador sencillo. En el WebIDE, debes crear un nuevo archivo. Este archivo contendrá el código de nuestro generador. Nombrar el archivo generator_demo.py y colocarlo en el directorio /home/labex/project. Aquí está el contenido que debes poner en el archivo:

## Generator function that counts down from n
def countdown(n):
    print(f"Starting countdown from {n}")
    while n > 0:
        yield n
        n -= 1
    print("Countdown complete!")

## Create a generator object
counter = countdown(5)

## Drive the generator manually
print(next(counter))  ## 5
print(next(counter))  ## 4
print(next(counter))  ## 3

## Iterate through remaining values
for value in counter:
    print(value)  ## 2, 1

En este código, primero definimos una función generadora llamada countdown. Esta función toma un número n como argumento y cuenta hacia atrás desde n hasta 1. Dentro de la función, usamos un bucle while para decrementar n y devolver cada valor. Cuando llamamos a countdown(5), crea un objeto generador llamado counter.

Luego usamos la función next() para obtener manualmente valores del generador. Cada vez que llamamos a next(counter), el generador reanuda la ejecución desde donde se detuvo y devuelve el siguiente valor. Después de obtener manualmente tres valores, usamos un bucle for para iterar a través de los valores restantes en el generador.

Para ejecutar este código, abre la terminal y ejecuta el siguiente comando:

python3 /home/labex/project/generator_demo.py

Cuando ejecutes el código, deberías ver la siguiente salida:

Starting countdown from 5
5
4
3
2
1
Countdown complete!

Veamos cómo se comporta la función generadora:

  1. La función generadora comienza su ejecución cuando llamamos por primera vez a next(counter). Antes de eso, la función solo está definida y no se ha comenzado a contar hacia atrás realmente.
  2. Se detiene en cada declaración yield. Después de devolver un valor, se detiene y espera la siguiente llamada a next().
  3. Cuando llamamos a next() de nuevo, continúa desde donde se detuvo. Por ejemplo, después de devolver 5, recuerda el estado y continúa decrementando n y devolviendo el siguiente valor.
  4. La función generadora completa su ejecución después de devolver el último valor. En nuestro caso, después de devolver 1, imprime "Countdown complete!".

Esta capacidad de pausar y reanudar la ejecución es lo que hace a los generadores poderosos. Es muy útil para tareas como la programación de tareas (task scheduling) y la programación asíncrona, donde necesitamos realizar múltiples tareas de manera eficiente sin bloquear la ejecución de otras tareas.

✨ Revisar Solución y Practicar

Creando un programador de tareas (task scheduler) con generadores

En programación, un programador de tareas (task scheduler) es una herramienta crucial que ayuda a gestionar y ejecutar múltiples tareas de manera eficiente. En esta sección, usaremos generadores para construir un sencillo programador de tareas que pueda ejecutar múltiples funciones generadoras de forma concurrente. Esto te mostrará cómo se pueden gestionar los generadores para realizar multitarea cooperativa, lo que significa que las tareas se turnan para ejecutarse y comparten el tiempo de ejecución.

Primero, debes crear un nuevo archivo. Navega al directorio /home/labex/project y crea un archivo llamado multitask.py. Este archivo contendrá el código de nuestro programador de tareas.

## multitask.py

from collections import deque

## Task queue
tasks = deque()

## Simple task scheduler
def run():
    while tasks:
        task = tasks.popleft()  ## Get the next task
        try:
            task.send(None)     ## Resume the task
            tasks.append(task)  ## Put it back in the queue
        except StopIteration:
            print('Task done')  ## Task is complete

## Example task 1: Countdown
def countdown(n):
    while n > 0:
        print('T-minus', n)
        yield              ## Pause execution
        n -= 1

## Example task 2: Count up
def countup(n):
    x = 0
    while x < n:
        print('Up we go', x)
        yield              ## Pause execution
        x += 1

Ahora, analicemos cómo funciona este programador de tareas:

  1. Usamos un deque (cola doblemente terminada) para almacenar nuestras tareas generadoras. Un deque es una estructura de datos que te permite agregar y eliminar elementos de ambos extremos de manera eficiente. Es una excelente opción para nuestra cola de tareas porque necesitamos agregar tareas al final y eliminarlas del frente.
  2. La función run() es el corazón de nuestro programador de tareas. Toma las tareas de la cola una por una:
    • Reanuda cada tarea usando send(None). Esto es similar a usar next() en un generador. Le dice al generador que continúe la ejecución desde donde se detuvo.
    • Después de que la tarea devuelva un valor (yield), se agrega de nuevo al final de la cola. De esta manera, la tarea tendrá otra oportunidad de ejecutarse más tarde.
    • Cuando una tarea se completa (lanza StopIteration), se elimina de la cola. Esto indica que la tarea ha terminado su ejecución.
  3. Cada declaración yield en nuestras tareas generadoras actúa como un punto de pausa. Cuando un generador alcanza una declaración yield, pausa su ejecución y devuelve el control al programador de tareas. Esto permite que otras tareas se ejecuten.

Este enfoque implementa la multitarea cooperativa. Cada tarea cede voluntariamente el control al programador de tareas, lo que permite que otras tareas se ejecuten. De esta manera, múltiples tareas pueden compartir el tiempo de ejecución y ejecutarse de forma concurrente.

✨ Revisar Solución y Practicar

Probando nuestro programador de tareas (task scheduler)

Ahora, vamos a agregar una prueba a nuestro archivo multitask.py. El propósito de esta prueba es ejecutar múltiples tareas al mismo tiempo, lo que se conoce como ejecución concurrente. La ejecución concurrente permite que diferentes tareas avancen al parecer al mismo tiempo, aunque en un entorno de un solo hilo (single - threaded), las tareas en realidad se turnan para ejecutarse.

Para realizar esta prueba, agrega el siguiente código al final del archivo multitask.py:

## Test our scheduler
if __name__ == '__main__':
    ## Add tasks to the queue
    tasks.append(countdown(10))  ## Count down from 10
    tasks.append(countdown(5))   ## Count down from 5
    tasks.append(countup(20))    ## Count up to 20

    ## Run all tasks
    run()

En este código, primero verificamos si el script se está ejecutando directamente usando if __name__ == '__main__':. Luego, agregamos tres tareas diferentes a la cola tasks. Las tareas countdown contarán hacia atrás desde los números dados, y la tarea countup contará hacia arriba hasta el número especificado. Finalmente, llamamos a la función run() para comenzar a ejecutar estas tareas.

Después de agregar el código, ejecútalo con el siguiente comando en la terminal:

python3 /home/labex/project/multitask.py

Cuando ejecutes el código, deberías ver una salida similar a esta (el orden exacto de las líneas puede variar):

T-minus 10
T-minus 5
Up we go 0
T-minus 9
T-minus 4
Up we go 1
T-minus 8
T-minus 3
Up we go 2
...

Observa cómo la salida de las diferentes tareas está mezclada. Esto es una clara indicación de que nuestro programador de tareas está ejecutando las tres tareas de forma concurrente. Cada vez que una tarea alcanza una declaración yield, el programador de tareas pausa esa tarea y cambia a otra, lo que permite que todas las tareas avancen con el tiempo.

Cómo funciona

Echemos un vistazo más de cerca a lo que sucede cuando nuestro programador de tareas se ejecuta:

  1. Primero, agregamos tres tareas generadoras a la cola: countdown(10), countdown(5) y countup(20). Estas tareas generadoras son funciones especiales que pueden pausar y reanudar su ejecución en las declaraciones yield.
  2. Luego, la función run() comienza a trabajar:
    • Toma la primera tarea, countdown(10), de la cola.
    • Ejecuta esta tarea hasta que alcanza una declaración yield. Cuando llega al yield, imprime "T-minus 10".
    • Después de eso, agrega la tarea countdown(10) de nuevo a la cola para que se pueda ejecutar de nuevo más tarde.
    • A continuación, toma la tarea countdown(5) de la cola.
    • Ejecuta la tarea countdown(5) hasta que alcanza una declaración yield, imprimiendo "T-minus 5".
    • Y este proceso continúa...

Este ciclo continúa hasta que todas las tareas se completan. Cada tarea tiene la oportunidad de ejecutarse durante un corto tiempo, lo que da la ilusión de ejecución concurrente sin necesidad de usar hilos (threads) o callbacks. Los hilos son una forma más compleja de lograr concurrencia, y los callbacks se utilizan en la programación asíncrona. Nuestro sencillo programador de tareas utiliza generadores para lograr un efecto similar de manera más sencilla.

✨ Revisar Solución y Practicar

Construyendo un servidor de red con generadores

En esta sección, tomaremos el concepto de un programador de tareas (task scheduler) que hemos aprendido y lo expandiremos para crear algo más práctico: un sencillo servidor de red. Este servidor puede manejar múltiples conexiones de clientes al mismo tiempo utilizando generadores. Los generadores son una poderosa característica de Python que permite a las funciones pausar y reanudar su ejecución, lo cual es muy útil para manejar múltiples tareas sin bloquear.

Primero, debes crear un nuevo archivo llamado server.py en el directorio /home/labex/project. Este archivo contendrá el código de nuestro servidor de red.

## server.py

from socket import *
from select import select
from collections import deque

## Task system
tasks = deque()
recv_wait = {}   ## Map: socket -> task (for tasks waiting to receive)
send_wait = {}   ## Map: socket -> task (for tasks waiting to send)

def run():
    while any([tasks, recv_wait, send_wait]):
        ## If no active tasks, wait for I/O
        while not tasks:
            ## Wait for any socket to become ready for I/O
            can_recv, can_send, _ = select(recv_wait, send_wait, [])

            ## Add tasks waiting on readable sockets back to active queue
            for s in can_recv:
                tasks.append(recv_wait.pop(s))

            ## Add tasks waiting on writable sockets back to active queue
            for s in can_send:
                tasks.append(send_wait.pop(s))

        ## Get next task to run
        task = tasks.popleft()

        try:
            ## Resume the task
            reason, resource = task.send(None)

            ## Handle different yield reasons
            if reason == 'recv':
                ## Task is waiting to receive data
                recv_wait[resource] = task
            elif reason == 'send':
                ## Task is waiting to send data
                send_wait[resource] = task
            else:
                raise RuntimeError('Unknown yield reason %r' % reason)

        except StopIteration:
            print('Task done')

Este programador de tareas mejorado es un poco más complicado que el anterior, pero sigue las mismas ideas fundamentales. Analicemos las principales diferencias:

  1. Las tareas pueden devolver una razón ('recv' o 'send') y un recurso (un socket). Esto significa que una tarea puede decirle al programador de tareas que está esperando recibir o enviar datos en un socket específico.
  2. Dependiendo de la razón devuelta, la tarea se mueve a una zona de espera diferente. Si una tarea está esperando recibir datos, se va al diccionario recv_wait. Si está esperando enviar datos, se va al diccionario send_wait.
  3. La función select() se utiliza para determinar qué sockets están listos para operaciones de E/S (I/O). Esta función verifica los sockets en los diccionarios recv_wait y send_wait y devuelve aquellos que están listos para recibir o enviar datos.
  4. Cuando un socket está listo, la tarea asociada se mueve de nuevo a la cola activa. Esto permite que la tarea continúe su ejecución y realice la operación de E/S por la que estaba esperando.

Al utilizar estas técnicas, nuestras tareas pueden esperar de manera eficiente a la E/S de red sin bloquear la ejecución de otras tareas. Esto hace que nuestro servidor de red sea más receptivo y capaz de manejar múltiples conexiones de clientes de forma concurrente.

✨ Revisar Solución y Practicar

Implementando un servidor de eco (echo server)

Ahora, vamos a agregar la implementación de un servidor de eco a nuestro archivo server.py. Un servidor de eco es un tipo de servidor que simplemente devuelve cualquier dato que reciba de un cliente. Esta es una excelente manera de entender cómo los servidores manejan los datos entrantes y se comunican con los clientes.

Agrega el siguiente código al final del archivo server.py. Este código configurará nuestro servidor de eco y manejará las conexiones de los clientes.

## TCP Server implementation
def tcp_server(address, handler):
    ## Create a TCP socket
    sock = socket(AF_INET, SOCK_STREAM)
    ## Set the socket option to reuse the address
    sock.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1)
    ## Bind the socket to the given address
    sock.bind(address)
    ## Start listening for incoming connections, with a backlog of 5
    sock.listen(5)

    while True:
        ## Yield to pause the function until a client connects
        yield 'recv', sock        ## Wait for a client connection
        ## Accept a client connection
        client, addr = sock.accept()
        ## Add a new handler task for this client to the tasks list
        tasks.append(handler(client, addr))  ## Start a handler task for this client

## Echo handler - echoes back whatever the client sends
def echo_handler(client, address):
    print('Connection from', address)

    while True:
        ## Yield to pause the function until the client sends data
        yield 'recv', client      ## Wait until client sends data
        ## Receive up to 1000 bytes of data from the client
        data = client.recv(1000)

        if not data:              ## Client closed connection
            break

        ## Yield to pause the function until the client can receive data
        yield 'send', client      ## Wait until client can receive data
        ## Send the data back to the client with 'GOT:' prefix
        client.send(b'GOT:' + data)

    print('Connection closed')
    ## Close the client connection
    client.close()

## Start the server
if __name__ == '__main__':
    ## Add the tcp_server task to the tasks list
    tasks.append(tcp_server(('', 25000), echo_handler))
    ## Start the scheduler
    run()

Vamos a entender este código paso a paso:

  1. La función tcp_server:

    • Primero, configura un socket para escuchar las conexiones entrantes. Un socket es un punto final para la comunicación entre dos máquinas.
    • Luego, utiliza yield 'recv', sock para pausar la función hasta que se conecte un cliente. Esta es una parte clave de nuestro enfoque asíncrono.
    • Finalmente, crea una nueva tarea de manejo para cada conexión de cliente. Esto permite que el servidor maneje múltiples clientes de forma concurrente.
  2. La función echo_handler:

    • Devuelve 'recv', client para esperar a que el cliente envíe datos. Esto pausa la función hasta que haya datos disponibles.
    • Devuelve 'send', client para esperar hasta que pueda enviar datos de vuelta al cliente. Esto asegura que el cliente esté listo para recibir los datos.
    • Procesa los datos del cliente hasta que la conexión sea cerrada por el cliente.
  3. Cuando ejecutamos el servidor, agrega la tarea tcp_server a la cola y inicia el programador de tareas (scheduler). El programador de tareas es responsable de administrar todas las tareas y asegurarse de que se ejecuten de forma asíncrona.

Para probar el servidor, ejecútalo en una terminal:

python3 /home/labex/project/server.py

Deberías ver un mensaje que indica que el servidor está en funcionamiento. Esto significa que el servidor ahora está escuchando las conexiones entrantes.

Abre otra terminal y conéctate al servidor utilizando nc (netcat). Netcat es una utilidad simple que te permite conectarte a un servidor y enviar datos.

nc localhost 25000

Ahora puedes escribir mensajes y verlos devueltos con el prefijo "GOT:":

Hello
GOT:Hello
World
GOT:World

Si no tienes nc instalado, puedes usar la biblioteca incorporada telnetlib de Python. Telnetlib es una biblioteca que te permite conectarte a un servidor utilizando el protocolo Telnet.

python3 -c "import telnetlib; t = telnetlib.Telnet('localhost', 25000); t.interact()"

Puedes abrir múltiples ventanas de terminal y conectar múltiples clientes simultáneamente. El servidor manejará todas las conexiones de forma concurrente, a pesar de ser de un solo hilo (single - threaded). Esto se debe a nuestro programador de tareas basado en generadores, que permite al servidor pausar y reanudar tareas según sea necesario.

Cómo funciona

Este ejemplo demuestra una poderosa aplicación de los generadores para la E/S asíncrona (async I/O):

  1. El servidor devuelve un valor cuando de lo contrario se bloquearía esperando la E/S. Esto significa que en lugar de esperar indefinidamente por datos, el servidor puede pausar y dejar que otras tareas se ejecuten.
  2. El programador de tareas lo mueve a un área de espera hasta que la E/S esté lista. Esto asegura que el servidor no gaste recursos esperando la E/S.
  3. Otras tareas pueden ejecutarse mientras se espera a que se complete la E/S. Esto permite que el servidor maneje múltiples tareas de forma concurrente.
  4. Cuando la E/S está lista, la tarea continúa desde donde se detuvo. Esta es una característica clave de la programación asíncrona.

Este patrón forma la base de los modernos marcos de trabajo asíncronos de Python como asyncio, que se agregó a la biblioteca estándar de Python en Python 3.4.

✨ Revisar Solución y Practicar

Resumen

En este laboratorio, has aprendido sobre el concepto de generadores gestionados en Python. Has explorado cómo pausar y reanudar generadores utilizando la declaración yield, y has construido un sencillo programador de tareas (task scheduler) para ejecutar múltiples generadores de forma concurrente. Además, has extendido el programador de tareas para manejar de manera eficiente la E/S de red y has implementado un servidor de red capaz de manejar múltiples conexiones simultáneamente.

Este patrón de uso de generadores para la multitarea cooperativa es una técnica poderosa que sustenta muchos marcos de trabajo de programación asíncrona en Python, como el módulo incorporado asyncio. Este enfoque ofrece varias ventajas, incluyendo código secuencial simple, manejo eficiente de E/S no bloqueante, multitarea cooperativa sin múltiples hilos y control detallado de la ejecución de tareas. Estas técnicas son valiosas para construir aplicaciones de red de alto rendimiento y sistemas que requieran un manejo eficiente de operaciones concurrentes.