Coroutine-Powered Data Processing

Beginner

This tutorial is from open-source community. Access the source code

Introduction

In this lab, you will learn how to use coroutines to build data processing pipelines. Coroutines, a powerful Python feature, support cooperative multitasking, enabling functions to pause and resume execution at a later point.

The objectives of this lab are to understand how coroutines work in Python, implement data processing pipelines based on coroutines, and transform data through multiple coroutine stages. You will create two files: cofollow.py, a coroutine-based file follower, and coticker.py, a stock ticker application using coroutines. It is assumed that the stocksim.py program from a previous exercise is still running in the background, generating stock data in a log file.

Understanding Coroutines with a File Follower

Let's start by understanding what coroutines are and how they work in Python. A coroutine is a specialized version of a generator function. In Python, functions usually start from the beginning every time they're called. But coroutines are different. They can both consume and produce data, and they have the ability to suspend and resume their execution. This means that a coroutine can pause its operation at a certain point and then pick up right where it left off later.

Creating a Basic Coroutine File Follower

In this step, we'll create a file follower that uses coroutines to monitor a file for new content and process it. This is similar to the Unix tail -f command, which continuously shows the end of a file and updates as new lines are added.

  1. Open the code editor and create a new file named cofollow.py in the /home/labex/project directory. This is where we'll write our Python code to implement the file follower using coroutines.

  2. Copy the following code into the file:

## cofollow.py
import os
import time

## Data source
def follow(filename, target):
    with open(filename, 'r') as f:
        f.seek(0, os.SEEK_END)  ## Move to the end of the file
        while True:
            line = f.readline()
            if line != '':
                target.send(line)  ## Send the line to the target coroutine
            else:
                time.sleep(0.1)  ## Sleep briefly if no new content

## Decorator for coroutine functions
from functools import wraps

def consumer(func):
    @wraps(func)
    def start(*args, **kwargs):
        f = func(*args, **kwargs)
        f.send(None)  ## Prime the coroutine (necessary first step)
        return f
    return start

## Sample coroutine
@consumer
def printer():
    while True:
        item = yield     ## Receive an item sent to me
        print(item)

## Example use
if __name__ == '__main__':
    follow('stocklog.csv', printer())
  1. Let's understand the key components of this code:

    • follow(filename, target): This function is responsible for opening a file. It first moves the file pointer to the end of the file using f.seek(0, os.SEEK_END). Then, it enters an infinite loop where it continuously tries to read new lines from the file. If a new line is found, it sends that line to the target coroutine using the send method. If there is no new content, it pauses for a short time (0.1 seconds) using time.sleep(0.1) before checking again.
    • @consumer decorator: In Python, coroutines need to be "primed" before they can start receiving data. This decorator takes care of that. It automatically sends an initial None value to the coroutine, which is a necessary first step to get the coroutine ready to receive real data.
    • printer() coroutine: This is a simple coroutine. It has an infinite loop where it uses the yield keyword to receive an item sent to it. Once it receives an item, it simply prints it.
  2. Save the file and run it from the terminal:

cd /home/labex/project
python3 cofollow.py
  1. You should see the script printing the content of the stock log file, and it will continue to print new lines as they are added to the file. Press Ctrl+C to stop the program.

The key concept here is that data flows from the follow function into the printer coroutine through the send method. This "pushing" of data is opposite to generators, which "pull" data through iteration. In a generator, you typically use a for loop to iterate over the values it produces. But in this coroutine example, the data is actively sent from one part of the code to another.

Creating Coroutine Pipeline Components

In this step, we're going to create more specialized coroutines for processing stock data. A coroutine is a special type of function that can pause and resume its execution, which is very useful for building data processing pipelines. Each coroutine we create will perform a specific task in our overall processing pipeline.

  1. First, you need to create a new file. Navigate to the /home/labex/project directory and create a file named coticker.py. This file will hold all the code for our coroutine - based data processing.

  2. Now, let's start writing code in the coticker.py file. We'll first import the necessary modules and define the basic structure. Modules are pre - written code libraries that provide useful functions and classes. The following code does just that:

## coticker.py
from structure import Structure

class Ticker(Structure):
    name = String()
    price = Float()
    date = String()
    time = String()
    change = Float()
    open = Float()
    high = Float()
    low = Float()
    volume = Integer()

from cofollow import consumer, follow
from tableformat import create_formatter
import csv
  1. When you look at the code above, you'll notice that there are errors related to String(), Float(), and Integer(). These are classes that we need to import. So, we'll add the required imports at the top of the file. This way, Python knows where to find these classes. Here's the updated code:
## coticker.py
from structure import Structure, String, Float, Integer

class Ticker(Structure):
    name = String()
    price = Float()
    date = String()
    time = String()
    change = Float()
    open = Float()
    high = Float()
    low = Float()
    volume = Integer()

from cofollow import consumer, follow
from tableformat import create_formatter
import csv
  1. Next, we'll add the coroutine components that will form our data processing pipeline. Each coroutine has a specific job in the pipeline. Here's the code to add these coroutines:
@consumer
def to_csv(target):
    def producer():
        while True:
            line = yield

    reader = csv.reader(producer())
    while True:
        line = yield
        target.send(next(reader))

@consumer
def create_ticker(target):
    while True:
        row = yield
        target.send(Ticker.from_row(row))

@consumer
def negchange(target):
    while True:
        record = yield
        if record.change < 0:
            target.send(record)

@consumer
def ticker(fmt, fields):
    formatter = create_formatter(fmt)
    formatter.headings(fields)
    while True:
        rec = yield
        row = [getattr(rec, name) for name in fields]
        formatter.row(row)
  1. Let's understand what each of these coroutines does:

    • to_csv: Its job is to convert raw text lines into parsed CSV rows. This is important because our data is initially in text format, and we need to break it into structured CSV data.
    • create_ticker: This coroutine takes the CSV rows and creates Ticker objects from them. Ticker objects represent the stock data in a more organized way.
    • negchange: It filters the Ticker objects. It only passes on the stocks that have negative price changes. This helps us focus on the stocks that are losing value.
    • ticker: This coroutine formats and displays the ticker data. It uses a formatter to present the data in a nice, readable table.
  2. Finally, we need to add the main program code that connects all these components together. This code will set up the data flow through the pipeline. Here's the code:

if __name__ == '__main__':
    import sys

    ## Define the field names to display
    fields = ['name', 'price', 'change']

    ## Create the processing pipeline
    t = ticker('text', fields)
    neg_filter = negchange(t)
    tick_creator = create_ticker(neg_filter)
    csv_parser = to_csv(tick_creator)

    ## Connect the pipeline to the data source
    follow('stocklog.csv', csv_parser)
  1. After writing all the code, save the coticker.py file. Then, open the terminal and run the following commands. The cd command changes the directory to where our file is located, and the python3 command runs our Python script:
cd /home/labex/project
python3 coticker.py
  1. If everything goes well, you should see a formatted table in the terminal. This table shows stocks with negative price changes. The output will look something like this:
      name      price     change
---------- ---------- ----------
      MSFT      72.50      -0.25
        AA      35.25      -0.15
       IBM      50.10      -0.15
      GOOG     100.02      -0.01
      AAPL     102.50      -0.06

Keep in mind that the actual values in the table may vary depending on the generated stock data.

Understanding the Pipeline Flow

The most important part of this program is how data flows through the coroutines. Let's break it down step by step:

  1. The follow function starts by reading lines from the stocklog.csv file. This is our data source.
  2. Each line that is read is then sent to the csv_parser coroutine. The csv_parser takes the raw text line and parses it into CSV fields.
  3. The parsed CSV data is then sent to the tick_creator coroutine. This coroutine creates Ticker objects from the CSV rows.
  4. The Ticker objects are then sent to the neg_filter coroutine. This coroutine checks each Ticker object. If the stock has a negative price change, it passes the object on; otherwise, it discards it.
  5. Finally, the filtered Ticker objects are sent to the ticker coroutine. The ticker coroutine formats the data and displays it in a table.

This pipeline architecture is very useful because it allows each component to focus on a single task. This makes the code more modular, which means it's easier to understand, modify, and maintain.

Enhancing the Coroutine Pipeline

Now that we have a basic pipeline up and running, it's time to make it more flexible. In programming, flexibility is crucial as it allows our code to adapt to different requirements. We'll achieve this by modifying our coticker.py program to support various filtering and formatting options.

  1. First, open the coticker.py file in your code editor. The code editor is where you'll make all the necessary changes to the program. It provides a convenient environment to view, edit, and save your code.

  2. Next, we'll add a new coroutine that filters data by stock name. A coroutine is a special type of function that can pause and resume its execution. This allows us to create a pipeline where data can flow through different processing steps. Here's the code for the new coroutine:

@consumer
def filter_by_name(name, target):
    while True:
        record = yield
        if record.name == name:
            target.send(record)

