Python 에서 대용량 CSV 파일 효율적으로 처리하는 방법

PythonBeginner
지금 연습하기

소개

대용량 CSV 파일을 처리하는 것은 Python 개발자에게 흔한 과제입니다. 이 튜토리얼은 성능과 메모리 사용량을 최적화하는 데 중점을 두고, 이러한 파일을 효과적으로 처리하기 위한 효율적인 기술을 안내합니다. 이 랩을 마치면 Python 에서 데이터 집약적인 CSV 처리 작업을 해결할 수 있는 실용적인 지식을 갖추게 될 것입니다.

고객 데이터를 분석하거나, 금융 기록을 처리하거나, 모든 유형의 구조화된 데이터를 사용하든, 이 랩에서 배운 기술은 메모리 문제 없이 대용량 데이터 세트를 효율적으로 처리하는 데 도움이 될 것입니다.

간단한 CSV 파일 생성 및 읽기

CSV (Comma-Separated Values, 쉼표로 구분된 값) 는 표 형식 데이터를 저장하는 데 사용되는 널리 사용되는 파일 형식입니다. 이 단계에서는 간단한 CSV 파일을 생성하고 Python 의 내장 csv 모듈을 사용하여 읽는 방법을 배웁니다.

CSV 파일 이해

CSV 파일은 다음과 같은 일반 텍스트 형식으로 데이터를 저장합니다.

  • 각 줄은 데이터의 행을 나타냅니다.
  • 각 행 내의 값은 구분 기호 (일반적으로 쉼표) 로 구분됩니다.
  • 첫 번째 행에는 종종 열 머리글이 포함됩니다.

작업할 간단한 CSV 파일을 만들어 보겠습니다.

CSV 파일 생성

먼저, 작업할 디렉토리를 만들고 간단한 CSV 파일을 만들어 보겠습니다.

  1. WebIDE 에서 터미널을 엽니다.
  2. 편집기에서 csv_basics.py라는 새 Python 파일을 만듭니다.

이제 다음 코드를 csv_basics.py에 추가합니다.

import csv

## Data to write to CSV
data = [
    ['Name', 'Age', 'City'],              ## Header row
    ['John Smith', '28', 'New York'],
    ['Sarah Johnson', '32', 'San Francisco'],
    ['Michael Brown', '45', 'Chicago'],
    ['Emily Davis', '36', 'Boston'],
    ['David Wilson', '52', 'Seattle']
]

## Writing data to a CSV file
with open('sample_data.csv', 'w', newline='') as file:
    writer = csv.writer(file)
    writer.writerows(data)

print("CSV file 'sample_data.csv' created successfully.")

터미널에서 다음 명령을 실행하여 이 코드를 실행합니다.

python3 csv_basics.py

예상 출력:

CSV file 'sample_data.csv' created successfully.

이렇게 하면 현재 디렉토리에 sample_data.csv라는 새 CSV 파일이 생성됩니다. 다음을 실행하여 이 파일의 내용을 볼 수 있습니다.

cat sample_data.csv

예상 출력:

Name,Age,City
John Smith,28,New York
Sarah Johnson,32,San Francisco
Michael Brown,45,Chicago
Emily Davis,36,Boston
David Wilson,52,Seattle

CSV 파일 읽기

이제 방금 생성한 CSV 파일을 읽어 보겠습니다. 다음 코드를 사용하여 read_csv.py라는 새 파일을 만듭니다.

import csv

## Reading a CSV file
with open('sample_data.csv', 'r') as file:
    reader = csv.reader(file)

    print("Contents of sample_data.csv:")
    print("--------------------------")

    for row in reader:
        print(row)

## Reading and accessing specific columns
print("\nReading specific columns:")
print("--------------------------")

with open('sample_data.csv', 'r') as file:
    reader = csv.reader(file)
    headers = next(reader)  ## Skip the header row

    for row in reader:
        name = row[0]
        age = row[1]
        city = row[2]
        print(f"Name: {name}, Age: {age}, City: {city}")

다음 명령으로 이 코드를 실행합니다.

python3 read_csv.py

예상 출력:

Contents of sample_data.csv:
--------------------------
['Name', 'Age', 'City']
['John Smith', '28', 'New York']
['Sarah Johnson', '32', 'San Francisco']
['Michael Brown', '45', 'Chicago']
['Emily Davis', '36', 'Boston']
['David Wilson', '52', 'Seattle']

