Utilize Generators For Stocksim Pipelines

Beginner

This tutorial is from open-source community. Access the source code

Introduction

In this lab, you will learn how to leverage Python generators to build efficient data processing pipelines. Generators are a potent Python feature that enables on-demand data production, eliminating the need to store all data in memory simultaneously. You'll discover how to connect generators to create data processing workflows akin to Unix pipes.

The objectives of this lab are to understand the fundamentals of generator-based processing pipelines, create data processing workflows using Python generators, and filter and format real-time data streams. The ticker.py file will be created during this lab. Note that for this exercise, the stocksim.py program should be running in the background, and you'll use the follow() function from a previous exercise.

Basic Generator Pipeline with CSV Data

In this step, we're going to learn how to create a basic processing pipeline using generators. But first, let's understand what generators are. Generators are a special type of iterator in Python. Unlike regular iterators that might load all data into memory at once, generators generate values on-demand. This is extremely useful when dealing with large data streams because it saves memory. Instead of having to store the entire dataset in memory, the generator produces values one by one as you need them.

Understanding Generators

A generator is essentially a function that returns an iterator. When you iterate over this iterator, it produces a sequence of values. The way you write a generator function is similar to a regular function, but there's a key difference. Instead of using the return statement, a generator function uses the yield statement. The yield statement has a unique behavior. It pauses the function and saves its current state. When the next value is requested, the function continues from where it left off. This allows the generator to produce values incrementally without having to start from the beginning every time.

Using the follow() Function

The follow() function you created earlier works in a similar way to the Unix tail -f command. The tail -f command continuously monitors a file for new content, and so does the follow() function. Now, let's use it to create a simple processing pipeline.

Step 1: Open a new terminal window

First, open a new terminal window in the WebIDE. You can do this by going to Terminal → New Terminal. This new terminal will be where we'll run our Python commands.

Step 2: Start a Python interactive shell

Once the new terminal is open, start a Python interactive shell. You can do this by entering the following command in the terminal:

python3

The Python interactive shell allows you to run Python code line by line and see the results immediately.

Step 3: Import the follow function and set up the pipeline

Now, we'll import the follow function and set up a basic pipeline to read the stock data. In the Python interactive shell, enter the following code:

>>> from follow import follow
>>> import csv
>>> lines = follow('stocklog.csv')
>>> rows = csv.reader(lines)
>>> for row in rows:
...     print(row)
...

Here's what each line does:

  • from follow import follow: This imports the follow function from the follow module.
  • import csv: This imports the csv module, which is used to read and write CSV files in Python.
  • lines = follow('stocklog.csv'): This calls the follow function with the file name stocklog.csv. The follow function returns a generator that yields new lines as they're added to the file.
  • rows = csv.reader(lines): The csv.reader() function takes the lines generated by the follow function and parses them into rows of CSV data.
  • The for loop iterates through these rows and prints each one.

Step 4: Check the output

After running the code, you should see output similar to this (your data will vary):

['BA', '98.35', '6/11/2007', '09:41.07', '0.16', '98.25', '98.35', '98.31', '158148']
['AA', '39.63', '6/11/2007', '09:41.07', '-0.03', '39.67', '39.63', '39.31', '270224']
['XOM', '82.45', '6/11/2007', '09:41.07', '-0.23', '82.68', '82.64', '82.41', '748062']
['PG', '62.95', '6/11/2007', '09:41.08', '-0.12', '62.80', '62.97', '62.61', '454327']
...

This output indicates that you've successfully created a data pipeline. The follow() function generates lines from the file, and these lines are then passed to the csv.reader() function, which parses them into rows of data.

If you've seen enough output, you can stop the execution by pressing Ctrl+C.

What's Happening?

Let's break down what's going on in this pipeline:

  1. follow('stocklog.csv') creates a generator. This generator keeps track of the stocklog.csv file and yields new lines as they're added to the file.
  2. csv.reader(lines) takes the lines generated by the follow function and parses them into CSV row data. It understands the structure of CSV files and splits the lines into individual values.
  3. The for loop then iterates through these rows, printing each one. This allows you to see the data in a readable format.

This is a simple example of a data processing pipeline using generators. In the next steps, we'll build more complex and useful pipelines.

Creating the Ticker Class

In data processing, working with raw data can be quite challenging. To make our work with stock data more organized and efficient, we'll define a proper class to represent stock quotes. This class will serve as a blueprint for our stock data, making our data processing pipeline more robust and easier to manage.

