Python で大規模 CSV ファイルを効率的に処理する方法

PythonBeginner
オンラインで実践に進む

はじめに

大規模な CSV ファイルの処理は、Python 開発者にとって一般的なチャレンジ(Challenge)です。このチュートリアルでは、これらのファイルを効果的に処理するための効率的なテクニックを案内し、パフォーマンスとメモリ使用量の最適化に焦点を当てます。この実験(Lab)の終わりには、Python でデータ集約型の CSV 処理タスクに取り組むための実践的な知識を習得できます。

顧客データの分析、財務記録の処理、またはあらゆる種類の構造化データの処理など、この実験(Lab)で学ぶスキルは、メモリの問題に遭遇することなく、大規模なデータセットを効率的に処理するのに役立ちます。

シンプルな CSV ファイルの作成と読み込み

CSV(Comma-Separated Values)は、表形式のデータを保存するために使用される一般的なファイル形式です。このステップでは、シンプルな CSV ファイルを作成し、Python の組み込み csv モジュールを使用してそれを読み込む方法を学びます。

CSV ファイルの理解

CSV ファイルは、次のようにプレーンテキスト形式でデータを保存します。

  • 各行はデータの行を表します
  • 各行内の値は区切り文字(通常はカンマ)で区切られます
  • 最初の行には、多くの場合、列ヘッダーが含まれます

まず、操作するシンプルな CSV ファイルを作成しましょう。

CSV ファイルの作成

まず、作業するディレクトリを作成し、次にシンプルな CSV ファイルを作成します。

  1. WebIDE でターミナルを開きます
  2. エディターで csv_basics.py という名前の新しい Python ファイルを作成します

次に、次のコードを csv_basics.py に追加します。

import csv

## CSV に書き込むデータ
data = [
    ['Name', 'Age', 'City'],              ## ヘッダー行
    ['John Smith', '28', 'New York'],
    ['Sarah Johnson', '32', 'San Francisco'],
    ['Michael Brown', '45', 'Chicago'],
    ['Emily Davis', '36', 'Boston'],
    ['David Wilson', '52', 'Seattle']
]

## CSV ファイルへのデータの書き込み
with open('sample_data.csv', 'w', newline='') as file:
    writer = csv.writer(file)
    writer.writerows(data)

print("CSV ファイル 'sample_data.csv' が正常に作成されました。")

ターミナルで次のコマンドを実行して、このコードを実行します。

python3 csv_basics.py

期待される出力:

CSV ファイル 'sample_data.csv' が正常に作成されました。

これにより、現在のディレクトリに sample_data.csv という名前の新しい CSV ファイルが作成されます。このファイルの内容を表示するには、次を実行します。

cat sample_data.csv

期待される出力:

Name,Age,City
John Smith,28,New York
Sarah Johnson,32,San Francisco
Michael Brown,45,Chicago
Emily Davis,36,Boston
David Wilson,52,Seattle

CSV ファイルの読み込み

次に、作成した CSV ファイルを読み込みましょう。次のコードを使用して、read_csv.py という名前の新しいファイルを作成します。

import csv

## CSV ファイルの読み込み
with open('sample_data.csv', 'r') as file:
    reader = csv.reader(file)

    print("sample_data.csv の内容:")
    print("--------------------------")

    for row in reader:
        print(row)

## 特定の列の読み込みとアクセス
print("\n特定の列の読み込み:")
print("--------------------------")

with open('sample_data.csv', 'r') as file:
    reader = csv.reader(file)
    headers = next(reader)  ## ヘッダー行をスキップ

    for row in reader:
        name = row[0]
        age = row[1]
        city = row[2]
        print(f"Name: {name}, Age: {age}, City: {city}")

次のコマンドでこのコードを実行します。

python3 read_csv.py

期待される出力:

sample_data.csv の内容:
--------------------------
['Name', 'Age', 'City']
['John Smith', '28', 'New York']
['Sarah Johnson', '32', 'San Francisco']
['Michael Brown', '45', 'Chicago']
['Emily Davis', '36', 'Boston']
['David Wilson', '52', 'Seattle']

