生产者、消费者和管道

PythonPythonBeginner
立即练习

This tutorial is from open-source community. Access the source code

💡 本教程由 AI 辅助翻译自英文原版。如需查看原文,您可以 切换至英文原版

简介

生成器是用于设置各种生产者/消费者问题和数据流管道的有用工具。本节将对此进行讨论。

生产者 - 消费者问题

生成器与各种形式的生产者 - 消费者问题密切相关。

## 生产者
def follow(f):
 ...
    while True:
     ...
        yield line        ## 生成下方 `line` 中的值
     ...

## 消费者
for line in follow(f):    ## 消费上方 `yield` 产生的值
 ...

yield 产生值,for 循环消费这些值。

生成器管道

你可以利用生成器的这一特性来设置处理管道(类似于Unix管道)。

生产者处理处理消费者

处理管道有一个初始数据生产者、一些中间处理阶段和一个最终消费者。

生产者处理处理消费者

def producer():
  ...
    yield item
  ...

生产者通常是一个生成器。不过它也可以是某个其他序列的列表。yield 将数据输入到管道中。

生产者处理处理消费者

def consumer(s):
    for item in s:
      ...

消费者是一个 for 循环。它获取项目并对其进行处理。

生产者处理处理消费者

def processing(s):
    for item in s:
      ...
        yield newitem
      ...

中间处理阶段同时消费和生产项目。它们可能会修改数据流。它们也可以进行过滤(丢弃项目)。

生产者处理处理消费者

def producer():
  ...
    yield item          ## 生成由 `processing` 接收的项目
  ...

def processing(s):
    for item in s:      ## 来自 `producer`
      ...
        yield newitem   ## 生成一个新项目
      ...

def consumer(s):
    for item in s:      ## 来自 `processing`
      ...

设置管道的代码

a = producer()
b = processing(a)
c = consumer(b)

你会注意到数据会逐步流经不同的函数。

对于这个练习,stocksim.py 程序应该仍在后台运行。你将使用在前一个练习中编写的 follow() 函数。

练习6.8:设置一个简单的管道

让我们看看管道这个概念是如何实际应用的。编写以下函数:

>>> def filematch(lines, substr):
        for line in lines:
            if substr in line:
                yield line

>>>

这个函数与上一个练习中的第一个生成器示例几乎完全相同,只是它不再打开文件 —— 它只是对作为参数提供给它的一系列行进行操作。现在,试试这个:

>>> from follow import follow
>>> lines = follow('stocklog.csv')
>>> goog = filematch(lines, 'GOOG')
>>> for line in goog:
        print(line)

... 等待输出...

可能需要一些时间才会出现输出,但最终你应该会看到一些包含GOOG数据的行。

注意:这些练习必须在两个单独的终端上同时进行。

练习6.9:设置一个更复杂的管道

通过执行更多操作,进一步拓展管道的概念。

>>> from follow import follow
>>> import csv
>>> lines = follow('stocklog.csv')
>>> rows = csv.reader(lines)
>>> for row in rows:
        print(row)

['GOOG', '1502.08', '2023-10-01', '09:37.19', '1.83', '1500.25', '1502.08', '1500.25', '731']
['AAPL', '252.33', '2023-10-01', '09:37.19', '1.83', '250.50', '252.33', '250.50', '681']
['GOOG', '1502.09', '2023-10-01', '09:37.21', '1.84', '1500.25', '1502.09', '1500.25', '734']
['AAPL', '252.34', '2023-10-01', '09:37.21', '1.84', '250.50', '252.34', '250.50', '684']
['GOOG', '1502.10', '2023-10-01', '09:37.23', '1.85', '1500.25', '1502.10', '1500.25', '738']
['AAPL', '252.35', '2023-10-01', '09:37.23', '1.85', '250.50', '252.35', '250.50', '688']
...

嗯,这很有意思。你在这里看到的是,follow() 函数的输出已经被输送到 csv.reader() 函数中,现在我们得到了一个拆分后的行序列。

练习6.10:创建更多管道组件

让我们将整个概念扩展到一个更大的管道中。在一个单独的文件 ticker.py 中,首先创建一个像你上面那样读取CSV文件的函数:

## ticker.py

from follow import follow
import csv

def parse_stock_data(lines):
    rows = csv.reader(lines)
    return rows

if __name__ == '__main__':
    lines = follow('stocklog.csv')
    rows = parse_stock_data(lines)
    for row in rows:
        print(row)

编写一个新函数来选择特定的列:

## ticker.py

...
def select_columns(rows, indices):
for row in rows:
yield [row[index] for index in indices]
...
def parse_stock_data(lines):
rows = csv.reader(lines)
rows = select_columns(rows, [0, 1, 4])
return rows

再次运行你的程序。你应该会看到输出被精简成这样:

['GOOG', '1503.06', '2.81']
['AAPL', '253.31', '2.81']
['GOOG', '1503.07', '2.82']
['AAPL', '253.32', '2.82']
['GOOG', '1503.08', '2.83']

...

编写生成器函数来转换数据类型并构建字典。例如:

## ticker.py
...

def convert_types(rows, types):
    for row in rows:
        yield [func(val) for func, val in zip(types, row)]

def make_dicts(rows, headers):
    for row in rows:
        yield dict(zip(headers, row))
...
def parse_stock_data(lines):
    rows = csv.reader(lines)
    rows = select_columns(rows, [0, 1, 4])
    rows = convert_types(rows, [str, float, float])
    rows = make_dicts(rows, ['name', 'price', 'change'])
    return rows
...

再次运行你的程序。现在你应该会得到这样一连串的字典:

{'name': 'GOOG', 'price': 1503.4, 'change': 3.15}
{'name': 'AAPL', 'price': 253.65, 'change': 3.15}
{'name': 'GOOG', 'price': 1503.41, 'change': 3.16}
{'name': 'AAPL', 'price': 253.66, 'change': 3.16}
{'name': 'GOOG', 'price': 1503.42, 'change': 3.17}
{'name': 'AAPL', 'price': 253.67, 'change': 3.17}

...

练习6.11:过滤数据

编写一个用于过滤数据的函数。例如:

## ticker.py
...

def filter_symbols(rows, names):
    for row in rows:
        if row['GOOG'] in names:
            yield row

使用这个函数将股票数据过滤为仅包含你投资组合中的股票:

import report
import ticker
import follow
portfolio = report.read_portfolio('portfolio.csv')
rows = ticker.parse_stock_data(follow.follow('stocklog.csv'))
rows = ticker.filter_symbols(rows, portfolio)
for row in rows:
    print(row)

讨论

一些经验教训:你可以创建各种生成器函数,并将它们链接在一起,以执行涉及数据流管道的处理。此外,你可以创建将一系列管道阶段打包成单个函数调用的函数(例如,parse_stock_data() 函数)。

总结

恭喜你!你已经完成了生产者、消费者和管道实验。你可以在LabEx中练习更多实验来提升你的技能。