Traitement efficace des fichiers CSV volumineux
Dans des scénarios réels, vous devrez peut-être traiter des fichiers CSV de plusieurs gigaoctets. Le chargement de ces fichiers entièrement en mémoire peut entraîner le plantage de votre application ou un ralentissement important. Dans cette étape, nous allons explorer des techniques pour traiter efficacement les fichiers CSV volumineux.
Le défi de la mémoire avec les fichiers volumineux
Lorsque vous travaillez avec des fichiers CSV, il existe trois approches courantes, chacune ayant des exigences de mémoire différentes :
- Chargement de l'intégralité du fichier en mémoire - Simple mais utilise le plus de mémoire
- Streaming du fichier ligne par ligne - Utilise un minimum de mémoire mais peut être plus lent pour les opérations complexes
- Chunking (traitement par blocs) - Un entre-deux qui traite le fichier par blocs gérables
Explorons chacune de ces approches avec des exemples pratiques.
Création d'un fichier d'exemple plus grand
Tout d'abord, créons un fichier d'exemple plus grand pour démontrer ces techniques. Créez un nouveau fichier nommé create_large_csv.py :
import csv
import random
from datetime import datetime, timedelta
def create_large_csv(filename, num_rows):
## Define column headers
headers = ['transaction_id', 'date', 'customer_id', 'product_id', 'amount', 'status']
## Create a file with the specified number of rows
with open(filename, 'w', newline='') as file:
writer = csv.writer(file)
writer.writerow(headers)
## Generate random data
start_date = datetime(2022, 1, 1)
status_options = ['completed', 'pending', 'failed', 'refunded']
for i in range(1, num_rows + 1):
## Generate random values
transaction_id = f"TXN-{i:08d}"
days_offset = random.randint(0, 365)
date = (start_date + timedelta(days=days_offset)).strftime('%Y-%m-%d')
customer_id = f"CUST-{random.randint(1001, 9999)}"
product_id = f"PROD-{random.randint(101, 999)}"
amount = round(random.uniform(10.0, 500.0), 2)
status = random.choice(status_options)
## Write row to CSV
writer.writerow([transaction_id, date, customer_id, product_id, amount, status])
## Print progress indicator for every 10,000 rows
if i % 10000 == 0:
print(f"Generated {i} rows...")
## Create a CSV file with 50,000 rows (about 5-10 MB)
create_large_csv('transactions.csv', 50000)
print("Large CSV file 'transactions.csv' has been created.")
Exécutez ce script pour créer le fichier :
python3 create_large_csv.py
Sortie attendue :
Generated 10000 rows...
Generated 20000 rows...
Generated 30000 rows...
Generated 40000 rows...
Generated 50000 rows...
Large CSV file 'transactions.csv' has been created.
Vous pouvez vérifier la taille du fichier avec :
ls -lh transactions.csv
Sortie attendue (la taille peut varier légèrement) :
-rw-r--r-- 1 labex labex 3.8M Apr 15 12:30 transactions.csv
Approche 1 : Traitement ligne par ligne (Streaming)
L'approche la plus économe en mémoire consiste à traiter un fichier CSV ligne par ligne. Créez un fichier nommé streaming_example.py :
import csv
import time
def process_csv_streaming(filename):
print(f"Processing {filename} using streaming (line by line)...")
start_time = time.time()
## Track some statistics
row_count = 0
total_amount = 0
status_counts = {'completed': 0, 'pending': 0, 'failed': 0, 'refunded': 0}
## Process the file line by line
with open(filename, 'r') as file:
reader = csv.DictReader(file)
for row in reader:
## Increment row counter
row_count += 1
## Process row data
amount = float(row['amount'])
status = row['status']
## Update statistics
total_amount += amount
status_counts[status] += 1
## Calculate and display results
end_time = time.time()
processing_time = end_time - start_time
print(f"\nResults:")
print(f"Processed {row_count:,} rows in {processing_time:.2f} seconds")
print(f"Total transaction amount: ${total_amount:,.2f}")
print(f"Average transaction amount: ${total_amount/row_count:.2f}")
print("\nTransaction status breakdown:")
for status, count in status_counts.items():
percentage = (count / row_count) * 100
print(f" {status}: {count:,} ({percentage:.1f}%)")
## Process the file
process_csv_streaming('transactions.csv')
Exécutez ce script :
python3 streaming_example.py
Sortie attendue (vos chiffres exacts peuvent varier) :
Processing transactions.csv using streaming (line by line)...
Results:
Processed 50,000 rows in 0.17 seconds
Total transaction amount: $12,739,853.35
Average transaction amount: $254.80
Transaction status breakdown:
completed: 12,432 (24.9%)
pending: 12,598 (25.2%)
failed: 12,414 (24.8%)
refunded: 12,556 (25.1%)
Approche 2 : Traitement par blocs (Chunked Processing)
Pour des opérations plus complexes ou lorsque vous devez traiter les données par lots, vous pouvez utiliser une approche par blocs. Créez un fichier nommé chunked_example.py :
import csv
import time
def process_csv_chunked(filename, chunk_size=10000):
print(f"Processing {filename} using chunks of {chunk_size} rows...")
start_time = time.time()
## Track some statistics
row_count = 0
total_amount = 0
status_counts = {'completed': 0, 'pending': 0, 'failed': 0, 'refunded': 0}
## Process the file in chunks
with open(filename, 'r') as file:
reader = csv.DictReader(file)
chunk = []
for row in reader:
## Add row to current chunk
chunk.append(row)
## When chunk reaches desired size, process it
if len(chunk) >= chunk_size:
## Process the chunk
for row_data in chunk:
## Update statistics
row_count += 1
amount = float(row_data['amount'])
status = row_data['status']
total_amount += amount
status_counts[status] += 1
print(f"Processed chunk of {len(chunk)} rows... ({row_count:,} total)")
## Clear the chunk for next batch
chunk = []
## Process any remaining rows in the last chunk
if chunk:
for row_data in chunk:
row_count += 1
amount = float(row_data['amount'])
status = row_data['status']
total_amount += amount
status_counts[status] += 1
print(f"Processed final chunk of {len(chunk)} rows... ({row_count:,} total)")
## Calculate and display results
end_time = time.time()
processing_time = end_time - start_time
print(f"\nResults:")
print(f"Processed {row_count:,} rows in {processing_time:.2f} seconds")
print(f"Total transaction amount: ${total_amount:,.2f}")
print(f"Average transaction amount: ${total_amount/row_count:.2f}")
print("\nTransaction status breakdown:")
for status, count in status_counts.items():
percentage = (count / row_count) * 100
print(f" {status}: {count:,} ({percentage:.1f}%)")
## Process the file with chunks of 10,000 rows
process_csv_chunked('transactions.csv', chunk_size=10000)
Exécutez ce script :
python3 chunked_example.py
Sortie attendue :
Processing transactions.csv using chunks of 10000 rows...
Processed chunk of 10000 rows... (10,000 total)
Processed chunk of 10000 rows... (20,000 total)
Processed chunk of 10000 rows... (30,000 total)
Processed chunk of 10000 rows... (40,000 total)
Processed chunk of 10000 rows... (50,000 total)
Results:
Processed 50,000 rows in 0.20 seconds
Total transaction amount: $12,739,853.35
Average transaction amount: $254.80
Transaction status breakdown:
completed: 12,432 (24.9%)
pending: 12,598 (25.2%)
failed: 12,414 (24.8%)
refunded: 12,556 (25.1%)
Comparaison de l'utilisation de la mémoire
Les principales différences entre ces approches sont les suivantes :
-
Streaming (ligne par ligne) :
- Utilise un minimum de mémoire quelle que soit la taille du fichier
- Idéal pour les très grands fichiers
- Opérations simples sur chaque ligne
-
Traitement par blocs :
- Utilise plus de mémoire que le streaming, mais reste efficace
- Bon pour les opérations qui doivent traiter les lignes par lots
- Fournit des mises à jour de progression pendant le traitement
- Peut être combiné avec le traitement en parallèle (multiprocessing)
Dans la plupart des cas pratiques, l'approche de streaming est recommandée, sauf si vous avez spécifiquement besoin de capacités de traitement par lots. Elle offre la meilleure efficacité mémoire tout en maintenant de bonnes performances.
Dans l'étape suivante, nous explorerons l'utilisation de bibliothèques tierces comme pandas pour des capacités de traitement CSV encore plus puissantes.