코루틴 기반 데이터 처리

Beginner

This tutorial is from open-source community. Access the source code

소개

이 랩에서는 코루틴을 사용하여 데이터 처리 파이프라인을 구축하는 방법을 배우게 됩니다. 파이썬의 강력한 기능인 코루틴은 협력적 멀티태스킹을 지원하여 함수가 일시 중지되었다가 나중에 실행을 재개할 수 있도록 합니다.

이 랩의 목표는 파이썬에서 코루틴이 어떻게 작동하는지 이해하고, 코루틴 기반의 데이터 처리 파이프라인을 구현하며, 여러 코루틴 단계를 통해 데이터를 변환하는 것입니다. cofollow.py (코루틴 기반 파일 팔로워) 와 coticker.py (코루틴을 사용하는 주식 시세 표시기 애플리케이션) 의 두 파일을 생성합니다. 이전 연습에서 stocksim.py 프로그램이 백그라운드에서 실행 중이며 로그 파일에 주식 데이터를 생성하고 있다고 가정합니다.

파일 팔로워를 사용한 코루틴 이해

코루틴이 무엇이며 파이썬에서 어떻게 작동하는지 이해하는 것부터 시작해 보겠습니다. 코루틴은 제너레이터 함수의 특수한 버전입니다. 파이썬에서 함수는 일반적으로 호출될 때마다 처음부터 시작합니다. 그러나 코루틴은 다릅니다. 코루틴은 데이터를 소비하고 생성할 수 있으며, 실행을 일시 중지하고 재개할 수 있는 기능을 가지고 있습니다. 즉, 코루틴은 특정 지점에서 작업을 일시 중지한 다음 나중에 중단된 지점부터 다시 시작할 수 있습니다.

기본 코루틴 파일 팔로워 생성

이 단계에서는 새로운 콘텐츠에 대해 파일을 모니터링하고 이를 처리하기 위해 코루틴을 사용하는 파일 팔로워를 생성합니다. 이는 파일의 끝을 지속적으로 표시하고 새 줄이 추가될 때 업데이트되는 Unix tail -f 명령과 유사합니다.

  1. 코드 편집기를 열고 /home/labex/project 디렉토리에 cofollow.py라는 새 파일을 만듭니다. 여기에서 코루틴을 사용하여 파일 팔로워를 구현하기 위한 파이썬 코드를 작성합니다.

  2. 다음 코드를 파일에 복사합니다.

## cofollow.py
import os
import time

## Data source
def follow(filename, target):
    with open(filename, 'r') as f:
        f.seek(0, os.SEEK_END)  ## Move to the end of the file
        while True:
            line = f.readline()
            if line != '':
                target.send(line)  ## Send the line to the target coroutine
            else:
                time.sleep(0.1)  ## Sleep briefly if no new content

## Decorator for coroutine functions
from functools import wraps

def consumer(func):
    @wraps(func)
    def start(*args, **kwargs):
        f = func(*args, **kwargs)
        f.send(None)  ## Prime the coroutine (necessary first step)
        return f
    return start

## Sample coroutine
@consumer
def printer():
    while True:
        item = yield     ## Receive an item sent to me
        print(item)

## Example use
if __name__ == '__main__':
    follow('stocklog.csv', printer())
  1. 이 코드의 주요 구성 요소를 이해해 보겠습니다.

    • follow(filename, target): 이 함수는 파일을 여는 역할을 합니다. 먼저 f.seek(0, os.SEEK_END)를 사용하여 파일 포인터를 파일의 끝으로 이동합니다. 그런 다음, 파일에서 새 줄을 지속적으로 읽으려고 시도하는 무한 루프에 들어갑니다. 새 줄이 발견되면 send 메서드를 사용하여 해당 줄을 대상 코루틴으로 보냅니다. 새 콘텐츠가 없으면 다시 확인하기 전에 time.sleep(0.1)을 사용하여 잠시 (0.1 초) 일시 중지합니다.
    • @consumer 데코레이터: 파이썬에서 코루틴은 데이터를 받기 시작하기 전에 "프라이밍 (primed)"되어야 합니다. 이 데코레이터가 이를 처리합니다. 코루틴에 초기 None 값을 자동으로 보내는데, 이는 코루틴이 실제 데이터를 받을 준비를 하기 위한 필수 첫 번째 단계입니다.
    • printer() 코루틴: 이것은 간단한 코루틴입니다. yield 키워드를 사용하여 자신에게 전송된 항목을 받는 무한 루프가 있습니다. 항목을 받으면 단순히 인쇄합니다.
  2. 파일을 저장하고 터미널에서 실행합니다.