Reading specific columns:
--------------------------
Name: John Smith, Age: 28, City: New York
Name: Sarah Johnson, Age: 32, City: San Francisco
Name: Michael Brown, Age: 45, City: Chicago
Name: Emily Davis, Age: 36, City: Boston
Name: David Wilson, Age: 52, City: Seattle

CSV 모듈 이해

Python csv 모듈은 두 개의 주요 클래스를 제공합니다.

  • csv.reader: CSV 파일을 읽고 각 행을 문자열 목록으로 반환합니다.
  • csv.writer: CSV 파일에 데이터를 씁니다.

이 모듈은 특수 문자를 이스케이프하고 따옴표를 처리하는 등 다양한 CSV 형식을 처리하는 모든 복잡성을 처리합니다.

이 단계에서는 간단한 CSV 파일을 생성하고 읽는 방법을 배웠습니다. 다음 단계에서는 더 큰 CSV 파일을 처리하는 보다 효율적인 방법을 살펴보겠습니다.

편리한 CSV 처리를 위한 DictReader 사용

이전 단계에서는 기본 csv.readercsv.writer 함수를 사용했습니다. 이제 열 머리글이 있는 데이터를 사용할 때 특히 유용한 csv.DictReader 클래스를 사용하여 CSV 파일을 처리하는 더 편리한 방법을 살펴보겠습니다.

DictReader 란 무엇인가요?

csv.DictReader는 CSV 파일을 읽고 각 행을 다음과 같은 딕셔너리로 반환합니다.

  • 키는 열 머리글 (기본적으로 CSV 파일의 첫 번째 행) 에서 가져옵니다.
  • 값은 각 행의 해당 데이터입니다.

이 접근 방식은 인덱스 대신 이름으로 열을 참조할 수 있으므로 코드를 더 읽기 쉽고 오류 발생 가능성을 줄여줍니다.

더 큰 테스트 파일 만들기

먼저, DictReader의 이점을 보여주기 위해 약간 더 큰 CSV 파일을 만들어 보겠습니다. 다음 코드를 사용하여 create_users_data.py라는 새 파일을 만듭니다.

import csv
import random

## Generate some sample user data
def generate_users(count):
    users = [['id', 'name', 'email', 'age', 'country']]  ## Header row

    domains = ['gmail.com', 'yahoo.com', 'outlook.com', 'example.com']
    countries = ['USA', 'Canada', 'UK', 'Australia', 'Germany', 'France', 'Japan', 'Brazil']
    first_names = ['John', 'Jane', 'Michael', 'Emily', 'David', 'Sarah', 'Robert', 'Lisa']
    last_names = ['Smith', 'Johnson', 'Brown', 'Davis', 'Wilson', 'Miller', 'Jones', 'Taylor']

    for i in range(1, count + 1):
        first_name = random.choice(first_names)
        last_name = random.choice(last_names)
        name = f"{first_name} {last_name}"
        email = f"{first_name.lower()}.{last_name.lower()}@{random.choice(domains)}"
        age = random.randint(18, 65)
        country = random.choice(countries)

        users.append([str(i), name, email, str(age), country])

    return users

## Create a CSV file with 100 users
users_data = generate_users(100)

## Write data to CSV file
with open('users_data.csv', 'w', newline='') as file:
    writer = csv.writer(file)
    writer.writerows(users_data)

print(f"Created 'users_data.csv' with 100 user records")

스크립트를 실행하여 파일을 만듭니다.

python3 create_users_data.py

예상 출력:

Created 'users_data.csv' with 100 user records

이 새 파일의 처음 몇 줄을 살펴보겠습니다.

head -n 5 users_data.csv

머리글 행과 그 뒤에 4 개의 데이터 행이 표시됩니다.

id,name,email,age,country
1,John Smith,john.smith@gmail.com,25,USA
2,Emily Brown,emily.brown@yahoo.com,32,Canada
3,David Jones,david.jones@outlook.com,45,UK
4,Sarah Wilson,sarah.wilson@example.com,28,Australia

DictReader 를 사용하여 CSV 파일 처리

이제 DictReader를 사용하여 이 파일을 처리하는 스크립트를 만들어 보겠습니다. 다음 코드를 사용하여 dict_reader_example.py라는 새 파일을 만듭니다.

