Python 高效处理大型 CSV 文件的方法

PythonBeginner
立即练习

介绍

处理大型 CSV 文件是 Python 开发者们经常遇到的一个挑战。本教程将指导你使用高效的技术来有效地处理这些文件,重点关注优化性能和内存使用。完成这个实验后,你将掌握实用的知识,能够在 Python 中处理数据密集型的 CSV 处理任务。

无论你是在分析客户数据、处理财务记录,还是处理任何类型的结构化数据,在这个实验中学习到的技能都将帮助你高效地处理大型数据集,而不会遇到内存问题。

创建和读取一个简单的 CSV 文件

CSV(逗号分隔值,Comma-Separated Values)是一种流行的文件格式,用于存储表格数据。在这一步中,我们将创建一个简单的 CSV 文件,并学习如何使用 Python 内置的 csv 模块来读取它。

理解 CSV 文件

CSV 文件以纯文本格式存储数据,其中:

  • 每一行代表一行数据
  • 每行中的值由分隔符(通常是逗号)分隔
  • 第一行通常包含列标题

让我们从创建一个简单的 CSV 文件开始。

创建一个 CSV 文件

首先,让我们创建一个目录来工作,然后创建一个简单的 CSV 文件:

  1. 在 WebIDE 中打开终端
  2. 在编辑器中创建一个名为 csv_basics.py 的新 Python 文件

现在,将以下代码添加到 csv_basics.py 中:

import csv

## Data to write to CSV
data = [
    ['Name', 'Age', 'City'],              ## Header row
    ['John Smith', '28', 'New York'],
    ['Sarah Johnson', '32', 'San Francisco'],
    ['Michael Brown', '45', 'Chicago'],
    ['Emily Davis', '36', 'Boston'],
    ['David Wilson', '52', 'Seattle']
]

## Writing data to a CSV file
with open('sample_data.csv', 'w', newline='') as file:
    writer = csv.writer(file)
    writer.writerows(data)

print("CSV file 'sample_data.csv' created successfully.")

通过在终端中执行以下命令来运行此代码:

python3 csv_basics.py

预期输出:

CSV file 'sample_data.csv' created successfully.

这将在你当前目录下创建一个名为 sample_data.csv 的新 CSV 文件。你可以通过运行以下命令来查看此文件的内容:

cat sample_data.csv

预期输出:

Name,Age,City
John Smith,28,New York
Sarah Johnson,32,San Francisco
Michael Brown,45,Chicago
Emily Davis,36,Boston
David Wilson,52,Seattle

读取一个 CSV 文件

现在让我们读取我们刚刚创建的 CSV 文件。创建一个名为 read_csv.py 的新文件,其中包含以下代码:

import csv

## Reading a CSV file
with open('sample_data.csv', 'r') as file:
    reader = csv.reader(file)

    print("Contents of sample_data.csv:")
    print("--------------------------")

    for row in reader:
        print(row)

## Reading and accessing specific columns
print("\nReading specific columns:")
print("--------------------------")

with open('sample_data.csv', 'r') as file:
    reader = csv.reader(file)
    headers = next(reader)  ## Skip the header row

    for row in reader:
        name = row[0]
        age = row[1]
        city = row[2]
        print(f"Name: {name}, Age: {age}, City: {city}")

使用以下命令运行此代码:

python3 read_csv.py

预期输出:

Contents of sample_data.csv:
--------------------------
['Name', 'Age', 'City']
['John Smith', '28', 'New York']
['Sarah Johnson', '32', 'San Francisco']
['Michael Brown', '45', 'Chicago']
['Emily Davis', '36', 'Boston']
['David Wilson', '52', 'Seattle']

Reading specific columns:
--------------------------
Name: John Smith, Age: 28, City: New York
Name: Sarah Johnson, Age: 32, City: San Francisco
Name: Michael Brown, Age: 45, City: Chicago
Name: Emily Davis, Age: 36, City: Boston
Name: David Wilson, Age: 52, City: Seattle

理解 CSV 模块

Python csv 模块提供了两个主要的类:

  • csv.reader:读取 CSV 文件并将每一行作为字符串列表返回
  • csv.writer:将数据写入 CSV 文件

此模块处理处理不同 CSV 格式的所有复杂性,例如转义特殊字符和处理引号。

