Изучите управляемые генераторы

PythonPythonBeginner
Практиковаться сейчас

This tutorial is from open-source community. Access the source code

💡 Этот учебник переведен с английского с помощью ИИ. Чтобы просмотреть оригинал, вы можете перейти на английский оригинал

Введение

В этом практическом занятии (лабораторной работе) вы узнаете о управляемых генераторах и поймете, как управлять ими нестандартными способами. Вы также создадите простой планировщик задач и разработаете с использованием генераторов сетевой сервер.

Генераторная функция в Python требует внешнего кода для выполнения. Например, итерационный генератор запускается только при итерации с использованием цикла for, а корутины требуют вызова метода send(). В этом практическом занятии мы рассмотрим практические примеры управления генераторами в продвинутых приложениях. Файлы, созданные в ходе этого практического занятия, - это multitask.py и server.py.


Skills Graph

%%%%{init: {'theme':'neutral'}}%%%% flowchart RL python(("Python")) -.-> python/AdvancedTopicsGroup(["Advanced Topics"]) python(("Python")) -.-> python/PythonStandardLibraryGroup(["Python Standard Library"]) python(("Python")) -.-> python/NetworkingGroup(["Networking"]) python/AdvancedTopicsGroup -.-> python/iterators("Iterators") python/AdvancedTopicsGroup -.-> python/generators("Generators") python/PythonStandardLibraryGroup -.-> python/data_collections("Data Collections") python/NetworkingGroup -.-> python/socket_programming("Socket Programming") subgraph Lab Skills python/iterators -.-> lab-132526{{"Изучите управляемые генераторы"}} python/generators -.-> lab-132526{{"Изучите управляемые генераторы"}} python/data_collections -.-> lab-132526{{"Изучите управляемые генераторы"}} python/socket_programming -.-> lab-132526{{"Изучите управляемые генераторы"}} end

Понимание генераторов в Python

Начнем с того, что вспомним, что такое генераторы в Python. В Python генераторы - это особый тип функций. Они отличаются от обычных функций. Когда вы вызываете обычную функцию, она выполняется от начала до конца и возвращает одно значение. Однако генераторная функция возвращает итератор, то есть объект, по которому можно итерироваться, то есть получать его значения по одному.

Генераторы используют оператор yield для возврата значений. В отличие от обычной функции, которая возвращает все значения сразу, генератор возвращает значения по одному. После выдачи значения генератор приостанавливает свое выполнение. В следующий раз, когда мы запрашиваем значение, он возобновляет выполнение с того места, где остановился.

Создание простого генератора

Теперь давайте создадим простой генератор. В WebIDE вам нужно создать новый файл. Этот файл будет содержать код нашего генератора. Назовите файл generator_demo.py и поместите его в директорию /home/labex/project. Вот содержимое, которое вы должны поместить в файл:

## Generator function that counts down from n
def countdown(n):
    print(f"Starting countdown from {n}")
    while n > 0:
        yield n
        n -= 1
    print("Countdown complete!")

## Create a generator object
counter = countdown(5)

## Drive the generator manually
print(next(counter))  ## 5
print(next(counter))  ## 4
print(next(counter))  ## 3

## Iterate through remaining values
for value in counter:
    print(value)  ## 2, 1

В этом коде мы сначала определяем генераторную функцию с именем countdown. Эта функция принимает число n в качестве аргумента и выполняет обратный отсчет от n до 1. Внутри функции мы используем цикл while для уменьшения значения n и выдачи каждого значения. Когда мы вызываем countdown(5), создается объект - генератор с именем counter.

Затем мы используем функцию next() для ручного получения значений из генератора. Каждый раз, когда мы вызываем next(counter), генератор возобновляет выполнение с того места, где остановился, и выдает следующее значение. После ручного получения трех значений мы используем цикл for для итерации по оставшимся значениям в генераторе.

Чтобы запустить этот код, откройте терминал и выполните следующую команду:

python3 /home/labex/project/generator_demo.py

