Обработка данных на основе корутин

Beginner

This tutorial is from open-source community. Access the source code

Введение

В этом практическом занятии (лабораторной работе) вы научитесь использовать корутины (сопрограммы) для создания конвейеров обработки данных. Корутины, мощная особенность языка Python, поддерживают кооперативную многозадачность, позволяя функциям приостанавливать и возобновлять выполнение в более поздний момент.

Цели этого практического занятия (лабораторной работы) — понять, как работают корутины в Python, реализовать конвейеры обработки данных на основе корутин и преобразовать данные на нескольких этапах с использованием корутин. Вы создадите два файла: cofollow.py — программа для отслеживания файлов на основе корутин, и coticker.py — приложение для отслеживания котировок акций с использованием корутин. Предполагается, что программа stocksim.py из предыдущего упражнения по-прежнему работает в фоновом режиме, генерируя данные о котировках акций в журнальном файле.

Понимание корутин на примере программы для отслеживания файлов

Начнем с понимания того, что такое корутины (сопрограммы) и как они работают в Python. Корутина представляет собой специализированную версию генераторной функции. В Python функции обычно начинают выполнение с самого начала каждый раз, когда они вызываются. Однако корутины отличаются. Они могут как потреблять, так и создавать данные, а также имеют возможность приостанавливать и возобновлять свое выполнение. Это означает, что корутина может приостановить свою работу в определенной точке и затем продолжить с того места, где она остановилась.

Создание базового файлового отслеживателя на основе корутин

На этом этапе мы создадим программу для отслеживания файлов, которая использует корутины для мониторинга файла на предмет появления нового содержимого и его обработки. Это аналогично команде Unix tail -f, которая непрерывно отображает конец файла и обновляет его при добавлении новых строк.

  1. Откройте текстовый редактор кода и создайте новый файл с именем cofollow.py в директории /home/labex/project. Именно здесь мы напишем нашу Python - программу для реализации файлового отслеживателя с использованием корутин.

  2. Скопируйте следующий код в файл:

## cofollow.py
import os
import time

## Data source
def follow(filename, target):
    with open(filename, 'r') as f:
        f.seek(0, os.SEEK_END)  ## Move to the end of the file
        while True:
            line = f.readline()
            if line != '':
                target.send(line)  ## Send the line to the target coroutine
            else:
                time.sleep(0.1)  ## Sleep briefly if no new content

## Decorator for coroutine functions
from functools import wraps

def consumer(func):
    @wraps(func)
    def start(*args, **kwargs):
        f = func(*args, **kwargs)
        f.send(None)  ## Prime the coroutine (necessary first step)
        return f
    return start

## Sample coroutine
@consumer
def printer():
    while True:
        item = yield     ## Receive an item sent to me
        print(item)

## Example use
if __name__ == '__main__':
    follow('stocklog.csv', printer())
  1. Поймем ключевые компоненты этого кода:

    • follow(filename, target): Эта функция отвечает за открытие файла. Сначала она перемещает указатель файла в конец файла с помощью f.seek(0, os.SEEK_END). Затем она входит в бесконечный цикл, в котором непрерывно пытается прочитать новые строки из файла. Если найдена новая строка, она отправляет эту строку в целевую корутину с помощью метода send. Если нет нового содержимого, она приостанавливается на короткое время (0,1 секунды) с помощью time.sleep(0.1) перед следующей проверкой.
    • Декоратор @consumer: В Python корутины должны быть "инициализированы" перед тем, как они могут начать получать данные. Этот декоратор занимается этим. Он автоматически отправляет начальное значение None в корутину, что является необходимым первым шагом для подготовки корутины к приему реальных данных.
    • Корутина printer(): Это простая корутина. Она имеет бесконечный цикл, в котором использует ключевое слово yield для приема отправленного ей элемента. Как только она получает элемент, она просто выводит его на экран.
  2. Сохраните файл и запустите его из терминала:

cd /home/labex/project
python3 cofollow.py
  1. Вы должны увидеть, как скрипт выводит содержимое файла журнала котировок акций, и он будет продолжать выводить новые строки по мере их добавления в файл. Нажмите Ctrl+C, чтобы остановить программу.