特定の列の読み込み:
--------------------------
Name: John Smith, Age: 28, City: New York
Name: Sarah Johnson, Age: 32, City: San Francisco
Name: Michael Brown, Age: 45, City: Chicago
Name: Emily Davis, Age: 36, City: Boston
Name: David Wilson, Age: 52, City: Seattle

CSV モジュールの理解

Python の csv モジュールには、次の 2 つの主要なクラスが用意されています。

  • csv.reader: CSV ファイルを読み込み、各行を文字列のリストとして返します
  • csv.writer: CSV ファイルにデータを書き込みます

このモジュールは、特殊文字のエスケープや引用符の処理など、さまざまな CSV 形式を扱う複雑さをすべて処理します。

このステップでは、シンプルな CSV ファイルを作成して読み込む方法を学びました。次のステップでは、より大きな CSV ファイルを処理するためのより効率的な方法を探ります。

DictReader を使用した便利な CSV 処理

前のステップでは、基本的な csv.reader および csv.writer 関数を扱いました。次に、csv.DictReader クラスを使用して CSV ファイルを処理する、より便利な方法を探ってみましょう。これは、列ヘッダーを持つデータを扱う場合に特に役立ちます。

DictReader とは?

csv.DictReader は CSV ファイルを読み込み、各行を辞書として返します。ここで、

  • キーは、列ヘッダー(デフォルトでは CSV ファイルの最初の行)から取得されます
  • 値は、各行の対応するデータです

このアプローチにより、コードの可読性が向上し、エラーが発生しにくくなります。これは、インデックスではなく名前で列を参照できるためです。

より大きなテストファイルの作成

まず、DictReader の利点を示すために、少し大きな CSV ファイルを作成しましょう。create_users_data.py という名前の新しいファイルを作成し、次のコードを記述します。

import csv
import random

## サンプルユーザーデータを生成する
def generate_users(count):
    users = [['id', 'name', 'email', 'age', 'country']]  ## ヘッダー行

    domains = ['gmail.com', 'yahoo.com', 'outlook.com', 'example.com']
    countries = ['USA', 'Canada', 'UK', 'Australia', 'Germany', 'France', 'Japan', 'Brazil']
    first_names = ['John', 'Jane', 'Michael', 'Emily', 'David', 'Sarah', 'Robert', 'Lisa']
    last_names = ['Smith', 'Johnson', 'Brown', 'Davis', 'Wilson', 'Miller', 'Jones', 'Taylor']

    for i in range(1, count + 1):
        first_name = random.choice(first_names)
        last_name = random.choice(last_names)
        name = f"{first_name} {last_name}"
        email = f"{first_name.lower()}.{last_name.lower()}@{random.choice(domains)}"
        age = random.randint(18, 65)
        country = random.choice(countries)

        users.append([str(i), name, email, str(age), country])

    return users

## 100 人のユーザーを持つ CSV ファイルを作成する
users_data = generate_users(100)

## CSV ファイルにデータを書き込む
with open('users_data.csv', 'w', newline='') as file:
    writer = csv.writer(file)
    writer.writerows(users_data)

print(f"100 件のユーザーレコードを含む 'users_data.csv' を作成しました")

スクリプトを実行してファイルを作成します。

python3 create_users_data.py

期待される出力:

100 件のユーザーレコードを含む 'users_data.csv' を作成しました

この新しいファイルの最初の数行を調べてみましょう。

head -n 5 users_data.csv

ヘッダー行とそれに続く 4 行のデータが表示されるはずです。

id,name,email,age,country
1,John Smith,john.smith@gmail.com,25,USA
2,Emily Brown,emily.brown@yahoo.com,32,Canada
3,David Jones,david.jones@outlook.com,45,UK
4,Sarah Wilson,sarah.wilson@example.com,28,Australia

DictReader を使用した CSV ファイルの処理

次に、DictReader を使用してこのファイルを処理するスクリプトを作成しましょう。dict_reader_example.py という名前の新しいファイルを作成し、次のコードを記述します。

import csv

## DictReader を使用して CSV ファイルを読み込む
with open('users_data.csv', 'r') as file:
    csv_reader = csv.DictReader(file)

    ## フィールド名(列ヘッダー)を出力する
    print(f"列ヘッダー: {csv_reader.fieldnames}")
    print("\n最初の 5 件のレコード:")
    print("-----------------")

    ## 最初の 5 件のレコードを出力する
    for i, row in enumerate(csv_reader):
        if i < 5:
            ## 名前でフィールドにアクセスする
            print(f"ユーザー {row['id']}: {row['name']}, {row['age']} 歳,{row['country']} 出身")
            print(f"  メール:{row['email']}")
        else:
            break

