了解受控生成器

Beginner

This tutorial is from open-source community. Access the source code

简介

在这个实验中,你将了解托管生成器(managed generators),并掌握以非常规方式驱动它们的方法。你还将构建一个简单的任务调度器,并使用生成器创建一个网络服务器。

Python 中的生成器函数需要外部代码来执行。例如,迭代生成器只有在使用 for 循环进行迭代时才会运行,而协程则需要调用其 send() 方法。在这个实验中,我们将探索在高级应用中驱动生成器的实际示例。本实验期间创建的文件是 multitask.pyserver.py

这是一个实验(Guided Lab),提供逐步指导来帮助你学习和实践。请仔细按照说明完成每个步骤,获得实际操作经验。根据历史数据,这是一个 初级 级别的实验,完成率为 84%。获得了学习者 80% 的好评率。

理解 Python 生成器

让我们先回顾一下 Python 中生成器的概念。在 Python 里,生成器是一种特殊类型的函数,它们与普通函数有所不同。当你调用一个普通函数时,它会从头到尾执行,并返回一个单一的值。然而,生成器函数会返回一个迭代器(iterator),这是一个可以进行迭代的对象,意味着我们可以逐个访问其值。

生成器使用 yield 语句来返回值。与普通函数一次性返回所有值不同,生成器会一次返回一个值。在生成一个值之后,生成器会暂停执行。下次我们请求一个值时,它会从上次暂停的地方继续执行。

创建一个简单的生成器

现在,让我们来创建一个简单的生成器。在 WebIDE 中,你需要创建一个新文件,该文件将包含我们生成器的代码。将文件命名为 generator_demo.py,并将其放在 /home/labex/project 目录下。以下是你应该放入文件中的内容:

## Generator function that counts down from n
def countdown(n):
    print(f"Starting countdown from {n}")
    while n > 0:
        yield n
        n -= 1
    print("Countdown complete!")

## Create a generator object
counter = countdown(5)

## Drive the generator manually
print(next(counter))  ## 5
print(next(counter))  ## 4
print(next(counter))  ## 3

## Iterate through remaining values
for value in counter:
    print(value)  ## 2, 1

在这段代码中,我们首先定义了一个名为 countdown 的生成器函数。这个函数接受一个数字 n 作为参数,并从 n 开始倒数到 1。在函数内部,我们使用一个 while 循环来递减 n 并生成每个值。当我们调用 countdown(5) 时,它会创建一个名为 counter 的生成器对象。

然后,我们使用 next() 函数手动从生成器中获取值。每次调用 next(counter) 时,生成器会从上次暂停的地方继续执行,并生成下一个值。在手动获取三个值之后,我们使用一个 for 循环来迭代生成器中剩余的值。

要运行这段代码,打开终端并执行以下命令:

python3 /home/labex/project/generator_demo.py

当你运行代码时,你应该会看到以下输出:

Starting countdown from 5
5
4
3
2
1
Countdown complete!

让我们注意一下生成器函数的行为:

  1. 当我们首次调用 next(counter) 时,生成器函数开始执行。在此之前,函数只是被定义,实际的倒计时并未开始。
  2. 它会在每个 yield 语句处暂停。在生成一个值之后,它会停止并等待下一次调用 next()
  3. 当我们再次调用 next() 时,它会从上次暂停的地方继续执行。例如,在生成 5 之后,它会记住状态,继续递减 n 并生成下一个值。
  4. 在生成最后一个值之后,生成器函数完成执行。在我们的例子中,在生成 1 之后,它会打印 "Countdown complete!"。

这种暂停和恢复执行的能力正是生成器强大的原因。它对于任务调度和异步编程等任务非常有用,在这些任务中,我们需要以高效的方式执行多个任务,而不会阻塞其他任务的执行。

使用生成器创建任务调度器

在编程中,任务调度器是一个重要的工具,它有助于高效地管理和执行多个任务。在本节中,我们将使用生成器构建一个简单的任务调度器,该调度器可以并发运行多个生成器函数。这将向你展示如何管理生成器以实现协作式多任务处理,即任务轮流运行并共享执行时间。

