Learn About Managed Generators

Beginner

This tutorial is from open-source community. Access the source code

Introduction

In this lab, you will learn about managed generators and understand how to drive them in unusual ways. You'll also build a simple task scheduler and create a network server using generators.

A generator function in Python requires external code to execute. For instance, an iteration generator runs only when iterated over with a for loop, and coroutines need their send() method called. In this lab, we'll explore practical examples of driving generators in advanced applications. The files created during this lab are multitask.py and server.py.

This is a Guided Lab, which provides step-by-step instructions to help you learn and practice. Follow the instructions carefully to complete each step and gain hands-on experience. Historical data shows that this is a beginner level lab with a 84% completion rate. It has received a 80% positive review rate from learners.

Understanding Python Generators

Let's start by reviewing what generators are in Python. In Python, generators are a special type of function. They are different from regular functions. When you call a regular function, it runs from start to finish and returns a single value. However, a generator function returns an iterator, which is an object that we can iterate through, meaning we can access its values one by one.

Generators use the yield statement to return values. Instead of returning all values at once like a regular function, a generator returns values one at a time. After yielding a value, the generator suspends its execution. The next time we ask for a value, it resumes execution from where it left off.

Creating a Simple Generator

Now, let's create a simple generator. In the WebIDE, you need to create a new file. This file will contain the code for our generator. Name the file generator_demo.py and place it in the /home/labex/project directory. Here is the content you should put in the file:

## Generator function that counts down from n
def countdown(n):
    print(f"Starting countdown from {n}")
    while n > 0:
        yield n
        n -= 1
    print("Countdown complete!")

## Create a generator object
counter = countdown(5)

## Drive the generator manually
print(next(counter))  ## 5
print(next(counter))  ## 4
print(next(counter))  ## 3

## Iterate through remaining values
for value in counter:
    print(value)  ## 2, 1

In this code, we first define a generator function called countdown. This function takes a number n as an argument and counts down from n to 1. Inside the function, we use a while loop to decrement n and yield each value. When we call countdown(5), it creates a generator object named counter.

We then use the next() function to manually get values from the generator. Each time we call next(counter), the generator resumes execution from where it left off and yields the next value. After manually getting three values, we use a for loop to iterate through the remaining values in the generator.

To run this code, open the terminal and execute the following command:

python3 /home/labex/project/generator_demo.py

When you run the code, you should see the following output:

Starting countdown from 5
5
4
3
2
1
Countdown complete!

Let's note how the generator function behaves:

  1. The generator function starts its execution when we first call next(counter). Before that, the function is just defined and no actual counting down has started.
  2. It pauses at each yield statement. After yielding a value, it stops and waits for the next call to next().
  3. When we call next() again, it continues from where it left off. For example, after yielding 5, it remembers the state and continues to decrement n and yield the next value.
  4. The generator function completes its execution after the last value is yielded. In our case, after yielding 1, it prints "Countdown complete!".

This ability to pause and resume execution is what makes generators powerful. It is very useful for tasks like task scheduling and asynchronous programming, where we need to perform multiple tasks in an efficient way without blocking the execution of other tasks.

Creating a Task Scheduler with Generators

In programming, a task scheduler is a crucial tool that helps manage and execute multiple tasks efficiently. In this section, we'll use generators to build a simple task scheduler that can run multiple generator functions concurrently. This will show you how generators can be managed to perform cooperative multitasking, which means tasks take turns to run and share the execution time.

First, you need to create a new file. Navigate to the /home/labex/project directory and create a file named multitask.py. This file will contain the code for our task scheduler.

## multitask.py

from collections import deque

## Task queue
tasks = deque()

## Simple task scheduler
def run():
    while tasks:
        task = tasks.popleft()  ## Get the next task
        try:
            task.send(None)     ## Resume the task
            tasks.append(task)  ## Put it back in the queue
        except StopIteration:
            print('Task done')  ## Task is complete

## Example task 1: Countdown
def countdown(n):
    while n > 0:
        print('T-minus', n)
        yield              ## Pause execution
        n -= 1

## Example task 2: Count up
def countup(n):
    x = 0
    while x < n:
        print('Up we go', x)
        yield              ## Pause execution
        x += 1

Now, let's break down how this task scheduler works:

  1. We use a deque (double-ended queue) to store our generator tasks. A deque is a data structure that allows you to add and remove elements from both ends efficiently. It's a great choice for our task queue because we need to add tasks to the end and remove them from the front.
  2. The run() function is the heart of our task scheduler. It takes tasks from the queue one by one:
    • It resumes each task using send(None). This is similar to using next() on a generator. It tells the generator to continue executing from where it left off.
    • After the task yields, it's added back to the end of the queue. This way, the task will get another chance to run later.
    • When a task completes (raises StopIteration), it's removed from the queue. This indicates that the task has finished its execution.
  3. Each yield statement in our generator tasks acts as a pause point. When a generator reaches a yield statement, it pauses its execution and gives control back to the scheduler. This allows other tasks to run.

