Улучшение конвейера на основе корутин
Теперь, когда у нас есть работающий базовый конвейер, пришло время сделать его более гибким. В программировании гибкость имеет решающее значение, так как позволяет нашему коду адаптироваться к различным требованиям. Мы добьемся этого, изменив программу coticker.py так, чтобы она поддерживала различные варианты фильтрации и форматирования.
-
Сначала откройте файл coticker.py в текстовом редакторе кода. Редактор кода - это место, где вы будете вносить все необходимые изменения в программу. Он предоставляет удобную среду для просмотра, редактирования и сохранения кода.
-
Далее добавим новую корутину (сопрограмму), которая фильтрует данные по названию акции. Корутина - это особый тип функции, которая может приостанавливать и возобновлять свое выполнение. Это позволяет нам создать конвейер, в котором данные могут проходить через разные этапы обработки. Вот код новой корутины:
@consumer
def filter_by_name(name, target):
while True:
record = yield
if record.name == name:
target.send(record)
В этом коде корутина filter_by_name принимает название акции и целевую корутину в качестве параметров. Она непрерывно ожидает записи с использованием ключевого слова yield. Когда запись приходит, она проверяет, совпадает ли название записи с указанным названием. Если совпадает, она отправляет запись в целевую корутину.
- Теперь добавим еще одну корутину, которая фильтрует данные на основе пороговых значений цены. Эта корутина поможет нам выбрать акции в определенном диапазоне цен. Вот код:
@consumer
def price_threshold(min_price, max_price, target):
while True:
record = yield
if min_price <= record.price <= max_price:
target.send(record)
Подобно предыдущей корутине, корутина price_threshold ожидает запись. Затем она проверяет, находится ли цена записи в указанном минимальном и максимальном диапазоне цен. Если находится, она отправляет запись в целевую корутину.
- После добавления новых корутин нам нужно обновить основную программу, чтобы продемонстрировать эти дополнительные фильтры. Основная программа - это точка входа в наше приложение, где мы настраиваем конвейеры обработки и запускаем поток данных. Вот обновленный код:
if __name__ == '__main__':
import sys
## Define the field names to display
fields = ['name', 'price', 'change', 'high', 'low']
## Create the processing pipeline with multiple outputs
## Pipeline 1: Show all negative changes (same as before)
print("Stocks with negative changes:")
t1 = ticker('text', fields)
neg_filter = negchange(t1)
tick_creator1 = create_ticker(neg_filter)
csv_parser1 = to_csv(tick_creator1)
## Start following the file with the first pipeline
import threading
threading.Thread(target=follow, args=('stocklog.csv', csv_parser1), daemon=True).start()
## Wait a moment to see some results
import time
time.sleep(5)
## Pipeline 2: Filter by name (AAPL)
print("\nApple stock updates:")
t2 = ticker('text', fields)
name_filter = filter_by_name('AAPL', t2)
tick_creator2 = create_ticker(name_filter)
csv_parser2 = to_csv(tick_creator2)
## Follow the file with the second pipeline
threading.Thread(target=follow, args=('stocklog.csv', csv_parser2), daemon=True).start()
## Wait a moment to see some results
time.sleep(5)
## Pipeline 3: Filter by price range
print("\nStocks priced between 50 and 75:")
t3 = ticker('text', fields)
price_filter = price_threshold(50, 75, t3)
tick_creator3 = create_ticker(price_filter)
csv_parser3 = to_csv(tick_creator3)
## Follow with the third pipeline
follow('stocklog.csv', csv_parser3)
В этом обновленном коде мы создаем три разных конвейера обработки. Первый конвейер показывает акции с отрицательными изменениями, второй конвейер фильтрует акции по названию 'AAPL', а третий конвейер фильтрует акции на основе диапазона цен от 50 до 75. Мы используем потоки (threads), чтобы запустить первые два конвейера параллельно, что позволяет нам более эффективно обрабатывать данные.
- После внесения всех изменений сохраните файл. Сохранение файла гарантирует, что все ваши изменения будут сохранены. Затем запустите обновленную программу, используя следующие команды в терминале:
cd /home/labex/project
python3 coticker.py
Команда cd изменяет текущую директорию на директорию проекта, а команда python3 coticker.py запускает Python - программу.
- После запуска программы вы должны увидеть три разных вывода:
- Сначала вы увидите акции с отрицательными изменениями.
- Затем вы увидите все обновления по акциям AAPL.
- Наконец, вы увидите все акции с ценой от 50 до 75.
Понимание усовершенствованного конвейера
Усовершенствованная программа демонстрирует несколько важных концепций:
- Множественные конвейеры: Мы можем создавать несколько конвейеров обработки из одного и того же источника данных. Это позволяет нам одновременно выполнять различные типы анализа над одними и теми же данными.
- Специализированные фильтры: Мы можем создавать разные корутины для конкретных задач фильтрации. Эти фильтры помогают нам выбирать только те данные, которые соответствуют нашим конкретным критериям.
- Параллельная обработка: Используя потоки (threads), мы можем запускать несколько конвейеров параллельно. Это повышает эффективность нашей программы, позволяя ей обрабатывать данные параллельно.
- Композиция конвейеров: Корутины могут быть объединены различными способами, чтобы достичь разных целей обработки данных. Это дает нам гибкость в настройке наших конвейеров обработки данных в соответствии с нашими потребностями.
Этот подход предоставляет гибкий и модульный способ обработки потоковых данных. Он позволяет вам добавлять или изменять этапы обработки без изменения общей архитектуры программы.