Основная концепция здесь заключается в том, что данные передаются из функции follow в корутину printer с помощью метода send. Этот способ "проталкивания" данных противоположен генераторам, которые "тянут" данные с помощью итерации. В генераторе обычно используется цикл for для итерации по значениям, которые он генерирует. Однако в этом примере с корутинами данные активно отправляются из одной части кода в другую.

Создание компонентов конвейера на основе корутин

На этом этапе мы создадим более специализированные корутины (сопрограммы) для обработки данных о котировках акций. Корутина представляет собой особый тип функции, которая может приостанавливать и возобновлять свое выполнение, что очень полезно для создания конвейеров обработки данных. Каждая корутина, которую мы создадим, будет выполнять определенную задачу в общем конвейере обработки.

  1. Сначала вам нужно создать новый файл. Перейдите в директорию /home/labex/project и создайте файл с именем coticker.py. Этот файл будет содержать весь код для нашей обработки данных на основе корутин.

  2. Теперь начнем писать код в файле coticker.py. Сначала импортируем необходимые модули и определим базовую структуру. Модули - это предварительно написанные библиотеки кода, которые предоставляют полезные функции и классы. Следующий код делает именно это:

## coticker.py
from structure import Structure

class Ticker(Structure):
    name = String()
    price = Float()
    date = String()
    time = String()
    change = Float()
    open = Float()
    high = Float()
    low = Float()
    volume = Integer()

from cofollow import consumer, follow
from tableformat import create_formatter
import csv
  1. Если вы посмотрите на приведенный выше код, заметите, что есть ошибки, связанные с String(), Float() и Integer(). Это классы, которые нам нужно импортировать. Поэтому мы добавим необходимые импорты в начало файла. Таким образом, Python будет знать, где найти эти классы. Вот обновленный код:
## coticker.py
from structure import Structure, String, Float, Integer

class Ticker(Structure):
    name = String()
    price = Float()
    date = String()
    time = String()
    change = Float()
    open = Float()
    high = Float()
    low = Float()
    volume = Integer()

from cofollow import consumer, follow
from tableformat import create_formatter
import csv
  1. Далее добавим компоненты корутин, которые будут формировать наш конвейер обработки данных. Каждая корутина имеет определенную задачу в конвейере. Вот код для добавления этих корутин:
@consumer
def to_csv(target):
    def producer():
        while True:
            line = yield

    reader = csv.reader(producer())
    while True:
        line = yield
        target.send(next(reader))

@consumer
def create_ticker(target):
    while True:
        row = yield
        target.send(Ticker.from_row(row))

@consumer
def negchange(target):
    while True:
        record = yield
        if record.change < 0:
            target.send(record)

@consumer
def ticker(fmt, fields):
    formatter = create_formatter(fmt)
    formatter.headings(fields)
    while True:
        rec = yield
        row = [getattr(rec, name) for name in fields]
        formatter.row(row)
  1. Поймем, что делает каждая из этих корутин:

    • to_csv: Ее задача - преобразовать необработанные текстовые строки в разобранные строки CSV. Это важно, так как наши данные изначально находятся в текстовом формате, и нам нужно разбить их на структурированные данные CSV.
    • create_ticker: Эта корутина принимает строки CSV и создает из них объекты Ticker. Объекты Ticker представляют данные о котировках акций более организованным образом.
    • negchange: Она фильтрует объекты Ticker. Она пропускает только те акции, у которых есть отрицательное изменение цены. Это помогает нам сосредоточиться на акциях, которые теряют в цене.
    • ticker: Эта корутина форматирует и отображает данные о котировках. Она использует форматер для представления данных в удобочитаемой таблице.
  2. Наконец, нам нужно добавить основной код программы, который соединяет все эти компоненты вместе. Этот код настроит поток данных через конвейер. Вот код:

if __name__ == '__main__':
    import sys

    ## Define the field names to display
    fields = ['name', 'price', 'change']

    ## Create the processing pipeline
    t = ticker('text', fields)
    neg_filter = negchange(t)
    tick_creator = create_ticker(neg_filter)
    csv_parser = to_csv(tick_creator)

    ## Connect the pipeline to the data source
    follow('stocklog.csv', csv_parser)
  1. После написания всего кода сохраните файл coticker.py. Затем откройте терминал и выполните следующие команды. Команда cd изменяет текущую директорию на ту, где находится наш файл, а команда python3 запускает наш Python - скрипт:
cd /home/labex/project
python3 coticker.py
  1. Если все пройдет успешно, вы должны увидеть отформатированную таблицу в терминале. Эта таблица показывает акции с отрицательным изменением цены. Вывод будет выглядеть примерно так:
      name      price     change
---------- ---------- ----------
      MSFT      72.50      -0.25
        AA      35.25      -0.15
       IBM      50.10      -0.15
      GOOG     100.02      -0.01
      AAPL     102.50      -0.06

Обратите внимание, что фактические значения в таблице могут отличаться в зависимости от сгенерированных данных о котировках акций.

Понимание потока данных в конвейере

Самая важная часть этой программы - это то, как данные проходят через корутины. Разберем это пошагово:

  1. Функция follow начинает чтение строк из файла stocklog.csv. Это наш источник данных.
  2. Каждая прочитанная строка затем отправляется в корутину csv_parser. csv_parser берет необработанную текстовую строку и разбирает ее на поля CSV.
  3. Разобранные данные CSV затем отправляются в корутину tick_creator. Эта корутина создает объекты Ticker из строк CSV.
  4. Объекты Ticker затем отправляются в корутину neg_filter. Эта корутина проверяет каждый объект Ticker. Если у акции есть отрицательное изменение цены, она пропускает объект дальше; в противном случае она отбрасывает его.
  5. Наконец, отфильтрованные объекты Ticker отправляются в корутину ticker. Корутина ticker форматирует данные и отображает их в таблице.

Эта архитектура конвейера очень полезна, так как позволяет каждому компоненту сосредоточиться на одной задаче. Это делает код более модульным, что означает, что его легче понять, изменить и поддерживать.

Улучшение конвейера на основе корутин

Теперь, когда у нас есть работающий базовый конвейер, пришло время сделать его более гибким. В программировании гибкость имеет решающее значение, так как позволяет нашему коду адаптироваться к различным требованиям. Мы добьемся этого, изменив программу coticker.py так, чтобы она поддерживала различные варианты фильтрации и форматирования.

  1. Сначала откройте файл coticker.py в текстовом редакторе кода. Редактор кода - это место, где вы будете вносить все необходимые изменения в программу. Он предоставляет удобную среду для просмотра, редактирования и сохранения кода.

  2. Далее добавим новую корутину (сопрограмму), которая фильтрует данные по названию акции. Корутина - это особый тип функции, которая может приостанавливать и возобновлять свое выполнение. Это позволяет нам создать конвейер, в котором данные могут проходить через разные этапы обработки. Вот код новой корутины:

@consumer
def filter_by_name(name, target):
    while True:
        record = yield
        if record.name == name:
            target.send(record)

В этом коде корутина filter_by_name принимает название акции и целевую корутину в качестве параметров. Она непрерывно ожидает записи с использованием ключевого слова yield. Когда запись приходит, она проверяет, совпадает ли название записи с указанным названием. Если совпадает, она отправляет запись в целевую корутину.

  1. Теперь добавим еще одну корутину, которая фильтрует данные на основе пороговых значений цены. Эта корутина поможет нам выбрать акции в определенном диапазоне цен. Вот код:
@consumer
def price_threshold(min_price, max_price, target):
    while True:
        record = yield
        if min_price <= record.price <= max_price:
            target.send(record)

Подобно предыдущей корутине, корутина price_threshold ожидает запись. Затем она проверяет, находится ли цена записи в указанном минимальном и максимальном диапазоне цен. Если находится, она отправляет запись в целевую корутину.

  1. После добавления новых корутин нам нужно обновить основную программу, чтобы продемонстрировать эти дополнительные фильтры. Основная программа - это точка входа в наше приложение, где мы настраиваем конвейеры обработки и запускаем поток данных. Вот обновленный код:
if __name__ == '__main__':
    import sys

    ## Define the field names to display
    fields = ['name', 'price', 'change', 'high', 'low']

    ## Create the processing pipeline with multiple outputs

    ## Pipeline 1: Show all negative changes (same as before)
    print("Stocks with negative changes:")
    t1 = ticker('text', fields)
    neg_filter = negchange(t1)
    tick_creator1 = create_ticker(neg_filter)
    csv_parser1 = to_csv(tick_creator1)

    ## Start following the file with the first pipeline
    import threading
    threading.Thread(target=follow, args=('stocklog.csv', csv_parser1), daemon=True).start()

    ## Wait a moment to see some results
    import time
    time.sleep(5)

    ## Pipeline 2: Filter by name (AAPL)
    print("\nApple stock updates:")
    t2 = ticker('text', fields)
    name_filter = filter_by_name('AAPL', t2)
    tick_creator2 = create_ticker(name_filter)
    csv_parser2 = to_csv(tick_creator2)

    ## Follow the file with the second pipeline
    threading.Thread(target=follow, args=('stocklog.csv', csv_parser2), daemon=True).start()

    ## Wait a moment to see some results
    time.sleep(5)

    ## Pipeline 3: Filter by price range
    print("\nStocks priced between 50 and 75:")
    t3 = ticker('text', fields)
    price_filter = price_threshold(50, 75, t3)
    tick_creator3 = create_ticker(price_filter)
    csv_parser3 = to_csv(tick_creator3)

    ## Follow with the third pipeline
    follow('stocklog.csv', csv_parser3)

В этом обновленном коде мы создаем три разных конвейера обработки. Первый конвейер показывает акции с отрицательными изменениями, второй конвейер фильтрует акции по названию 'AAPL', а третий конвейер фильтрует акции на основе диапазона цен от 50 до 75. Мы используем потоки (threads), чтобы запустить первые два конвейера параллельно, что позволяет нам более эффективно обрабатывать данные.

  1. После внесения всех изменений сохраните файл. Сохранение файла гарантирует, что все ваши изменения будут сохранены. Затем запустите обновленную программу, используя следующие команды в терминале:
cd /home/labex/project
python3 coticker.py

Команда cd изменяет текущую директорию на директорию проекта, а команда python3 coticker.py запускает Python - программу.

  1. После запуска программы вы должны увидеть три разных вывода:
    • Сначала вы увидите акции с отрицательными изменениями.
    • Затем вы увидите все обновления по акциям AAPL.
    • Наконец, вы увидите все акции с ценой от 50 до 75.

Понимание усовершенствованного конвейера

Усовершенствованная программа демонстрирует несколько важных концепций:

  1. Множественные конвейеры: Мы можем создавать несколько конвейеров обработки из одного и того же источника данных. Это позволяет нам одновременно выполнять различные типы анализа над одними и теми же данными.
  2. Специализированные фильтры: Мы можем создавать разные корутины для конкретных задач фильтрации. Эти фильтры помогают нам выбирать только те данные, которые соответствуют нашим конкретным критериям.
  3. Параллельная обработка: Используя потоки (threads), мы можем запускать несколько конвейеров параллельно. Это повышает эффективность нашей программы, позволяя ей обрабатывать данные параллельно.
  4. Композиция конвейеров: Корутины могут быть объединены различными способами, чтобы достичь разных целей обработки данных. Это дает нам гибкость в настройке наших конвейеров обработки данных в соответствии с нашими потребностями.

Этот подход предоставляет гибкий и модульный способ обработки потоковых данных. Он позволяет вам добавлять или изменять этапы обработки без изменения общей архитектуры программы.

Резюме

В этом практическом занятии (лабораторной работе) вы научились использовать корутины (сопрограммы) для создания конвейеров обработки данных на Python. Ключевые концепции включают понимание основ корутин, таких как принцип их работы, необходимость инициализации (priming) и использование декораторов для инициализации. Вы также изучили поток данных, передавая данные через конвейер с помощью метода send(), что отличается от "пулл - модели" генераторов.

Кроме того, вы создали специализированные корутины для таких задач, как разбор данных в формате CSV, фильтрация записей и форматирование вывода. Вы научились составлять конвейеры, соединяя несколько корутин, и реализовали операции фильтрации и преобразования. Корутины предоставляют мощный подход для обработки потоковых данных, позволяя четко разделить задачи и легко модифицировать отдельные этапы обработки.