This approach implements cooperative multitasking. Each task voluntarily yields control back to the scheduler, allowing other tasks to run. This way, multiple tasks can share the execution time and run concurrently.

Testing Our Task Scheduler

Now, we're going to add a test to our multitask.py file. The purpose of this test is to run multiple tasks at the same time, which is known as concurrent execution. Concurrent execution allows different tasks to make progress seemingly at the same time, even though in a single-threaded environment, the tasks are actually taking turns to run.

To perform this test, add the following code at the end of the multitask.py file:

## Test our scheduler
if __name__ == '__main__':
    ## Add tasks to the queue
    tasks.append(countdown(10))  ## Count down from 10
    tasks.append(countdown(5))   ## Count down from 5
    tasks.append(countup(20))    ## Count up to 20

    ## Run all tasks
    run()

In this code, we first check if the script is being run directly using if __name__ == '__main__':. Then, we add three different tasks to the tasks queue. The countdown tasks will count down from the given numbers, and the countup task will count up to the specified number. Finally, we call the run() function to start executing these tasks.

After adding the code, run it with the following command in the terminal:

python3 /home/labex/project/multitask.py

When you run the code, you should see output similar to this (the exact order of the lines may vary):

T-minus 10
T-minus 5
Up we go 0
T-minus 9
T-minus 4
Up we go 1
T-minus 8
T-minus 3
Up we go 2
...

Notice how the output from the different tasks is mixed together. This is a clear indication that our scheduler is running all three tasks concurrently. Each time a task reaches a yield statement, the scheduler pauses that task and switches to another one, allowing all tasks to make progress over time.

How It Works

Let's take a closer look at what happens when our scheduler runs:

  1. First, we add three generator tasks to the queue: countdown(10), countdown(5), and countup(20). These generator tasks are special functions that can pause and resume their execution at yield statements.
  2. Then, the run() function starts its work:
    • It takes the first task, countdown(10), from the queue.
    • It runs this task until it reaches a yield statement. When it hits the yield, it prints "T-minus 10".
    • After that, it adds the countdown(10) task back to the queue so that it can be run again later.
    • Next, it takes the countdown(5) task from the queue.
    • It runs the countdown(5) task until it hits a yield statement, printing "T-minus 5".
    • And this process continues...

This cycle keeps going until all tasks are finished. Each task gets a chance to run for a short while, which gives the illusion of concurrent execution without the need to use threads or callbacks. Threads are a more complex way of achieving concurrency, and callbacks are used in asynchronous programming. Our simple scheduler uses generators to achieve a similar effect in a more straightforward manner.

Building a Network Server with Generators

In this section, we'll take the concept of a task scheduler we've learned and expand it to create something more practical: a simple network server. This server can handle multiple client connections at the same time using generators. Generators are a powerful Python feature that allows functions to pause and resume their execution, which is very useful for handling multiple tasks without blocking.

First, you need to create a new file named server.py in the /home/labex/project directory. This file will contain the code for our network server.

## server.py

from socket import *
from select import select
from collections import deque

## Task system
tasks = deque()
recv_wait = {}   ## Map: socket -> task (for tasks waiting to receive)
send_wait = {}   ## Map: socket -> task (for tasks waiting to send)

def run():
    while any([tasks, recv_wait, send_wait]):
        ## If no active tasks, wait for I/O
        while not tasks:
            ## Wait for any socket to become ready for I/O
            can_recv, can_send, _ = select(recv_wait, send_wait, [])

            ## Add tasks waiting on readable sockets back to active queue
            for s in can_recv:
                tasks.append(recv_wait.pop(s))

            ## Add tasks waiting on writable sockets back to active queue
            for s in can_send:
                tasks.append(send_wait.pop(s))

        ## Get next task to run
        task = tasks.popleft()

        try:
            ## Resume the task
            reason, resource = task.send(None)

            ## Handle different yield reasons
            if reason == 'recv':
                ## Task is waiting to receive data
                recv_wait[resource] = task
            elif reason == 'send':
                ## Task is waiting to send data
                send_wait[resource] = task
            else:
                raise RuntimeError('Unknown yield reason %r' % reason)

        except StopIteration:
            print('Task done')

This improved scheduler is a bit more complicated than the previous one, but it follows the same fundamental ideas. Let's break down the main differences:

  1. Tasks can yield a reason ('recv' or 'send') and a resource (a socket). This means that a task can tell the scheduler that it's waiting to either receive or send data on a specific socket.
  2. Depending on the yield reason, the task is moved to a different waiting area. If a task is waiting to receive data, it goes to the recv_wait dictionary. If it's waiting to send data, it goes to the send_wait dictionary.
  3. The select() function is used to figure out which sockets are ready for I/O operations. This function checks the sockets in the recv_wait and send_wait dictionaries and returns the ones that are ready to either receive or send data.
  4. When a socket is ready, the associated task is moved back to the active queue. This allows the task to continue its execution and perform the I/O operation it was waiting for.