In this code, the filter_by_name coroutine takes a stock name and a target coroutine as parameters. It continuously waits for a record using the yield keyword. When a record arrives, it checks if the record's name matches the specified name. If it does, it sends the record to the target coroutine.

  1. Now, let's add another coroutine that filters based on price thresholds. This coroutine will help us select stocks within a specific price range. Here's the code:
@consumer
def price_threshold(min_price, max_price, target):
    while True:
        record = yield
        if min_price <= record.price <= max_price:
            target.send(record)

Similar to the previous coroutine, the price_threshold coroutine waits for a record. It then checks if the record's price is within the specified minimum and maximum price range. If it is, it sends the record to the target coroutine.

  1. After adding the new coroutines, we need to update the main program to demonstrate these additional filters. The main program is the entry point of our application, where we set up the processing pipelines and start the data flow. Here's the updated code:
if __name__ == '__main__':
    import sys

    ## Define the field names to display
    fields = ['name', 'price', 'change', 'high', 'low']

    ## Create the processing pipeline with multiple outputs

    ## Pipeline 1: Show all negative changes (same as before)
    print("Stocks with negative changes:")
    t1 = ticker('text', fields)
    neg_filter = negchange(t1)
    tick_creator1 = create_ticker(neg_filter)
    csv_parser1 = to_csv(tick_creator1)

    ## Start following the file with the first pipeline
    import threading
    threading.Thread(target=follow, args=('stocklog.csv', csv_parser1), daemon=True).start()

    ## Wait a moment to see some results
    import time
    time.sleep(5)

    ## Pipeline 2: Filter by name (AAPL)
    print("\nApple stock updates:")
    t2 = ticker('text', fields)
    name_filter = filter_by_name('AAPL', t2)
    tick_creator2 = create_ticker(name_filter)
    csv_parser2 = to_csv(tick_creator2)

    ## Follow the file with the second pipeline
    threading.Thread(target=follow, args=('stocklog.csv', csv_parser2), daemon=True).start()

    ## Wait a moment to see some results
    time.sleep(5)

    ## Pipeline 3: Filter by price range
    print("\nStocks priced between 50 and 75:")
    t3 = ticker('text', fields)
    price_filter = price_threshold(50, 75, t3)
    tick_creator3 = create_ticker(price_filter)
    csv_parser3 = to_csv(tick_creator3)

    ## Follow with the third pipeline
    follow('stocklog.csv', csv_parser3)

In this updated code, we create three different processing pipelines. The first pipeline shows stocks with negative changes, the second pipeline filters stocks by the name 'AAPL', and the third pipeline filters stocks based on a price range between 50 and 75. We use threads to run the first two pipelines concurrently, which allows us to process data more efficiently.

  1. Once you've made all the changes, save the file. Saving the file ensures that all your modifications are preserved. Then, run the updated program using the following commands in your terminal:
cd /home/labex/project
python3 coticker.py

The cd command changes the current directory to the project directory, and the python3 coticker.py command runs the Python program.

  1. After running the program, you should see three different outputs:
    • First, you'll see stocks with negative changes.
    • Then, you'll see all AAPL stock updates.
    • Finally, you'll see all stocks priced between 50 and 75.

Understanding the Enhanced Pipeline

The enhanced program demonstrates several important concepts:

  1. Multiple Pipelines: We can create multiple processing pipelines from the same data source. This allows us to perform different types of analysis on the same data simultaneously.
  2. Specialized Filters: We can create different coroutines for specific filtering tasks. These filters help us select only the data that meets our specific criteria.
  3. Concurrent Processing: Using threads, we can run multiple pipelines concurrently. This improves the efficiency of our program by allowing it to process data in parallel.
  4. Pipeline Composition: Coroutines can be combined in different ways to achieve different data processing goals. This gives us the flexibility to customize our data processing pipelines according to our needs.

This approach provides a flexible and modular way to process streaming data. It allows you to add or modify processing steps without changing the overall architecture of the program.

Summary

In this lab, you have learned how to use coroutines to build data processing pipelines in Python. Key concepts include understanding coroutine basics, such as how they operate, the need for priming, and using decorators for initialization. You also explored data flow, pushing data through a pipeline via the send() method, different from the generator's "pull" model.

Moreover, you created specialized coroutines for tasks like parsing CSV data, filtering records, and formatting output. You learned to compose pipelines by connecting multiple coroutines and implemented filtering and transformation operations. Coroutines offer a powerful approach for streaming data processing, enabling clean separation of concerns and easy modification of individual stages.