首先,你需要创建一个新文件。导航到 /home/labex/project 目录并创建一个名为 multitask.py 的文件。该文件将包含我们任务调度器的代码。

## multitask.py

from collections import deque

## Task queue
tasks = deque()

## Simple task scheduler
def run():
    while tasks:
        task = tasks.popleft()  ## Get the next task
        try:
            task.send(None)     ## Resume the task
            tasks.append(task)  ## Put it back in the queue
        except StopIteration:
            print('Task done')  ## Task is complete

## Example task 1: Countdown
def countdown(n):
    while n > 0:
        print('T-minus', n)
        yield              ## Pause execution
        n -= 1

## Example task 2: Count up
def countup(n):
    x = 0
    while x < n:
        print('Up we go', x)
        yield              ## Pause execution
        x += 1

现在,让我们详细分析这个任务调度器的工作原理:

  1. 我们使用 deque(双端队列)来存储生成器任务。deque 是一种数据结构,它允许你高效地从两端添加和移除元素。对于我们的任务队列来说,它是一个很好的选择,因为我们需要在队列尾部添加任务,并从队列头部移除任务。
  2. run() 函数是我们任务调度器的核心。它逐个从队列中取出任务:
    • 它使用 send(None) 恢复每个任务的执行。这类似于对生成器使用 next()。它告诉生成器从上次暂停的地方继续执行。
    • 任务产生(yield)值后,会被重新添加到队列尾部。这样,该任务稍后将有机会再次运行。
    • 当一个任务完成(抛出 StopIteration 异常)时,它会从队列中移除。这表明该任务已完成执行。
  3. 我们的生成器任务中的每个 yield 语句都充当一个暂停点。当生成器到达 yield 语句时,它会暂停执行并将控制权交还给调度器。这使得其他任务可以运行。

这种方法实现了协作式多任务处理。每个任务自愿将控制权交还给调度器,从而允许其他任务运行。通过这种方式,多个任务可以共享执行时间并并发运行。

测试我们的任务调度器

现在,我们要在 multitask.py 文件中添加一个测试。这个测试的目的是同时运行多个任务,这被称为并发执行。并发执行允许不同的任务看似同时取得进展,尽管在单线程环境中,这些任务实际上是轮流运行的。

为了进行这个测试,在 multitask.py 文件的末尾添加以下代码:

## Test our scheduler
if __name__ == '__main__':
    ## Add tasks to the queue
    tasks.append(countdown(10))  ## Count down from 10
    tasks.append(countdown(5))   ## Count down from 5
    tasks.append(countup(20))    ## Count up to 20

    ## Run all tasks
    run()

在这段代码中,我们首先使用 if __name__ == '__main__': 检查脚本是否是直接运行的。然后,我们向 tasks 队列中添加三个不同的任务。countdown 任务会从给定的数字开始倒计时,而 countup 任务会计数到指定的数字。最后,我们调用 run() 函数来开始执行这些任务。

添加代码后,在终端中使用以下命令运行它:

python3 /home/labex/project/multitask.py

当你运行代码时,你应该会看到类似以下的输出(具体的行顺序可能会有所不同):

T-minus 10
T-minus 5
Up we go 0
T-minus 9
T-minus 4
Up we go 1
T-minus 8
T-minus 3
Up we go 2
...

注意不同任务的输出是如何混合在一起的。这清楚地表明我们的调度器正在并发运行所有三个任务。每次一个任务到达 yield 语句时,调度器会暂停该任务并切换到另一个任务,从而使所有任务随着时间的推移都能取得进展。

工作原理

让我们更仔细地看看我们的调度器运行时会发生什么:

  1. 首先,我们向队列中添加三个生成器任务:countdown(10)countdown(5)countup(20)。这些生成器任务是特殊的函数,它们可以在 yield 语句处暂停和恢复执行。
  2. 然后,run() 函数开始工作:
    • 它从队列中取出第一个任务 countdown(10)
    • 它运行这个任务,直到到达 yield 语句。当遇到 yield 时,它会打印 "T-minus 10"。
    • 之后,它将 countdown(10) 任务重新添加到队列中,以便稍后可以再次运行。
    • 接下来,它从队列中取出 countdown(5) 任务。
    • 它运行 countdown(5) 任务,直到遇到 yield 语句,打印 "T-minus 5"。
    • 这个过程会持续进行……

