Introduction
In this lab, you will learn how to use coroutines to build data processing pipelines. Coroutines, a powerful Python feature, support cooperative multitasking, enabling functions to pause and resume execution at a later point.
The objectives of this lab are to understand how coroutines work in Python, implement data processing pipelines based on coroutines, and transform data through multiple coroutine stages. You will create two files: cofollow.py, a coroutine-based file follower, and coticker.py, a stock ticker application using coroutines. It is assumed that the stocksim.py program from a previous exercise is still running in the background, generating stock data in a log file.
Understanding Coroutines with a File Follower
Let's start by understanding what coroutines are and how they work in Python. A coroutine is a specialized version of a generator function. In Python, functions usually start from the beginning every time they're called. But coroutines are different. They can both consume and produce data, and they have the ability to suspend and resume their execution. This means that a coroutine can pause its operation at a certain point and then pick up right where it left off later.
Creating a Basic Coroutine File Follower
In this step, we'll create a file follower that uses coroutines to monitor a file for new content and process it. This is similar to the Unix tail -f command, which continuously shows the end of a file and updates as new lines are added.
Open the code editor and create a new file named
cofollow.pyin the/home/labex/projectdirectory. This is where we'll write our Python code to implement the file follower using coroutines.Copy the following code into the file:
## cofollow.py
import os
import time
## Data source
def follow(filename, target):
with open(filename, 'r') as f:
f.seek(0, os.SEEK_END) ## Move to the end of the file
while True:
line = f.readline()
if line != '':
target.send(line) ## Send the line to the target coroutine
else:
time.sleep(0.1) ## Sleep briefly if no new content
## Decorator for coroutine functions
from functools import wraps
def consumer(func):
@wraps(func)
def start(*args, **kwargs):
f = func(*args, **kwargs)
f.send(None) ## Prime the coroutine (necessary first step)
return f
return start
## Sample coroutine
@consumer
def printer():
while True:
item = yield ## Receive an item sent to me
print(item)
## Example use
if __name__ == '__main__':
follow('stocklog.csv', printer())
Let's understand the key components of this code:
follow(filename, target): This function is responsible for opening a file. It first moves the file pointer to the end of the file usingf.seek(0, os.SEEK_END). Then, it enters an infinite loop where it continuously tries to read new lines from the file. If a new line is found, it sends that line to the target coroutine using thesendmethod. If there is no new content, it pauses for a short time (0.1 seconds) usingtime.sleep(0.1)before checking again.@consumerdecorator: In Python, coroutines need to be "primed" before they can start receiving data. This decorator takes care of that. It automatically sends an initialNonevalue to the coroutine, which is a necessary first step to get the coroutine ready to receive real data.printer()coroutine: This is a simple coroutine. It has an infinite loop where it uses theyieldkeyword to receive an item sent to it. Once it receives an item, it simply prints it.
Save the file and run it from the terminal:
cd /home/labex/project
python3 cofollow.py
- You should see the script printing the content of the stock log file, and it will continue to print new lines as they are added to the file. Press
Ctrl+Cto stop the program.
The key concept here is that data flows from the follow function into the printer coroutine through the send method. This "pushing" of data is opposite to generators, which "pull" data through iteration. In a generator, you typically use a for loop to iterate over the values it produces. But in this coroutine example, the data is actively sent from one part of the code to another.
Creating Coroutine Pipeline Components
In this step, we're going to create more specialized coroutines for processing stock data. A coroutine is a special type of function that can pause and resume its execution, which is very useful for building data processing pipelines. Each coroutine we create will perform a specific task in our overall processing pipeline.
First, you need to create a new file. Navigate to the
/home/labex/projectdirectory and create a file namedcoticker.py. This file will hold all the code for our coroutine - based data processing.Now, let's start writing code in the
coticker.pyfile. We'll first import the necessary modules and define the basic structure. Modules are pre - written code libraries that provide useful functions and classes. The following code does just that:
## coticker.py
from structure import Structure
class Ticker(Structure):
name = String()
price = Float()
date = String()
time = String()
change = Float()
open = Float()
high = Float()
low = Float()
volume = Integer()
from cofollow import consumer, follow
from tableformat import create_formatter
import csv
- When you look at the code above, you'll notice that there are errors related to
String(),Float(), andInteger(). These are classes that we need to import. So, we'll add the required imports at the top of the file. This way, Python knows where to find these classes. Here's the updated code:
## coticker.py
from structure import Structure, String, Float, Integer
class Ticker(Structure):
name = String()
price = Float()
date = String()
time = String()
change = Float()
open = Float()
high = Float()
low = Float()
volume = Integer()
from cofollow import consumer, follow
from tableformat import create_formatter
import csv
- Next, we'll add the coroutine components that will form our data processing pipeline. Each coroutine has a specific job in the pipeline. Here's the code to add these coroutines:
@consumer
def to_csv(target):
def producer():
while True:
line = yield
reader = csv.reader(producer())
while True:
line = yield
target.send(next(reader))
@consumer
def create_ticker(target):
while True:
row = yield
target.send(Ticker.from_row(row))
@consumer
def negchange(target):
while True:
record = yield
if record.change < 0:
target.send(record)
@consumer
def ticker(fmt, fields):
formatter = create_formatter(fmt)
formatter.headings(fields)
while True:
rec = yield
row = [getattr(rec, name) for name in fields]
formatter.row(row)
Let's understand what each of these coroutines does:
to_csv: Its job is to convert raw text lines into parsed CSV rows. This is important because our data is initially in text format, and we need to break it into structured CSV data.create_ticker: This coroutine takes the CSV rows and createsTickerobjects from them.Tickerobjects represent the stock data in a more organized way.negchange: It filters theTickerobjects. It only passes on the stocks that have negative price changes. This helps us focus on the stocks that are losing value.ticker: This coroutine formats and displays the ticker data. It uses a formatter to present the data in a nice, readable table.
Finally, we need to add the main program code that connects all these components together. This code will set up the data flow through the pipeline. Here's the code:
if __name__ == '__main__':
import sys
## Define the field names to display
fields = ['name', 'price', 'change']
## Create the processing pipeline
t = ticker('text', fields)
neg_filter = negchange(t)
tick_creator = create_ticker(neg_filter)
csv_parser = to_csv(tick_creator)
## Connect the pipeline to the data source
follow('stocklog.csv', csv_parser)
- After writing all the code, save the
coticker.pyfile. Then, open the terminal and run the following commands. Thecdcommand changes the directory to where our file is located, and thepython3command runs our Python script:
cd /home/labex/project
python3 coticker.py
- If everything goes well, you should see a formatted table in the terminal. This table shows stocks with negative price changes. The output will look something like this:
name price change
---------- ---------- ----------
MSFT 72.50 -0.25
AA 35.25 -0.15
IBM 50.10 -0.15
GOOG 100.02 -0.01
AAPL 102.50 -0.06
Keep in mind that the actual values in the table may vary depending on the generated stock data.
Understanding the Pipeline Flow
The most important part of this program is how data flows through the coroutines. Let's break it down step by step:
- The
followfunction starts by reading lines from thestocklog.csvfile. This is our data source. - Each line that is read is then sent to the
csv_parsercoroutine. Thecsv_parsertakes the raw text line and parses it into CSV fields. - The parsed CSV data is then sent to the
tick_creatorcoroutine. This coroutine createsTickerobjects from the CSV rows. - The
Tickerobjects are then sent to theneg_filtercoroutine. This coroutine checks eachTickerobject. If the stock has a negative price change, it passes the object on; otherwise, it discards it. - Finally, the filtered
Tickerobjects are sent to thetickercoroutine. Thetickercoroutine formats the data and displays it in a table.
This pipeline architecture is very useful because it allows each component to focus on a single task. This makes the code more modular, which means it's easier to understand, modify, and maintain.
Enhancing the Coroutine Pipeline
Now that we have a basic pipeline up and running, it's time to make it more flexible. In programming, flexibility is crucial as it allows our code to adapt to different requirements. We'll achieve this by modifying our coticker.py program to support various filtering and formatting options.
First, open the
coticker.pyfile in your code editor. The code editor is where you'll make all the necessary changes to the program. It provides a convenient environment to view, edit, and save your code.Next, we'll add a new coroutine that filters data by stock name. A coroutine is a special type of function that can pause and resume its execution. This allows us to create a pipeline where data can flow through different processing steps. Here's the code for the new coroutine:
@consumer
def filter_by_name(name, target):
while True:
record = yield
if record.name == name:
target.send(record)
In this code, the filter_by_name coroutine takes a stock name and a target coroutine as parameters. It continuously waits for a record using the yield keyword. When a record arrives, it checks if the record's name matches the specified name. If it does, it sends the record to the target coroutine.
- Now, let's add another coroutine that filters based on price thresholds. This coroutine will help us select stocks within a specific price range. Here's the code:
@consumer
def price_threshold(min_price, max_price, target):
while True:
record = yield
if min_price <= record.price <= max_price:
target.send(record)
Similar to the previous coroutine, the price_threshold coroutine waits for a record. It then checks if the record's price is within the specified minimum and maximum price range. If it is, it sends the record to the target coroutine.
- After adding the new coroutines, we need to update the main program to demonstrate these additional filters. The main program is the entry point of our application, where we set up the processing pipelines and start the data flow. Here's the updated code:
if __name__ == '__main__':
import sys
## Define the field names to display
fields = ['name', 'price', 'change', 'high', 'low']
## Create the processing pipeline with multiple outputs
## Pipeline 1: Show all negative changes (same as before)
print("Stocks with negative changes:")
t1 = ticker('text', fields)
neg_filter = negchange(t1)
tick_creator1 = create_ticker(neg_filter)
csv_parser1 = to_csv(tick_creator1)
## Start following the file with the first pipeline
import threading
threading.Thread(target=follow, args=('stocklog.csv', csv_parser1), daemon=True).start()
## Wait a moment to see some results
import time
time.sleep(5)
## Pipeline 2: Filter by name (AAPL)
print("\nApple stock updates:")
t2 = ticker('text', fields)
name_filter = filter_by_name('AAPL', t2)
tick_creator2 = create_ticker(name_filter)
csv_parser2 = to_csv(tick_creator2)
## Follow the file with the second pipeline
threading.Thread(target=follow, args=('stocklog.csv', csv_parser2), daemon=True).start()
## Wait a moment to see some results
time.sleep(5)
## Pipeline 3: Filter by price range
print("\nStocks priced between 50 and 75:")
t3 = ticker('text', fields)
price_filter = price_threshold(50, 75, t3)
tick_creator3 = create_ticker(price_filter)
csv_parser3 = to_csv(tick_creator3)
## Follow with the third pipeline
follow('stocklog.csv', csv_parser3)
In this updated code, we create three different processing pipelines. The first pipeline shows stocks with negative changes, the second pipeline filters stocks by the name 'AAPL', and the third pipeline filters stocks based on a price range between 50 and 75. We use threads to run the first two pipelines concurrently, which allows us to process data more efficiently.
- Once you've made all the changes, save the file. Saving the file ensures that all your modifications are preserved. Then, run the updated program using the following commands in your terminal:
cd /home/labex/project
python3 coticker.py
The cd command changes the current directory to the project directory, and the python3 coticker.py command runs the Python program.
- After running the program, you should see three different outputs:
- First, you'll see stocks with negative changes.
- Then, you'll see all AAPL stock updates.
- Finally, you'll see all stocks priced between 50 and 75.
Understanding the Enhanced Pipeline
The enhanced program demonstrates several important concepts:
- Multiple Pipelines: We can create multiple processing pipelines from the same data source. This allows us to perform different types of analysis on the same data simultaneously.
- Specialized Filters: We can create different coroutines for specific filtering tasks. These filters help us select only the data that meets our specific criteria.
- Concurrent Processing: Using threads, we can run multiple pipelines concurrently. This improves the efficiency of our program by allowing it to process data in parallel.
- Pipeline Composition: Coroutines can be combined in different ways to achieve different data processing goals. This gives us the flexibility to customize our data processing pipelines according to our needs.
This approach provides a flexible and modular way to process streaming data. It allows you to add or modify processing steps without changing the overall architecture of the program.
Summary
In this lab, you have learned how to use coroutines to build data processing pipelines in Python. Key concepts include understanding coroutine basics, such as how they operate, the need for priming, and using decorators for initialization. You also explored data flow, pushing data through a pipeline via the send() method, different from the generator's "pull" model.
Moreover, you created specialized coroutines for tasks like parsing CSV data, filtering records, and formatting output. You learned to compose pipelines by connecting multiple coroutines and implemented filtering and transformation operations. Coroutines offer a powerful approach for streaming data processing, enabling clean separation of concerns and easy modification of individual stages.