import csv

## Read the CSV file using DictReader
with open('users_data.csv', 'r') as file:
    csv_reader = csv.DictReader(file)

    ## Print the field names (column headers)
    print(f"Column headers: {csv_reader.fieldnames}")
    print("\nFirst 5 records:")
    print("-----------------")

    ## Print the first 5 records
    for i, row in enumerate(csv_reader):
        if i < 5:
            ## Access fields by name
            print(f"User {row['id']}: {row['name']}, {row['age']} years old, from {row['country']}")
            print(f"  Email: {row['email']}")
        else:
            break

## Using DictReader for data analysis
with open('users_data.csv', 'r') as file:
    csv_reader = csv.DictReader(file)

    ## Calculate average age
    total_age = 0
    user_count = 0

    ## Count users by country
    countries = {}

    for row in csv_reader:
        user_count += 1
        total_age += int(row['age'])

        ## Count users by country
        country = row['country']
        if country in countries:
            countries[country] += 1
        else:
            countries[country] = 1

    avg_age = total_age / user_count if user_count > 0 else 0

    print("\nData Analysis:")
    print("--------------")
    print(f"Total users: {user_count}")
    print(f"Average age: {avg_age:.2f} years")
    print("\nUsers by country:")

    for country, count in sorted(countries.items(), key=lambda x: x[1], reverse=True):
        print(f"  {country}: {count} users")

이 스크립트를 실행합니다.

python3 dict_reader_example.py

예상 출력 (데이터가 무작위로 생성되므로 정확한 값은 다를 수 있습니다).

Column headers: ['id', 'name', 'email', 'age', 'country']

First 5 records:
-----------------
User 1: John Smith, 25 years old, from USA
  Email: john.smith@gmail.com
User 2: Emily Brown, 32 years old, from Canada
  Email: emily.brown@yahoo.com
User 3: David Jones, 45 years old, from UK
  Email: david.jones@outlook.com
User 4: Sarah Wilson, 28 years old, from Australia
  Email: sarah.wilson@example.com
User 5: Michael Taylor, 37 years old, from Germany
  Email: michael.taylor@example.com

Data Analysis:
--------------
Total users: 100
Average age: 41.35 years

Users by country:
  USA: 16 users
  Canada: 14 users
  Japan: 13 users
  UK: 12 users
  Germany: 12 users
  Australia: 12 users
  France: 11 users
  Brazil: 10 users

DictReader 사용의 장점

보시다시피 DictReader를 사용하면 몇 가지 장점이 있습니다.

  1. 읽기 쉬운 코드: 인덱스 위치를 기억하는 대신 이름으로 필드에 액세스할 수 있습니다.
  2. 자동 문서화: 코드는 액세스하는 필드를 명확하게 보여줍니다.
  3. 유연성: CSV 파일에서 열 순서가 변경되더라도 열 이름이 동일하게 유지되는 한 코드는 계속 작동합니다.

이 접근 방식은 열이 많은 실제 데이터를 사용하거나 열 순서가 시간이 지남에 따라 변경될 수 있는 경우에 특히 유용합니다.

다음 단계에서는 모든 것을 한 번에 메모리에 로드하지 않고 더 큰 CSV 파일을 처리하기 위한 효율적인 기술을 살펴보겠습니다.

대용량 CSV 파일 효율적으로 처리하기

실제 시나리오에서는 크기가 수 기가바이트에 달하는 CSV 파일을 처리해야 할 수 있습니다. 이러한 파일을 전체적으로 메모리에 로드하면 애플리케이션이 충돌하거나 속도가 현저히 느려질 수 있습니다. 이 단계에서는 대용량 CSV 파일을 효율적으로 처리하는 기술을 살펴보겠습니다.

대용량 파일의 메모리 문제

CSV 파일을 사용할 때 세 가지 일반적인 접근 방식이 있으며, 각 접근 방식은 서로 다른 메모리 요구 사항을 갖습니다.

  1. 전체 파일을 메모리에 로드 - 간단하지만 가장 많은 메모리를 사용합니다.
  2. 파일을 줄 단위로 스트리밍 - 최소한의 메모리를 사용하지만 복잡한 작업의 경우 속도가 느릴 수 있습니다.
  3. 청크 (Chunking) - 관리 가능한 청크로 파일을 처리하는 중간 지점입니다.