在这一步中,你已经学习了如何创建和读取一个简单的 CSV 文件。在下一步中,我们将探索更有效的方法来处理更大的 CSV 文件。

使用 DictReader 进行便捷的 CSV 处理

在上一步中,我们使用了基本的 csv.readercsv.writer 函数。现在,让我们探索一种更方便的方式来处理 CSV 文件,使用 csv.DictReader 类,这在处理具有列标题的数据时特别有用。

什么是 DictReader?

csv.DictReader 读取 CSV 文件,并将每一行作为字典返回,其中:

  • 键取自列标题(默认情况下是 CSV 文件的第一行)
  • 值是每行对应的的数据

这种方法使你的代码更具可读性,并且不易出错,因为你可以通过名称而不是索引来引用列。

创建一个更大的测试文件

首先,让我们创建一个稍大的 CSV 文件来演示 DictReader 的好处。创建一个名为 create_users_data.py 的新文件,其中包含以下代码:

import csv
import random

## Generate some sample user data
def generate_users(count):
    users = [['id', 'name', 'email', 'age', 'country']]  ## Header row

    domains = ['gmail.com', 'yahoo.com', 'outlook.com', 'example.com']
    countries = ['USA', 'Canada', 'UK', 'Australia', 'Germany', 'France', 'Japan', 'Brazil']
    first_names = ['John', 'Jane', 'Michael', 'Emily', 'David', 'Sarah', 'Robert', 'Lisa']
    last_names = ['Smith', 'Johnson', 'Brown', 'Davis', 'Wilson', 'Miller', 'Jones', 'Taylor']

    for i in range(1, count + 1):
        first_name = random.choice(first_names)
        last_name = random.choice(last_names)
        name = f"{first_name} {last_name}"
        email = f"{first_name.lower()}.{last_name.lower()}@{random.choice(domains)}"
        age = random.randint(18, 65)
        country = random.choice(countries)

        users.append([str(i), name, email, str(age), country])

    return users

## Create a CSV file with 100 users
users_data = generate_users(100)

## Write data to CSV file
with open('users_data.csv', 'w', newline='') as file:
    writer = csv.writer(file)
    writer.writerows(users_data)

print(f"Created 'users_data.csv' with 100 user records")

运行脚本以创建文件:

python3 create_users_data.py

预期输出:

Created 'users_data.csv' with 100 user records

让我们检查一下这个新文件的前几行:

head -n 5 users_data.csv

你应该看到标题行,然后是 4 行数据:

id,name,email,age,country
1,John Smith,john.smith@gmail.com,25,USA
2,Emily Brown,emily.brown@yahoo.com,32,Canada
3,David Jones,david.jones@outlook.com,45,UK
4,Sarah Wilson,sarah.wilson@example.com,28,Australia

使用 DictReader 处理 CSV 文件

现在,让我们创建一个脚本来使用 DictReader 处理此文件。创建一个名为 dict_reader_example.py 的新文件,其中包含以下代码:

import csv

## Read the CSV file using DictReader
with open('users_data.csv', 'r') as file:
    csv_reader = csv.DictReader(file)

    ## Print the field names (column headers)
    print(f"Column headers: {csv_reader.fieldnames}")
    print("\nFirst 5 records:")
    print("-----------------")

    ## Print the first 5 records
    for i, row in enumerate(csv_reader):
        if i < 5:
            ## Access fields by name
            print(f"User {row['id']}: {row['name']}, {row['age']} years old, from {row['country']}")
            print(f"  Email: {row['email']}")
        else:
            break

## Using DictReader for data analysis
with open('users_data.csv', 'r') as file:
    csv_reader = csv.DictReader(file)

    ## Calculate average age
    total_age = 0
    user_count = 0

    ## Count users by country
    countries = {}

    for row in csv_reader:
        user_count += 1
        total_age += int(row['age'])

        ## Count users by country
        country = row['country']
        if country in countries:
            countries[country] += 1
        else:
            countries[country] = 1

    avg_age = total_age / user_count if user_count > 0 else 0

    print("\nData Analysis:")
    print("--------------")
    print(f"Total users: {user_count}")
    print(f"Average age: {avg_age:.2f} years")
    print("\nUsers by country:")

    for country, count in sorted(countries.items(), key=lambda x: x[1], reverse=True):
        print(f"  {country}: {count} users")

运行此脚本:

python3 dict_reader_example.py

预期输出(你的确切值可能会有所不同,因为数据是随机生成的):

Column headers: ['id', 'name', 'email', 'age', 'country']

First 5 records:
-----------------
User 1: John Smith, 25 years old, from USA
  Email: john.smith@gmail.com
User 2: Emily Brown, 32 years old, from Canada
  Email: emily.brown@yahoo.com
User 3: David Jones, 45 years old, from UK
  Email: david.jones@outlook.com
User 4: Sarah Wilson, 28 years old, from Australia
  Email: sarah.wilson@example.com
User 5: Michael Taylor, 37 years old, from Germany
  Email: michael.taylor@example.com

Data Analysis:
--------------
Total users: 100
Average age: 41.35 years

Users by country:
  USA: 16 users
  Canada: 14 users
  Japan: 13 users
  UK: 12 users
  Germany: 12 users
  Australia: 12 users
  France: 11 users
  Brazil: 10 users

使用 DictReader 的好处

正如你所看到的,使用 DictReader 提供了几个优点:

  1. 可读性强的代码:你可以通过名称访问字段,而不是记住索引位置
  2. 自文档化:代码清楚地显示了你正在访问哪个字段
  3. 灵活性:如果 CSV 文件中的列顺序发生变化,只要列名保持不变,你的代码仍然可以工作

这种方法在处理具有许多列的真实数据或列顺序可能随时间变化时特别有用。

在下一步中,我们将探索用于处理更大 CSV 文件而无需一次将所有内容加载到内存中的高效技术。

高效处理大型 CSV 文件

在现实世界中,你可能需要处理大小为几千兆字节的 CSV 文件。将此类文件完全加载到内存中可能会导致你的应用程序崩溃或显着减速。在这一步中,我们将探讨高效处理大型 CSV 文件的技术。

大型文件的内存挑战

在使用 CSV 文件时,有三种常见方法,每种方法都有不同的内存需求:

  1. 将整个文件加载到内存中 - 简单但使用最多的内存
  2. 逐行流式传输文件 - 使用最少的内存,但对于复杂操作可能较慢
  3. 分块 - 一种中间方法,以可管理的分块处理文件

让我们通过实际示例来探索每种方法。

创建一个更大的示例文件

首先,让我们创建一个更大的示例文件来演示这些技术。创建一个名为 create_large_csv.py 的新文件:

import csv
import random
from datetime import datetime, timedelta

def create_large_csv(filename, num_rows):
    ## Define column headers
    headers = ['transaction_id', 'date', 'customer_id', 'product_id', 'amount', 'status']

    ## Create a file with the specified number of rows
    with open(filename, 'w', newline='') as file:
        writer = csv.writer(file)
        writer.writerow(headers)

        ## Generate random data
        start_date = datetime(2022, 1, 1)
        status_options = ['completed', 'pending', 'failed', 'refunded']

        for i in range(1, num_rows + 1):
            ## Generate random values
            transaction_id = f"TXN-{i:08d}"
            days_offset = random.randint(0, 365)
            date = (start_date + timedelta(days=days_offset)).strftime('%Y-%m-%d')
            customer_id = f"CUST-{random.randint(1001, 9999)}"
            product_id = f"PROD-{random.randint(101, 999)}"
            amount = round(random.uniform(10.0, 500.0), 2)
            status = random.choice(status_options)

            ## Write row to CSV
            writer.writerow([transaction_id, date, customer_id, product_id, amount, status])

            ## Print progress indicator for every 10,000 rows
            if i % 10000 == 0:
                print(f"Generated {i} rows...")

## Create a CSV file with 50,000 rows (about 5-10 MB)
create_large_csv('transactions.csv', 50000)
print("Large CSV file 'transactions.csv' has been created.")

运行此脚本以创建文件:

python3 create_large_csv.py

预期输出:

Generated 10000 rows...
Generated 20000 rows...
Generated 30000 rows...
Generated 40000 rows...
Generated 50000 rows...
Large CSV file 'transactions.csv' has been created.

你可以使用以下命令检查文件的大小:

ls -lh transactions.csv

预期输出(大小可能略有不同):

-rw-r--r-- 1 labex labex 3.8M Apr 15 12:30 transactions.csv

方法 1:逐行处理(流式传输)

