基于协程的数据处理

PythonPythonBeginner
立即练习

This tutorial is from open-source community. Access the source code

💡 本教程由 AI 辅助翻译自英文原版。如需查看原文,您可以 切换至英文原版

简介

在本次实验中,你将学习如何使用协程(coroutine)来构建数据处理管道。协程是 Python 中一项强大的特性,支持协作式多任务处理,允许函数在执行过程中暂停,并在后续恢复执行。

本次实验的目标是理解 Python 中协程的工作原理,基于协程实现数据处理管道,并通过多个协程阶段对数据进行转换。你将创建两个文件:cofollow.py(一个基于协程的文件跟踪器)和 coticker.py(一个使用协程的股票行情应用程序)。假设之前练习中的 stocksim.py 程序仍在后台运行,在日志文件中生成股票数据。


Skills Graph

%%%%{init: {'theme':'neutral'}}%%%% flowchart RL python(("Python")) -.-> python/FunctionsGroup(["Functions"]) python(("Python")) -.-> python/ObjectOrientedProgrammingGroup(["Object-Oriented Programming"]) python(("Python")) -.-> python/AdvancedTopicsGroup(["Advanced Topics"]) python(("Python")) -.-> python/PythonStandardLibraryGroup(["Python Standard Library"]) python/FunctionsGroup -.-> python/function_definition("Function Definition") python/ObjectOrientedProgrammingGroup -.-> python/classes_objects("Classes and Objects") python/AdvancedTopicsGroup -.-> python/generators("Generators") python/AdvancedTopicsGroup -.-> python/threading_multiprocessing("Multithreading and Multiprocessing") python/PythonStandardLibraryGroup -.-> python/data_collections("Data Collections") subgraph Lab Skills python/function_definition -.-> lab-132524{{"基于协程的数据处理"}} python/classes_objects -.-> lab-132524{{"基于协程的数据处理"}} python/generators -.-> lab-132524{{"基于协程的数据处理"}} python/threading_multiprocessing -.-> lab-132524{{"基于协程的数据处理"}} python/data_collections -.-> lab-132524{{"基于协程的数据处理"}} end

通过文件跟踪器理解协程

让我们先了解一下什么是协程(coroutine),以及它们在 Python 中是如何工作的。协程是生成器函数的一种特殊形式。在 Python 中,函数通常每次被调用时都会从头开始执行。但协程不同,它们既可以消费数据,也可以产生数据,并且能够暂停和恢复执行。这意味着协程可以在某个特定点暂停操作,之后再从暂停处继续执行。

创建一个基本的协程文件跟踪器

在这一步中,我们将创建一个文件跟踪器,它使用协程来监控文件的新内容并进行处理。这类似于 Unix 的 tail -f 命令,该命令会持续显示文件的末尾,并在添加新行时进行更新。

  1. 打开代码编辑器,在 /home/labex/project 目录下创建一个名为 cofollow.py 的新文件。我们将在这里编写 Python 代码,使用协程来实现文件跟踪器。

  2. 将以下代码复制到文件中:

## cofollow.py
import os
import time

## 数据源
def follow(filename, target):
    with open(filename, 'r') as f:
        f.seek(0, os.SEEK_END)  ## 移动到文件末尾
        while True:
            line = f.readline()
            if line != '':
                target.send(line)  ## 将行发送到目标协程
            else:
                time.sleep(0.1)  ## 如果没有新内容,短暂休眠

## 协程函数的装饰器
from functools import wraps

def consumer(func):
    @wraps(func)
    def start(*args, **kwargs):
        f = func(*args, **kwargs)
        f.send(None)  ## 预激活协程(必要的第一步)
        return f
    return start

## 示例协程
@consumer
def printer():
    while True:
        item = yield     ## 接收发送给我的项
        print(item)