cd /home/labex/project
python3 cofollow.py
  1. 주식 로그 파일의 내용이 인쇄되는 것을 볼 수 있으며, 파일에 새 줄이 추가될 때 계속해서 새 줄을 인쇄합니다. Ctrl+C를 눌러 프로그램을 중지합니다.

여기서 핵심 개념은 데이터가 send 메서드를 통해 follow 함수에서 printer 코루틴으로 흐른다는 것입니다. 이 데이터 "푸시 (pushing)"는 반복을 통해 데이터를 "풀 (pull)"하는 제너레이터와 반대입니다. 제너레이터에서는 일반적으로 for 루프를 사용하여 생성하는 값을 반복합니다. 그러나 이 코루틴 예제에서는 데이터가 코드의 한 부분에서 다른 부분으로 적극적으로 전송됩니다.

코루틴 파이프라인 구성 요소 생성

이 단계에서는 주식 데이터를 처리하기 위해 더 특화된 코루틴을 생성할 것입니다. 코루틴은 실행을 일시 중지하고 재개할 수 있는 특수한 유형의 함수로, 데이터 처리 파이프라인을 구축하는 데 매우 유용합니다. 우리가 생성하는 각 코루틴은 전체 처리 파이프라인에서 특정 작업을 수행합니다.

  1. 먼저, 새 파일을 생성해야 합니다. /home/labex/project 디렉토리로 이동하여 coticker.py라는 파일을 만듭니다. 이 파일은 코루틴 기반 데이터 처리를 위한 모든 코드를 담을 것입니다.

  2. 이제 coticker.py 파일에 코드를 작성하기 시작해 보겠습니다. 먼저 필요한 모듈을 가져오고 기본 구조를 정의합니다. 모듈은 유용한 함수와 클래스를 제공하는 미리 작성된 코드 라이브러리입니다. 다음 코드가 바로 그 역할을 합니다.

## coticker.py
from structure import Structure

class Ticker(Structure):
    name = String()
    price = Float()
    date = String()
    time = String()
    change = Float()
    open = Float()
    high = Float()
    low = Float()
    volume = Integer()

from cofollow import consumer, follow
from tableformat import create_formatter
import csv
  1. 위 코드를 보면 String(), Float(), Integer()와 관련된 오류가 있음을 알 수 있습니다. 이것들은 우리가 가져와야 하는 클래스입니다. 따라서 파일 상단에 필요한 import 문을 추가합니다. 이렇게 하면 파이썬이 이러한 클래스를 어디에서 찾아야 하는지 알 수 있습니다. 업데이트된 코드는 다음과 같습니다.
## coticker.py
from structure import Structure, String, Float, Integer

class Ticker(Structure):
    name = String()
    price = Float()
    date = String()
    time = String()
    change = Float()
    open = Float()
    high = Float()
    low = Float()
    volume = Integer()

from cofollow import consumer, follow
from tableformat import create_formatter
import csv
  1. 다음으로, 데이터 처리 파이프라인을 형성할 코루틴 구성 요소를 추가합니다. 각 코루틴은 파이프라인에서 특정 작업을 수행합니다. 다음은 이러한 코루틴을 추가하는 코드입니다.
@consumer
def to_csv(target):
    def producer():
        while True:
            line = yield

    reader = csv.reader(producer())
    while True:
        line = yield
        target.send(next(reader))

@consumer
def create_ticker(target):
    while True:
        row = yield
        target.send(Ticker.from_row(row))

@consumer
def negchange(target):
    while True:
        record = yield
        if record.change < 0:
            target.send(record)