最节省内存的方法是逐行处理 CSV 文件。创建一个名为 streaming_example.py 的文件:

import csv
import time

def process_csv_streaming(filename):
    print(f"Processing {filename} using streaming (line by line)...")
    start_time = time.time()

    ## Track some statistics
    row_count = 0
    total_amount = 0
    status_counts = {'completed': 0, 'pending': 0, 'failed': 0, 'refunded': 0}

    ## Process the file line by line
    with open(filename, 'r') as file:
        reader = csv.DictReader(file)

        for row in reader:
            ## Increment row counter
            row_count += 1

            ## Process row data
            amount = float(row['amount'])
            status = row['status']

            ## Update statistics
            total_amount += amount
            status_counts[status] += 1

    ## Calculate and display results
    end_time = time.time()
    processing_time = end_time - start_time

    print(f"\nResults:")
    print(f"Processed {row_count:,} rows in {processing_time:.2f} seconds")
    print(f"Total transaction amount: ${total_amount:,.2f}")
    print(f"Average transaction amount: ${total_amount/row_count:.2f}")
    print("\nTransaction status breakdown:")
    for status, count in status_counts.items():
        percentage = (count / row_count) * 100
        print(f"  {status}: {count:,} ({percentage:.1f}%)")

## Process the file
process_csv_streaming('transactions.csv')

运行此脚本:

python3 streaming_example.py

预期输出(你的确切数字可能会有所不同):

Processing transactions.csv using streaming (line by line)...

Results:
Processed 50,000 rows in 0.17 seconds
Total transaction amount: $12,739,853.35
Average transaction amount: $254.80

Transaction status breakdown:
  completed: 12,432 (24.9%)
  pending: 12,598 (25.2%)
  failed: 12,414 (24.8%)
  refunded: 12,556 (25.1%)

方法 2:分块处理

对于更复杂的操作或需要批量处理数据时,你可以使用分块方法。创建一个名为 chunked_example.py 的文件:

import csv
import time

def process_csv_chunked(filename, chunk_size=10000):
    print(f"Processing {filename} using chunks of {chunk_size} rows...")
    start_time = time.time()

    ## Track some statistics
    row_count = 0
    total_amount = 0
    status_counts = {'completed': 0, 'pending': 0, 'failed': 0, 'refunded': 0}

    ## Process the file in chunks
    with open(filename, 'r') as file:
        reader = csv.DictReader(file)

        chunk = []
        for row in reader:
            ## Add row to current chunk
            chunk.append(row)

            ## When chunk reaches desired size, process it
            if len(chunk) >= chunk_size:
                ## Process the chunk
                for row_data in chunk:
                    ## Update statistics
                    row_count += 1
                    amount = float(row_data['amount'])
                    status = row_data['status']

                    total_amount += amount
                    status_counts[status] += 1

                print(f"Processed chunk of {len(chunk)} rows... ({row_count:,} total)")
                ## Clear the chunk for next batch
                chunk = []

        ## Process any remaining rows in the last chunk
        if chunk:
            for row_data in chunk:
                row_count += 1
                amount = float(row_data['amount'])
                status = row_data['status']

                total_amount += amount
                status_counts[status] += 1

            print(f"Processed final chunk of {len(chunk)} rows... ({row_count:,} total)")

    ## Calculate and display results
    end_time = time.time()
    processing_time = end_time - start_time

    print(f"\nResults:")
    print(f"Processed {row_count:,} rows in {processing_time:.2f} seconds")
    print(f"Total transaction amount: ${total_amount:,.2f}")
    print(f"Average transaction amount: ${total_amount/row_count:.2f}")
    print("\nTransaction status breakdown:")
    for status, count in status_counts.items():
        percentage = (count / row_count) * 100
        print(f"  {status}: {count:,} ({percentage:.1f}%)")

## Process the file with chunks of 10,000 rows
process_csv_chunked('transactions.csv', chunk_size=10000)

运行此脚本:

python3 chunked_example.py

预期输出:

Processing transactions.csv using chunks of 10000 rows...
Processed chunk of 10000 rows... (10,000 total)
Processed chunk of 10000 rows... (20,000 total)
Processed chunk of 10000 rows... (30,000 total)
Processed chunk of 10000 rows... (40,000 total)
Processed chunk of 10000 rows... (50,000 total)