这个循环会一直持续,直到所有任务都完成。每个任务都有机会运行一小段时间,这就产生了并发执行的错觉,而无需使用线程或回调。线程是一种更复杂的实现并发的方式,而回调则用于异步编程。我们的简单调度器使用生成器以更直接的方式实现了类似的效果。

使用生成器构建网络服务器

在本节中,我们将运用所学的任务调度器概念,并将其扩展以创建更实用的东西:一个简单的网络服务器。这个服务器可以使用生成器同时处理多个客户端连接。生成器是 Python 中一个强大的特性,它允许函数暂停和恢复执行,这对于在不阻塞的情况下处理多个任务非常有用。

首先,你需要在 /home/labex/project 目录下创建一个名为 server.py 的新文件。该文件将包含我们网络服务器的代码。

## server.py

from socket import *
from select import select
from collections import deque

## Task system
tasks = deque()
recv_wait = {}   ## Map: socket -> task (for tasks waiting to receive)
send_wait = {}   ## Map: socket -> task (for tasks waiting to send)

def run():
    while any([tasks, recv_wait, send_wait]):
        ## If no active tasks, wait for I/O
        while not tasks:
            ## Wait for any socket to become ready for I/O
            can_recv, can_send, _ = select(recv_wait, send_wait, [])

            ## Add tasks waiting on readable sockets back to active queue
            for s in can_recv:
                tasks.append(recv_wait.pop(s))

            ## Add tasks waiting on writable sockets back to active queue
            for s in can_send:
                tasks.append(send_wait.pop(s))

        ## Get next task to run
        task = tasks.popleft()

        try:
            ## Resume the task
            reason, resource = task.send(None)

            ## Handle different yield reasons
            if reason == 'recv':
                ## Task is waiting to receive data
                recv_wait[resource] = task
            elif reason == 'send':
                ## Task is waiting to send data
                send_wait[resource] = task
            else:
                raise RuntimeError('Unknown yield reason %r' % reason)

        except StopIteration:
            print('Task done')

这个改进后的调度器比之前的稍微复杂一些,但它遵循相同的基本思路。让我们来分析一下主要的区别:

  1. 任务可以产生一个原因('recv''send')和一个资源(一个套接字)。这意味着任务可以告诉调度器它正在等待在特定套接字上接收或发送数据。
  2. 根据产生的原因,任务会被移动到不同的等待区域。如果任务正在等待接收数据,它会被放入 recv_wait 字典中。如果它正在等待发送数据,它会被放入 send_wait 字典中。
  3. select() 函数用于确定哪些套接字已准备好进行 I/O 操作。该函数会检查 recv_waitsend_wait 字典中的套接字,并返回那些准备好接收或发送数据的套接字。
  4. 当一个套接字准备好时,关联的任务会被移回到活动队列中。这使得任务可以继续执行,并执行它所等待的 I/O 操作。

通过使用这些技术,我们的任务可以高效地等待网络 I/O,而不会阻塞其他任务的执行。这使得我们的网络服务器更具响应性,并且能够并发处理多个客户端连接。

实现一个回显服务器

现在,我们要在 server.py 文件中添加回显服务器的实现代码。回显服务器是一种简单地将从客户端接收到的任何数据原样返回的服务器。这是理解服务器如何处理传入数据并与客户端进行通信的绝佳方式。

server.py 文件的末尾添加以下代码。这段代码将设置我们的回显服务器并处理客户端连接。

## TCP Server implementation
def tcp_server(address, handler):
    ## Create a TCP socket
    sock = socket(AF_INET, SOCK_STREAM)
    ## Set the socket option to reuse the address
    sock.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1)
    ## Bind the socket to the given address
    sock.bind(address)
    ## Start listening for incoming connections, with a backlog of 5
    sock.listen(5)

    while True:
        ## Yield to pause the function until a client connects
        yield 'recv', sock        ## Wait for a client connection
        ## Accept a client connection
        client, addr = sock.accept()
        ## Add a new handler task for this client to the tasks list
        tasks.append(handler(client, addr))  ## Start a handler task for this client