실제 예제를 통해 이러한 각 접근 방식을 살펴보겠습니다.

더 큰 샘플 파일 만들기

먼저, 이러한 기술을 시연하기 위해 더 큰 샘플 파일을 만들어 보겠습니다. create_large_csv.py라는 새 파일을 만듭니다.

import csv
import random
from datetime import datetime, timedelta

def create_large_csv(filename, num_rows):
    ## Define column headers
    headers = ['transaction_id', 'date', 'customer_id', 'product_id', 'amount', 'status']

    ## Create a file with the specified number of rows
    with open(filename, 'w', newline='') as file:
        writer = csv.writer(file)
        writer.writerow(headers)

        ## Generate random data
        start_date = datetime(2022, 1, 1)
        status_options = ['completed', 'pending', 'failed', 'refunded']

        for i in range(1, num_rows + 1):
            ## Generate random values
            transaction_id = f"TXN-{i:08d}"
            days_offset = random.randint(0, 365)
            date = (start_date + timedelta(days=days_offset)).strftime('%Y-%m-%d')
            customer_id = f"CUST-{random.randint(1001, 9999)}"
            product_id = f"PROD-{random.randint(101, 999)}"
            amount = round(random.uniform(10.0, 500.0), 2)
            status = random.choice(status_options)

            ## Write row to CSV
            writer.writerow([transaction_id, date, customer_id, product_id, amount, status])

            ## Print progress indicator for every 10,000 rows
            if i % 10000 == 0:
                print(f"Generated {i} rows...")

## Create a CSV file with 50,000 rows (about 5-10 MB)
create_large_csv('transactions.csv', 50000)
print("Large CSV file 'transactions.csv' has been created.")

이 스크립트를 실행하여 파일을 만듭니다.

python3 create_large_csv.py

예상 출력:

Generated 10000 rows...
Generated 20000 rows...
Generated 30000 rows...
Generated 40000 rows...
Generated 50000 rows...
Large CSV file 'transactions.csv' has been created.

다음 명령으로 파일 크기를 확인할 수 있습니다.

ls -lh transactions.csv

예상 출력 (크기는 약간 다를 수 있습니다):

-rw-r--r-- 1 labex labex 3.8M Apr 15 12:30 transactions.csv

접근 방식 1: 줄 단위 처리 (스트리밍)

가장 메모리 효율적인 접근 방식은 CSV 파일을 줄 단위로 처리하는 것입니다. streaming_example.py라는 파일을 만듭니다.

import csv
import time

def process_csv_streaming(filename):
    print(f"Processing {filename} using streaming (line by line)...")
    start_time = time.time()

    ## Track some statistics
    row_count = 0
    total_amount = 0
    status_counts = {'completed': 0, 'pending': 0, 'failed': 0, 'refunded': 0}

    ## Process the file line by line
    with open(filename, 'r') as file:
        reader = csv.DictReader(file)

        for row in reader:
            ## Increment row counter
            row_count += 1

            ## Process row data
            amount = float(row['amount'])
            status = row['status']

            ## Update statistics
            total_amount += amount
            status_counts[status] += 1

    ## Calculate and display results
    end_time = time.time()
    processing_time = end_time - start_time

    print(f"\nResults:")
    print(f"Processed {row_count:,} rows in {processing_time:.2f} seconds")
    print(f"Total transaction amount: ${total_amount:,.2f}")
    print(f"Average transaction amount: ${total_amount/row_count:.2f}")
    print("\nTransaction status breakdown:")
    for status, count in status_counts.items():
        percentage = (count / row_count) * 100
        print(f"  {status}: {count:,} ({percentage:.1f}%)")

## Process the file
process_csv_streaming('transactions.csv')

이 스크립트를 실행합니다.

python3 streaming_example.py

예상 출력 (정확한 숫자는 다를 수 있습니다):

Processing transactions.csv using streaming (line by line)...

Results:
Processed 50,000 rows in 0.17 seconds
Total transaction amount: $12,739,853.35
Average transaction amount: $254.80

Transaction status breakdown:
  completed: 12,432 (24.9%)
  pending: 12,598 (25.2%)
  failed: 12,414 (24.8%)
  refunded: 12,556 (25.1%)

접근 방식 2: 청크 처리