Results:
Processed 50,000 rows in 0.20 seconds
Total transaction amount: $12,739,853.35
Average transaction amount: $254.80

Transaction status breakdown:
  completed: 12,432 (24.9%)
  pending: 12,598 (25.2%)
  failed: 12,414 (24.8%)
  refunded: 12,556 (25.1%)

内存使用比较

这些方法之间的主要区别在于:

  1. 流式传输(逐行)

    • 无论文件大小如何,都使用最少的内存
    • 最适合非常大的文件
    • 对每一行进行简单操作
  2. 分块处理

    • 比流式传输使用更多的内存,但仍然高效
    • 适用于需要批量处理行的操作
    • 在处理过程中提供进度更新
    • 可以与多处理结合使用以进行并行处理

对于大多数实际目的,建议使用流式传输方法,除非你特别需要批处理功能。它提供了最佳的内存效率,同时仍然保持良好的性能。

在下一步中,我们将探索使用第三方库(如 pandas)以获得更强大的 CSV 处理功能。

使用 Pandas 进行高级 CSV 处理

虽然 Python 内置的 csv 模块功能强大,但 pandas 库为数据分析和操作提供了更高级的功能。在这一步中,我们将探讨如何使用 pandas 进行高效的 CSV 处理。

安装 Pandas

首先,让我们安装 pandas 库:

pip install pandas

预期输出:

Collecting pandas
  Downloading pandas-2.0.0-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (12.3 MB)
     ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 12.3/12.3 MB 42.6 MB/s eta 0:00:00
...
Successfully installed pandas-2.0.0 numpy-1.24.3 python-dateutil-2.8.2 pytz-2023.3 tzdata-2023.3

使用 Pandas 读取 CSV 文件

Pandas 使得读取、分析和操作 CSV 数据变得容易。创建一个名为 pandas_basic.py 的文件:

import pandas as pd
import time

def process_with_pandas(filename):
    print(f"Processing {filename} with pandas...")
    start_time = time.time()

    ## Read the CSV file into a DataFrame
    df = pd.read_csv(filename)

    ## Display basic information
    print(f"\nDataFrame Info:")
    print(f"Shape: {df.shape} (rows, columns)")
    print(f"Column names: {', '.join(df.columns)}")

    ## Display the first 5 rows
    print("\nFirst 5 rows:")
    print(df.head())

    ## Basic statistics
    print("\nSummary statistics for numeric columns:")
    print(df.describe())

    ## Group by analysis
    print("\nTransaction counts by status:")
    status_counts = df['status'].value_counts()
    print(status_counts)

    ## Calculate average amount by status
    print("\nAverage transaction amount by status:")
    avg_by_status = df.groupby('status')['amount'].mean()
    print(avg_by_status)

    ## Calculate total amount by date (first 5 dates)
    print("\nTotal transaction amount by date (first 5 dates):")
    total_by_date = df.groupby('date')['amount'].sum().sort_values(ascending=False).head(5)
    print(total_by_date)

    end_time = time.time()
    print(f"\nProcessed in {end_time - start_time:.2f} seconds")

## Process the transactions file
process_with_pandas('transactions.csv')

运行此脚本:

python3 pandas_basic.py

预期输出(为简洁起见,已截断):

Processing transactions.csv with pandas...

DataFrame Info:
Shape: (50000, 6) (rows, columns)
Column names: transaction_id, date, customer_id, product_id, amount, status

First 5 rows:
  transaction_id        date customer_id product_id   amount    status
0    TXN-00000001  2022-12-19   CUST-5421   PROD-383  385.75  refunded
1    TXN-00000002  2022-02-01   CUST-7078   PROD-442  286.83  completed
2    TXN-00000003  2022-12-24   CUST-2356   PROD-701  364.87    failed
3    TXN-00000004  2022-04-09   CUST-3458   PROD-854  247.73   pending
4    TXN-00000005  2022-03-07   CUST-6977   PROD-307  298.69  completed

Summary statistics for numeric columns:
              amount
count  50000.000000
mean     254.797067
std      141.389125
min       10.010000
25%      127.732500
50%      254.865000
75%      381.387500
max      499.990000

Transaction counts by status:
pending     12598
refunded    12556
completed   12432
failed      12414
Name: status, dtype: int64

Average transaction amount by status:
status
completed    255.028733
failed       254.709444
pending      254.690785
refunded     254.760390
Name: amount, dtype: float64