## DictReader を使用したデータ分析
with open('users_data.csv', 'r') as file:
    csv_reader = csv.DictReader(file)

    ## 平均年齢を計算する
    total_age = 0
    user_count = 0

    ## 国別のユーザー数をカウントする
    countries = {}

    for row in csv_reader:
        user_count += 1
        total_age += int(row['age'])

        ## 国別のユーザー数をカウントする
        country = row['country']
        if country in countries:
            countries[country] += 1
        else:
            countries[country] = 1

    avg_age = total_age / user_count if user_count > 0 else 0

    print("\nデータ分析:")
    print("--------------")
    print(f"総ユーザー数:{user_count}")
    print(f"平均年齢:{avg_age:.2f} 歳")
    print("\n国別のユーザー:")

    for country, count in sorted(countries.items(), key=lambda x: x[1], reverse=True):
        print(f"  {country}: {count} ユーザー")

このスクリプトを実行します。

python3 dict_reader_example.py

期待される出力(データはランダムに生成されるため、正確な値は異なる場合があります):

列ヘッダー: ['id', 'name', 'email', 'age', 'country']

最初の 5 件のレコード:
-----------------
ユーザー 1: John Smith, 25 歳, USA 出身
  メール: john.smith@gmail.com
ユーザー 2: Emily Brown, 32 歳, Canada 出身
  メール: emily.brown@yahoo.com
ユーザー 3: David Jones, 45 歳, UK 出身
  メール: david.jones@outlook.com
ユーザー 4: Sarah Wilson, 28 歳, Australia 出身
  メール: sarah.wilson@example.com
ユーザー 5: Michael Taylor, 37 歳, Germany 出身
  メール: michael.taylor@example.com

データ分析:
--------------
総ユーザー数: 100
平均年齢: 41.35 歳

国別のユーザー:
  USA: 16 ユーザー
  Canada: 14 ユーザー
  Japan: 13 ユーザー
  UK: 12 ユーザー
  Germany: 12 ユーザー
  Australia: 12 ユーザー
  France: 11 ユーザー
  Brazil: 10 ユーザー

DictReader を使用する利点

ご覧のとおり、DictReader を使用すると、いくつかの利点があります。

  1. 可読性の高いコード: インデックス位置を覚える代わりに、名前でフィールドにアクセスできます
  2. 自己文書化: コードは、アクセスしているフィールドを明確に示しています
  3. 柔軟性: CSV ファイルの列の順序が変更されても、列名が同じままであれば、コードは引き続き機能します

このアプローチは、多くの列を持つ実際のデータや、列の順序が時間の経過とともに変更される可能性があるデータを扱う場合に特に役立ちます。

次のステップでは、すべてを一度にメモリにロードすることなく、より大きな CSV ファイルを処理するための効率的なテクニックを探ります。

大規模 CSV ファイルの効率的な処理

実際のシナリオでは、数ギガバイトの CSV ファイルを処理する必要がある場合があります。そのようなファイルをすべてメモリにロードすると、アプリケーションがクラッシュしたり、大幅に速度が低下したりする可能性があります。このステップでは、大規模な CSV ファイルを効率的に処理するためのテクニックを探ります。

大容量ファイルにおけるメモリの課題

CSV ファイルを扱う場合、3 つの一般的なアプローチがあり、それぞれメモリ要件が異なります。

  1. ファイル全体をメモリにロードする - シンプルですが、最も多くのメモリを使用します
  2. ファイルを 1 行ずつストリーミングする - メモリ使用量は最小限ですが、複雑な操作では遅くなる可能性があります
  3. チャンク化 - ファイルを管理可能なチャンクで処理する中間的な方法

これらの各アプローチを、具体的な例を挙げて見ていきましょう。

より大きなサンプルファイルの作成

まず、これらのテクニックを実証するために、より大きなサンプルファイルを作成しましょう。create_large_csv.py という名前の新しいファイルを作成します。

import csv
import random
from datetime import datetime, timedelta