## 示例用法
if __name__ == '__main__':
    follow('stocklog.csv', printer())
  1. 让我们来理解这段代码的关键部分:

    • follow(filename, target):这个函数负责打开文件。它首先使用 f.seek(0, os.SEEK_END) 将文件指针移动到文件末尾。然后,它进入一个无限循环,在该循环中持续尝试从文件中读取新行。如果找到新行,它会使用 send 方法将该行发送到目标协程。如果没有新内容,它会使用 time.sleep(0.1) 暂停一小段时间(0.1 秒),然后再次检查。
    • @consumer 装饰器:在 Python 中,协程在开始接收数据之前需要被“预激活”。这个装饰器负责处理这个问题。它会自动向协程发送一个初始的 None 值,这是让协程准备好接收实际数据的必要第一步。
    • printer() 协程:这是一个简单的协程。它有一个无限循环,在循环中使用 yield 关键字来接收发送给它的项。一旦接收到项,它就会将其打印出来。
  2. 保存文件并从终端运行它:

cd /home/labex/project
python3 cofollow.py
  1. 你应该会看到脚本打印出股票日志文件的内容,并且随着新行添加到文件中,它会继续打印新行。按 Ctrl+C 停止程序。

这里的关键概念是,数据通过 send 方法从 follow 函数流入 printer 协程。这种“推送”数据的方式与生成器相反,生成器是通过迭代“拉取”数据。在生成器中,你通常使用 for 循环来迭代它产生的值。但在这个协程示例中,数据是从代码的一部分主动发送到另一部分的。

✨ 查看解决方案并练习

创建协程管道组件

在这一步中,我们将创建更多用于处理股票数据的专用协程(coroutine)。协程是一种特殊类型的函数,它可以暂停和恢复执行,这对于构建数据处理管道非常有用。我们创建的每个协程都将在整个处理管道中执行特定的任务。

  1. 首先,你需要创建一个新文件。导航到 /home/labex/project 目录并创建一个名为 coticker.py 的文件。这个文件将包含我们基于协程的数据处理的所有代码。

  2. 现在,让我们开始在 coticker.py 文件中编写代码。我们首先导入必要的模块并定义基本结构。模块是预先编写的代码库,提供有用的函数和类。以下代码实现了这一点:

## coticker.py
from structure import Structure

class Ticker(Structure):
    name = String()
    price = Float()
    date = String()
    time = String()
    change = Float()
    open = Float()
    high = Float()
    low = Float()
    volume = Integer()

from cofollow import consumer, follow
from tableformat import create_formatter
import csv
  1. 当你查看上面的代码时,会注意到有与 String()Float()Integer() 相关的错误。这些是我们需要导入的类。因此,我们将在文件顶部添加所需的导入语句。这样,Python 就知道在哪里可以找到这些类。以下是更新后的代码:
## coticker.py
from structure import Structure, String, Float, Integer

class Ticker(Structure):
    name = String()
    price = Float()
    date = String()
    time = String()
    change = Float()
    open = Float()
    high = Float()
    low = Float()
    volume = Integer()

from cofollow import consumer, follow
from tableformat import create_formatter
import csv
  1. 接下来,我们将添加构成数据处理管道的协程组件。每个协程在管道中都有特定的任务。以下是添加这些协程的代码:
@consumer
def to_csv(target):
    def producer():
        while True:
            line = yield

    reader = csv.reader(producer())
    while True:
        line = yield
        target.send(next(reader))

@consumer
def create_ticker(target):
    while True:
        row = yield
        target.send(Ticker.from_row(row))

@consumer
def negchange(target):
    while True:
        record = yield
        if record.change < 0:
            target.send(record)

@consumer
def ticker(fmt, fields):
    formatter = create_formatter(fmt)
    formatter.headings(fields)
    while True:
        rec = yield
        row = [getattr(rec, name) for name in fields]
        formatter.row(row)
  1. 让我们来理解每个协程的作用:

    • to_csv:它的任务是将原始文本行转换为解析后的 CSV 行。这很重要,因为我们的数据最初是文本格式,我们需要将其拆分为结构化的 CSV 数据。
    • create_ticker:这个协程接收 CSV 行并从中创建 Ticker 对象。Ticker 对象以更有条理的方式表示股票数据。
    • negchange:它对 Ticker 对象进行过滤。它只传递价格变化为负的股票。这有助于我们关注那些正在贬值的股票。
    • ticker:这个协程对行情数据进行格式化并显示。它使用格式化器将数据以美观、易读的表格形式呈现。
  2. 最后,我们需要添加将所有这些组件连接在一起的主程序代码。这段代码将设置数据在管道中的流动。以下是代码:

if __name__ == '__main__':
    import sys

    ## 定义要显示的字段名称
    fields = ['name', 'price', 'change']

    ## 创建处理管道
    t = ticker('text', fields)
    neg_filter = negchange(t)
    tick_creator = create_ticker(neg_filter)
    csv_parser = to_csv(tick_creator)

    ## 将管道连接到数据源
    follow('stocklog.csv', csv_parser)
  1. 编写完所有代码后,保存 coticker.py 文件。然后,打开终端并运行以下命令。cd 命令将目录更改为我们文件所在的位置,python3 命令运行我们的 Python 脚本:
cd /home/labex/project
python3 coticker.py
  1. 如果一切顺利,你应该会在终端中看到一个格式化的表格。这个表格显示了价格变化为负的股票。输出将类似于以下内容:
      name      price     change
---------- ---------- ----------
      MSFT      72.50      -0.25
        AA      35.25      -0.15
       IBM      50.10      -0.15
      GOOG     100.02      -0.01
      AAPL     102.50      -0.06

请记住,表格中的实际值可能会根据生成的股票数据而有所不同。

理解管道流程

这个程序最重要的部分是数据如何通过协程流动。让我们逐步分解:

  1. follow 函数首先从 stocklog.csv 文件中读取行。这是我们的数据源。
  2. 读取的每一行随后被发送到 csv_parser 协程。csv_parser 接收原始文本行并将其解析为 CSV 字段。
  3. 解析后的 CSV 数据随后被发送到 tick_creator 协程。这个协程从 CSV 行创建 Ticker 对象。
  4. Ticker 对象随后被发送到 neg_filter 协程。这个协程检查每个 Ticker 对象。如果股票的价格变化为负,它就传递该对象;否则,它将其丢弃。
  5. 最后,经过过滤的 Ticker 对象被发送到 ticker 协程。ticker 协程对数据进行格式化并以表格形式显示。

这种管道架构非常有用,因为它允许每个组件专注于单一任务。这使得代码更具模块化,意味着它更易于理解、修改和维护。

✨ 查看解决方案并练习

增强协程管道

既然我们已经有了一个基本的管道并使其运行起来,现在是时候让它变得更加灵活了。在编程中,灵活性至关重要,因为它能让我们的代码适应不同的需求。我们将通过修改 coticker.py 程序来支持各种过滤和格式化选项,从而实现这一目标。

  1. 首先,在你的代码编辑器中打开 coticker.py 文件。代码编辑器是你对程序进行所有必要修改的地方。它为你查看、编辑和保存代码提供了一个便捷的环境。

  2. 接下来,我们将添加一个新的协程,用于按股票名称过滤数据。协程是一种特殊类型的函数,它可以暂停和恢复执行。这使我们能够创建一个管道,让数据可以流经不同的处理步骤。以下是新协程的代码:

@consumer
def filter_by_name(name, target):
    while True:
        record = yield
        if record.name == name:
            target.send(record)

在这段代码中,filter_by_name 协程接受一个股票名称和一个目标协程作为参数。它使用 yield 关键字持续等待一条记录。当一条记录到达时,它会检查该记录的名称是否与指定的名称匹配。如果匹配,它就将该记录发送到目标协程。

  1. 现在,让我们添加另一个基于价格阈值进行过滤的协程。这个协程将帮助我们选择特定价格范围内的股票。以下是代码:
@consumer
def price_threshold(min_price, max_price, target):
    while True:
        record = yield
        if min_price <= record.price <= max_price:
            target.send(record)

与前一个协程类似,price_threshold 协程等待一条记录。然后,它检查该记录的价格是否在指定的最低和最高价格范围内。如果是,它就将该记录发送到目标协程。

  1. 添加新的协程后,我们需要更新主程序以演示这些额外的过滤器。主程序是我们应用程序的入口点,在这里我们设置处理管道并启动数据流。以下是更新后的代码:
if __name__ == '__main__':
    import sys

    ## 定义要显示的字段名称
    fields = ['name', 'price', 'change', 'high', 'low']

    ## 创建具有多个输出的处理管道

    ## 管道 1:显示所有价格下跌的股票(与之前相同)
    print("Stocks with negative changes:")
    t1 = ticker('text', fields)
    neg_filter = negchange(t1)
    tick_creator1 = create_ticker(neg_filter)
    csv_parser1 = to_csv(tick_creator1)

    ## 启动第一个管道来跟踪文件
    import threading
    threading.Thread(target=follow, args=('stocklog.csv', csv_parser1), daemon=True).start()

    ## 等待片刻以查看一些结果
    import time
    time.sleep(5)

    ## 管道 2:按名称过滤(AAPL)
    print("\nApple stock updates:")
    t2 = ticker('text', fields)
    name_filter = filter_by_name('AAPL', t2)
    tick_creator2 = create_ticker(name_filter)
    csv_parser2 = to_csv(tick_creator2)

    ## 用第二个管道跟踪文件
    threading.Thread(target=follow, args=('stocklog.csv', csv_parser2), daemon=True).start()

    ## 等待片刻以查看一些结果
    time.sleep(5)

    ## 管道 3:按价格范围过滤
    print("\nStocks priced between 50 and 75:")
    t3 = ticker('text', fields)
    price_filter = price_threshold(50, 75, t3)
    tick_creator3 = create_ticker(price_filter)
    csv_parser3 = to_csv(tick_creator3)

    ## 用第三个管道跟踪文件
    follow('stocklog.csv', csv_parser3)

在这个更新后的代码中,我们创建了三个不同的处理管道。第一个管道显示价格下跌的股票,第二个管道按名称 'AAPL' 过滤股票,第三个管道根据 50 到 75 之间的价格范围过滤股票。我们使用线程并发运行前两个管道,这使我们能够更高效地处理数据。

  1. 完成所有修改后,保存文件。保存文件可确保你所有的修改都被保留。然后,在终端中使用以下命令运行更新后的程序:
cd /home/labex/project
python3 coticker.py

cd 命令将当前目录更改为项目目录,python3 coticker.py 命令运行 Python 程序。

  1. 运行程序后,你应该会看到三种不同的输出:
    • 首先,你会看到价格下跌的股票。
    • 然后,你会看到所有 AAPL 股票的更新信息。
    • 最后,你会看到所有价格在 50 到 75 之间的股票。

理解增强后的管道

增强后的程序展示了几个重要的概念:

  1. 多个管道:我们可以从同一个数据源创建多个处理管道。这使我们能够同时对同一数据进行不同类型的分析。
  2. 专用过滤器:我们可以为特定的过滤任务创建不同的协程。这些过滤器帮助我们只选择符合特定标准的数据。
  3. 并发处理:使用线程,我们可以并发运行多个管道。这通过允许程序并行处理数据来提高程序的效率。
  4. 管道组合:协程可以以不同的方式组合,以实现不同的数据处理目标。这使我们能够根据自己的需求自定义数据处理管道。

这种方法为处理流式数据提供了一种灵活且模块化的方式。它允许你在不改变程序整体架构的情况下添加或修改处理步骤。

✨ 查看解决方案并练习

总结

在这个实验中,你学习了如何在 Python 中使用协程(coroutine)来构建数据处理管道。关键概念包括理解协程的基础知识,例如它们如何运行、预激活(priming)的必要性,以及使用装饰器(decorator)进行初始化。你还探索了数据流,通过 send() 方法将数据推送通过管道,这与生成器(generator)的“拉取”模型不同。

此外,你创建了用于特定任务的协程,如解析 CSV 数据、过滤记录和格式化输出。你学会了通过连接多个协程来组合管道,并实现了过滤和转换操作。协程为流式数据处理提供了一种强大的方法,能够清晰地分离关注点,并便于对各个阶段进行修改。