Total transaction amount by date (first 5 dates):
date
2022-01-20    38883.19
2022-08-30    38542.49
2022-03-10    38331.67
2022-11-29    38103.61
2022-06-24    37954.87
Name: amount, dtype: float64

Processed in 0.11 seconds

使用 Pandas 处理大型 CSV 文件

虽然 pandas 功能强大,但在加载大型 CSV 文件时,它可能会消耗大量内存。对于非常大的文件,你可以使用 chunksize 参数以块为单位读取文件。创建一个名为 pandas_chunked.py 的文件:

import pandas as pd
import time

def process_large_csv_with_pandas(filename, chunk_size=10000):
    print(f"Processing {filename} with pandas using chunks of {chunk_size} rows...")
    start_time = time.time()

    ## Initialize variables to store aggregated results
    total_rows = 0
    total_amount = 0
    status_counts = {'completed': 0, 'pending': 0, 'failed': 0, 'refunded': 0}
    daily_totals = {}

    ## Process the file in chunks
    for chunk_num, chunk in enumerate(pd.read_csv(filename, chunksize=chunk_size)):
        ## Update row count
        chunk_rows = len(chunk)
        total_rows += chunk_rows

        ## Update total amount
        chunk_amount = chunk['amount'].sum()
        total_amount += chunk_amount

        ## Update status counts
        for status, count in chunk['status'].value_counts().items():
            status_counts[status] += count

        ## Update daily totals
        for date, group in chunk.groupby('date'):
            if date in daily_totals:
                daily_totals[date] += group['amount'].sum()
            else:
                daily_totals[date] = group['amount'].sum()

        print(f"Processed chunk {chunk_num + 1} ({total_rows:,} rows so far)")

    ## Calculate results
    end_time = time.time()
    processing_time = end_time - start_time

    print(f"\nResults after processing {total_rows:,} rows in {processing_time:.2f} seconds:")
    print(f"Total transaction amount: ${total_amount:,.2f}")
    print(f"Average transaction amount: ${total_amount/total_rows:.2f}")

    print("\nTransaction status breakdown:")
    for status, count in status_counts.items():
        percentage = (count / total_rows) * 100
        print(f"  {status}: {count:,} ({percentage:.1f}%)")

    ## Show top 5 days by transaction amount
    print("\nTop 5 days by transaction amount:")
    top_days = sorted(daily_totals.items(), key=lambda x: x[1], reverse=True)[:5]
    for date, amount in top_days:
        print(f"  {date}: ${amount:,.2f}")

## Process the transactions file with chunks
process_large_csv_with_pandas('transactions.csv', chunk_size=10000)

运行此脚本:

python3 pandas_chunked.py

预期输出:

Processing transactions.csv with pandas using chunks of 10000 rows...
Processed chunk 1 (10,000 rows so far)
Processed chunk 2 (20,000 rows so far)
Processed chunk 3 (30,000 rows so far)
Processed chunk 4 (40,000 rows so far)
Processed chunk 5 (50,000 rows so far)

Results after processing 50,000 rows in 0.34 seconds:
Total transaction amount: $12,739,853.35
Average transaction amount: $254.80

Transaction status breakdown:
  completed: 12,432 (24.9%)
  pending: 12,598 (25.2%)
  failed: 12,414 (24.8%)
  refunded: 12,556 (25.1%)

Top 5 days by transaction amount:
  2022-01-20: $38,883.19
  2022-08-30: $38,542.49
  2022-03-10: $38,331.67
  2022-11-29: $38,103.61
  2022-06-24: $37,954.87

使用 Pandas 过滤和转换数据

pandas 的一个巨大优势是能够轻松地过滤和转换数据。创建一个名为 pandas_filter.py 的文件:

import pandas as pd
import time

