생산자, 소비자 및 파이프라인

Beginner

This tutorial is from open-source community. Access the source code

소개

제너레이터 (Generators) 는 다양한 종류의 생산자/소비자 문제와 데이터 흐름 파이프라인을 설정하는 데 유용한 도구입니다. 이 섹션에서는 이에 대해 논의합니다.

생산자 - 소비자 문제

제너레이터는 다양한 형태의 생산자 - 소비자(producer-consumer) 문제와 밀접하게 관련되어 있습니다.

## Producer
def follow(f):
    ...
    while True:
        ...
        yield line        ## Produces value in `line` below
        ...

## Consumer
for line in follow(f):    ## Consumes value from `yield` above
    ...

yieldfor가 소비하는 값을 생성합니다.

제너레이터 파이프라인

제너레이터의 이러한 측면을 사용하여 (Unix 파이프와 같은) 처리 파이프라인을 설정할 수 있습니다.

생산자(producer) → 처리(processing) → 처리(processing) → 소비자(consumer)

처리 파이프는 초기 데이터 생산자, 일부 중간 처리 단계 집합 및 최종 소비자를 갖습니다.

생산자(producer) → 처리(processing) → 처리(processing) → 소비자(consumer)

def producer():
    ...
    yield item
    ...

생산자는 일반적으로 제너레이터입니다. 다른 시퀀스의 목록일 수도 있습니다. yield는 파이프라인에 데이터를 공급합니다.

생산자(producer) → 처리(processing) → 처리(processing) → 소비자(consumer)

def consumer(s):
    for item in s:
        ...

소비자는 for-loop 입니다. 항목을 가져와서 무언가를 수행합니다.

생산자(producer) → 처리(processing) → 처리(processing) → 소비자(consumer)

def processing(s):
    for item in s:
        ...
        yield newitem
        ...

중간 처리 단계는 동시에 항목을 소비하고 생성합니다. 데이터 스트림을 수정할 수 있습니다. 또한 필터링 (항목 폐기) 할 수도 있습니다.

생산자(producer) → 처리(processing) → 처리(processing) → 소비자(consumer)

def producer():
    ...
    yield item          ## yields the item that is received by the `processing`
    ...

def processing(s):
    for item in s:      ## Comes from the `producer`
        ...
        yield newitem   ## yields a new item
        ...

def consumer(s):
    for item in s:      ## Comes from the `processing`
        ...

파이프라인을 설정하는 코드

a = producer()
b = processing(a)
c = consumer(b)

데이터가 다른 함수를 통해 점진적으로 흐르는 것을 알 수 있습니다.

이 연습을 위해 stocksim.py 프로그램이 백그라운드에서 계속 실행되어야 합니다. 이전 연습에서 작성한 follow() 함수를 사용합니다.

연습 6.8: 간단한 파이프라인 설정

파이프라인 아이디어를 실제로 살펴보겠습니다. 다음 함수를 작성하십시오.

>>> def filematch(lines, substr):
        for line in lines:
            if substr in line:
                yield line

>>>

이 함수는 이전 연습의 첫 번째 제너레이터 예제와 거의 동일하지만 더 이상 파일을 열지 않고 인수로 제공된 일련의 라인에서 작동합니다. 이제 다음을 시도해 보십시오.

>>> from follow import follow
>>> lines = follow('stocklog.csv')
>>> goog = filematch(lines, 'GOOG')
>>> for line in goog:
        print(line)

... wait for output ...

출력이 나타나기까지 시간이 걸릴 수 있지만, 결국 GOOG 에 대한 데이터를 포함하는 일부 라인을 볼 수 있습니다.

참고: 이 연습은 두 개의 별도 터미널에서 동시에 수행해야 합니다.

연습 6.9: 더 복잡한 파이프라인 설정

더 많은 작업을 수행하여 파이프라인 아이디어를 몇 단계 더 발전시켜 보겠습니다.

>>> from follow import follow
>>> import csv
>>> lines = follow('stocklog.csv')
>>> rows = csv.reader(lines)
>>> for row in rows:
        print(row)