Creating the ticker.py File

  1. First, we need to create a new file in the WebIDE. You can do this by clicking on the "New File" icon or right - clicking in the file explorer and selecting "New File". Name this file ticker.py. This file will hold the code for our Ticker class.

  2. Now, let's add the following code to your newly created ticker.py file. This code will define our Ticker class and set up a simple processing pipeline to test it.

## ticker.py

from structure import Structure, String, Float, Integer

class Ticker(Structure):
    name = String()
    price = Float()
    date = String()
    time = String()
    change = Float()
    open = Float()
    high = Float()
    low = Float()
    volume = Integer()

if __name__ == '__main__':
    from follow import follow
    import csv
    lines = follow('stocklog.csv')
    rows = csv.reader(lines)
    records = (Ticker.from_row(row) for row in rows)
    for record in records:
        print(record)
  1. After adding the code, save the file. You can do this by pressing Ctrl+S or selecting "File" → "Save" from the menu. Saving the file ensures that your changes are preserved and can be run later.

Understanding the Code

Let's take a closer look at what this code does step by step:

  1. At the beginning of the code, we're importing Structure and field types from the structure.py module. This module has already been set up for you. These imports are essential because they provide the building blocks for our Ticker class. The Structure class will be the base class for our Ticker class, and the field types like String, Float, and Integer will define the data types of our stock data fields.

  2. Next, we define a Ticker class that inherits from Structure. This class has several fields that represent different aspects of the stock data:

    • name: This field stores the stock symbol, such as "IBM" or "AAPL". It helps us identify which company's stock we're dealing with.
    • price: It holds the current price of the stock. This is a crucial piece of information for investors.
    • date and time: These fields tell us when the stock quote was generated. Knowing the time and date is important for analyzing stock price trends over time.
    • change: This represents the price change of the stock. It shows whether the stock price has gone up or down compared to a previous point.
    • open, high, low: These fields represent the opening price, the highest price, and the lowest price of the stock during a certain period. They give us an idea of the stock's price range.
    • volume: This field stores the number of shares traded. High trading volume can indicate strong market interest in a particular stock.
  3. In the if __name__ == '__main__': block, we set up a processing pipeline. This block of code will be executed when we run the ticker.py file directly.

    • follow('stocklog.csv') is a function that generates lines from the stocklog.csv file. It allows us to read the file line by line.
    • csv.reader(lines) takes these lines and parses them into row data. CSV (Comma - Separated Values) is a common file format for storing tabular data, and this function helps us extract the data from each row.
    • (Ticker.from_row(row) for row in rows) is a generator expression. It takes each row of data and converts it into a Ticker object. This way, we transform the raw CSV data into structured objects that are easier to work with.
    • The for loop iterates over these Ticker objects and prints each one. This allows us to see the structured data in action.

Running the Code

Let's run the code to see how it works:

  1. First, we need to make sure we're in the project directory in the terminal. If you're not already there, use the following command to navigate to it:

    cd /home/labex/project
  2. Once you're in the correct directory, run the ticker.py script using the following command:

    python3 ticker.py
  3. After running the script, you should see output similar to this (your data will vary):

    Ticker(IBM, 103.53, 6/11/2007, 09:53.59, 0.46, 102.87, 103.53, 102.77, 541633)
    Ticker(MSFT, 30.21, 6/11/2007, 09:54.01, 0.16, 30.05, 30.21, 29.95, 7562516)
    Ticker(AA, 40.01, 6/11/2007, 09:54.01, 0.35, 39.67, 40.15, 39.31, 576619)
    Ticker(T, 40.1, 6/11/2007, 09:54.08, -0.16, 40.2, 40.19, 39.87, 1312959)

You can stop the execution of the script by pressing Ctrl+C when you've seen enough output.

Notice how the raw CSV data has been transformed into structured Ticker objects. This transformation makes the data much easier to work with in our processing pipeline, as we can now access and manipulate the stock data using the fields defined in the Ticker class.

Building a More Complex Data Pipeline

Now, we're going to take our data pipeline to the next level by adding filtering and improving the presentation of the data. This will make it easier to analyze and understand the information we're working with. We'll be making changes to our ticker.py script. Filtering the data will help us focus on the specific information we're interested in, and presenting it in a nicely formatted table will make it more readable.

Updating the ticker.py File

  1. First, open your ticker.py file in the WebIDE. The WebIDE is a tool that allows you to write and edit code directly in your browser. It provides a convenient environment for making changes to your Python scripts.

  2. Next, we need to replace the if __name__ == '__main__': block in the ticker.py file with the following code. This block of code is the entry point of our script, and by replacing it, we'll be changing how the script processes and displays the data.