def create_large_csv(filename, num_rows):
    ## 列ヘッダーを定義する
    headers = ['transaction_id', 'date', 'customer_id', 'product_id', 'amount', 'status']

    ## 指定された行数でファイルを作成する
    with open(filename, 'w', newline='') as file:
        writer = csv.writer(file)
        writer.writerow(headers)

        ## ランダムデータを生成する
        start_date = datetime(2022, 1, 1)
        status_options = ['completed', 'pending', 'failed', 'refunded']

        for i in range(1, num_rows + 1):
            ## ランダム値を生成する
            transaction_id = f"TXN-{i:08d}"
            days_offset = random.randint(0, 365)
            date = (start_date + timedelta(days=days_offset)).strftime('%Y-%m-%d')
            customer_id = f"CUST-{random.randint(1001, 9999)}"
            product_id = f"PROD-{random.randint(101, 999)}"
            amount = round(random.uniform(10.0, 500.0), 2)
            status = random.choice(status_options)

            ## 行を CSV に書き込む
            writer.writerow([transaction_id, date, customer_id, product_id, amount, status])

            ## 10,000 行ごとに進捗インジケーターを出力する
            if i % 10000 == 0:
                print(f"Generated {i} rows...")

## 50,000 行(約 5~10 MB)の CSV ファイルを作成する
create_large_csv('transactions.csv', 50000)
print("大規模 CSV ファイル 'transactions.csv' が作成されました。")

このスクリプトを実行してファイルを作成します。

python3 create_large_csv.py

期待される出力:

Generated 10000 rows...
Generated 20000 rows...
Generated 30000 rows...
Generated 40000 rows...
Generated 50000 rows...
大規模 CSV ファイル 'transactions.csv' が作成されました。

次のコマンドでファイルのサイズを確認できます。

ls -lh transactions.csv

期待される出力(サイズは若干異なる場合があります):

-rw-r--r-- 1 labex labex 3.8M Apr 15 12:30 transactions.csv

アプローチ 1: 行ごとの処理(ストリーミング)

最もメモリ効率の高いアプローチは、CSV ファイルを 1 行ずつ処理することです。streaming_example.py という名前のファイルを作成します。

import csv
import time

def process_csv_streaming(filename):
    print(f"ストリーミング(行ごと)を使用して {filename} を処理しています...")
    start_time = time.time()

    ## いくつかの統計情報を追跡する
    row_count = 0
    total_amount = 0
    status_counts = {'completed': 0, 'pending': 0, 'failed': 0, 'refunded': 0}

    ## ファイルを 1 行ずつ処理する
    with open(filename, 'r') as file:
        reader = csv.DictReader(file)

        for row in reader:
            ## 行カウンターをインクリメントする
            row_count += 1

            ## 行データを処理する
            amount = float(row['amount'])
            status = row['status']

            ## 統計情報を更新する
            total_amount += amount
            status_counts[status] += 1

    ## 結果を計算して表示する
    end_time = time.time()
    processing_time = end_time - start_time

    print("\n結果:")
    print(f"処理時間 {processing_time:.2f} 秒で {row_count:,} 行を処理しました")
    print(f"合計取引金額:${total_amount:,.2f}")
    print(f"平均取引金額:${total_amount/row_count:.2f}")
    print("\n取引ステータスの内訳:")
    for status, count in status_counts.items():
        percentage = (count / row_count) * 100
        print(f"  {status}: {count:,} ({percentage:.1f}%)")

## ファイルを処理する
process_csv_streaming('transactions.csv')

このスクリプトを実行します。

python3 streaming_example.py

期待される出力(正確な数値は異なる場合があります):

ストリーミング(行ごと)を使用して transactions.csv を処理しています...

結果:
処理時間 0.17 秒で 50,000 行を処理しました
合計取引金額: $12,739,853.35
平均取引金額: $254.80

取引ステータスの内訳:
  completed: 12,432 (24.9%)
  pending: 12,598 (25.2%)
  failed: 12,414 (24.8%)
  refunded: 12,556 (25.1%)

アプローチ 2: チャンク処理

より複雑な操作を行う場合や、データをバッチで処理する必要がある場合は、チャンク処理アプローチを使用できます。chunked_example.py という名前のファイルを作成します。