Когда вы запустите код, вы должны увидеть следующий вывод:

Starting countdown from 5
5
4
3
2
1
Countdown complete!

Обратите внимание на то, как ведет себя генераторная функция:

  1. Генераторная функция начинает свое выполнение, когда мы впервые вызываем next(counter). До этого функция только определена, и фактический обратный отсчет не начался.
  2. Она приостанавливается на каждом операторе yield. После выдачи значения она останавливается и ждет следующего вызова next().
  3. Когда мы снова вызываем next(), она продолжает выполнение с того места, где остановилась. Например, после выдачи значения 5, она запоминает состояние и продолжает уменьшать n и выдавать следующее значение.
  4. Генераторная функция завершает свое выполнение после выдачи последнего значения. В нашем случае, после выдачи значения 1, она выводит "Countdown complete!".

Эта возможность приостанавливать и возобновлять выполнение делает генераторы мощными инструментом. Это очень полезно для таких задач, как планирование задач и асинхронное программирование, где нам нужно эффективно выполнять несколько задач без блокирования выполнения других задач.

✨ Проверить решение и практиковаться

Создание планировщика задач с использованием генераторов

В программировании планировщик задач - это важный инструмент, который помогает эффективно управлять и выполнять несколько задач. В этом разделе мы будем использовать генераторы для создания простого планировщика задач, который может запускать несколько генераторных функций одновременно. Это покажет, как можно управлять генераторами для выполнения кооперативного многозадачности, то есть когда задачи поочередно выполняются и делят время выполнения.

Сначала вам нужно создать новый файл. Перейдите в директорию /home/labex/project и создайте файл с именем multitask.py. Этот файл будет содержать код нашего планировщика задач.

## multitask.py

from collections import deque

## Task queue
tasks = deque()

## Simple task scheduler
def run():
    while tasks:
        task = tasks.popleft()  ## Get the next task
        try:
            task.send(None)     ## Resume the task
            tasks.append(task)  ## Put it back in the queue
        except StopIteration:
            print('Task done')  ## Task is complete

## Example task 1: Countdown
def countdown(n):
    while n > 0:
        print('T-minus', n)
        yield              ## Pause execution
        n -= 1

## Example task 2: Count up
def countup(n):
    x = 0
    while x < n:
        print('Up we go', x)
        yield              ## Pause execution
        x += 1

Теперь разберемся, как работает этот планировщик задач:

  1. Мы используем deque (двустороннюю очередь) для хранения наших генераторных задач. deque - это структура данных, которая позволяет эффективно добавлять и удалять элементы с обоих концов. Это отличный выбор для нашей очереди задач, так как нам нужно добавлять задачи в конец и удалять их из начала.
  2. Функция run() является ядром нашего планировщика задач. Она берет задачи из очереди по одной:
    • Она возобновляет выполнение каждой задачи с помощью send(None). Это аналогично использованию next() для генератора. Это сообщает генератору продолжить выполнение с того места, где он остановился.
    • После того, как задача выдает значение (yield), она добавляется обратно в конец очереди. Таким образом, задача получит еще один шанс выполниться позже.
    • Когда задача завершается (возбуждает исключение StopIteration), она удаляется из очереди. Это означает, что задача завершила свое выполнение.
  3. Каждый оператор yield в наших генераторных задачах действует как точка остановки. Когда генератор достигает оператора yield, он приостанавливает свое выполнение и возвращает управление планировщику. Это позволяет другим задачам выполняться.

Этот подход реализует кооперативную многозадачность. Каждая задача добровольно возвращает управление планировщику, позволяя другим задачам выполняться. Таким образом, несколько задач могут делить время выполнения и выполняться одновременно.

✨ Проверить решение и практиковаться

Тестирование нашего планировщика задач

Теперь мы добавим тест в наш файл multitask.py. Цель этого теста - запустить несколько задач одновременно, что называется параллельным выполнением (concurrent execution). Параллельное выполнение позволяет разным задачам делать прогресс, казалось бы, одновременно, хотя в однопоточной среде задачи на самом деле выполняются поочередно.