## Echo handler - echoes back whatever the client sends
def echo_handler(client, address):
    print('Connection from', address)

    while True:
        ## Yield to pause the function until the client sends data
        yield 'recv', client      ## Wait until client sends data
        ## Receive up to 1000 bytes of data from the client
        data = client.recv(1000)

        if not data:              ## Client closed connection
            break

        ## Yield to pause the function until the client can receive data
        yield 'send', client      ## Wait until client can receive data
        ## Send the data back to the client with 'GOT:' prefix
        client.send(b'GOT:' + data)

    print('Connection closed')
    ## Close the client connection
    client.close()

## Start the server
if __name__ == '__main__':
    ## Add the tcp_server task to the tasks list
    tasks.append(tcp_server(('', 25000), echo_handler))
    ## Start the scheduler
    run()

让我们逐步理解这段代码:

  1. tcp_server 函数:

    • 首先,它设置一个套接字来监听传入的连接。套接字是两台机器之间通信的端点。
    • 然后,它使用 yield 'recv', sock 暂停函数,直到有客户端连接。这是我们异步方法的关键部分。
    • 最后,它为每个客户端连接创建一个新的处理任务。这使得服务器能够并发处理多个客户端。
  2. echo_handler 函数:

    • 它使用 yield 'recv', client 等待客户端发送数据。这会暂停函数,直到有数据可用。
    • 它使用 yield 'send', client 等待直到可以将数据发送回客户端。这确保客户端已准备好接收数据。
    • 它处理客户端数据,直到客户端关闭连接。
  3. 当我们运行服务器时,它会将 tcp_server 任务添加到队列中并启动调度器。调度器负责管理所有任务,并确保它们异步运行。

要测试服务器,在一个终端中运行以下命令:

python3 /home/labex/project/server.py

你应该会看到一条消息,表明服务器正在运行。这意味着服务器现在正在监听传入的连接。

打开另一个终端,使用 nc(netcat)连接到服务器。Netcat 是一个简单的实用工具,允许你连接到服务器并发送数据。

nc localhost 25000

现在你可以输入消息,并看到它们以 "GOT:" 为前缀被原样返回:

Hello
GOT:Hello
World
GOT:World

如果你没有安装 nc,可以使用 Python 内置的 telnetlib。Telnetlib 是一个允许你使用 Telnet 协议连接到服务器的库。

python3 -c "import telnetlib; t = telnetlib.Telnet('localhost', 25000); t.interact()"

你可以打开多个终端窗口,同时连接多个客户端。尽管服务器是单线程的,但它可以并发处理所有连接。这要归功于我们基于生成器的任务调度器,它允许服务器根据需要暂停和恢复任务。

工作原理

这个示例展示了生成器在异步 I/O 中的强大应用:

  1. 服务器在等待 I/O 时会产生(yield)。这意味着服务器不会无限期地等待数据,而是可以暂停并让其他任务运行。
  2. 调度器将其移动到等待区域,直到 I/O 准备好。这确保服务器不会在等待 I/O 时浪费资源。
  3. 在等待 I/O 完成时,其他任务可以运行。这使得服务器能够并发处理多个任务。
  4. 当 I/O 准备好时,任务从暂停处继续执行。这是异步编程的一个关键特性。

这种模式构成了现代异步 Python 框架(如 asyncio)的基础,asyncio 在 Python 3.4 中被添加到 Python 标准库中。

总结

在本次实验中,你学习了 Python 中受控生成器的概念。你探索了如何使用 yield 语句暂停和恢复生成器,并构建了一个简单的任务调度器来并发运行多个生成器。此外,你扩展了调度器以高效处理网络 I/O,并实现了一个能够同时处理多个连接的网络服务器。

这种使用生成器进行协作式多任务处理的模式是一种强大的技术,它是 Python 中许多异步编程框架(如内置的 asyncio 模块)的基础。这种方法具有多个优点,包括编写简单的顺序代码、高效的非阻塞 I/O 处理、无需多线程的协作式多任务处理,以及对任务执行的细粒度控制。这些技术对于构建高性能网络应用程序和需要高效处理并发操作的系统非常有价值。