소개
제너레이터 (Generators) 는 다양한 종류의 생산자/소비자 문제와 데이터 흐름 파이프라인을 설정하는 데 유용한 도구입니다. 이 섹션에서는 이에 대해 논의합니다.
제너레이터 (Generators) 는 다양한 종류의 생산자/소비자 문제와 데이터 흐름 파이프라인을 설정하는 데 유용한 도구입니다. 이 섹션에서는 이에 대해 논의합니다.
제너레이터는 다양한 형태의 생산자 - 소비자(producer-consumer) 문제와 밀접하게 관련되어 있습니다.
## Producer
def follow(f):
...
while True:
...
yield line ## Produces value in `line` below
...
## Consumer
for line in follow(f): ## Consumes value from `yield` above
...
yield는 for가 소비하는 값을 생성합니다.
제너레이터의 이러한 측면을 사용하여 (Unix 파이프와 같은) 처리 파이프라인을 설정할 수 있습니다.
생산자(producer) → 처리(processing) → 처리(processing) → 소비자(consumer)
처리 파이프는 초기 데이터 생산자, 일부 중간 처리 단계 집합 및 최종 소비자를 갖습니다.
생산자(producer) → 처리(processing) → 처리(processing) → 소비자(consumer)
def producer():
...
yield item
...
생산자는 일반적으로 제너레이터입니다. 다른 시퀀스의 목록일 수도 있습니다. yield는 파이프라인에 데이터를 공급합니다.
생산자(producer) → 처리(processing) → 처리(processing) → 소비자(consumer)
def consumer(s):
for item in s:
...
소비자는 for-loop 입니다. 항목을 가져와서 무언가를 수행합니다.
생산자(producer) → 처리(processing) → 처리(processing) → 소비자(consumer)
def processing(s):
for item in s:
...
yield newitem
...
중간 처리 단계는 동시에 항목을 소비하고 생성합니다. 데이터 스트림을 수정할 수 있습니다. 또한 필터링 (항목 폐기) 할 수도 있습니다.
생산자(producer) → 처리(processing) → 처리(processing) → 소비자(consumer)
def producer():
...
yield item ## yields the item that is received by the `processing`
...
def processing(s):
for item in s: ## Comes from the `producer`
...
yield newitem ## yields a new item
...
def consumer(s):
for item in s: ## Comes from the `processing`
...
파이프라인을 설정하는 코드
a = producer()
b = processing(a)
c = consumer(b)
데이터가 다른 함수를 통해 점진적으로 흐르는 것을 알 수 있습니다.
이 연습을 위해 stocksim.py 프로그램이 백그라운드에서 계속 실행되어야 합니다. 이전 연습에서 작성한 follow() 함수를 사용합니다.
파이프라인 아이디어를 실제로 살펴보겠습니다. 다음 함수를 작성하십시오.
>>> def filematch(lines, substr):
for line in lines:
if substr in line:
yield line
>>>
이 함수는 이전 연습의 첫 번째 제너레이터 예제와 거의 동일하지만 더 이상 파일을 열지 않고 인수로 제공된 일련의 라인에서 작동합니다. 이제 다음을 시도해 보십시오.
>>> from follow import follow
>>> lines = follow('stocklog.csv')
>>> goog = filematch(lines, 'GOOG')
>>> for line in goog:
print(line)
... wait for output ...
출력이 나타나기까지 시간이 걸릴 수 있지만, 결국 GOOG 에 대한 데이터를 포함하는 일부 라인을 볼 수 있습니다.
참고: 이 연습은 두 개의 별도 터미널에서 동시에 수행해야 합니다.
더 많은 작업을 수행하여 파이프라인 아이디어를 몇 단계 더 발전시켜 보겠습니다.
>>> from follow import follow
>>> import csv
>>> lines = follow('stocklog.csv')
>>> rows = csv.reader(lines)
>>> for row in rows:
print(row)
['GOOG', '1502.08', '2023-10-01', '09:37.19', '1.83', '1500.25', '1502.08', '1500.25', '731']
['AAPL', '252.33', '2023-10-01', '09:37.19', '1.83', '250.50', '252.33', '250.50', '681']
['GOOG', '1502.09', '2023-10-01', '09:37.21', '1.84', '1500.25', '1502.09', '1500.25', '734']
['AAPL', '252.34', '2023-10-01', '09:37.21', '1.84', '250.50', '252.34', '250.50', '684']
['GOOG', '1502.10', '2023-10-01', '09:37.23', '1.85', '1500.25', '1502.10', '1500.25', '738']
['AAPL', '252.35', '2023-10-01', '09:37.23', '1.85', '250.50', '252.35', '250.50', '688']
...
흥미롭네요. 여기서 볼 수 있는 것은 follow() 함수의 출력이 csv.reader() 함수로 파이핑되었고 이제 분할된 행의 시퀀스를 얻고 있다는 것입니다.
전체 아이디어를 더 큰 파이프라인으로 확장해 보겠습니다. 별도의 파일 ticker.py에서 위에서 했던 것처럼 CSV 파일을 읽는 함수를 생성하는 것으로 시작합니다.
## ticker.py
from follow import follow
import csv
def parse_stock_data(lines):
rows = csv.reader(lines)
return rows
if __name__ == '__main__':
lines = follow('stocklog.csv')
rows = parse_stock_data(lines)
for row in rows:
print(row)
특정 열을 선택하는 새 함수를 작성합니다.
## ticker.py
...
def select_columns(rows, indices):
for row in rows:
yield [row[index] for index in indices]
...
def parse_stock_data(lines):
rows = csv.reader(lines)
rows = select_columns(rows, [0, 1, 4])
return rows
프로그램을 다시 실행합니다. 다음과 같이 좁혀진 출력을 볼 수 있습니다.
['GOOG', '1503.06', '2.81']
['AAPL', '253.31', '2.81']
['GOOG', '1503.07', '2.82']
['AAPL', '253.32', '2.82']
['GOOG', '1503.08', '2.83']
...
데이터 유형을 변환하고 딕셔너리를 구축하는 제너레이터 함수를 작성합니다. 예를 들어:
## ticker.py
...
def convert_types(rows, types):
for row in rows:
yield [func(val) for func, val in zip(types, row)]
def make_dicts(rows, headers):
for row in rows:
yield dict(zip(headers, row))
...
def parse_stock_data(lines):
rows = csv.reader(lines)
rows = select_columns(rows, [0, 1, 4])
rows = convert_types(rows, [str, float, float])
rows = make_dicts(rows, ['name', 'price', 'change'])
return rows
...
프로그램을 다시 실행합니다. 이제 다음과 같은 딕셔너리 스트림을 볼 수 있습니다.
{'name': 'GOOG', 'price': 1503.4, 'change': 3.15}
{'name': 'AAPL', 'price': 253.65, 'change': 3.15}
{'name': 'GOOG', 'price': 1503.41, 'change': 3.16}
{'name': 'AAPL', 'price': 253.66, 'change': 3.16}
{'name': 'GOOG', 'price': 1503.42, 'change': 3.17}
{'name': 'AAPL', 'price': 253.67, 'change': 3.17}
...
데이터를 필터링하는 함수를 작성합니다. 예를 들어:
## ticker.py
...
def filter_symbols(rows, names):
for row in rows:
if row['GOOG'] in names:
yield row
이것을 사용하여 포트폴리오에 있는 주식만 필터링합니다.
import report
import ticker
import follow
portfolio = report.read_portfolio('portfolio.csv')
rows = ticker.parse_stock_data(follow.follow('stocklog.csv'))
rows = ticker.filter_symbols(rows, portfolio)
for row in rows:
print(row)
배운 점: 다양한 제너레이터 함수를 생성하고 함께 연결하여 데이터 흐름 파이프라인과 관련된 처리를 수행할 수 있습니다. 또한 일련의 파이프라인 단계를 단일 함수 호출로 패키징하는 함수를 생성할 수 있습니다 (예: parse_stock_data() 함수).
축하합니다! 생산자, 소비자 및 파이프라인 랩을 완료했습니다. LabEx 에서 더 많은 랩을 연습하여 기술을 향상시킬 수 있습니다.