Продьюсеры, потребители и конвейеры

PythonPythonBeginner
Практиковаться сейчас

This tutorial is from open-source community. Access the source code

💡 Этот учебник переведен с английского с помощью ИИ. Чтобы просмотреть оригинал, вы можете перейти на английский оригинал

Введение

Генераторы - это полезный инструмент для решения различных типов задач производителя/потребителя и построения конвейеров потока данных. В этом разделе обсуждается данная тема.

Задачи производителя - потребителя

Генераторы тесно связаны с различными формами задач производителя - потребителя.

## Производитель
def follow(f):
  ...
    while True:
      ...
        yield line        ## Генерирует значение в `line` ниже
      ...

## Потребитель
for line in follow(f):    ## Потребляет значение из `yield` выше
  ...

yield генерирует значения, которые for потребляет.

Конвейеры генераторов

Вы можете использовать этот аспект генераторов для настройки конвейеров обработки (похожих на Unix-пайпы).

производительобработкаобработкапотребитель

Конвейеры обработки имеют начального производителя данных, некоторый набор промежуточных этапов обработки и конечного потребителя.

производительобработкаобработкапотребитель

def producer():
 ...
    yield item
 ...

Производитель обычно является генератором. Хотя это может быть также список или другая последовательность. yield подает данные в конвейер.

proизводительобработкаобработкапотребитель

def consumer(s):
    for item in s:
     ...

Потребитель - это цикл for. Он получает элементы и делает с ними что-то.

proизводительобработкаобработкапотребитель

def processing(s):
    for item in s:
     ...
        yield newitem
     ...

Промежуточные этапы обработки одновременно потребляют и производят элементы. Они могут изменять поток данных. Они также могут фильтровать (игнорировать элементы).

proизводительобработкаобработкапотребитель

def producer():
 ...
    yield item          ## генерирует элемент, который получает `processing`
 ...

def processing(s):
    for item in s:      ## Приходит от `producer`
     ...
        yield newitem   ## генерирует новый элемент
     ...

def consumer(s):
    for item in s:      ## Приходит от `processing`
     ...

Код для настройки конвейера

a = producer()
b = processing(a)
c = consumer(b)

Вы заметите, что данные поступают пошагово через разные функции.

Для этого упражнения программа stocksim.py должна по-прежнему работать в фоновом режиме. Вы будете использовать функцию follow(), которую вы написали в предыдущем упражнении.

Упражнение 6.8: Настройка простого конвейера

Посмотрим, как работает идея конвейеризации. Напишите следующую функцию:

>>> def filematch(lines, substr):
        for line in lines:
            if substr in line:
                yield line

>>>

Эта функция практически идентична первому примеру генератора в предыдущем упражнении, за исключением того, что она不再открывает файл - она просто работает с последовательностью строк, переданной ей в качестве аргумента. Теперь, попробуйте это:

>>> from follow import follow
>>> lines = follow('stocklog.csv')
>>> goog = filematch(lines, 'GOOG')
>>> for line in goog:
        print(line)

... подождите вывод...

Может потребоваться некоторое время, чтобы появился вывод, но в конечном итоге вы должны увидеть несколько строк, содержащих данные для GOOG.

Примечание: Эти упражнения должны выполняться одновременно в двух разных терминалах.

Упражнение 6.9: Настройка более сложного конвейера

Продолжайте развивать идею конвейеризации, выполняя больше действий.

>>> from follow import follow
>>> import csv
>>> lines = follow('stocklog.csv')
>>> rows = csv.reader(lines)
>>> for row in rows:
        print(row)

