Procesamiento Eficiente de Archivos CSV Grandes
En escenarios del mundo real, es posible que deba procesar archivos CSV que tengan varios gigabytes de tamaño. Cargar estos archivos por completo en la memoria puede hacer que su aplicación se bloquee o se ralentice significativamente. En este paso, exploraremos técnicas para procesar archivos CSV grandes de manera eficiente.
El Desafío de la Memoria con Archivos Grandes
Cuando se trabaja con archivos CSV, existen tres enfoques comunes, cada uno con diferentes requisitos de memoria:
- Cargar todo el archivo en la memoria: Simple pero utiliza la mayor cantidad de memoria
- Transmitir el archivo línea por línea: Utiliza una memoria mínima pero puede ser más lento para operaciones complejas
- Fragmentación (Chunking): Un punto intermedio que procesa el archivo en fragmentos manejables
Exploremos cada uno de estos enfoques con ejemplos prácticos.
Creación de un Archivo de Muestra Más Grande
Primero, creemos un archivo de muestra más grande para demostrar estas técnicas. Cree un nuevo archivo llamado create_large_csv.py:
import csv
import random
from datetime import datetime, timedelta
def create_large_csv(filename, num_rows):
## Define column headers
headers = ['transaction_id', 'date', 'customer_id', 'product_id', 'amount', 'status']
## Create a file with the specified number of rows
with open(filename, 'w', newline='') as file:
writer = csv.writer(file)
writer.writerow(headers)
## Generate random data
start_date = datetime(2022, 1, 1)
status_options = ['completed', 'pending', 'failed', 'refunded']
for i in range(1, num_rows + 1):
## Generate random values
transaction_id = f"TXN-{i:08d}"
days_offset = random.randint(0, 365)
date = (start_date + timedelta(days=days_offset)).strftime('%Y-%m-%d')
customer_id = f"CUST-{random.randint(1001, 9999)}"
product_id = f"PROD-{random.randint(101, 999)}"
amount = round(random.uniform(10.0, 500.0), 2)
status = random.choice(status_options)
## Write row to CSV
writer.writerow([transaction_id, date, customer_id, product_id, amount, status])
## Print progress indicator for every 10,000 rows
if i % 10000 == 0:
print(f"Generated {i} rows...")
## Create a CSV file with 50,000 rows (about 5-10 MB)
create_large_csv('transactions.csv', 50000)
print("Large CSV file 'transactions.csv' has been created.")
Ejecute este script para crear el archivo:
python3 create_large_csv.py
Salida esperada:
Generated 10000 rows...
Generated 20000 rows...
Generated 30000 rows...
Generated 40000 rows...
Generated 50000 rows...
Large CSV file 'transactions.csv' has been created.
Puede verificar el tamaño del archivo con:
ls -lh transactions.csv
Salida esperada (el tamaño puede variar ligeramente):
-rw-r--r-- 1 labex labex 3.8M Apr 15 12:30 transactions.csv
Enfoque 1: Procesamiento Línea por Línea (Streaming)
El enfoque más eficiente en cuanto a memoria es procesar un archivo CSV línea por línea. Cree un archivo llamado streaming_example.py:
import csv
import time
def process_csv_streaming(filename):
print(f"Processing {filename} using streaming (line by line)...")
start_time = time.time()
## Track some statistics
row_count = 0
total_amount = 0
status_counts = {'completed': 0, 'pending': 0, 'failed': 0, 'refunded': 0}
## Process the file line by line
with open(filename, 'r') as file:
reader = csv.DictReader(file)
for row in reader:
## Increment row counter
row_count += 1
## Process row data
amount = float(row['amount'])
status = row['status']
## Update statistics
total_amount += amount
status_counts[status] += 1
## Calculate and display results
end_time = time.time()
processing_time = end_time - start_time
print(f"\nResults:")
print(f"Processed {row_count:,} rows in {processing_time:.2f} seconds")
print(f"Total transaction amount: ${total_amount:,.2f}")
print(f"Average transaction amount: ${total_amount/row_count:.2f}")
print("\nTransaction status breakdown:")
for status, count in status_counts.items():
percentage = (count / row_count) * 100
print(f" {status}: {count:,} ({percentage:.1f}%)")
## Process the file
process_csv_streaming('transactions.csv')
Ejecute este script:
python3 streaming_example.py
Salida esperada (sus números exactos pueden variar):
Processing transactions.csv using streaming (line by line)...
Results:
Processed 50,000 rows in 0.17 seconds
Total transaction amount: $12,739,853.35
Average transaction amount: $254.80
Transaction status breakdown:
completed: 12,432 (24.9%)
pending: 12,598 (25.2%)
failed: 12,414 (24.8%)
refunded: 12,556 (25.1%)
Enfoque 2: Procesamiento por Fragmentos (Chunked Processing)
Para operaciones más complejas o cuando necesita procesar datos en lotes, puede usar un enfoque fragmentado. Cree un archivo llamado chunked_example.py:
import csv
import time
def process_csv_chunked(filename, chunk_size=10000):
print(f"Processing {filename} using chunks of {chunk_size} rows...")
start_time = time.time()
## Track some statistics
row_count = 0
total_amount = 0
status_counts = {'completed': 0, 'pending': 0, 'failed': 0, 'refunded': 0}
## Process the file in chunks
with open(filename, 'r') as file:
reader = csv.DictReader(file)
chunk = []
for row in reader:
## Add row to current chunk
chunk.append(row)
## When chunk reaches desired size, process it
if len(chunk) >= chunk_size:
## Process the chunk
for row_data in chunk:
## Update statistics
row_count += 1
amount = float(row_data['amount'])
status = row_data['status']
total_amount += amount
status_counts[status] += 1
print(f"Processed chunk of {len(chunk)} rows... ({row_count:,} total)")
## Clear the chunk for next batch
chunk = []
## Process any remaining rows in the last chunk
if chunk:
for row_data in chunk:
row_count += 1
amount = float(row_data['amount'])
status = row_data['status']
total_amount += amount
status_counts[status] += 1
print(f"Processed final chunk of {len(chunk)} rows... ({row_count:,} total)")
## Calculate and display results
end_time = time.time()
processing_time = end_time - start_time
print(f"\nResults:")
print(f"Processed {row_count:,} rows in {processing_time:.2f} seconds")
print(f"Total transaction amount: ${total_amount:,.2f}")
print(f"Average transaction amount: ${total_amount/row_count:.2f}")
print("\nTransaction status breakdown:")
for status, count in status_counts.items():
percentage = (count / row_count) * 100
print(f" {status}: {count:,} ({percentage:.1f}%)")
## Process the file with chunks of 10,000 rows
process_csv_chunked('transactions.csv', chunk_size=10000)
Ejecute este script:
python3 chunked_example.py
Salida esperada:
Processing transactions.csv using chunks of 10000 rows...
Processed chunk of 10000 rows... (10,000 total)
Processed chunk of 10000 rows... (20,000 total)
Processed chunk of 10000 rows... (30,000 total)
Processed chunk of 10000 rows... (40,000 total)
Processed chunk of 10000 rows... (50,000 total)
Results:
Processed 50,000 rows in 0.20 seconds
Total transaction amount: $12,739,853.35
Average transaction amount: $254.80
Transaction status breakdown:
completed: 12,432 (24.9%)
pending: 12,598 (25.2%)
failed: 12,414 (24.8%)
refunded: 12,556 (25.1%)
Comparación del Uso de Memoria
Las diferencias clave entre estos enfoques son:
-
Streaming (línea por línea):
- Utiliza una memoria mínima independientemente del tamaño del archivo
- Mejor para archivos muy grandes
- Operaciones simples en cada fila
-
Procesamiento por fragmentos:
- Utiliza más memoria que el streaming pero sigue siendo eficiente
- Bueno para operaciones que necesitan procesar filas en lotes
- Proporciona actualizaciones de progreso durante el procesamiento
- Se puede combinar con el procesamiento multiproceso para el procesamiento en paralelo
Para la mayoría de los propósitos prácticos, se recomienda el enfoque de streaming a menos que necesite específicamente capacidades de procesamiento por lotes. Proporciona la mejor eficiencia de memoria manteniendo un buen rendimiento.
En el siguiente paso, exploraremos el uso de bibliotecas de terceros como pandas para capacidades de procesamiento de CSV aún más potentes.