Чтобы выполнить этот тест, добавьте следующий код в конец файла multitask.py:

## Test our scheduler
if __name__ == '__main__':
    ## Add tasks to the queue
    tasks.append(countdown(10))  ## Count down from 10
    tasks.append(countdown(5))   ## Count down from 5
    tasks.append(countup(20))    ## Count up to 20

    ## Run all tasks
    run()

В этом коде мы сначала проверяем, запускается ли скрипт напрямую, используя if __name__ == '__main__':. Затем мы добавляем три разные задачи в очередь tasks. Задачи countdown будут выполнять обратный отсчет от заданных чисел, а задача countup будет выполнять прямой отсчет до указанного числа. Наконец, мы вызываем функцию run(), чтобы начать выполнение этих задач.

После добавления кода запустите его с помощью следующей команды в терминале:

python3 /home/labex/project/multitask.py

Когда вы запустите код, вы должны увидеть вывод, похожий на следующий (точный порядок строк может отличаться):

T-minus 10
T-minus 5
Up we go 0
T-minus 9
T-minus 4
Up we go 1
T-minus 8
T-minus 3
Up we go 2
...

Обратите внимание, как вывод от разных задач перемешан. Это явный признак того, что наш планировщик запускает все три задачи параллельно. Каждый раз, когда задача достигает оператора yield, планировщик приостанавливает эту задачу и переключается на другую, позволяя всем задачам делать прогресс со временем.

Как это работает

Давайте рассмотрим более подробно, что происходит, когда наш планировщик запускается:

  1. Сначала мы добавляем три генераторные задачи в очередь: countdown(10), countdown(5) и countup(20). Эти генераторные задачи - это особые функции, которые могут приостанавливать и возобновлять свое выполнение на операторах yield.
  2. Затем функция run() начинает свою работу:
    • Она берет первую задачу, countdown(10), из очереди.
    • Она запускает эту задачу до тех пор, пока не достигнет оператора yield. Когда она достигает yield, она выводит "T-minus 10".
    • После этого она добавляет задачу countdown(10) обратно в очередь, чтобы ее можно было запустить позже.
    • Затем она берет задачу countdown(5) из очереди.
    • Она запускает задачу countdown(5) до тех пор, пока не достигнет оператора yield, выводя "T-minus 5".
    • И этот процесс продолжается...

Этот цикл продолжается до тех пор, пока все задачи не будут завершены. Каждая задача получает возможность выполниться на короткое время, что создает иллюзию параллельного выполнения без необходимости использовать потоки (threads) или колбэки (callbacks). Потоки - это более сложный способ достижения параллелизма, а колбэки используются в асинхронном программировании. Наш простой планировщик использует генераторы, чтобы достичь аналогичного эффекта более простым способом.

✨ Проверить решение и практиковаться

Создание сетевого сервера с использованием генераторов

В этом разделе мы возьмем концепцию планировщика задач, которую мы изучили, и расширим ее, чтобы создать что-то более практичное: простой сетевой сервер. Этот сервер может обрабатывать несколько клиентских подключений одновременно с использованием генераторов. Генераторы - это мощная особенность Python, которая позволяет функциям приостанавливать и возобновлять свое выполнение, что очень полезно для обработки нескольких задач без блокировки.

Сначала вам нужно создать новый файл с именем server.py в директории /home/labex/project. Этот файл будет содержать код нашего сетевого сервера.

## server.py

from socket import *
from select import select
from collections import deque

## Task system
tasks = deque()
recv_wait = {}   ## Map: socket -> task (for tasks waiting to receive)
send_wait = {}   ## Map: socket -> task (for tasks waiting to send)