['GOOG', '1502.08', '2023-10-01', '09:37.19', '1.83', '1500.25', '1502.08', '1500.25', '731']
['AAPL', '252.33', '2023-10-01', '09:37.19', '1.83', '250.50', '252.33', '250.50', '681']
['GOOG', '1502.09', '2023-10-01', '09:37.21', '1.84', '1500.25', '1502.09', '1500.25', '734']
['AAPL', '252.34', '2023-10-01', '09:37.21', '1.84', '250.50', '252.34', '250.50', '684']
['GOOG', '1502.10', '2023-10-01', '09:37.23', '1.85', '1500.25', '1502.10', '1500.25', '738']
['AAPL', '252.35', '2023-10-01', '09:37.23', '1.85', '250.50', '252.35', '250.50', '688']
...

흥미롭네요. 여기서 볼 수 있는 것은 follow() 함수의 출력이 csv.reader() 함수로 파이핑되었고 이제 분할된 행의 시퀀스를 얻고 있다는 것입니다.

연습 6.10: 더 많은 파이프라인 구성 요소 만들기

전체 아이디어를 더 큰 파이프라인으로 확장해 보겠습니다. 별도의 파일 ticker.py에서 위에서 했던 것처럼 CSV 파일을 읽는 함수를 생성하는 것으로 시작합니다.

## ticker.py

from follow import follow
import csv

def parse_stock_data(lines):
    rows = csv.reader(lines)
    return rows

if __name__ == '__main__':
    lines = follow('stocklog.csv')
    rows = parse_stock_data(lines)
    for row in rows:
        print(row)

특정 열을 선택하는 새 함수를 작성합니다.

## ticker.py
...
def select_columns(rows, indices):
    for row in rows:
        yield [row[index] for index in indices]
...
def parse_stock_data(lines):
    rows = csv.reader(lines)
    rows = select_columns(rows, [0, 1, 4])
    return rows

프로그램을 다시 실행합니다. 다음과 같이 좁혀진 출력을 볼 수 있습니다.

['GOOG', '1503.06', '2.81']
['AAPL', '253.31', '2.81']
['GOOG', '1503.07', '2.82']
['AAPL', '253.32', '2.82']
['GOOG', '1503.08', '2.83']
...

데이터 유형을 변환하고 딕셔너리를 구축하는 제너레이터 함수를 작성합니다. 예를 들어:

## ticker.py
...

def convert_types(rows, types):
    for row in rows:
        yield [func(val) for func, val in zip(types, row)]

def make_dicts(rows, headers):
    for row in rows:
        yield dict(zip(headers, row))
...
def parse_stock_data(lines):
    rows = csv.reader(lines)
    rows = select_columns(rows, [0, 1, 4])
    rows = convert_types(rows, [str, float, float])
    rows = make_dicts(rows, ['name', 'price', 'change'])
    return rows
...

프로그램을 다시 실행합니다. 이제 다음과 같은 딕셔너리 스트림을 볼 수 있습니다.

{'name': 'GOOG', 'price': 1503.4, 'change': 3.15}
{'name': 'AAPL', 'price': 253.65, 'change': 3.15}
{'name': 'GOOG', 'price': 1503.41, 'change': 3.16}
{'name': 'AAPL', 'price': 253.66, 'change': 3.16}
{'name': 'GOOG', 'price': 1503.42, 'change': 3.17}
{'name': 'AAPL', 'price': 253.67, 'change': 3.17}
...

연습 6.11: 데이터 필터링

데이터를 필터링하는 함수를 작성합니다. 예를 들어:

## ticker.py
...

def filter_symbols(rows, names):
    for row in rows:
        if row['GOOG'] in names:
            yield row

이것을 사용하여 포트폴리오에 있는 주식만 필터링합니다.

import report
import ticker
import follow
portfolio = report.read_portfolio('portfolio.csv')
rows = ticker.parse_stock_data(follow.follow('stocklog.csv'))
rows = ticker.filter_symbols(rows, portfolio)
for row in rows:
    print(row)

토론

배운 점: 다양한 제너레이터 함수를 생성하고 함께 연결하여 데이터 흐름 파이프라인과 관련된 처리를 수행할 수 있습니다. 또한 일련의 파이프라인 단계를 단일 함수 호출로 패키징하는 함수를 생성할 수 있습니다 (예: parse_stock_data() 함수).

요약

축하합니다! 생산자, 소비자 및 파이프라인 랩을 완료했습니다. LabEx 에서 더 많은 랩을 연습하여 기술을 향상시킬 수 있습니다.