더 복잡한 작업이나 데이터를 일괄적으로 처리해야 하는 경우 청크 접근 방식을 사용할 수 있습니다. chunked_example.py라는 파일을 만듭니다.

import csv
import time

def process_csv_chunked(filename, chunk_size=10000):
    print(f"Processing {filename} using chunks of {chunk_size} rows...")
    start_time = time.time()

    ## Track some statistics
    row_count = 0
    total_amount = 0
    status_counts = {'completed': 0, 'pending': 0, 'failed': 0, 'refunded': 0}

    ## Process the file in chunks
    with open(filename, 'r') as file:
        reader = csv.DictReader(file)

        chunk = []
        for row in reader:
            ## Add row to current chunk
            chunk.append(row)

            ## When chunk reaches desired size, process it
            if len(chunk) >= chunk_size:
                ## Process the chunk
                for row_data in chunk:
                    ## Update statistics
                    row_count += 1
                    amount = float(row_data['amount'])
                    status = row_data['status']

                    total_amount += amount
                    status_counts[status] += 1

                print(f"Processed chunk of {len(chunk)} rows... ({row_count:,} total)")
                ## Clear the chunk for next batch
                chunk = []

        ## Process any remaining rows in the last chunk
        if chunk:
            for row_data in chunk:
                row_count += 1
                amount = float(row_data['amount'])
                status = row_data['status']

                total_amount += amount
                status_counts[status] += 1

            print(f"Processed final chunk of {len(chunk)} rows... ({row_count:,} total)")

    ## Calculate and display results
    end_time = time.time()
    processing_time = end_time - start_time

    print(f"\nResults:")
    print(f"Processed {row_count:,} rows in {processing_time:.2f} seconds")
    print(f"Total transaction amount: ${total_amount:,.2f}")
    print(f"Average transaction amount: ${total_amount/row_count:.2f}")
    print("\nTransaction status breakdown:")
    for status, count in status_counts.items():
        percentage = (count / row_count) * 100
        print(f"  {status}: {count:,} ({percentage:.1f}%)")

## Process the file with chunks of 10,000 rows
process_csv_chunked('transactions.csv', chunk_size=10000)

이 스크립트를 실행합니다.

python3 chunked_example.py

예상 출력:

Processing transactions.csv using chunks of 10000 rows...
Processed chunk of 10000 rows... (10,000 total)
Processed chunk of 10000 rows... (20,000 total)
Processed chunk of 10000 rows... (30,000 total)
Processed chunk of 10000 rows... (40,000 total)
Processed chunk of 10000 rows... (50,000 total)

Results:
Processed 50,000 rows in 0.20 seconds
Total transaction amount: $12,739,853.35
Average transaction amount: $254.80

Transaction status breakdown:
  completed: 12,432 (24.9%)
  pending: 12,598 (25.2%)
  failed: 12,414 (24.8%)
  refunded: 12,556 (25.1%)

메모리 사용량 비교

이러한 접근 방식의 주요 차이점은 다음과 같습니다.

  1. 스트리밍 (줄 단위):

    • 파일 크기에 관계없이 최소한의 메모리를 사용합니다.
    • 매우 큰 파일에 가장 적합합니다.
    • 각 행에 대한 간단한 작업.
  2. 청크 처리:

    • 스트리밍보다 더 많은 메모리를 사용하지만 여전히 효율적입니다.
    • 일괄적으로 행을 처리해야 하는 작업에 적합합니다.
    • 처리 중에 진행 상황 업데이트를 제공합니다.
    • 병렬 처리를 위해 다중 처리를 결합할 수 있습니다.

대부분의 실제 목적의 경우, 특히 일괄 처리 기능이 필요한 경우가 아니라면 스트리밍 접근 방식이 권장됩니다. 이는 우수한 성능을 유지하면서 최고의 메모리 효율성을 제공합니다.

다음 단계에서는 pandas 와 같은 타사 라이브러리를 사용하여 훨씬 더 강력한 CSV 처리 기능을 살펴보겠습니다.

고급 CSV 처리를 위한 Pandas 사용

Python 의 내장 csv 모듈은 강력하지만, pandas 라이브러리는 데이터 분석 및 조작을 위한 더 고급 기능을 제공합니다. 이 단계에서는 효율적인 CSV 처리를 위해 pandas 를 사용하는 방법을 살펴보겠습니다.