import csv
import time

def process_csv_chunked(filename, chunk_size=10000):
    print(f"チャンクサイズ {chunk_size} 行を使用して {filename} を処理しています...")
    start_time = time.time()

    ## いくつかの統計情報を追跡する
    row_count = 0
    total_amount = 0
    status_counts = {'completed': 0, 'pending': 0, 'failed': 0, 'refunded': 0}

    ## ファイルをチャンクで処理する
    with open(filename, 'r') as file:
        reader = csv.DictReader(file)

        chunk = []
        for row in reader:
            ## 現在のチャンクに行を追加する
            chunk.append(row)

            ## チャンクが目的のサイズに達したら、それを処理する
            if len(chunk) >= chunk_size:
                ## チャンクを処理する
                for row_data in chunk:
                    ## 統計情報を更新する
                    row_count += 1
                    amount = float(row_data['amount'])
                    status = row_data['status']

                    total_amount += amount
                    status_counts[status] += 1

                print(f"チャンク {len(chunk)} 行を処理しました...(合計 {row_count:,} 行)")
                ## 次のバッチのためにチャンクをクリアする
                chunk = []

        ## 最後のチャンクに残りの行を処理する
        if chunk:
            for row_data in chunk:
                row_count += 1
                amount = float(row_data['amount'])
                status = row_data['status']

                total_amount += amount
                status_counts[status] += 1

            print(f"最終チャンク {len(chunk)} 行を処理しました...(合計 {row_count:,} 行)")

    ## 結果を計算して表示する
    end_time = time.time()
    processing_time = end_time - start_time

    print("\n結果:")
    print(f"処理時間 {processing_time:.2f} 秒で {row_count:,} 行を処理しました")
    print(f"合計取引金額:${total_amount:,.2f}")
    print(f"平均取引金額:${total_amount/row_count:.2f}")
    print("\n取引ステータスの内訳:")
    for status, count in status_counts.items():
        percentage = (count / row_count) * 100
        print(f"  {status}: {count:,} ({percentage:.1f}%)")

## 10,000 行のチャンクでファイルを処理する
process_csv_chunked('transactions.csv', chunk_size=10000)

このスクリプトを実行します。

python3 chunked_example.py

期待される出力:

チャンクサイズ 10000 行を使用して transactions.csv を処理しています...
チャンク 10000 行を処理しました...(合計 10,000 行)
チャンク 10000 行を処理しました...(合計 20,000 行)
チャンク 10000 行を処理しました...(合計 30,000 行)
チャンク 10000 行を処理しました...(合計 40,000 行)
チャンク 10000 行を処理しました...(合計 50,000 行)

結果:
処理時間 0.20 秒で 50,000 行を処理しました
合計取引金額: $12,739,853.35
平均取引金額: $254.80

取引ステータスの内訳:
  completed: 12,432 (24.9%)
  pending: 12,598 (25.2%)
  failed: 12,414 (24.8%)
  refunded: 12,556 (25.1%)

メモリ使用量の比較

これらのアプローチの主な違いは次のとおりです。

  1. ストリーミング(行ごと):

    • ファイルサイズに関係なく、最小限のメモリを使用します
    • 非常に大きなファイルに最適です
    • 各行に対する単純な操作
  2. チャンク処理:

    • ストリーミングよりも多くのメモリを使用しますが、それでも効率的です
    • 行をバッチで処理する必要がある操作に適しています
    • 処理中に進捗状況の更新を提供します
    • マルチプロセッシングと組み合わせて並列処理を行うことができます

ほとんどの実用的な目的では、バッチ処理機能が特に必要な場合を除き、ストリーミングアプローチをお勧めします。これは、優れたパフォーマンスを維持しながら、最高のメモリ効率を提供します。

次のステップでは、pandas などのサードパーティライブラリを使用して、さらに強力な CSV 処理機能を探ります。

pandas を使用した高度な CSV 処理

Python の組み込み csv モジュールは強力ですが、pandas ライブラリは、データ分析と操作のためのより高度な機能を提供します。このステップでは、効率的な CSV 処理に pandas を使用する方法を探ります。

Pandas のインストール

まず、pandas ライブラリをインストールしましょう。

pip install pandas

期待される出力:

Collecting pandas
  Downloading pandas-2.0.0-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (12.3 MB)
     ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 12.3/12.3 MB 42.6 MB/s eta 0:00:00
...
Successfully installed pandas-2.0.0 numpy-1.24.3 python-dateutil-2.8.2 pytz-2023.3 tzdata-2023.3

Pandas での CSV ファイルの読み込み

Pandas を使用すると、CSV データの読み込み、分析、操作が簡単になります。pandas_basic.py という名前のファイルを作成します。

import pandas as pd
import time

def process_with_pandas(filename):
    print(f"pandas を使用して {filename} を処理しています...")
    start_time = time.time()

    ## CSV ファイルを DataFrame に読み込む
    df = pd.read_csv(filename)

    ## 基本的な情報を表示する
    print(f"\nDataFrame 情報:")
    print(f"形状:{df.shape} (行,列)")
    print(f"列名:{', '.join(df.columns)}")

    ## 最初の 5 行を表示する
    print("\n最初の 5 行:")
    print(df.head())

    ## 基本的な統計情報
    print("\n数値列の要約統計:")
    print(df.describe())

    ## グループ化による分析
    print("\nステータス別の取引数:")
    status_counts = df['status'].value_counts()
    print(status_counts)

    ## ステータス別の平均金額を計算する
    print("\nステータス別の平均取引金額:")
    avg_by_status = df.groupby('status')['amount'].mean()
    print(avg_by_status)

    ## 日付別の合計金額を計算する(最初の 5 日間)
    print("\n日付別の合計取引金額(最初の 5 日間):")
    total_by_date = df.groupby('date')['amount'].sum().sort_values(ascending=False).head(5)
    print(total_by_date)

    end_time = time.time()
    print(f"\n{end_time - start_time:.2f} 秒で処理されました")

## 取引ファイルを処理する
process_with_pandas('transactions.csv')

このスクリプトを実行します。

python3 pandas_basic.py

期待される出力(簡潔にするために切り捨てられています):

pandas を使用して transactions.csv を処理しています...

DataFrame 情報:
形状: (50000, 6) (行, 列)
列名: transaction_id, date, customer_id, product_id, amount, status

最初の 5 行:
  transaction_id        date customer_id product_id   amount    status
0    TXN-00000001  2022-12-19   CUST-5421   PROD-383  385.75  refunded
1    TXN-00000002  2022-02-01   CUST-7078   PROD-442  286.83  completed
2    TXN-00000003  2022-12-24   CUST-2356   PROD-701  364.87    failed
3    TXN-00000004  2022-04-09   CUST-3458   PROD-854  247.73   pending
4    TXN-00000005  2022-03-07   CUST-6977   PROD-307  298.69  completed

数値列の要約統計:
              amount
count  50000.000000
mean     254.797067
std      141.389125
min       10.010000
25%      127.732500
50%      254.865000
75%      381.387500
max      499.990000

ステータス別の取引数:
pending     12598
refunded    12556
completed   12432
failed      12414
Name: status, dtype: int64

ステータス別の平均取引金額:
status
completed    255.028733
failed       254.709444
pending      254.690785
refunded     254.760390
Name: amount, dtype: float64

日付別の合計取引金額(最初の 5 日間):
date
2022-01-20    38883.19
2022-08-30    38542.49
2022-03-10    38331.67
2022-11-29    38103.61
2022-06-24    37954.87
Name: amount, dtype: float64

0.11 秒で処理されました

Pandas を使用した大規模 CSV ファイルの処理

pandas は強力ですが、大規模な CSV ファイルをロードすると、多くのメモリを消費する可能性があります。非常に大きなファイルの場合は、chunksize パラメータを使用してファイルをチャンク単位で読み取ることができます。pandas_chunked.py という名前のファイルを作成します。

import pandas as pd
import time

