Introduction
In this lab, you will learn how to leverage Python generators to build efficient data processing pipelines. Generators are a potent Python feature that enables on-demand data production, eliminating the need to store all data in memory simultaneously. You'll discover how to connect generators to create data processing workflows akin to Unix pipes.
The objectives of this lab are to understand the fundamentals of generator-based processing pipelines, create data processing workflows using Python generators, and filter and format real-time data streams. The ticker.py file will be created during this lab. Note that for this exercise, the stocksim.py program should be running in the background, and you'll use the follow() function from a previous exercise.
Basic Generator Pipeline with CSV Data
In this step, we're going to learn how to create a basic processing pipeline using generators. But first, let's understand what generators are. Generators are a special type of iterator in Python. Unlike regular iterators that might load all data into memory at once, generators generate values on-demand. This is extremely useful when dealing with large data streams because it saves memory. Instead of having to store the entire dataset in memory, the generator produces values one by one as you need them.
Understanding Generators
A generator is essentially a function that returns an iterator. When you iterate over this iterator, it produces a sequence of values. The way you write a generator function is similar to a regular function, but there's a key difference. Instead of using the return statement, a generator function uses the yield statement. The yield statement has a unique behavior. It pauses the function and saves its current state. When the next value is requested, the function continues from where it left off. This allows the generator to produce values incrementally without having to start from the beginning every time.
Using the follow() Function
The follow() function you created earlier works in a similar way to the Unix tail -f command. The tail -f command continuously monitors a file for new content, and so does the follow() function. Now, let's use it to create a simple processing pipeline.
Step 1: Open a new terminal window
First, open a new terminal window in the WebIDE. You can do this by going to Terminal → New Terminal. This new terminal will be where we'll run our Python commands.
Step 2: Start a Python interactive shell
Once the new terminal is open, start a Python interactive shell. You can do this by entering the following command in the terminal:
python3
The Python interactive shell allows you to run Python code line by line and see the results immediately.
Step 3: Import the follow function and set up the pipeline
Now, we'll import the follow function and set up a basic pipeline to read the stock data. In the Python interactive shell, enter the following code:
>>> from follow import follow
>>> import csv
>>> lines = follow('stocklog.csv')
>>> rows = csv.reader(lines)
>>> for row in rows:
... print(row)
...
Here's what each line does:
from follow import follow: This imports thefollowfunction from thefollowmodule.import csv: This imports thecsvmodule, which is used to read and write CSV files in Python.lines = follow('stocklog.csv'): This calls thefollowfunction with the file namestocklog.csv. Thefollowfunction returns a generator that yields new lines as they're added to the file.rows = csv.reader(lines): Thecsv.reader()function takes the lines generated by thefollowfunction and parses them into rows of CSV data.- The
forloop iterates through these rows and prints each one.
Step 4: Check the output
After running the code, you should see output similar to this (your data will vary):
['BA', '98.35', '6/11/2007', '09:41.07', '0.16', '98.25', '98.35', '98.31', '158148']
['AA', '39.63', '6/11/2007', '09:41.07', '-0.03', '39.67', '39.63', '39.31', '270224']
['XOM', '82.45', '6/11/2007', '09:41.07', '-0.23', '82.68', '82.64', '82.41', '748062']
['PG', '62.95', '6/11/2007', '09:41.08', '-0.12', '62.80', '62.97', '62.61', '454327']
...
This output indicates that you've successfully created a data pipeline. The follow() function generates lines from the file, and these lines are then passed to the csv.reader() function, which parses them into rows of data.
If you've seen enough output, you can stop the execution by pressing Ctrl+C.
What's Happening?
Let's break down what's going on in this pipeline:
follow('stocklog.csv')creates a generator. This generator keeps track of thestocklog.csvfile and yields new lines as they're added to the file.csv.reader(lines)takes the lines generated by thefollowfunction and parses them into CSV row data. It understands the structure of CSV files and splits the lines into individual values.- The
forloop then iterates through these rows, printing each one. This allows you to see the data in a readable format.
This is a simple example of a data processing pipeline using generators. In the next steps, we'll build more complex and useful pipelines.
Creating the Ticker Class
In data processing, working with raw data can be quite challenging. To make our work with stock data more organized and efficient, we'll define a proper class to represent stock quotes. This class will serve as a blueprint for our stock data, making our data processing pipeline more robust and easier to manage.
Creating the ticker.py File
First, we need to create a new file in the WebIDE. You can do this by clicking on the "New File" icon or right - clicking in the file explorer and selecting "New File". Name this file
ticker.py. This file will hold the code for ourTickerclass.Now, let's add the following code to your newly created
ticker.pyfile. This code will define ourTickerclass and set up a simple processing pipeline to test it.
## ticker.py
from structure import Structure, String, Float, Integer
class Ticker(Structure):
name = String()
price = Float()
date = String()
time = String()
change = Float()
open = Float()
high = Float()
low = Float()
volume = Integer()
if __name__ == '__main__':
from follow import follow
import csv
lines = follow('stocklog.csv')
rows = csv.reader(lines)
records = (Ticker.from_row(row) for row in rows)
for record in records:
print(record)
- After adding the code, save the file. You can do this by pressing
Ctrl+Sor selecting "File" → "Save" from the menu. Saving the file ensures that your changes are preserved and can be run later.
Understanding the Code
Let's take a closer look at what this code does step by step:
At the beginning of the code, we're importing
Structureand field types from thestructure.pymodule. This module has already been set up for you. These imports are essential because they provide the building blocks for ourTickerclass. TheStructureclass will be the base class for ourTickerclass, and the field types likeString,Float, andIntegerwill define the data types of our stock data fields.Next, we define a
Tickerclass that inherits fromStructure. This class has several fields that represent different aspects of the stock data:name: This field stores the stock symbol, such as "IBM" or "AAPL". It helps us identify which company's stock we're dealing with.price: It holds the current price of the stock. This is a crucial piece of information for investors.dateandtime: These fields tell us when the stock quote was generated. Knowing the time and date is important for analyzing stock price trends over time.change: This represents the price change of the stock. It shows whether the stock price has gone up or down compared to a previous point.open,high,low: These fields represent the opening price, the highest price, and the lowest price of the stock during a certain period. They give us an idea of the stock's price range.volume: This field stores the number of shares traded. High trading volume can indicate strong market interest in a particular stock.
In the
if __name__ == '__main__':block, we set up a processing pipeline. This block of code will be executed when we run theticker.pyfile directly.follow('stocklog.csv')is a function that generates lines from thestocklog.csvfile. It allows us to read the file line by line.csv.reader(lines)takes these lines and parses them into row data. CSV (Comma - Separated Values) is a common file format for storing tabular data, and this function helps us extract the data from each row.(Ticker.from_row(row) for row in rows)is a generator expression. It takes each row of data and converts it into aTickerobject. This way, we transform the raw CSV data into structured objects that are easier to work with.- The
forloop iterates over theseTickerobjects and prints each one. This allows us to see the structured data in action.
Running the Code
Let's run the code to see how it works:
First, we need to make sure we're in the project directory in the terminal. If you're not already there, use the following command to navigate to it:
cd /home/labex/projectOnce you're in the correct directory, run the
ticker.pyscript using the following command:python3 ticker.pyAfter running the script, you should see output similar to this (your data will vary):
Ticker(IBM, 103.53, 6/11/2007, 09:53.59, 0.46, 102.87, 103.53, 102.77, 541633) Ticker(MSFT, 30.21, 6/11/2007, 09:54.01, 0.16, 30.05, 30.21, 29.95, 7562516) Ticker(AA, 40.01, 6/11/2007, 09:54.01, 0.35, 39.67, 40.15, 39.31, 576619) Ticker(T, 40.1, 6/11/2007, 09:54.08, -0.16, 40.2, 40.19, 39.87, 1312959)
You can stop the execution of the script by pressing Ctrl+C when you've seen enough output.
Notice how the raw CSV data has been transformed into structured Ticker objects. This transformation makes the data much easier to work with in our processing pipeline, as we can now access and manipulate the stock data using the fields defined in the Ticker class.
Building a More Complex Data Pipeline
Now, we're going to take our data pipeline to the next level by adding filtering and improving the presentation of the data. This will make it easier to analyze and understand the information we're working with. We'll be making changes to our ticker.py script. Filtering the data will help us focus on the specific information we're interested in, and presenting it in a nicely formatted table will make it more readable.
Updating the ticker.py File
First, open your
ticker.pyfile in the WebIDE. The WebIDE is a tool that allows you to write and edit code directly in your browser. It provides a convenient environment for making changes to your Python scripts.Next, we need to replace the
if __name__ == '__main__':block in theticker.pyfile with the following code. This block of code is the entry point of our script, and by replacing it, we'll be changing how the script processes and displays the data.
if __name__ == '__main__':
from follow import follow
import csv
from tableformat import create_formatter, print_table
formatter = create_formatter('text')
lines = follow('stocklog.csv')
rows = csv.reader(lines)
records = (Ticker.from_row(row) for row in rows)
negative = (rec for rec in records if rec.change < 0)
print_table(negative, ['name', 'price', 'change'], formatter)
- After making these changes, save the file. You can do this by pressing
Ctrl+Son your keyboard or by selecting "File" → "Save" from the menu. Saving the file ensures that your changes are preserved and can be run later.
Understanding the Enhanced Pipeline
Let's take a closer look at what this enhanced pipeline does. Understanding each step will help you see how the different parts of the code work together to process and display the data.
We start by importing
create_formatterandprint_tablefrom thetableformatmodule. This module is already set up for you, and it provides functions that help us format and print the data in a nice table.Then, we create a text formatter using
create_formatter('text'). This formatter will be used to format the data in a way that's easy to read.Now, let's break down the pipeline step by step:
follow('stocklog.csv')is a function that generates lines from thestocklog.csvfile. It continuously monitors the file for new data and provides the lines one by one.csv.reader(lines)takes the lines generated byfollowand parses them into row data. This is necessary because the data in the CSV file is in a text format, and we need to convert it into a structured format that we can work with.(Ticker.from_row(row) for row in rows)is a generator expression that converts each row of data into aTickerobject. ATickerobject represents a stock and contains information such as the stock's name, price, and change.(rec for rec in records if rec.change < 0)is another generator expression that filters theTickerobjects. It only keeps the objects where the stock's price change is negative. This allows us to focus on the stocks that have decreased in price.print_table(negative, ['name', 'price', 'change'], formatter)takes the filteredTickerobjects and formats them into a table using the formatter we created earlier. It then prints the table to the console.
This pipeline demonstrates the power of generators. Instead of loading all the data from the file into memory at once, we're chaining together multiple operations (reading, parsing, converting, filtering) and processing the data one item at a time. This saves memory and makes the code more efficient.
Running the Enhanced Pipeline
Let's run the updated code to see the results.
First, make sure you're in the project directory in the terminal. If you're not already there, you can navigate to it using the following command:
cd /home/labex/projectOnce you're in the project directory, run the
ticker.pyscript using the following command:python3 ticker.pyAfter running the script, you should see a nicely formatted table in the terminal. This table shows only the stocks with negative price changes.
name price change ---------- ---------- ---------- C 53.12 -0.21 UTX 70.04 -0.19 AXP 62.86 -0.18 MMM 85.72 -0.22 MCD 51.38 -0.03 WMT 49.85 -0.23 KO 51.6 -0.07 AIG 71.39 -0.14 PG 63.05 -0.02 HD 37.76 -0.19
If you've seen enough output and want to stop the execution of the script, you can press Ctrl+C on your keyboard.
The Power of Generator Pipelines
What we've created here is a powerful data processing pipeline. Let's summarize what it does:
- It continuously monitors the
stocklog.csvfile for new data. This means that as new data is added to the file, the pipeline will automatically process it. - It parses the CSV data from the file into structured
Tickerobjects. This makes it easier to work with the data and perform operations on it. - It filters the data based on a specific criteria, in this case, negative price changes. This allows us to focus on the stocks that are losing value.
- It formats and presents the filtered data in a readable table. This makes it easy to analyze the data and draw conclusions.
One of the key advantages of using generators in this pipeline is that it uses minimal memory. Generators produce values on-demand, which means they don't store all the data in memory at once. This is similar to Unix pipes, where each component processes the data and passes it on to the next component.
You can think of generators as Lego blocks. Just like you can stack Lego blocks together to create different structures, you can combine generators to create powerful data processing workflows. This modular approach allows you to build complex systems from simple, reusable components.
Summary
In this lab, you have learned how to use Python generators to build efficient data processing pipelines. You completed several important tasks, such as using the follow() function to monitor a file for new data, creating a Ticker class to represent stock quotes, and constructing a multi - stage processing pipeline that reads, parses, and filters CSV data, then formats and displays the results.
The generator - based approach offers multiple advantages, including memory efficiency as data is processed on - demand, modularity allowing easy combination and reuse of pipeline components, and simplicity in expressing complex data flows. These concepts are commonly applied in real - world data processing, especially for large datasets or streaming data.