Em cenários do mundo real, você pode precisar processar arquivos CSV que têm vários gigabytes de tamanho. Carregar esses arquivos inteiramente na memória pode fazer com que seu aplicativo trave ou diminua a velocidade significativamente. Nesta etapa, exploraremos técnicas para processar arquivos CSV grandes de forma eficiente.
O Desafio da Memória com Arquivos Grandes
Ao trabalhar com arquivos CSV, existem três abordagens comuns, cada uma com diferentes requisitos de memória:
- Carregar o arquivo inteiro na memória - Simples, mas usa a maior quantidade de memória
- Transmitir o arquivo linha por linha - Usa memória mínima, mas pode ser mais lento para operações complexas
- Chunking (divisão em blocos) - Um meio termo que processa o arquivo em blocos gerenciáveis
Vamos explorar cada uma dessas abordagens com exemplos práticos.
Criando um Arquivo de Amostra Maior
Primeiro, vamos criar um arquivo de amostra maior para demonstrar essas técnicas. Crie um novo arquivo chamado create_large_csv.py:
import csv
import random
from datetime import datetime, timedelta
def create_large_csv(filename, num_rows):
## Define column headers
headers = ['transaction_id', 'date', 'customer_id', 'product_id', 'amount', 'status']
## Create a file with the specified number of rows
with open(filename, 'w', newline='') as file:
writer = csv.writer(file)
writer.writerow(headers)
## Generate random data
start_date = datetime(2022, 1, 1)
status_options = ['completed', 'pending', 'failed', 'refunded']
for i in range(1, num_rows + 1):
## Generate random values
transaction_id = f"TXN-{i:08d}"
days_offset = random.randint(0, 365)
date = (start_date + timedelta(days=days_offset)).strftime('%Y-%m-%d')
customer_id = f"CUST-{random.randint(1001, 9999)}"
product_id = f"PROD-{random.randint(101, 999)}"
amount = round(random.uniform(10.0, 500.0), 2)
status = random.choice(status_options)
## Write row to CSV
writer.writerow([transaction_id, date, customer_id, product_id, amount, status])
## Print progress indicator for every 10,000 rows
if i % 10000 == 0:
print(f"Generated {i} rows...")
## Create a CSV file with 50,000 rows (about 5-10 MB)
create_large_csv('transactions.csv', 50000)
print("Large CSV file 'transactions.csv' has been created.")
Execute este script para criar o arquivo:
python3 create_large_csv.py
Saída esperada:
Generated 10000 rows...
Generated 20000 rows...
Generated 30000 rows...
Generated 40000 rows...
Generated 50000 rows...
Large CSV file 'transactions.csv' has been created.
Você pode verificar o tamanho do arquivo com:
ls -lh transactions.csv
Saída esperada (o tamanho pode variar ligeiramente):
-rw-r--r-- 1 labex labex 3.8M Apr 15 12:30 transactions.csv
Abordagem 1: Processamento Linha por Linha (Streaming)
A abordagem mais eficiente em termos de memória é processar um arquivo CSV linha por linha. Crie um arquivo chamado streaming_example.py:
import csv
import time
def process_csv_streaming(filename):
print(f"Processing {filename} using streaming (line by line)...")
start_time = time.time()
## Track some statistics
row_count = 0
total_amount = 0
status_counts = {'completed': 0, 'pending': 0, 'failed': 0, 'refunded': 0}
## Process the file line by line
with open(filename, 'r') as file:
reader = csv.DictReader(file)
for row in reader:
## Increment row counter
row_count += 1
## Process row data
amount = float(row['amount'])
status = row['status']
## Update statistics
total_amount += amount
status_counts[status] += 1
## Calculate and display results
end_time = time.time()
processing_time = end_time - start_time
print(f"\nResults:")
print(f"Processed {row_count:,} rows in {processing_time:.2f} seconds")
print(f"Total transaction amount: ${total_amount:,.2f}")
print(f"Average transaction amount: ${total_amount/row_count:.2f}")
print("\nTransaction status breakdown:")
for status, count in status_counts.items():
percentage = (count / row_count) * 100
print(f" {status}: {count:,} ({percentage:.1f}%)")
## Process the file
process_csv_streaming('transactions.csv')
Execute este script:
python3 streaming_example.py
Saída esperada (seus números exatos podem variar):
Processing transactions.csv using streaming (line by line)...
Results:
Processed 50,000 rows in 0.17 seconds
Total transaction amount: $12,739,853.35
Average transaction amount: $254.80
Transaction status breakdown:
completed: 12,432 (24.9%)
pending: 12,598 (25.2%)
failed: 12,414 (24.8%)
refunded: 12,556 (25.1%)
Abordagem 2: Processamento em Blocos (Chunked Processing)
Para operações mais complexas ou quando você precisa processar dados em lotes, você pode usar uma abordagem em blocos. Crie um arquivo chamado chunked_example.py:
import csv
import time
def process_csv_chunked(filename, chunk_size=10000):
print(f"Processing {filename} using chunks of {chunk_size} rows...")
start_time = time.time()
## Track some statistics
row_count = 0
total_amount = 0
status_counts = {'completed': 0, 'pending': 0, 'failed': 0, 'refunded': 0}
## Process the file in chunks
with open(filename, 'r') as file:
reader = csv.DictReader(file)
chunk = []
for row in reader:
## Add row to current chunk
chunk.append(row)
## When chunk reaches desired size, process it
if len(chunk) >= chunk_size:
## Process the chunk
for row_data in chunk:
## Update statistics
row_count += 1
amount = float(row_data['amount'])
status = row_data['status']
total_amount += amount
status_counts[status] += 1
print(f"Processed chunk of {len(chunk)} rows... ({row_count:,} total)")
## Clear the chunk for next batch
chunk = []
## Process any remaining rows in the last chunk
if chunk:
for row_data in chunk:
row_count += 1
amount = float(row_data['amount'])
status = row_data['status']
total_amount += amount
status_counts[status] += 1
print(f"Processed final chunk of {len(chunk)} rows... ({row_count:,} total)")
## Calculate and display results
end_time = time.time()
processing_time = end_time - start_time
print(f"\nResults:")
print(f"Processed {row_count:,} rows in {processing_time:.2f} seconds")
print(f"Total transaction amount: ${total_amount:,.2f}")
print(f"Average transaction amount: ${total_amount/row_count:.2f}")
print("\nTransaction status breakdown:")
for status, count in status_counts.items():
percentage = (count / row_count) * 100
print(f" {status}: {count:,} ({percentage:.1f}%)")
## Process the file with chunks of 10,000 rows
process_csv_chunked('transactions.csv', chunk_size=10000)
Execute este script:
python3 chunked_example.py
Saída esperada:
Processing transactions.csv using chunks of 10000 rows...
Processed chunk of 10000 rows... (10,000 total)
Processed chunk of 10000 rows... (20,000 total)
Processed chunk of 10000 rows... (30,000 total)
Processed chunk of 10000 rows... (40,000 total)
Processed chunk of 10000 rows... (50,000 total)
Results:
Processed 50,000 rows in 0.20 seconds
Total transaction amount: $12,739,853.35
Average transaction amount: $254.80
Transaction status breakdown:
completed: 12,432 (24.9%)
pending: 12,598 (25.2%)
failed: 12,414 (24.8%)
refunded: 12,556 (25.1%)
Comparação de Uso de Memória
As principais diferenças entre essas abordagens são:
-
Streaming (linha por linha):
- Usa memória mínima, independentemente do tamanho do arquivo
- Melhor para arquivos muito grandes
- Operações simples em cada linha
-
Processamento em blocos:
- Usa mais memória do que streaming, mas ainda é eficiente
- Bom para operações que precisam processar linhas em lotes
- Fornece atualizações de progresso durante o processamento
- Pode ser combinado com multiprocessamento para processamento paralelo
Para a maioria dos propósitos práticos, a abordagem de streaming é recomendada, a menos que você precise especificamente de recursos de processamento em lotes. Ele fornece a melhor eficiência de memória, mantendo um bom desempenho.
Na próxima etapa, exploraremos o uso de bibliotecas de terceiros como pandas para recursos de processamento CSV ainda mais poderosos.