@consumer
def ticker(fmt, fields):
    formatter = create_formatter(fmt)
    formatter.headings(fields)
    while True:
        rec = yield
        row = [getattr(rec, name) for name in fields]
        formatter.row(row)
  1. 각 코루틴이 무엇을 하는지 이해해 보겠습니다.

    • to_csv: 이 코루틴의 역할은 원시 텍스트 줄을 구문 분석된 CSV 행으로 변환하는 것입니다. 데이터가 처음에 텍스트 형식으로 되어 있고 이를 구조화된 CSV 데이터로 분해해야 하므로 중요합니다.
    • create_ticker: 이 코루틴은 CSV 행을 가져와서 Ticker 객체를 생성합니다. Ticker 객체는 주식 데이터를 더 체계적인 방식으로 나타냅니다.
    • negchange: Ticker 객체를 필터링합니다. 음수 가격 변동이 있는 주식만 전달합니다. 이를 통해 가치가 하락하는 주식에 집중할 수 있습니다.
    • ticker: 이 코루틴은 티커 데이터를 형식화하고 표시합니다. 포매터를 사용하여 데이터를 보기 좋고 읽기 쉬운 표로 표시합니다.
  2. 마지막으로, 이러한 모든 구성 요소를 함께 연결하는 메인 프로그램 코드를 추가해야 합니다. 이 코드는 파이프라인을 통해 데이터 흐름을 설정합니다. 다음은 코드입니다.

if __name__ == '__main__':
    import sys

    ## Define the field names to display
    fields = ['name', 'price', 'change']

    ## Create the processing pipeline
    t = ticker('text', fields)
    neg_filter = negchange(t)
    tick_creator = create_ticker(neg_filter)
    csv_parser = to_csv(tick_creator)

    ## Connect the pipeline to the data source
    follow('stocklog.csv', csv_parser)
  1. 모든 코드를 작성한 후 coticker.py 파일을 저장합니다. 그런 다음 터미널을 열고 다음 명령을 실행합니다. cd 명령은 파일이 있는 디렉토리로 디렉토리를 변경하고, python3 명령은 파이썬 스크립트를 실행합니다.
cd /home/labex/project
python3 coticker.py
  1. 모든 것이 잘 진행되면 터미널에 형식화된 표가 표시됩니다. 이 표는 음수 가격 변동이 있는 주식을 보여줍니다. 출력은 다음과 유사합니다.
      name      price     change
---------- ---------- ----------
      MSFT      72.50      -0.25
        AA      35.25      -0.15
       IBM      50.10      -0.15
      GOOG     100.02      -0.01
      AAPL     102.50      -0.06

표의 실제 값은 생성된 주식 데이터에 따라 달라질 수 있습니다.

파이프라인 흐름 이해

이 프로그램에서 가장 중요한 부분은 데이터가 코루틴을 통해 어떻게 흐르는지입니다. 단계별로 살펴보겠습니다.

  1. follow 함수는 stocklog.csv 파일에서 줄을 읽는 것으로 시작합니다. 이것이 우리의 데이터 소스입니다.
  2. 읽은 각 줄은 csv_parser 코루틴으로 전송됩니다. csv_parser는 원시 텍스트 줄을 가져와 CSV 필드로 구문 분석합니다.
  3. 구문 분석된 CSV 데이터는 tick_creator 코루틴으로 전송됩니다. 이 코루틴은 CSV 행에서 Ticker 객체를 생성합니다.
  4. Ticker 객체는 neg_filter 코루틴으로 전송됩니다. 이 코루틴은 각 Ticker 객체를 확인합니다. 주식의 가격 변동이 음수이면 객체를 전달하고, 그렇지 않으면 폐기합니다.
  5. 마지막으로, 필터링된 Ticker 객체는 ticker 코루틴으로 전송됩니다. ticker 코루틴은 데이터를 형식화하고 표로 표시합니다.

이 파이프라인 아키텍처는 각 구성 요소가 단일 작업에 집중할 수 있기 때문에 매우 유용합니다. 이렇게 하면 코드가 더 모듈화되어 이해, 수정 및 유지 관리가 더 쉬워집니다.