Pandas 설치

먼저, pandas 라이브러리를 설치해 보겠습니다.

pip install pandas

예상 출력:

Collecting pandas
  Downloading pandas-2.0.0-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (12.3 MB)
     ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 12.3/12.3 MB 42.6 MB/s eta 0:00:00
...
Successfully installed pandas-2.0.0 numpy-1.24.3 python-dateutil-2.8.2 pytz-2023.3 tzdata-2023.3

Pandas 로 CSV 파일 읽기

Pandas 를 사용하면 CSV 데이터를 쉽게 읽고, 분석하고, 조작할 수 있습니다. pandas_basic.py라는 파일을 만듭니다.

import pandas as pd
import time

def process_with_pandas(filename):
    print(f"Processing {filename} with pandas...")
    start_time = time.time()

    ## Read the CSV file into a DataFrame
    df = pd.read_csv(filename)

    ## Display basic information
    print(f"\nDataFrame Info:")
    print(f"Shape: {df.shape} (rows, columns)")
    print(f"Column names: {', '.join(df.columns)}")

    ## Display the first 5 rows
    print("\nFirst 5 rows:")
    print(df.head())

    ## Basic statistics
    print("\nSummary statistics for numeric columns:")
    print(df.describe())

    ## Group by analysis
    print("\nTransaction counts by status:")
    status_counts = df['status'].value_counts()
    print(status_counts)

    ## Calculate average amount by status
    print("\nAverage transaction amount by status:")
    avg_by_status = df.groupby('status')['amount'].mean()
    print(avg_by_status)

    ## Calculate total amount by date (first 5 dates)
    print("\nTotal transaction amount by date (first 5 dates):")
    total_by_date = df.groupby('date')['amount'].sum().sort_values(ascending=False).head(5)
    print(total_by_date)

    end_time = time.time()
    print(f"\nProcessed in {end_time - start_time:.2f} seconds")

## Process the transactions file
process_with_pandas('transactions.csv')

이 스크립트를 실행합니다.

python3 pandas_basic.py

예상 출력 (간결성을 위해 잘림):

Processing transactions.csv with pandas...

DataFrame Info:
Shape: (50000, 6) (rows, columns)
Column names: transaction_id, date, customer_id, product_id, amount, status

First 5 rows:
  transaction_id        date customer_id product_id   amount    status
0    TXN-00000001  2022-12-19   CUST-5421   PROD-383  385.75  refunded
1    TXN-00000002  2022-02-01   CUST-7078   PROD-442  286.83  completed
2    TXN-00000003  2022-12-24   CUST-2356   PROD-701  364.87    failed
3    TXN-00000004  2022-04-09   CUST-3458   PROD-854  247.73   pending
4    TXN-00000005  2022-03-07   CUST-6977   PROD-307  298.69  completed

Summary statistics for numeric columns:
              amount
count  50000.000000
mean     254.797067
std      141.389125
min       10.010000
25%      127.732500
50%      254.865000
75%      381.387500
max      499.990000

Transaction counts by status:
pending     12598
refunded    12556
completed   12432
failed      12414
Name: status, dtype: int64

Average transaction amount by status:
status
completed    255.028733
failed       254.709444
pending      254.690785
refunded     254.760390
Name: amount, dtype: float64

Total transaction amount by date (first 5 dates):
date
2022-01-20    38883.19
2022-08-30    38542.49
2022-03-10    38331.67
2022-11-29    38103.61
2022-06-24    37954.87
Name: amount, dtype: float64

Processed in 0.11 seconds

Pandas 로 대용량 CSV 파일 처리

Pandas 는 강력하지만, 대용량 CSV 파일을 로드할 때 많은 메모리를 소비할 수 있습니다. 매우 큰 파일의 경우 chunksize 매개변수를 사용하여 파일을 청크 단위로 읽을 수 있습니다. pandas_chunked.py라는 파일을 만듭니다.

import pandas as pd
import time

