Coroutine-Powered Data Processing

PythonPythonBeginner
Practice Now

This tutorial is from open-source community. Access the source code

Introduction

Objectives:

  • Using coroutines to set up processing pipelines

Files Created: cofollow.py, coticker.py

Note

For this exercise the stocksim.py program should still be running in the background.

In Exercise 8.2 you wrote some code that used generators to set up a processing pipeline. A key aspect of that program was the idea of data flowing between generator functions. A very similar kind of dataflow can be set up using coroutines. The only difference is that with a coroutine, you send data into different processing elements as opposed to pulling data out with a for-loop.


Skills Graph

%%%%{init: {'theme':'neutral'}}%%%% flowchart RL python(("`Python`")) -.-> python/BasicConceptsGroup(["`Basic Concepts`"]) python(("`Python`")) -.-> python/FunctionsGroup(["`Functions`"]) python(("`Python`")) -.-> python/FileHandlingGroup(["`File Handling`"]) python(("`Python`")) -.-> python/ControlFlowGroup(["`Control Flow`"]) python(("`Python`")) -.-> python/DataStructuresGroup(["`Data Structures`"]) python(("`Python`")) -.-> python/ModulesandPackagesGroup(["`Modules and Packages`"]) python(("`Python`")) -.-> python/ObjectOrientedProgrammingGroup(["`Object-Oriented Programming`"]) python(("`Python`")) -.-> python/AdvancedTopicsGroup(["`Advanced Topics`"]) python(("`Python`")) -.-> python/PythonStandardLibraryGroup(["`Python Standard Library`"]) python/BasicConceptsGroup -.-> python/comments("`Comments`") python/FunctionsGroup -.-> python/keyword_arguments("`Keyword Arguments`") python/FileHandlingGroup -.-> python/with_statement("`Using with Statement`") python/BasicConceptsGroup -.-> python/booleans("`Booleans`") python/ControlFlowGroup -.-> python/conditional_statements("`Conditional Statements`") python/ControlFlowGroup -.-> python/for_loops("`For Loops`") python/ControlFlowGroup -.-> python/while_loops("`While Loops`") python/ControlFlowGroup -.-> python/list_comprehensions("`List Comprehensions`") python/DataStructuresGroup -.-> python/lists("`Lists`") python/DataStructuresGroup -.-> python/tuples("`Tuples`") python/FunctionsGroup -.-> python/function_definition("`Function Definition`") python/ModulesandPackagesGroup -.-> python/importing_modules("`Importing Modules`") python/ModulesandPackagesGroup -.-> python/using_packages("`Using Packages`") python/ModulesandPackagesGroup -.-> python/standard_libraries("`Common Standard Libraries`") python/ObjectOrientedProgrammingGroup -.-> python/classes_objects("`Classes and Objects`") python/FileHandlingGroup -.-> python/file_opening_closing("`Opening and Closing Files`") python/AdvancedTopicsGroup -.-> python/iterators("`Iterators`") python/AdvancedTopicsGroup -.-> python/generators("`Generators`") python/PythonStandardLibraryGroup -.-> python/os_system("`Operating System and System`") python/FunctionsGroup -.-> python/build_in_functions("`Build-in Functions`") subgraph Lab Skills python/comments -.-> lab-132524{{"`Coroutine-Powered Data Processing`"}} python/keyword_arguments -.-> lab-132524{{"`Coroutine-Powered Data Processing`"}} python/with_statement -.-> lab-132524{{"`Coroutine-Powered Data Processing`"}} python/booleans -.-> lab-132524{{"`Coroutine-Powered Data Processing`"}} python/conditional_statements -.-> lab-132524{{"`Coroutine-Powered Data Processing`"}} python/for_loops -.-> lab-132524{{"`Coroutine-Powered Data Processing`"}} python/while_loops -.-> lab-132524{{"`Coroutine-Powered Data Processing`"}} python/list_comprehensions -.-> lab-132524{{"`Coroutine-Powered Data Processing`"}} python/lists -.-> lab-132524{{"`Coroutine-Powered Data Processing`"}} python/tuples -.-> lab-132524{{"`Coroutine-Powered Data Processing`"}} python/function_definition -.-> lab-132524{{"`Coroutine-Powered Data Processing`"}} python/importing_modules -.-> lab-132524{{"`Coroutine-Powered Data Processing`"}} python/using_packages -.-> lab-132524{{"`Coroutine-Powered Data Processing`"}} python/standard_libraries -.-> lab-132524{{"`Coroutine-Powered Data Processing`"}} python/classes_objects -.-> lab-132524{{"`Coroutine-Powered Data Processing`"}} python/file_opening_closing -.-> lab-132524{{"`Coroutine-Powered Data Processing`"}} python/iterators -.-> lab-132524{{"`Coroutine-Powered Data Processing`"}} python/generators -.-> lab-132524{{"`Coroutine-Powered Data Processing`"}} python/os_system -.-> lab-132524{{"`Coroutine-Powered Data Processing`"}} python/build_in_functions -.-> lab-132524{{"`Coroutine-Powered Data Processing`"}} end

A coroutine example

Getting started with coroutines can be a little tricky. Here is an example program that performs the same task as Exercise 8.2, but with coroutines. Take this program and copy it into a file called cofollow.py.

## cofollow.py
import os
import time

## Data source
def follow(filename,target):
    with open(filename,'r') as f:
        f.seek(0,os.SEEK_END)
        while True:
            line = f.readline()
            if line != '':
                target.send(line)
            else:
                time.sleep(0.1)

## Decorator for coroutine functions
from functools import wraps

def consumer(func):
    @wraps(func)
    def start(*args,**kwargs):
        f = func(*args,**kwargs)
        f.send(None)
        return f
    return start

## Sample coroutine
@consumer
def printer():
    while True:
        item = yield     ## Receive an item sent to me
        print(item)

## Example use
if __name__ == '__main__':
    follow('stocklog.csv',printer())

Run this program and make sure produces output.. Make sure you understand how the different pieces are hooked together.

Build some pipeline components

In a file coticker.py, build a series of pipeline components that carry out the same tasks as the ticker.py program in Exercise 8.2. Here is the implementation of the various pieces.

## coticker.py
from structure import Structure

class Ticker(Structure):
    name = String()
    price =Float()
    date = String()
    time = String()
    change = Float()
    open = Float()
    high = Float()
    low = Float()
    volume = Integer()

from cofollow import consumer, follow
from tableformat import create_formatter
import csv

## This one is tricky. See solution for notes about it
@consumer
def to_csv(target):
    def producer():
        while True:
            yield line

    reader = csv.reader(producer())
    while True:
        line = yield
        target.send(next(reader))

@consumer
def create_ticker(target):
    while True:
        row = yield
        target.send(Ticker.from_row(row))

@consumer
def negchange(target):
    while True:
        record = yield
        if record.change < 0:
            target.send(record)

@consumer
def ticker(fmt, fields):
    formatter = create_formatter(fmt)
    formatter.headings(fields)
    while True:
        rec = yield
        row = [getattr(rec, name) for name in fields]
        formatter.row(row)

Your challenge: Write the main program that hooks all of these components together to generate the same stock ticker as in the previous exercise.

Summary

Congratulations! You have completed the Set Up Processing Pipelines lab. You can practice more labs in LabEx to improve your skills.

Other Python Tutorials you may like