코루틴 파이프라인 향상

이제 기본 파이프라인이 실행되고 있으므로, 이를 더욱 유연하게 만들 차례입니다. 프로그래밍에서 유연성은 다양한 요구 사항에 맞게 코드를 조정할 수 있도록 하므로 매우 중요합니다. coticker.py 프로그램을 수정하여 다양한 필터링 및 형식 지정 옵션을 지원함으로써 이를 달성할 것입니다.

  1. 먼저, 코드 편집기에서 coticker.py 파일을 엽니다. 코드 편집기는 프로그램에 필요한 모든 변경을 수행하는 곳입니다. 코드를 보고, 편집하고, 저장할 수 있는 편리한 환경을 제공합니다.

  2. 다음으로, 주식 이름으로 데이터를 필터링하는 새 코루틴을 추가합니다. 코루틴은 실행을 일시 중지하고 재개할 수 있는 특수한 유형의 함수입니다. 이를 통해 서로 다른 처리 단계를 거쳐 데이터가 흐를 수 있는 파이프라인을 만들 수 있습니다. 다음은 새 코루틴에 대한 코드입니다.

@consumer
def filter_by_name(name, target):
    while True:
        record = yield
        if record.name == name:
            target.send(record)

이 코드에서 filter_by_name 코루틴은 주식 이름과 대상 코루틴을 매개변수로 사용합니다. yield 키워드를 사용하여 레코드를 지속적으로 기다립니다. 레코드가 도착하면 레코드의 이름이 지정된 이름과 일치하는지 확인합니다. 일치하는 경우 레코드를 대상 코루틴으로 보냅니다.

  1. 이제 가격 임계값을 기반으로 필터링하는 또 다른 코루틴을 추가해 보겠습니다. 이 코루틴은 특정 가격 범위 내의 주식을 선택하는 데 도움이 됩니다. 다음은 코드입니다.
@consumer
def price_threshold(min_price, max_price, target):
    while True:
        record = yield
        if min_price <= record.price <= max_price:
            target.send(record)

이전 코루틴과 유사하게, price_threshold 코루틴은 레코드를 기다립니다. 그런 다음 레코드의 가격이 지정된 최소 및 최대 가격 범위 내에 있는지 확인합니다. 그렇다면 레코드를 대상 코루틴으로 보냅니다.

  1. 새 코루틴을 추가한 후, 이러한 추가 필터를 시연하기 위해 메인 프로그램을 업데이트해야 합니다. 메인 프로그램은 애플리케이션의 진입점이며, 여기서 처리 파이프라인을 설정하고 데이터 흐름을 시작합니다. 다음은 업데이트된 코드입니다.
if __name__ == '__main__':
    import sys

    ## Define the field names to display
    fields = ['name', 'price', 'change', 'high', 'low']

    ## Create the processing pipeline with multiple outputs

    ## Pipeline 1: Show all negative changes (same as before)
    print("Stocks with negative changes:")
    t1 = ticker('text', fields)
    neg_filter = negchange(t1)
    tick_creator1 = create_ticker(neg_filter)
    csv_parser1 = to_csv(tick_creator1)

    ## Start following the file with the first pipeline
    import threading
    threading.Thread(target=follow, args=('stocklog.csv', csv_parser1), daemon=True).start()

    ## Wait a moment to see some results
    import time
    time.sleep(5)

    ## Pipeline 2: Filter by name (AAPL)
    print("\nApple stock updates:")
    t2 = ticker('text', fields)
    name_filter = filter_by_name('AAPL', t2)
    tick_creator2 = create_ticker(name_filter)
    csv_parser2 = to_csv(tick_creator2)

    ## Follow the file with the second pipeline
    threading.Thread(target=follow, args=('stocklog.csv', csv_parser2), daemon=True).start()

    ## Wait a moment to see some results
    time.sleep(5)

    ## Pipeline 3: Filter by price range
    print("\nStocks priced between 50 and 75:")
    t3 = ticker('text', fields)
    price_filter = price_threshold(50, 75, t3)
    tick_creator3 = create_ticker(price_filter)
    csv_parser3 = to_csv(tick_creator3)

    ## Follow with the third pipeline
    follow('stocklog.csv', csv_parser3)