def process_large_csv_with_pandas(filename, chunk_size=10000):
    print(f"Processing {filename} with pandas using chunks of {chunk_size} rows...")
    start_time = time.time()

    ## Initialize variables to store aggregated results
    total_rows = 0
    total_amount = 0
    status_counts = {'completed': 0, 'pending': 0, 'failed': 0, 'refunded': 0}
    daily_totals = {}

    ## Process the file in chunks
    for chunk_num, chunk in enumerate(pd.read_csv(filename, chunksize=chunk_size)):
        ## Update row count
        chunk_rows = len(chunk)
        total_rows += chunk_rows

        ## Update total amount
        chunk_amount = chunk['amount'].sum()
        total_amount += chunk_amount

        ## Update status counts
        for status, count in chunk['status'].value_counts().items():
            status_counts[status] += count

        ## Update daily totals
        for date, group in chunk.groupby('date'):
            if date in daily_totals:
                daily_totals[date] += group['amount'].sum()
            else:
                daily_totals[date] = group['amount'].sum()

        print(f"Processed chunk {chunk_num + 1} ({total_rows:,} rows so far)")

    ## Calculate results
    end_time = time.time()
    processing_time = end_time - start_time

    print(f"\nResults after processing {total_rows:,} rows in {processing_time:.2f} seconds:")
    print(f"Total transaction amount: ${total_amount:,.2f}")
    print(f"Average transaction amount: ${total_amount/total_rows:.2f}")

    print("\nTransaction status breakdown:")
    for status, count in status_counts.items():
        percentage = (count / total_rows) * 100
        print(f"  {status}: {count:,} ({percentage:.1f}%)")

    ## Show top 5 days by transaction amount
    print("\nTop 5 days by transaction amount:")
    top_days = sorted(daily_totals.items(), key=lambda x: x[1], reverse=True)[:5]
    for date, amount in top_days:
        print(f"  {date}: ${amount:,.2f}")

## Process the transactions file with chunks
process_large_csv_with_pandas('transactions.csv', chunk_size=10000)

이 스크립트를 실행합니다.

python3 pandas_chunked.py

예상 출력:

Processing transactions.csv with pandas using chunks of 10000 rows...
Processed chunk 1 (10,000 rows so far)
Processed chunk 2 (20,000 rows so far)
Processed chunk 3 (30,000 rows so far)
Processed chunk 4 (40,000 rows so far)
Processed chunk 5 (50,000 rows so far)

Results after processing 50,000 rows in 0.34 seconds:
Total transaction amount: $12,739,853.35
Average transaction amount: $254.80

Transaction status breakdown:
  completed: 12,432 (24.9%)
  pending: 12,598 (25.2%)
  failed: 12,414 (24.8%)
  refunded: 12,556 (25.1%)

Top 5 days by transaction amount:
  2022-01-20: $38,883.19
  2022-08-30: $38,542.49
  2022-03-10: $38,331.67
  2022-11-29: $38,103.61
  2022-06-24: $37,954.87

Pandas 로 데이터 필터링 및 변환

Pandas 의 큰 장점 중 하나는 데이터를 쉽게 필터링하고 변환할 수 있다는 것입니다. pandas_filter.py라는 파일을 만듭니다.

import pandas as pd
import time

def filter_and_transform(filename):
    print(f"Filtering and transforming data from {filename}...")
    start_time = time.time()

    ## Read the CSV file
    df = pd.read_csv(filename)

    ## 1. Filter: Get only completed transactions
    completed = df[df['status'] == 'completed']
    print(f"Number of completed transactions: {len(completed)}")

    ## 2. Filter: Get high-value transactions (over $400)
    high_value = df[df['amount'] > 400]
    print(f"Number of high-value transactions (>${400}): {len(high_value)}")

    ## 3. Filter: Transactions from first quarter of 2022
    df['date'] = pd.to_datetime(df['date'])  ## Convert to datetime
    q1_2022 = df[(df['date'] >= '2022-01-01') & (df['date'] <= '2022-03-31')]
    print(f"Number of transactions in Q1 2022: {len(q1_2022)}")

    ## 4. Add a new column: transaction_month
    df['month'] = df['date'].dt.strftime('%Y-%m')

    ## 5. Group by month and status
    monthly_by_status = df.groupby(['month', 'status']).agg({
        'transaction_id': 'count',
        'amount': 'sum'
    }).rename(columns={'transaction_id': 'count'})

    ## Calculate success rate by month (completed / total)
    print("\nTransaction success rates by month:")
    for month, month_data in df.groupby('month'):
        total = len(month_data)
        completed = len(month_data[month_data['status'] == 'completed'])
        success_rate = (completed / total) * 100
        print(f"  {month}: {success_rate:.1f}% ({completed}/{total})")

    ## Save filtered data to a new CSV file
    completed_high_value = df[(df['status'] == 'completed') & (df['amount'] > 300)]
    output_file = 'high_value_completed.csv'
    completed_high_value.to_csv(output_file, index=False)

    end_time = time.time()
    print(f"\nFiltering completed in {end_time - start_time:.2f} seconds")
    print(f"Saved {len(completed_high_value)} high-value completed transactions to {output_file}")

