Introduction
In this lab, you will learn about managed generators and understand how to drive them in unusual ways. You'll also build a simple task scheduler and create a network server using generators.
A generator function in Python requires external code to execute. For instance, an iteration generator runs only when iterated over with a for loop, and coroutines need their send() method called. In this lab, we'll explore practical examples of driving generators in advanced applications. The files created during this lab are multitask.py and server.py.
Understanding Python Generators
Let's start by reviewing what generators are in Python. In Python, generators are a special type of function. They are different from regular functions. When you call a regular function, it runs from start to finish and returns a single value. However, a generator function returns an iterator, which is an object that we can iterate through, meaning we can access its values one by one.
Generators use the yield statement to return values. Instead of returning all values at once like a regular function, a generator returns values one at a time. After yielding a value, the generator suspends its execution. The next time we ask for a value, it resumes execution from where it left off.
Creating a Simple Generator
Now, let's create a simple generator. In the WebIDE, you need to create a new file. This file will contain the code for our generator. Name the file generator_demo.py and place it in the /home/labex/project directory. Here is the content you should put in the file:
## Generator function that counts down from n
def countdown(n):
print(f"Starting countdown from {n}")
while n > 0:
yield n
n -= 1
print("Countdown complete!")
## Create a generator object
counter = countdown(5)
## Drive the generator manually
print(next(counter)) ## 5
print(next(counter)) ## 4
print(next(counter)) ## 3
## Iterate through remaining values
for value in counter:
print(value) ## 2, 1
In this code, we first define a generator function called countdown. This function takes a number n as an argument and counts down from n to 1. Inside the function, we use a while loop to decrement n and yield each value. When we call countdown(5), it creates a generator object named counter.
We then use the next() function to manually get values from the generator. Each time we call next(counter), the generator resumes execution from where it left off and yields the next value. After manually getting three values, we use a for loop to iterate through the remaining values in the generator.
To run this code, open the terminal and execute the following command:
python3 /home/labex/project/generator_demo.py
When you run the code, you should see the following output:
Starting countdown from 5
5
4
3
2
1
Countdown complete!
Let's note how the generator function behaves:
- The generator function starts its execution when we first call
next(counter). Before that, the function is just defined and no actual counting down has started. - It pauses at each
yieldstatement. After yielding a value, it stops and waits for the next call tonext(). - When we call
next()again, it continues from where it left off. For example, after yielding 5, it remembers the state and continues to decrementnand yield the next value. - The generator function completes its execution after the last value is yielded. In our case, after yielding 1, it prints "Countdown complete!".
This ability to pause and resume execution is what makes generators powerful. It is very useful for tasks like task scheduling and asynchronous programming, where we need to perform multiple tasks in an efficient way without blocking the execution of other tasks.
Creating a Task Scheduler with Generators
In programming, a task scheduler is a crucial tool that helps manage and execute multiple tasks efficiently. In this section, we'll use generators to build a simple task scheduler that can run multiple generator functions concurrently. This will show you how generators can be managed to perform cooperative multitasking, which means tasks take turns to run and share the execution time.
First, you need to create a new file. Navigate to the /home/labex/project directory and create a file named multitask.py. This file will contain the code for our task scheduler.
## multitask.py
from collections import deque
## Task queue
tasks = deque()
## Simple task scheduler
def run():
while tasks:
task = tasks.popleft() ## Get the next task
try:
task.send(None) ## Resume the task
tasks.append(task) ## Put it back in the queue
except StopIteration:
print('Task done') ## Task is complete
## Example task 1: Countdown
def countdown(n):
while n > 0:
print('T-minus', n)
yield ## Pause execution
n -= 1
## Example task 2: Count up
def countup(n):
x = 0
while x < n:
print('Up we go', x)
yield ## Pause execution
x += 1
Now, let's break down how this task scheduler works:
- We use a
deque(double-ended queue) to store our generator tasks. Adequeis a data structure that allows you to add and remove elements from both ends efficiently. It's a great choice for our task queue because we need to add tasks to the end and remove them from the front. - The
run()function is the heart of our task scheduler. It takes tasks from the queue one by one:- It resumes each task using
send(None). This is similar to usingnext()on a generator. It tells the generator to continue executing from where it left off. - After the task yields, it's added back to the end of the queue. This way, the task will get another chance to run later.
- When a task completes (raises
StopIteration), it's removed from the queue. This indicates that the task has finished its execution.
- It resumes each task using
- Each
yieldstatement in our generator tasks acts as a pause point. When a generator reaches ayieldstatement, it pauses its execution and gives control back to the scheduler. This allows other tasks to run.
This approach implements cooperative multitasking. Each task voluntarily yields control back to the scheduler, allowing other tasks to run. This way, multiple tasks can share the execution time and run concurrently.
Testing Our Task Scheduler
Now, we're going to add a test to our multitask.py file. The purpose of this test is to run multiple tasks at the same time, which is known as concurrent execution. Concurrent execution allows different tasks to make progress seemingly at the same time, even though in a single-threaded environment, the tasks are actually taking turns to run.
To perform this test, add the following code at the end of the multitask.py file:
## Test our scheduler
if __name__ == '__main__':
## Add tasks to the queue
tasks.append(countdown(10)) ## Count down from 10
tasks.append(countdown(5)) ## Count down from 5
tasks.append(countup(20)) ## Count up to 20
## Run all tasks
run()
In this code, we first check if the script is being run directly using if __name__ == '__main__':. Then, we add three different tasks to the tasks queue. The countdown tasks will count down from the given numbers, and the countup task will count up to the specified number. Finally, we call the run() function to start executing these tasks.
After adding the code, run it with the following command in the terminal:
python3 /home/labex/project/multitask.py
When you run the code, you should see output similar to this (the exact order of the lines may vary):
T-minus 10
T-minus 5
Up we go 0
T-minus 9
T-minus 4
Up we go 1
T-minus 8
T-minus 3
Up we go 2
...
Notice how the output from the different tasks is mixed together. This is a clear indication that our scheduler is running all three tasks concurrently. Each time a task reaches a yield statement, the scheduler pauses that task and switches to another one, allowing all tasks to make progress over time.
How It Works
Let's take a closer look at what happens when our scheduler runs:
- First, we add three generator tasks to the queue:
countdown(10),countdown(5), andcountup(20). These generator tasks are special functions that can pause and resume their execution atyieldstatements. - Then, the
run()function starts its work:- It takes the first task,
countdown(10), from the queue. - It runs this task until it reaches a
yieldstatement. When it hits theyield, it prints "T-minus 10". - After that, it adds the
countdown(10)task back to the queue so that it can be run again later. - Next, it takes the
countdown(5)task from the queue. - It runs the
countdown(5)task until it hits ayieldstatement, printing "T-minus 5". - And this process continues...
- It takes the first task,
This cycle keeps going until all tasks are finished. Each task gets a chance to run for a short while, which gives the illusion of concurrent execution without the need to use threads or callbacks. Threads are a more complex way of achieving concurrency, and callbacks are used in asynchronous programming. Our simple scheduler uses generators to achieve a similar effect in a more straightforward manner.
Building a Network Server with Generators
In this section, we'll take the concept of a task scheduler we've learned and expand it to create something more practical: a simple network server. This server can handle multiple client connections at the same time using generators. Generators are a powerful Python feature that allows functions to pause and resume their execution, which is very useful for handling multiple tasks without blocking.
First, you need to create a new file named server.py in the /home/labex/project directory. This file will contain the code for our network server.
## server.py
from socket import *
from select import select
from collections import deque
## Task system
tasks = deque()
recv_wait = {} ## Map: socket -> task (for tasks waiting to receive)
send_wait = {} ## Map: socket -> task (for tasks waiting to send)
def run():
while any([tasks, recv_wait, send_wait]):
## If no active tasks, wait for I/O
while not tasks:
## Wait for any socket to become ready for I/O
can_recv, can_send, _ = select(recv_wait, send_wait, [])
## Add tasks waiting on readable sockets back to active queue
for s in can_recv:
tasks.append(recv_wait.pop(s))
## Add tasks waiting on writable sockets back to active queue
for s in can_send:
tasks.append(send_wait.pop(s))
## Get next task to run
task = tasks.popleft()
try:
## Resume the task
reason, resource = task.send(None)
## Handle different yield reasons
if reason == 'recv':
## Task is waiting to receive data
recv_wait[resource] = task
elif reason == 'send':
## Task is waiting to send data
send_wait[resource] = task
else:
raise RuntimeError('Unknown yield reason %r' % reason)
except StopIteration:
print('Task done')
This improved scheduler is a bit more complicated than the previous one, but it follows the same fundamental ideas. Let's break down the main differences:
- Tasks can yield a reason ('recv' or 'send') and a resource (a socket). This means that a task can tell the scheduler that it's waiting to either receive or send data on a specific socket.
- Depending on the yield reason, the task is moved to a different waiting area. If a task is waiting to receive data, it goes to the
recv_waitdictionary. If it's waiting to send data, it goes to thesend_waitdictionary. - The
select()function is used to figure out which sockets are ready for I/O operations. This function checks the sockets in therecv_waitandsend_waitdictionaries and returns the ones that are ready to either receive or send data. - When a socket is ready, the associated task is moved back to the active queue. This allows the task to continue its execution and perform the I/O operation it was waiting for.
By using these techniques, our tasks can efficiently wait for network I/O without blocking the execution of other tasks. This makes our network server more responsive and able to handle multiple client connections concurrently.
Implementing an Echo Server
Now, we're going to add the implementation of an echo server to our server.py file. An echo server is a type of server that simply sends back whatever data it receives from a client. This is a great way to understand how servers handle incoming data and communicate with clients.
Add the following code at the end of the server.py file. This code will set up our echo server and handle client connections.
## TCP Server implementation
def tcp_server(address, handler):
## Create a TCP socket
sock = socket(AF_INET, SOCK_STREAM)
## Set the socket option to reuse the address
sock.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1)
## Bind the socket to the given address
sock.bind(address)
## Start listening for incoming connections, with a backlog of 5
sock.listen(5)
while True:
## Yield to pause the function until a client connects
yield 'recv', sock ## Wait for a client connection
## Accept a client connection
client, addr = sock.accept()
## Add a new handler task for this client to the tasks list
tasks.append(handler(client, addr)) ## Start a handler task for this client
## Echo handler - echoes back whatever the client sends
def echo_handler(client, address):
print('Connection from', address)
while True:
## Yield to pause the function until the client sends data
yield 'recv', client ## Wait until client sends data
## Receive up to 1000 bytes of data from the client
data = client.recv(1000)
if not data: ## Client closed connection
break
## Yield to pause the function until the client can receive data
yield 'send', client ## Wait until client can receive data
## Send the data back to the client with 'GOT:' prefix
client.send(b'GOT:' + data)
print('Connection closed')
## Close the client connection
client.close()
## Start the server
if __name__ == '__main__':
## Add the tcp_server task to the tasks list
tasks.append(tcp_server(('', 25000), echo_handler))
## Start the scheduler
run()
Let's understand this code step by step:
The
tcp_serverfunction:- First, it sets up a socket to listen for incoming connections. A socket is an endpoint for communication between two machines.
- Then, it uses
yield 'recv', sockto pause the function until a client connects. This is a key part of our asynchronous approach. - Finally, it creates a new handler task for each client connection. This allows the server to handle multiple clients concurrently.
The
echo_handlerfunction:- It yields
'recv', clientto wait for the client to send data. This pauses the function until data is available. - It yields
'send', clientto wait until it can send data back to the client. This ensures that the client is ready to receive the data. - It processes the client data until the connection is closed by the client.
- It yields
When we run the server, it adds the
tcp_servertask to the queue and starts the scheduler. The scheduler is responsible for managing all the tasks and making sure they run asynchronously.
To test the server, run it in one terminal:
python3 /home/labex/project/server.py
You should see a message indicating the server is running. This means that the server is now listening for incoming connections.
Open another terminal and connect to the server using nc (netcat). Netcat is a simple utility that allows you to connect to a server and send data.
nc localhost 25000
Now you can type messages and see them echoed back with "GOT:" prefixed:
Hello
GOT:Hello
World
GOT:World
If you don't have nc installed, you can use Python's built-in telnetlib. Telnetlib is a library that allows you to connect to a server using the Telnet protocol.
python3 -c "import telnetlib; t = telnetlib.Telnet('localhost', 25000); t.interact()"
You can open multiple terminal windows and connect multiple clients simultaneously. The server will handle all connections concurrently, despite being single-threaded. This is thanks to our generator-based task scheduler, which allows the server to pause and resume tasks as needed.
How It Works
This example demonstrates a powerful application of generators for async I/O:
- The server yields when it would otherwise block waiting for I/O. This means that instead of waiting indefinitely for data, the server can pause and let other tasks run.
- The scheduler moves it to a waiting area until the I/O is ready. This ensures that the server doesn't waste resources waiting for I/O.
- Other tasks can run while waiting for I/O to complete. This allows the server to handle multiple tasks concurrently.
- When I/O is ready, the task continues from where it left off. This is a key feature of asynchronous programming.
This pattern forms the foundation of modern asynchronous Python frameworks like asyncio, which was added to the Python standard library in Python 3.4.
Summary
In this lab, you have learned about the concept of managed generators in Python. You explored how to pause and resume generators using the yield statement, and built a simple task scheduler to run multiple generators concurrently. Additionally, you extended the scheduler to handle network I/O efficiently and implemented a network server capable of handling multiple connections simultaneously.
This pattern of using generators for cooperative multitasking is a powerful technique that underpins many asynchronous programming frameworks in Python, such as the built-in asyncio module. The approach offers several advantages, including simple sequential code, efficient non-blocking I/O handling, cooperative multitasking without multiple threads, and fine-grained control over task execution. These techniques are valuable for building high-performance network applications and systems that require efficient handling of concurrent operations.