By using these techniques, our tasks can efficiently wait for network I/O without blocking the execution of other tasks. This makes our network server more responsive and able to handle multiple client connections concurrently.

Implementing an Echo Server

Now, we're going to add the implementation of an echo server to our server.py file. An echo server is a type of server that simply sends back whatever data it receives from a client. This is a great way to understand how servers handle incoming data and communicate with clients.

Add the following code at the end of the server.py file. This code will set up our echo server and handle client connections.

## TCP Server implementation
def tcp_server(address, handler):
    ## Create a TCP socket
    sock = socket(AF_INET, SOCK_STREAM)
    ## Set the socket option to reuse the address
    sock.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1)
    ## Bind the socket to the given address
    sock.bind(address)
    ## Start listening for incoming connections, with a backlog of 5
    sock.listen(5)

    while True:
        ## Yield to pause the function until a client connects
        yield 'recv', sock        ## Wait for a client connection
        ## Accept a client connection
        client, addr = sock.accept()
        ## Add a new handler task for this client to the tasks list
        tasks.append(handler(client, addr))  ## Start a handler task for this client

## Echo handler - echoes back whatever the client sends
def echo_handler(client, address):
    print('Connection from', address)

    while True:
        ## Yield to pause the function until the client sends data
        yield 'recv', client      ## Wait until client sends data
        ## Receive up to 1000 bytes of data from the client
        data = client.recv(1000)

        if not data:              ## Client closed connection
            break

        ## Yield to pause the function until the client can receive data
        yield 'send', client      ## Wait until client can receive data
        ## Send the data back to the client with 'GOT:' prefix
        client.send(b'GOT:' + data)

    print('Connection closed')
    ## Close the client connection
    client.close()

## Start the server
if __name__ == '__main__':
    ## Add the tcp_server task to the tasks list
    tasks.append(tcp_server(('', 25000), echo_handler))
    ## Start the scheduler
    run()

Let's understand this code step by step:

  1. The tcp_server function:

    • First, it sets up a socket to listen for incoming connections. A socket is an endpoint for communication between two machines.
    • Then, it uses yield 'recv', sock to pause the function until a client connects. This is a key part of our asynchronous approach.
    • Finally, it creates a new handler task for each client connection. This allows the server to handle multiple clients concurrently.
  2. The echo_handler function:

    • It yields 'recv', client to wait for the client to send data. This pauses the function until data is available.
    • It yields 'send', client to wait until it can send data back to the client. This ensures that the client is ready to receive the data.
    • It processes the client data until the connection is closed by the client.
  3. When we run the server, it adds the tcp_server task to the queue and starts the scheduler. The scheduler is responsible for managing all the tasks and making sure they run asynchronously.

To test the server, run it in one terminal:

python3 /home/labex/project/server.py

You should see a message indicating the server is running. This means that the server is now listening for incoming connections.

Open another terminal and connect to the server using nc (netcat). Netcat is a simple utility that allows you to connect to a server and send data.

nc localhost 25000

Now you can type messages and see them echoed back with "GOT:" prefixed:

Hello
GOT:Hello
World
GOT:World

If you don't have nc installed, you can use Python's built-in telnetlib. Telnetlib is a library that allows you to connect to a server using the Telnet protocol.

python3 -c "import telnetlib; t = telnetlib.Telnet('localhost', 25000); t.interact()"

You can open multiple terminal windows and connect multiple clients simultaneously. The server will handle all connections concurrently, despite being single-threaded. This is thanks to our generator-based task scheduler, which allows the server to pause and resume tasks as needed.

How It Works

This example demonstrates a powerful application of generators for async I/O:

  1. The server yields when it would otherwise block waiting for I/O. This means that instead of waiting indefinitely for data, the server can pause and let other tasks run.
  2. The scheduler moves it to a waiting area until the I/O is ready. This ensures that the server doesn't waste resources waiting for I/O.
  3. Other tasks can run while waiting for I/O to complete. This allows the server to handle multiple tasks concurrently.
  4. When I/O is ready, the task continues from where it left off. This is a key feature of asynchronous programming.

This pattern forms the foundation of modern asynchronous Python frameworks like asyncio, which was added to the Python standard library in Python 3.4.

Summary

In this lab, you have learned about the concept of managed generators in Python. You explored how to pause and resume generators using the yield statement, and built a simple task scheduler to run multiple generators concurrently. Additionally, you extended the scheduler to handle network I/O efficiently and implemented a network server capable of handling multiple connections simultaneously.

This pattern of using generators for cooperative multitasking is a powerful technique that underpins many asynchronous programming frameworks in Python, such as the built-in asyncio module. The approach offers several advantages, including simple sequential code, efficient non-blocking I/O handling, cooperative multitasking without multiple threads, and fine-grained control over task execution. These techniques are valuable for building high-performance network applications and systems that require efficient handling of concurrent operations.