def filter_and_transform(filename):
    print(f"Filtering and transforming data from {filename}...")
    start_time = time.time()

    ## Read the CSV file
    df = pd.read_csv(filename)

    ## 1. Filter: Get only completed transactions
    completed = df[df['status'] == 'completed']
    print(f"Number of completed transactions: {len(completed)}")

    ## 2. Filter: Get high-value transactions (over $400)
    high_value = df[df['amount'] > 400]
    print(f"Number of high-value transactions (>${400}): {len(high_value)}")

    ## 3. Filter: Transactions from first quarter of 2022
    df['date'] = pd.to_datetime(df['date'])  ## Convert to datetime
    q1_2022 = df[(df['date'] >= '2022-01-01') & (df['date'] <= '2022-03-31')]
    print(f"Number of transactions in Q1 2022: {len(q1_2022)}")

    ## 4. Add a new column: transaction_month
    df['month'] = df['date'].dt.strftime('%Y-%m')

    ## 5. Group by month and status
    monthly_by_status = df.groupby(['month', 'status']).agg({
        'transaction_id': 'count',
        'amount': 'sum'
    }).rename(columns={'transaction_id': 'count'})

    ## Calculate success rate by month (completed / total)
    print("\nTransaction success rates by month:")
    for month, month_data in df.groupby('month'):
        total = len(month_data)
        completed = len(month_data[month_data['status'] == 'completed'])
        success_rate = (completed / total) * 100
        print(f"  {month}: {success_rate:.1f}% ({completed}/{total})")

    ## Save filtered data to a new CSV file
    completed_high_value = df[(df['status'] == 'completed') & (df['amount'] > 300)]
    output_file = 'high_value_completed.csv'
    completed_high_value.to_csv(output_file, index=False)

    end_time = time.time()
    print(f"\nFiltering completed in {end_time - start_time:.2f} seconds")
    print(f"Saved {len(completed_high_value)} high-value completed transactions to {output_file}")

## Filter and transform the data
filter_and_transform('transactions.csv')

运行此脚本:

python3 pandas_filter.py

预期输出:

Filtering and transforming data from transactions.csv...
Number of completed transactions: 12432
Number of high-value transactions (>$400): 6190
Number of transactions in Q1 2022: 12348

Transaction success rates by month:
  2022-01: 24.8% (1048/4225)
  2022-02: 25.0% (1010/4034)
  2022-03: 25.4% (1042/4089)
  2022-04: 24.2% (978/4052)
  2022-05: 24.4% (1047/4297)
  2022-06: 24.4% (1046/4280)
  2022-07: 24.7% (1071/4341)
  2022-08: 25.1% (1090/4343)
  2022-09: 26.1% (1091/4177)
  2022-10: 24.1% (1008/4182)
  2022-11: 24.8% (1009/4075)
  2022-12: 25.2% (992/3905)

Filtering completed in 0.38 seconds
Saved 6304 high-value completed transactions to high_value_completed.csv

使用 Pandas 的优势

从这些示例中可以看出,pandas 在 CSV 处理方面具有几个优势:

  1. 丰富的功能:用于过滤、分组、聚合和转换数据的内置方法
  2. 性能:在底层使用优化的 C 代码,以便对大型数据集进行快速操作
  3. 轻松的数据分析:计算统计数据和获取见解的简单方法
  4. 可视化功能:与绘图库轻松集成(未在示例中显示)
  5. 分块处理:能够处理大于可用内存的文件

对于大多数涉及 CSV 文件的数据分析任务,建议使用 pandas 方法,除非你具有需要使用纯 Python csv 模块的特定内存限制。

总结

在本教程中,你学习了几种在 Python 中高效处理 CSV 文件的方法,从小型数据集到需要仔细进行内存管理的大型数据集:

  1. 基本 CSV 处理:使用 Python 内置的 csv 模块,通过 csv.readercsv.writer 读取和写入 CSV 文件。

  2. 基于字典的处理:使用 csv.DictReader 以更直观的方式处理 CSV 数据,通过名称而不是索引访问字段。

  3. 高效处理技术

    • 流式传输:逐行处理文件以最大限度地减少内存使用
    • 分块:批量处理文件以更好地进行内存管理
  4. 使用 Pandas 进行高级处理

    • 将 CSV 文件读入 DataFrame
    • 分析和过滤数据
    • 分块处理大型文件
    • 转换和导出数据

这些技术提供了一个全面的工具包,用于在 Python 中处理任何大小的 CSV 文件。对于大多数数据分析任务,由于其丰富的功能和性能,建议使用 pandas 库。但是,对于非常大的文件或简单的处理任务,使用内置 csv 模块的流式传输和分块方法可能更节省内存。

通过根据你的特定需求应用适当的技术,你可以高效地处理任何大小的 CSV 文件,而不会遇到内存问题。