['GOOG', '1502.08', '2023-10-01', '09:37.19', '1.83', '1500.25', '1502.08', '1500.25', '731']
['AAPL', '252.33', '2023-10-01', '09:37.19', '1.83', '250.50', '252.33', '250.50', '681']
['GOOG', '1502.09', '2023-10-01', '09:37.21', '1.84', '1500.25', '1502.09', '1500.25', '734']
['AAPL', '252.34', '2023-10-01', '09:37.21', '1.84', '250.50', '252.34', '250.50', '684']
['GOOG', '1502.10', '2023-10-01', '09:37.23', '1.85', '1500.25', '1502.10', '1500.25', '738']
['AAPL', '252.35', '2023-10-01', '09:37.23', '1.85', '250.50', '252.35', '250.50', '688']
...

Ну, это интересно. Здесь вы можете увидеть, что вывод функции follow() был направлен в функцию csv.reader(), и теперь мы получаем последовательность разделенных строк.

Упражнение 6.10: Создание дополнительных компонентов конвейера

Давайте расширим идею конвейера. В отдельном файле ticker.py начните с создания функции, которая читает CSV-файл, как вы это делали выше:

## ticker.py

from follow import follow
import csv

def parse_stock_data(lines):
    rows = csv.reader(lines)
    return rows

if __name__ == '__main__':
    lines = follow('stocklog.csv')
    rows = parse_stock_data(lines)
    for row in rows:
        print(row)

Напишите новую функцию, которая выбирает определенные столбцы:

## ticker.py

...
def select_columns(rows, indices):
for row in rows:
yield [row[index] for index in indices]
...
def parse_stock_data(lines):
rows = csv.reader(lines)
rows = select_columns(rows, [0, 1, 4])
return rows

Запустите программу снова. Вы должны увидеть вывод, усеченный следующим образом:

['GOOG', '1503.06', '2.81']
['AAPL', '253.31', '2.81']
['GOOG', '1503.07', '2.82']
['AAPL', '253.32', '2.82']
['GOOG', '1503.08', '2.83']

...

Напишите генераторные функции, которые преобразуют типы данных и создают словари. Например:

## ticker.py
...

def convert_types(rows, types):
    for row in rows:
        yield [func(val) for func, val in zip(types, row)]

def make_dicts(rows, headers):
    for row in rows:
        yield dict(zip(headers, row))
...
def parse_stock_data(lines):
    rows = csv.reader(lines)
    rows = select_columns(rows, [0, 1, 4])
    rows = convert_types(rows, [str, float, float])
    rows = make_dicts(rows, ['name', 'price', 'change'])
    return rows
...

Запустите программу снова. Теперь вы должны получить поток словарей, подобный следующему:

{'name': 'GOOG', 'price': 1503.4, 'change': 3.15}
{'name': 'AAPL', 'price': 253.65, 'change': 3.15}
{'name': 'GOOG', 'price': 1503.41, 'change': 3.16}
{'name': 'AAPL', 'price': 253.66, 'change': 3.16}
{'name': 'GOOG', 'price': 1503.42, 'change': 3.17}
{'name': 'AAPL', 'price': 253.67, 'change': 3.17}

...

Упражнение 6.11: Фильтрация данных

Напишите функцию, которая фильтрует данные. Например:

## ticker.py
...

def filter_symbols(rows, names):
    for row in rows:
        if row['GOOG'] in names:
            yield row

Используйте это для фильтрации акций только тех, которые находятся в вашем портфеле:

import report
import ticker
import follow
portfolio = report.read_portfolio('portfolio.csv')
rows = ticker.parse_stock_data(follow.follow('stocklog.csv'))
rows = ticker.filter_symbols(rows, portfolio)
for row in rows:
    print(row)

Обсуждение

Некоторые уроки, выученные: можно создавать различные генераторные функции и связывать их вместе, чтобы выполнять обработку, связанную с конвейерами потока данных. Кроме того, можно создавать функции, которые объединяют серию этапов конвейера в вызов одной функции (например, функцию parse_stock_data()).

Резюме

Поздравляем! Вы завершили лабораторную работу по продьюсерам, потребителям и конвейерам. Вы можете практиковаться в других лабораторных работах в LabEx, чтобы улучшить свои навыки.