def process_large_csv_with_pandas(filename, chunk_size=10000):
    print(f"チャンクサイズ {chunk_size} 行を使用して pandas で {filename} を処理しています...")
    start_time = time.time()

    ## 集計結果を保存するための変数を初期化する
    total_rows = 0
    total_amount = 0
    status_counts = {'completed': 0, 'pending': 0, 'failed': 0, 'refunded': 0}
    daily_totals = {}

    ## ファイルをチャンクで処理する
    for chunk_num, chunk in enumerate(pd.read_csv(filename, chunksize=chunk_size)):
        ## 行数を更新する
        chunk_rows = len(chunk)
        total_rows += chunk_rows

        ## 合計金額を更新する
        chunk_amount = chunk['amount'].sum()
        total_amount += chunk_amount

        ## ステータス数を更新する
        for status, count in chunk['status'].value_counts().items():
            status_counts[status] += count

        ## 日別合計を更新する
        for date, group in chunk.groupby('date'):
            if date in daily_totals:
                daily_totals[date] += group['amount'].sum()
            else:
                daily_totals[date] = group['amount'].sum()

        print(f"チャンク {chunk_num + 1} を処理しました(これまでの合計 {total_rows:,} 行)")

    ## 結果を計算する
    end_time = time.time()
    processing_time = end_time - start_time

    print(f"\n{processing_time:.2f} 秒で {total_rows:,} 行を処理した後の結果:")
    print(f"合計取引金額:${total_amount:,.2f}")
    print(f"平均取引金額:${total_amount/total_rows:.2f}")

    print("\n取引ステータスの内訳:")
    for status, count in status_counts.items():
        percentage = (count / total_rows) * 100
        print(f"  {status}: {count:,} ({percentage:.1f}%)")

    ## 取引金額上位 5 日間を表示する
    print("\n取引金額上位 5 日間:")
    top_days = sorted(daily_totals.items(), key=lambda x: x[1], reverse=True)[:5]
    for date, amount in top_days:
        print(f"  {date}: ${amount:,.2f}")

## チャンクを使用して取引ファイルを処理する
process_large_csv_with_pandas('transactions.csv', chunk_size=10000)

このスクリプトを実行します。

python3 pandas_chunked.py

期待される出力:

チャンクサイズ 10000 行を使用して pandas で transactions.csv を処理しています...
チャンク 1 を処理しました(これまでの合計 10,000 行)
チャンク 2 を処理しました(これまでの合計 20,000 行)
チャンク 3 を処理しました(これまでの合計 30,000 行)
チャンク 4 を処理しました(これまでの合計 40,000 行)
チャンク 5 を処理しました(これまでの合計 50,000 行)

50,000 行を 0.34 秒で処理した後の結果:
合計取引金額: $12,739,853.35
平均取引金額: $254.80

取引ステータスの内訳:
  completed: 12,432 (24.9%)
  pending: 12,598 (25.2%)
  failed: 12,414 (24.8%)
  refunded: 12,556 (25.1%)

取引金額上位 5 日間:
  2022-01-20: $38,883.19
  2022-08-30: $38,542.49
  2022-03-10: $38,331.67
  2022-11-29: $38,103.61
  2022-06-24: $37,954.87

Pandas でのデータのフィルタリングと変換

pandas の大きな利点の 1 つは、データを簡単にフィルタリングして変換できることです。pandas_filter.py という名前のファイルを作成します。

import pandas as pd
import time

def filter_and_transform(filename):
    print(f"{filename} からのデータをフィルタリングして変換しています...")
    start_time = time.time()

    ## CSV ファイルを読み込む
    df = pd.read_csv(filename)

    ## 1. フィルタリング:完了した取引のみを取得する
    completed = df[df['status'] == 'completed']
    print(f"完了した取引の数:{len(completed)}")

    ## 2. フィルタリング:高額取引($400 以上)を取得する
    high_value = df[df['amount'] > 400]
    print(f"高額取引(>${400})の数:{len(high_value)}")

    ## 3. フィルタリング:2022 年第 1 四半期の取引
    df['date'] = pd.to_datetime(df['date'])  ## datetime に変換する
    q1_2022 = df[(df['date'] >= '2022-01-01') & (df['date'] <= '2022-03-31')]
    print(f"2022 年第 1 四半期の取引数:{len(q1_2022)}")

    ## 4. 新しい列を追加する:transaction_month
    df['month'] = df['date'].dt.strftime('%Y-%m')

    ## 5. 月とステータスでグループ化する
    monthly_by_status = df.groupby(['month', 'status']).agg({
        'transaction_id': 'count',
        'amount': 'sum'
    }).rename(columns={'transaction_id': 'count'})

    ## 月ごとの成功率を計算する(完了 / 合計)
    print("\n月ごとの取引成功率:")
    for month, month_data in df.groupby('month'):
        total = len(month_data)
        completed = len(month_data[month_data['status'] == 'completed'])
        success_rate = (completed / total) * 100
        print(f"  {month}: {success_rate:.1f}% ({completed}/{total})")

    ## フィルタリングされたデータを新しい CSV ファイルに保存する
    completed_high_value = df[(df['status'] == 'completed') & (df['amount'] > 300)]
    output_file = 'high_value_completed.csv'
    completed_high_value.to_csv(output_file, index=False)

    end_time = time.time()
    print(f"\nフィルタリングは {end_time - start_time:.2f} 秒で完了しました")
    print(f"{output_file} に {len(completed_high_value)} 件の高額完了取引を保存しました")