def run():
    while any([tasks, recv_wait, send_wait]):
        ## If no active tasks, wait for I/O
        while not tasks:
            ## Wait for any socket to become ready for I/O
            can_recv, can_send, _ = select(recv_wait, send_wait, [])

            ## Add tasks waiting on readable sockets back to active queue
            for s in can_recv:
                tasks.append(recv_wait.pop(s))

            ## Add tasks waiting on writable sockets back to active queue
            for s in can_send:
                tasks.append(send_wait.pop(s))

        ## Get next task to run
        task = tasks.popleft()

        try:
            ## Resume the task
            reason, resource = task.send(None)

            ## Handle different yield reasons
            if reason == 'recv':
                ## Task is waiting to receive data
                recv_wait[resource] = task
            elif reason == 'send':
                ## Task is waiting to send data
                send_wait[resource] = task
            else:
                raise RuntimeError('Unknown yield reason %r' % reason)

        except StopIteration:
            print('Task done')

Этот улучшенный планировщик немного сложнее, чем предыдущий, но он следует тем же основным идеям. Разберем основные различия:

  1. Задачи могут возвращать причину ('recv' или 'send') и ресурс (сокет). Это означает, что задача может сообщить планировщику, что она ждет либо получения, либо отправки данных на определенном сокете.
  2. В зависимости от причины возврата задача перемещается в другую область ожидания. Если задача ждет получения данных, она попадает в словарь recv_wait. Если она ждет отправки данных, она попадает в словарь send_wait.
  3. Функция select() используется для определения, какие сокеты готовы к операциям ввода-вывода. Эта функция проверяет сокеты в словарях recv_wait и send_wait и возвращает те, которые готовы либо к получению, либо к отправке данных.
  4. Когда сокет готов, связанная с ним задача возвращается в активную очередь. Это позволяет задаче продолжить свое выполнение и выполнить операцию ввода-вывода, на которую она ждала.

Используя эти методы, наши задачи могут эффективно ждать сетевого ввода-вывода без блокировки выполнения других задач. Это делает наш сетевой сервер более отзывчивым и способным обрабатывать несколько клиентских подключений одновременно.

✨ Проверить решение и практиковаться

Реализация эхо-сервера

Теперь мы добавим реализацию эхо-сервера в наш файл server.py. Эхо-сервер - это тип сервера, который просто отправляет обратно любые данные, которые он получает от клиента. Это отличный способ понять, как серверы обрабатывают входящие данные и общаются с клиентами.

Добавьте следующий код в конец файла server.py. Этот код настроит наш эхо-сервер и обработает клиентские подключения.

## TCP Server implementation
def tcp_server(address, handler):
    ## Create a TCP socket
    sock = socket(AF_INET, SOCK_STREAM)
    ## Set the socket option to reuse the address
    sock.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1)
    ## Bind the socket to the given address
    sock.bind(address)
    ## Start listening for incoming connections, with a backlog of 5
    sock.listen(5)

    while True:
        ## Yield to pause the function until a client connects
        yield 'recv', sock        ## Wait for a client connection
        ## Accept a client connection
        client, addr = sock.accept()
        ## Add a new handler task for this client to the tasks list
        tasks.append(handler(client, addr))  ## Start a handler task for this client

## Echo handler - echoes back whatever the client sends
def echo_handler(client, address):
    print('Connection from', address)

    while True:
        ## Yield to pause the function until the client sends data
        yield 'recv', client      ## Wait until client sends data
        ## Receive up to 1000 bytes of data from the client
        data = client.recv(1000)

        if not data:              ## Client closed connection
            break

        ## Yield to pause the function until the client can receive data
        yield 'send', client      ## Wait until client can receive data
        ## Send the data back to the client with 'GOT:' prefix
        client.send(b'GOT:' + data)

    print('Connection closed')
    ## Close the client connection
    client.close()

## Start the server
if __name__ == '__main__':
    ## Add the tcp_server task to the tasks list
    tasks.append(tcp_server(('', 25000), echo_handler))
    ## Start the scheduler
    run()