이 업데이트된 코드에서는 세 가지 다른 처리 파이프라인을 생성합니다. 첫 번째 파이프라인은 음수 변동이 있는 주식을 표시하고, 두 번째 파이프라인은 'AAPL' 이름으로 주식을 필터링하며, 세 번째 파이프라인은 50 에서 75 사이의 가격 범위를 기반으로 주식을 필터링합니다. 스레드를 사용하여 처음 두 파이프라인을 동시에 실행하여 데이터를 보다 효율적으로 처리할 수 있습니다.

  1. 모든 변경을 완료했으면 파일을 저장합니다. 파일을 저장하면 모든 수정 사항이 보존됩니다. 그런 다음 터미널에서 다음 명령을 사용하여 업데이트된 프로그램을 실행합니다.
cd /home/labex/project
python3 coticker.py

cd 명령은 현재 디렉토리를 프로젝트 디렉토리로 변경하고, python3 coticker.py 명령은 파이썬 프로그램을 실행합니다.

  1. 프로그램을 실행한 후 세 가지 다른 출력을 볼 수 있습니다.
    • 먼저, 음수 변동이 있는 주식을 볼 수 있습니다.
    • 그런 다음, 모든 AAPL 주식 업데이트를 볼 수 있습니다.
    • 마지막으로, 50 에서 75 사이의 가격으로 책정된 모든 주식을 볼 수 있습니다.

향상된 파이프라인 이해

향상된 프로그램은 몇 가지 중요한 개념을 보여줍니다.

  1. 다중 파이프라인 (Multiple Pipelines): 동일한 데이터 소스에서 여러 처리 파이프라인을 생성할 수 있습니다. 이를 통해 동일한 데이터에 대해 서로 다른 유형의 분석을 동시에 수행할 수 있습니다.
  2. 특수 필터 (Specialized Filters): 특정 필터링 작업을 위해 서로 다른 코루틴을 생성할 수 있습니다. 이러한 필터는 특정 기준을 충족하는 데이터만 선택하는 데 도움이 됩니다.
  3. 동시 처리 (Concurrent Processing): 스레드를 사용하여 여러 파이프라인을 동시에 실행할 수 있습니다. 이렇게 하면 데이터를 병렬로 처리하여 프로그램의 효율성을 향상시킬 수 있습니다.
  4. 파이프라인 구성 (Pipeline Composition): 코루틴은 서로 다른 데이터 처리 목표를 달성하기 위해 다양한 방식으로 결합될 수 있습니다. 이를 통해 필요에 따라 데이터 처리 파이프라인을 사용자 정의할 수 있는 유연성을 얻을 수 있습니다.

이 접근 방식은 스트리밍 데이터를 처리하는 유연하고 모듈식인 방법을 제공합니다. 프로그램의 전체 아키텍처를 변경하지 않고도 처리 단계를 추가하거나 수정할 수 있습니다.

요약

이 랩에서는 파이썬에서 코루틴을 사용하여 데이터 처리 파이프라인을 구축하는 방법을 배웠습니다. 주요 개념에는 코루틴의 작동 방식, 초기화 (priming) 의 필요성, 초기화를 위한 데코레이터 (decorators) 사용과 같은 코루틴 기본 사항 이해가 포함됩니다. 또한 send() 메서드를 통해 파이프라인을 통해 데이터를 푸시하는 데이터 흐름을 탐색했으며, 이는 제너레이터 (generator) 의 "풀 (pull)" 모델과는 다릅니다.

또한 CSV 데이터 구문 분석, 레코드 필터링 및 출력 형식 지정과 같은 작업에 특화된 코루틴을 만들었습니다. 여러 코루틴을 연결하여 파이프라인을 구성하고 필터링 및 변환 작업을 구현하는 방법을 배웠습니다. 코루틴은 스트리밍 데이터 처리를 위한 강력한 접근 방식을 제공하여 관심사의 깔끔한 분리 (separation of concerns) 와 개별 단계의 쉬운 수정을 가능하게 합니다.