Эффективная обработка больших CSV-файлов
В реальных сценариях вам может потребоваться обработать CSV-файлы размером в несколько гигабайт. Загрузка таких файлов полностью в память может привести к сбою вашего приложения или значительному замедлению его работы. На этом шаге мы рассмотрим методы эффективной обработки больших CSV-файлов.
Проблема с памятью при работе с большими файлами
При работе с CSV-файлами существует три распространенных подхода, каждый из которых имеет разные требования к памяти:
- Загрузка всего файла в память — просто, но использует больше всего памяти
- Потоковая передача файла строка за строкой — использует минимальную память, но может быть медленнее для сложных операций
- Разбиение на части (Chunking) — промежуточный вариант, который обрабатывает файл управляемыми частями
Давайте рассмотрим каждый из этих подходов на практических примерах.
Создание большего файла-образца
Сначала давайте создадим больший файл-образец, чтобы продемонстрировать эти методы. Создайте новый файл с именем create_large_csv.py:
import csv
import random
from datetime import datetime, timedelta
def create_large_csv(filename, num_rows):
## Define column headers
headers = ['transaction_id', 'date', 'customer_id', 'product_id', 'amount', 'status']
## Create a file with the specified number of rows
with open(filename, 'w', newline='') as file:
writer = csv.writer(file)
writer.writerow(headers)
## Generate random data
start_date = datetime(2022, 1, 1)
status_options = ['completed', 'pending', 'failed', 'refunded']
for i in range(1, num_rows + 1):
## Generate random values
transaction_id = f"TXN-{i:08d}"
days_offset = random.randint(0, 365)
date = (start_date + timedelta(days=days_offset)).strftime('%Y-%m-%d')
customer_id = f"CUST-{random.randint(1001, 9999)}"
product_id = f"PROD-{random.randint(101, 999)}"
amount = round(random.uniform(10.0, 500.0), 2)
status = random.choice(status_options)
## Write row to CSV
writer.writerow([transaction_id, date, customer_id, product_id, amount, status])
## Print progress indicator for every 10,000 rows
if i % 10000 == 0:
print(f"Generated {i} rows...")
## Create a CSV file with 50,000 rows (about 5-10 MB)
create_large_csv('transactions.csv', 50000)
print("Large CSV file 'transactions.csv' has been created.")
Запустите этот скрипт, чтобы создать файл:
python3 create_large_csv.py
Ожидаемый вывод:
Generated 10000 rows...
Generated 20000 rows...
Generated 30000 rows...
Generated 40000 rows...
Generated 50000 rows...
Large CSV file 'transactions.csv' has been created.
Вы можете проверить размер файла с помощью:
ls -lh transactions.csv
Ожидаемый вывод (размер может незначительно отличаться):
-rw-r--r-- 1 labex labex 3.8M Apr 15 12:30 transactions.csv
Подход 1: Построчная обработка (потоковая передача)
Наиболее эффективным с точки зрения памяти подходом является построчная обработка CSV-файла. Создайте файл с именем streaming_example.py:
import csv
import time
def process_csv_streaming(filename):
print(f"Processing {filename} using streaming (line by line)...")
start_time = time.time()
## Track some statistics
row_count = 0
total_amount = 0
status_counts = {'completed': 0, 'pending': 0, 'failed': 0, 'refunded': 0}
## Process the file line by line
with open(filename, 'r') as file:
reader = csv.DictReader(file)
for row in reader:
## Increment row counter
row_count += 1
## Process row data
amount = float(row['amount'])
status = row['status']
## Update statistics
total_amount += amount
status_counts[status] += 1
## Calculate and display results
end_time = time.time()
processing_time = end_time - start_time
print(f"\nResults:")
print(f"Processed {row_count:,} rows in {processing_time:.2f} seconds")
print(f"Total transaction amount: ${total_amount:,.2f}")
print(f"Average transaction amount: ${total_amount/row_count:.2f}")
print("\nTransaction status breakdown:")
for status, count in status_counts.items():
percentage = (count / row_count) * 100
print(f" {status}: {count:,} ({percentage:.1f}%)")
## Process the file
process_csv_streaming('transactions.csv')
Запустите этот скрипт:
python3 streaming_example.py
Ожидаемый вывод (ваши точные числа могут отличаться):
Processing transactions.csv using streaming (line by line)...
Results:
Processed 50,000 rows in 0.17 seconds
Total transaction amount: $12,739,853.35
Average transaction amount: $254.80
Transaction status breakdown:
completed: 12,432 (24.9%)
pending: 12,598 (25.2%)
failed: 12,414 (24.8%)
refunded: 12,556 (25.1%)
Подход 2: Обработка по частям (Chunked Processing)
Для более сложных операций или когда вам нужно обрабатывать данные пакетами, вы можете использовать подход с разбивкой на части. Создайте файл с именем chunked_example.py:
import csv
import time
def process_csv_chunked(filename, chunk_size=10000):
print(f"Processing {filename} using chunks of {chunk_size} rows...")
start_time = time.time()
## Track some statistics
row_count = 0
total_amount = 0
status_counts = {'completed': 0, 'pending': 0, 'failed': 0, 'refunded': 0}
## Process the file in chunks
with open(filename, 'r') as file:
reader = csv.DictReader(file)
chunk = []
for row in reader:
## Add row to current chunk
chunk.append(row)
## When chunk reaches desired size, process it
if len(chunk) >= chunk_size:
## Process the chunk
for row_data in chunk:
## Update statistics
row_count += 1
amount = float(row_data['amount'])
status = row_data['status']
total_amount += amount
status_counts[status] += 1
print(f"Processed chunk of {len(chunk)} rows... ({row_count:,} total)")
## Clear the chunk for next batch
chunk = []
## Process any remaining rows in the last chunk
if chunk:
for row_data in chunk:
row_count += 1
amount = float(row_data['amount'])
status = row_data['status']
total_amount += amount
status_counts[status] += 1
print(f"Processed final chunk of {len(chunk)} rows... ({row_count:,} total)")
## Calculate and display results
end_time = time.time()
processing_time = end_time - start_time
print(f"\nResults:")
print(f"Processed {row_count:,} rows in {processing_time:.2f} seconds")
print(f"Total transaction amount: ${total_amount:,.2f}")
print(f"Average transaction amount: ${total_amount/row_count:.2f}")
print("\nTransaction status breakdown:")
for status, count in status_counts.items():
percentage = (count / row_count) * 100
print(f" {status}: {count:,} ({percentage:.1f}%)")
## Process the file with chunks of 10,000 rows
process_csv_chunked('transactions.csv', chunk_size=10000)
Запустите этот скрипт:
python3 chunked_example.py
Ожидаемый вывод:
Processing transactions.csv using chunks of 10000 rows...
Processed chunk of 10000 rows... (10,000 total)
Processed chunk of 10000 rows... (20,000 total)
Processed chunk of 10000 rows... (30,000 total)
Processed chunk of 10000 rows... (40,000 total)
Processed chunk of 10000 rows... (50,000 total)
Results:
Processed 50,000 rows in 0.20 seconds
Total transaction amount: $12,739,853.35
Average transaction amount: $254.80
Transaction status breakdown:
completed: 12,432 (24.9%)
pending: 12,598 (25.2%)
failed: 12,414 (24.8%)
refunded: 12,556 (25.1%)
Сравнение использования памяти
Основные различия между этими подходами заключаются в следующем:
-
Потоковая передача (строка за строкой):
- Использует минимальную память независимо от размера файла
- Лучше всего подходит для очень больших файлов
- Простые операции над каждой строкой
-
Обработка по частям:
- Использует больше памяти, чем потоковая передача, но все равно эффективна
- Подходит для операций, которым необходимо обрабатывать строки пакетами
- Предоставляет обновления о ходе выполнения во время обработки
- Может быть объединена с многопроцессорностью для параллельной обработки
Для большинства практических целей рекомендуется подход с потоковой передачей, если только вам специально не нужны возможности пакетной обработки. Он обеспечивает наилучшую эффективность использования памяти, сохраняя при этом хорошую производительность.
На следующем шаге мы рассмотрим использование сторонних библиотек, таких как pandas, для еще более мощных возможностей обработки CSV.