## データをフィルタリングして変換する
filter_and_transform('transactions.csv')

このスクリプトを実行します。

python3 pandas_filter.py

期待される出力:

transactions.csv からのデータをフィルタリングして変換しています...
完了した取引の数: 12432
高額取引(>$400)の数: 6190
2022 年第 1 四半期の取引数: 12348

月ごとの取引成功率:
  2022-01: 24.8% (1048/4225)
  2022-02: 25.0% (1010/4034)
  2022-03: 25.4% (1042/4089)
  2022-04: 24.2% (978/4052)
  2022-05: 24.4% (1047/4297)
  2022-06: 24.4% (1046/4280)
  2022-07: 24.7% (1071/4341)
  2022-08: 25.1% (1090/4343)
  2022-09: 26.1% (1091/4177)
  2022-10: 24.1% (1008/4182)
  2022-11: 24.8% (1009/4075)
  2022-12: 25.2% (992/3905)

フィルタリングは 0.38 秒で完了しました
high_value_completed.csv に 6304 件の高額完了取引を保存しました

Pandas を使用する利点

これらの例からわかるように、pandas は CSV 処理にいくつかの利点を提供します。

  1. 豊富な機能: データのフィルタリング、グループ化、集計、変換のための組み込みメソッド
  2. パフォーマンス: 大規模データセットに対する高速な操作のために最適化された C コードが内部で使用されています
  3. 簡単なデータ分析: 統計情報を計算し、洞察を得るための簡単な方法
  4. 可視化機能: プロットライブラリとの簡単な統合(例では示されていません)
  5. チャンク処理: 利用可能なメモリよりも大きなファイルを処理する機能

CSV ファイルを扱うほとんどのデータ分析タスクでは、純粋な Python csv モジュールを使用する必要がある特定のメモリ制約がない限り、pandas が推奨されるアプローチです。

まとめ

このチュートリアルでは、小さなデータセットから、慎重なメモリ管理が必要な大規模なデータセットまで、Python で CSV ファイルを効率的に処理するためのいくつかの方法を学びました。

  1. 基本的な CSV 処理: Python の組み込み csv モジュールを使用して、csv.readercsv.writer で CSV ファイルを読み書きします。

  2. 辞書ベースの処理: csv.DictReader を使用して、より直感的な方法で CSV データを操作し、インデックスではなく名前でフィールドにアクセスします。

  3. 効率的な処理技術:

    • ストリーミング:メモリ使用量を最小限に抑えるために、ファイルを 1 行ずつ処理します
    • チャンク化:より優れたメモリ管理のために、ファイルをバッチで処理します
  4. Pandas を使用した高度な処理:

    • CSV ファイルを DataFrame に読み込む
    • データの分析とフィルタリング
    • 大規模ファイルをチャンクで処理する
    • データの変換とエクスポート

これらの技術は、Python であらゆるサイズの CSV ファイルを処理するための包括的なツールキットを提供します。ほとんどのデータ分析タスクでは、pandas はその豊富な機能とパフォーマンスにより、推奨されるライブラリです。ただし、非常に大きなファイルや単純な処理タスクの場合は、組み込みの csv モジュールを使用したストリーミングおよびチャンク化アプローチの方が、メモリ効率が高くなる可能性があります。

特定の要件に基づいて適切な技術を適用することにより、メモリの問題に遭遇することなく、あらゆるサイズの CSV ファイルを効率的に処理できます。