if __name__ == '__main__':
    from follow import follow
    import csv
    from tableformat import create_formatter, print_table

    formatter = create_formatter('text')

    lines = follow('stocklog.csv')
    rows = csv.reader(lines)
    records = (Ticker.from_row(row) for row in rows)
    negative = (rec for rec in records if rec.change < 0)
    print_table(negative, ['name', 'price', 'change'], formatter)
  1. After making these changes, save the file. You can do this by pressing Ctrl+S on your keyboard or by selecting "File" → "Save" from the menu. Saving the file ensures that your changes are preserved and can be run later.

Understanding the Enhanced Pipeline

Let's take a closer look at what this enhanced pipeline does. Understanding each step will help you see how the different parts of the code work together to process and display the data.

  1. We start by importing create_formatter and print_table from the tableformat module. This module is already set up for you, and it provides functions that help us format and print the data in a nice table.

  2. Then, we create a text formatter using create_formatter('text'). This formatter will be used to format the data in a way that's easy to read.

  3. Now, let's break down the pipeline step by step:

    • follow('stocklog.csv') is a function that generates lines from the stocklog.csv file. It continuously monitors the file for new data and provides the lines one by one.
    • csv.reader(lines) takes the lines generated by follow and parses them into row data. This is necessary because the data in the CSV file is in a text format, and we need to convert it into a structured format that we can work with.
    • (Ticker.from_row(row) for row in rows) is a generator expression that converts each row of data into a Ticker object. A Ticker object represents a stock and contains information such as the stock's name, price, and change.
    • (rec for rec in records if rec.change < 0) is another generator expression that filters the Ticker objects. It only keeps the objects where the stock's price change is negative. This allows us to focus on the stocks that have decreased in price.
    • print_table(negative, ['name', 'price', 'change'], formatter) takes the filtered Ticker objects and formats them into a table using the formatter we created earlier. It then prints the table to the console.

This pipeline demonstrates the power of generators. Instead of loading all the data from the file into memory at once, we're chaining together multiple operations (reading, parsing, converting, filtering) and processing the data one item at a time. This saves memory and makes the code more efficient.

Running the Enhanced Pipeline

Let's run the updated code to see the results.

  1. First, make sure you're in the project directory in the terminal. If you're not already there, you can navigate to it using the following command:

    cd /home/labex/project
  2. Once you're in the project directory, run the ticker.py script using the following command:

    python3 ticker.py
  3. After running the script, you should see a nicely formatted table in the terminal. This table shows only the stocks with negative price changes.

           name      price     change
     ---------- ---------- ----------
              C      53.12      -0.21
            UTX      70.04      -0.19
            AXP      62.86      -0.18
            MMM      85.72      -0.22
            MCD      51.38      -0.03
            WMT      49.85      -0.23
             KO       51.6      -0.07
            AIG      71.39      -0.14
             PG      63.05      -0.02
             HD      37.76      -0.19

If you've seen enough output and want to stop the execution of the script, you can press Ctrl+C on your keyboard.

The Power of Generator Pipelines

What we've created here is a powerful data processing pipeline. Let's summarize what it does:

  1. It continuously monitors the stocklog.csv file for new data. This means that as new data is added to the file, the pipeline will automatically process it.
  2. It parses the CSV data from the file into structured Ticker objects. This makes it easier to work with the data and perform operations on it.
  3. It filters the data based on a specific criteria, in this case, negative price changes. This allows us to focus on the stocks that are losing value.
  4. It formats and presents the filtered data in a readable table. This makes it easy to analyze the data and draw conclusions.

One of the key advantages of using generators in this pipeline is that it uses minimal memory. Generators produce values on-demand, which means they don't store all the data in memory at once. This is similar to Unix pipes, where each component processes the data and passes it on to the next component.

You can think of generators as Lego blocks. Just like you can stack Lego blocks together to create different structures, you can combine generators to create powerful data processing workflows. This modular approach allows you to build complex systems from simple, reusable components.

Summary

In this lab, you have learned how to use Python generators to build efficient data processing pipelines. You completed several important tasks, such as using the follow() function to monitor a file for new data, creating a Ticker class to represent stock quotes, and constructing a multi - stage processing pipeline that reads, parses, and filters CSV data, then formats and displays the results.

The generator - based approach offers multiple advantages, including memory efficiency as data is processed on - demand, modularity allowing easy combination and reuse of pipeline components, and simplicity in expressing complex data flows. These concepts are commonly applied in real - world data processing, especially for large datasets or streaming data.