Поймем этот код пошагово:

  1. Функция tcp_server:

    • Сначала она настраивает сокет для прослушивания входящих подключений. Сокет - это конечная точка для связи между двумя машинами.
    • Затем она использует yield 'recv', sock, чтобы приостановить функцию до подключения клиента. Это ключевой элемент нашего асинхронного подхода.
    • Наконец, она создает новую задачу-обработчик для каждого клиентского подключения. Это позволяет серверу обрабатывать несколько клиентов одновременно.
  2. Функция echo_handler:

    • Она возвращает 'recv', client, чтобы ждать, пока клиент отправит данные. Это приостанавливает функцию до появления данных.
    • Она возвращает 'send', client, чтобы ждать, пока можно будет отправить данные обратно клиенту. Это гарантирует, что клиент готов принять данные.
    • Она обрабатывает данные клиента до тех пор, пока клиент не закроет соединение.
  3. Когда мы запускаем сервер, он добавляет задачу tcp_server в очередь и запускает планировщик. Планировщик отвечает за управление всеми задачами и гарантирует, что они выполняются асинхронно.

Чтобы протестировать сервер, запустите его в одном терминале:

python3 /home/labex/project/server.py

Вы должны увидеть сообщение, указывающее, что сервер запущен. Это означает, что сервер теперь прослушивает входящие подключения.

Откройте другой терминал и подключитесь к серверу с помощью nc (netcat). Netcat - это простая утилита, которая позволяет подключиться к серверу и отправлять данные.

nc localhost 25000

Теперь вы можете вводить сообщения и видеть их возвращаться с префиксом "GOT:":

Hello
GOT:Hello
World
GOT:World

Если nc не установлен, вы можете использовать встроенную библиотеку Python telnetlib. Telnetlib - это библиотека, которая позволяет подключиться к серверу с использованием протокола Telnet.

python3 -c "import telnetlib; t = telnetlib.Telnet('localhost', 25000); t.interact()"

Вы можете открыть несколько окон терминала и подключить несколько клиентов одновременно. Сервер будет обрабатывать все подключения одновременно, несмотря на то, что он однопоточный. Это благодаря нашему планировщику задач на основе генераторов, который позволяет серверу приостанавливать и возобновлять задачи по мере необходимости.

Как это работает

В этом примере показано мощное применение генераторов для асинхронного ввода-вывода:

  1. Сервер возвращает управление, когда он бы иначе блокировался, ожидая ввода-вывода. Это означает, что вместо бесконечного ожидания данных сервер может приостановиться и дать возможность другим задачам выполняться.
  2. Планировщик перемещает его в область ожидания до готовности ввода-вывода. Это гарантирует, что сервер не тратит ресурсы на ожидание ввода-вывода.
  3. Другие задачи могут выполняться, пока ожидается завершение ввода-вывода. Это позволяет серверу обрабатывать несколько задач одновременно.
  4. Когда ввод-вывод готов, задача продолжает выполнение с того места, где она была приостановлена. Это ключевая особенность асинхронного программирования.

Этот шаблон составляет основу современных асинхронных фреймворков Python, таких как asyncio, который был добавлен в стандартную библиотеку Python в версии 3.4.

✨ Проверить решение и практиковаться

Резюме

В этом практическом занятии вы узнали о концепции управляемых генераторов в Python. Вы изучили, как приостанавливать и возобновлять генераторы с использованием оператора yield, и создали простой планировщик задач для одновременного выполнения нескольких генераторов. Кроме того, вы расширили планировщик для эффективной обработки сетевого ввода-вывода и реализовали сетевой сервер, способный обрабатывать несколько подключений одновременно.

Этот шаблон использования генераторов для кооперативного многозадачности представляет собой мощную технику, которая лежит в основе многих асинхронных программных фреймворков в Python, таких как встроенный модуль asyncio. Подход имеет несколько преимуществ, включая простой последовательный код, эффективную неблокирующую обработку ввода-вывода, кооперативную многозадачность без использования нескольких потоков и детальный контроль над выполнением задач. Эти методы ценны для создания высокопроизводительных сетевых приложений и систем, которые требуют эффективной обработки параллельных операций.