简介
在本次实验中,你将学习如何使用协程(coroutine)来构建数据处理管道。协程是 Python 中一项强大的特性,支持协作式多任务处理,允许函数在执行过程中暂停,并在后续恢复执行。
本次实验的目标是理解 Python 中协程的工作原理,基于协程实现数据处理管道,并通过多个协程阶段对数据进行转换。你将创建两个文件:cofollow.py(一个基于协程的文件跟踪器)和 coticker.py(一个使用协程的股票行情应用程序)。假设之前练习中的 stocksim.py 程序仍在后台运行,在日志文件中生成股票数据。
通过文件跟踪器理解协程
让我们先了解一下什么是协程(coroutine),以及它们在 Python 中是如何工作的。协程是生成器函数的一种特殊形式。在 Python 中,函数通常每次被调用时都会从头开始执行。但协程不同,它们既可以消费数据,也可以产生数据,并且能够暂停和恢复执行。这意味着协程可以在某个特定点暂停操作,之后再从暂停处继续执行。
创建一个基本的协程文件跟踪器
在这一步中,我们将创建一个文件跟踪器,它使用协程来监控文件的新内容并进行处理。这类似于 Unix 的 tail -f 命令,该命令会持续显示文件的末尾,并在添加新行时进行更新。
打开代码编辑器,在
/home/labex/project目录下创建一个名为cofollow.py的新文件。我们将在这里编写 Python 代码,使用协程来实现文件跟踪器。将以下代码复制到文件中:
## cofollow.py
import os
import time
## 数据源
def follow(filename, target):
with open(filename, 'r') as f:
f.seek(0, os.SEEK_END) ## 移动到文件末尾
while True:
line = f.readline()
if line != '':
target.send(line) ## 将行发送到目标协程
else:
time.sleep(0.1) ## 如果没有新内容,短暂休眠
## 协程函数的装饰器
from functools import wraps
def consumer(func):
@wraps(func)
def start(*args, **kwargs):
f = func(*args, **kwargs)
f.send(None) ## 预激活协程(必要的第一步)
return f
return start
## 示例协程
@consumer
def printer():
while True:
item = yield ## 接收发送给我的项
print(item)
## 示例用法
if __name__ == '__main__':
follow('stocklog.csv', printer())
让我们来理解这段代码的关键部分:
follow(filename, target):这个函数负责打开文件。它首先使用f.seek(0, os.SEEK_END)将文件指针移动到文件末尾。然后,它进入一个无限循环,在该循环中持续尝试从文件中读取新行。如果找到新行,它会使用send方法将该行发送到目标协程。如果没有新内容,它会使用time.sleep(0.1)暂停一小段时间(0.1 秒),然后再次检查。@consumer装饰器:在 Python 中,协程在开始接收数据之前需要被“预激活”。这个装饰器负责处理这个问题。它会自动向协程发送一个初始的None值,这是让协程准备好接收实际数据的必要第一步。printer()协程:这是一个简单的协程。它有一个无限循环,在循环中使用yield关键字来接收发送给它的项。一旦接收到项,它就会将其打印出来。
保存文件并从终端运行它:
cd /home/labex/project
python3 cofollow.py
- 你应该会看到脚本打印出股票日志文件的内容,并且随着新行添加到文件中,它会继续打印新行。按
Ctrl+C停止程序。
这里的关键概念是,数据通过 send 方法从 follow 函数流入 printer 协程。这种“推送”数据的方式与生成器相反,生成器是通过迭代“拉取”数据。在生成器中,你通常使用 for 循环来迭代它产生的值。但在这个协程示例中,数据是从代码的一部分主动发送到另一部分的。
创建协程管道组件
在这一步中,我们将创建更多用于处理股票数据的专用协程(coroutine)。协程是一种特殊类型的函数,它可以暂停和恢复执行,这对于构建数据处理管道非常有用。我们创建的每个协程都将在整个处理管道中执行特定的任务。
首先,你需要创建一个新文件。导航到
/home/labex/project目录并创建一个名为coticker.py的文件。这个文件将包含我们基于协程的数据处理的所有代码。现在,让我们开始在
coticker.py文件中编写代码。我们首先导入必要的模块并定义基本结构。模块是预先编写的代码库,提供有用的函数和类。以下代码实现了这一点:
## coticker.py
from structure import Structure
class Ticker(Structure):
name = String()
price = Float()
date = String()
time = String()
change = Float()
open = Float()
high = Float()
low = Float()
volume = Integer()
from cofollow import consumer, follow
from tableformat import create_formatter
import csv
- 当你查看上面的代码时,会注意到有与
String()、Float()和Integer()相关的错误。这些是我们需要导入的类。因此,我们将在文件顶部添加所需的导入语句。这样,Python 就知道在哪里可以找到这些类。以下是更新后的代码:
## coticker.py
from structure import Structure, String, Float, Integer
class Ticker(Structure):
name = String()
price = Float()
date = String()
time = String()
change = Float()
open = Float()
high = Float()
low = Float()
volume = Integer()
from cofollow import consumer, follow
from tableformat import create_formatter
import csv
- 接下来,我们将添加构成数据处理管道的协程组件。每个协程在管道中都有特定的任务。以下是添加这些协程的代码:
@consumer
def to_csv(target):
def producer():
while True:
line = yield
reader = csv.reader(producer())
while True:
line = yield
target.send(next(reader))
@consumer
def create_ticker(target):
while True:
row = yield
target.send(Ticker.from_row(row))
@consumer
def negchange(target):
while True:
record = yield
if record.change < 0:
target.send(record)
@consumer
def ticker(fmt, fields):
formatter = create_formatter(fmt)
formatter.headings(fields)
while True:
rec = yield
row = [getattr(rec, name) for name in fields]
formatter.row(row)
让我们来理解每个协程的作用:
to_csv:它的任务是将原始文本行转换为解析后的 CSV 行。这很重要,因为我们的数据最初是文本格式,我们需要将其拆分为结构化的 CSV 数据。create_ticker:这个协程接收 CSV 行并从中创建Ticker对象。Ticker对象以更有条理的方式表示股票数据。negchange:它对Ticker对象进行过滤。它只传递价格变化为负的股票。这有助于我们关注那些正在贬值的股票。ticker:这个协程对行情数据进行格式化并显示。它使用格式化器将数据以美观、易读的表格形式呈现。
最后,我们需要添加将所有这些组件连接在一起的主程序代码。这段代码将设置数据在管道中的流动。以下是代码:
if __name__ == '__main__':
import sys
## 定义要显示的字段名称
fields = ['name', 'price', 'change']
## 创建处理管道
t = ticker('text', fields)
neg_filter = negchange(t)
tick_creator = create_ticker(neg_filter)
csv_parser = to_csv(tick_creator)
## 将管道连接到数据源
follow('stocklog.csv', csv_parser)
- 编写完所有代码后,保存
coticker.py文件。然后,打开终端并运行以下命令。cd命令将目录更改为我们文件所在的位置,python3命令运行我们的 Python 脚本:
cd /home/labex/project
python3 coticker.py
- 如果一切顺利,你应该会在终端中看到一个格式化的表格。这个表格显示了价格变化为负的股票。输出将类似于以下内容:
name price change
---------- ---------- ----------
MSFT 72.50 -0.25
AA 35.25 -0.15
IBM 50.10 -0.15
GOOG 100.02 -0.01
AAPL 102.50 -0.06
请记住,表格中的实际值可能会根据生成的股票数据而有所不同。
理解管道流程
这个程序最重要的部分是数据如何通过协程流动。让我们逐步分解:
follow函数首先从stocklog.csv文件中读取行。这是我们的数据源。- 读取的每一行随后被发送到
csv_parser协程。csv_parser接收原始文本行并将其解析为 CSV 字段。 - 解析后的 CSV 数据随后被发送到
tick_creator协程。这个协程从 CSV 行创建Ticker对象。 Ticker对象随后被发送到neg_filter协程。这个协程检查每个Ticker对象。如果股票的价格变化为负,它就传递该对象;否则,它将其丢弃。- 最后,经过过滤的
Ticker对象被发送到ticker协程。ticker协程对数据进行格式化并以表格形式显示。
这种管道架构非常有用,因为它允许每个组件专注于单一任务。这使得代码更具模块化,意味着它更易于理解、修改和维护。
增强协程管道
既然我们已经有了一个基本的管道并使其运行起来,现在是时候让它变得更加灵活了。在编程中,灵活性至关重要,因为它能让我们的代码适应不同的需求。我们将通过修改 coticker.py 程序来支持各种过滤和格式化选项,从而实现这一目标。
首先,在你的代码编辑器中打开
coticker.py文件。代码编辑器是你对程序进行所有必要修改的地方。它为你查看、编辑和保存代码提供了一个便捷的环境。接下来,我们将添加一个新的协程,用于按股票名称过滤数据。协程是一种特殊类型的函数,它可以暂停和恢复执行。这使我们能够创建一个管道,让数据可以流经不同的处理步骤。以下是新协程的代码:
@consumer
def filter_by_name(name, target):
while True:
record = yield
if record.name == name:
target.send(record)
在这段代码中,filter_by_name 协程接受一个股票名称和一个目标协程作为参数。它使用 yield 关键字持续等待一条记录。当一条记录到达时,它会检查该记录的名称是否与指定的名称匹配。如果匹配,它就将该记录发送到目标协程。
- 现在,让我们添加另一个基于价格阈值进行过滤的协程。这个协程将帮助我们选择特定价格范围内的股票。以下是代码:
@consumer
def price_threshold(min_price, max_price, target):
while True:
record = yield
if min_price <= record.price <= max_price:
target.send(record)
与前一个协程类似,price_threshold 协程等待一条记录。然后,它检查该记录的价格是否在指定的最低和最高价格范围内。如果是,它就将该记录发送到目标协程。
- 添加新的协程后,我们需要更新主程序以演示这些额外的过滤器。主程序是我们应用程序的入口点,在这里我们设置处理管道并启动数据流。以下是更新后的代码:
if __name__ == '__main__':
import sys
## 定义要显示的字段名称
fields = ['name', 'price', 'change', 'high', 'low']
## 创建具有多个输出的处理管道
## 管道 1:显示所有价格下跌的股票(与之前相同)
print("Stocks with negative changes:")
t1 = ticker('text', fields)
neg_filter = negchange(t1)
tick_creator1 = create_ticker(neg_filter)
csv_parser1 = to_csv(tick_creator1)
## 启动第一个管道来跟踪文件
import threading
threading.Thread(target=follow, args=('stocklog.csv', csv_parser1), daemon=True).start()
## 等待片刻以查看一些结果
import time
time.sleep(5)
## 管道 2:按名称过滤(AAPL)
print("\nApple stock updates:")
t2 = ticker('text', fields)
name_filter = filter_by_name('AAPL', t2)
tick_creator2 = create_ticker(name_filter)
csv_parser2 = to_csv(tick_creator2)
## 用第二个管道跟踪文件
threading.Thread(target=follow, args=('stocklog.csv', csv_parser2), daemon=True).start()
## 等待片刻以查看一些结果
time.sleep(5)
## 管道 3:按价格范围过滤
print("\nStocks priced between 50 and 75:")
t3 = ticker('text', fields)
price_filter = price_threshold(50, 75, t3)
tick_creator3 = create_ticker(price_filter)
csv_parser3 = to_csv(tick_creator3)
## 用第三个管道跟踪文件
follow('stocklog.csv', csv_parser3)
在这个更新后的代码中,我们创建了三个不同的处理管道。第一个管道显示价格下跌的股票,第二个管道按名称 'AAPL' 过滤股票,第三个管道根据 50 到 75 之间的价格范围过滤股票。我们使用线程并发运行前两个管道,这使我们能够更高效地处理数据。
- 完成所有修改后,保存文件。保存文件可确保你所有的修改都被保留。然后,在终端中使用以下命令运行更新后的程序:
cd /home/labex/project
python3 coticker.py
cd 命令将当前目录更改为项目目录,python3 coticker.py 命令运行 Python 程序。
- 运行程序后,你应该会看到三种不同的输出:
- 首先,你会看到价格下跌的股票。
- 然后,你会看到所有 AAPL 股票的更新信息。
- 最后,你会看到所有价格在 50 到 75 之间的股票。
理解增强后的管道
增强后的程序展示了几个重要的概念:
- 多个管道:我们可以从同一个数据源创建多个处理管道。这使我们能够同时对同一数据进行不同类型的分析。
- 专用过滤器:我们可以为特定的过滤任务创建不同的协程。这些过滤器帮助我们只选择符合特定标准的数据。
- 并发处理:使用线程,我们可以并发运行多个管道。这通过允许程序并行处理数据来提高程序的效率。
- 管道组合:协程可以以不同的方式组合,以实现不同的数据处理目标。这使我们能够根据自己的需求自定义数据处理管道。
这种方法为处理流式数据提供了一种灵活且模块化的方式。它允许你在不改变程序整体架构的情况下添加或修改处理步骤。
总结
在这个实验中,你学习了如何在 Python 中使用协程(coroutine)来构建数据处理管道。关键概念包括理解协程的基础知识,例如它们如何运行、预激活(priming)的必要性,以及使用装饰器(decorator)进行初始化。你还探索了数据流,通过 send() 方法将数据推送通过管道,这与生成器(generator)的“拉取”模型不同。
此外,你创建了用于特定任务的协程,如解析 CSV 数据、过滤记录和格式化输出。你学会了通过连接多个协程来组合管道,并实现了过滤和转换操作。协程为流式数据处理提供了一种强大的方法,能够清晰地分离关注点,并便于对各个阶段进行修改。