## Filter and transform the data
filter_and_transform('transactions.csv')

이 스크립트를 실행합니다.

python3 pandas_filter.py

예상 출력:

Filtering and transforming data from transactions.csv...
Number of completed transactions: 12432
Number of high-value transactions (>$400): 6190
Number of transactions in Q1 2022: 12348

Transaction success rates by month:
  2022-01: 24.8% (1048/4225)
  2022-02: 25.0% (1010/4034)
  2022-03: 25.4% (1042/4089)
  2022-04: 24.2% (978/4052)
  2022-05: 24.4% (1047/4297)
  2022-06: 24.4% (1046/4280)
  2022-07: 24.7% (1071/4341)
  2022-08: 25.1% (1090/4343)
  2022-09: 26.1% (1091/4177)
  2022-10: 24.1% (1008/4182)
  2022-11: 24.8% (1009/4075)
  2022-12: 25.2% (992/3905)

Filtering completed in 0.38 seconds
Saved 6304 high-value completed transactions to high_value_completed.csv

Pandas 사용의 장점

이러한 예제에서 볼 수 있듯이, pandas 는 CSV 처리에 몇 가지 장점을 제공합니다.

  1. 풍부한 기능: 데이터 필터링, 그룹화, 집계 및 변환을 위한 내장 메서드
  2. 성능: 대규모 데이터 세트에 대한 빠른 작업을 위해 내부적으로 최적화된 C 코드
  3. 쉬운 데이터 분석: 통계를 계산하고 통찰력을 얻는 간단한 방법
  4. 시각화 기능: 플로팅 라이브러리와의 쉬운 통합 (예제에 표시되지 않음)
  5. 청크 처리: 사용 가능한 메모리보다 큰 파일을 처리하는 기능

CSV 파일을 포함하는 대부분의 데이터 분석 작업의 경우, 순수한 Python csv 모듈을 사용해야 하는 특정 메모리 제약 조건이 없는 한 pandas 가 권장되는 접근 방식입니다.

요약

이 튜토리얼에서는 작은 데이터 세트부터 신중한 메모리 관리가 필요한 대규모 데이터 세트까지 Python 에서 CSV 파일을 효율적으로 처리하는 여러 가지 접근 방식을 배웠습니다.

  1. 기본 CSV 처리: Python 의 내장 csv 모듈을 사용하여 csv.readercsv.writer로 CSV 파일을 읽고 씁니다.

  2. 사전 기반 처리: csv.DictReader를 사용하여 CSV 데이터를 보다 직관적인 방식으로 처리하고, 인덱스 대신 이름으로 필드에 액세스합니다.

  3. 효율적인 처리 기술:

    • 스트리밍: 최소한의 메모리 사용을 위해 파일을 줄 단위로 처리
    • 청크 처리: 더 나은 메모리 관리를 위해 파일을 일괄적으로 처리
  4. Pandas 를 사용한 고급 처리:

    • CSV 파일을 DataFrame 으로 읽기
    • 데이터 분석 및 필터링
    • 대용량 파일을 청크 단위로 처리
    • 데이터 변환 및 내보내기

이러한 기술은 Python 에서 모든 크기의 CSV 파일을 처리하기 위한 포괄적인 도구 키트를 제공합니다. 대부분의 데이터 분석 작업의 경우, pandas 는 풍부한 기능과 성능으로 인해 권장되는 라이브러리입니다. 그러나 매우 큰 파일이나 간단한 처리 작업의 경우, 내장 csv 모듈을 사용한 스트리밍 및 청크 처리 방식이 더 메모리 효율적일 수 있습니다.

특정 요구 사항에 따라 적절한 기술을 적용하여 메모리 문제 없이 모든 크기의 CSV 파일을 효율적으로 처리할 수 있습니다.