Effizientes Verarbeiten großer CSV-Dateien
In realweltlichen Szenarien müssen Sie möglicherweise CSV-Dateien verarbeiten, die mehrere Gigabyte groß sind. Das vollständige Laden solcher Dateien in den Speicher kann dazu führen, dass Ihre Anwendung abstürzt oder sich erheblich verlangsamt. In diesem Schritt werden wir Techniken zur effizienten Verarbeitung großer CSV-Dateien untersuchen.
Die Speicherherausforderung bei großen Dateien
Bei der Arbeit mit CSV-Dateien gibt es drei gängige Ansätze, die jeweils unterschiedliche Speicheranforderungen haben:
- Laden der gesamten Datei in den Speicher - Einfach, aber verbraucht den meisten Speicher
- Streamen der Datei Zeile für Zeile - Verbraucht minimalen Speicher, kann aber für komplexe Operationen langsamer sein
- Chunking (Aufteilen in Blöcke) - Ein Mittelweg, der die Datei in handhabbaren Blöcken verarbeitet
Lassen Sie uns jeden dieser Ansätze anhand praktischer Beispiele untersuchen.
Erstellen einer größeren Beispieldatei
Erstellen wir zunächst eine größere Beispieldatei, um diese Techniken zu demonstrieren. Erstellen Sie eine neue Datei mit dem Namen create_large_csv.py
:
import csv
import random
from datetime import datetime, timedelta
def create_large_csv(filename, num_rows):
## Define column headers
headers = ['transaction_id', 'date', 'customer_id', 'product_id', 'amount', 'status']
## Create a file with the specified number of rows
with open(filename, 'w', newline='') as file:
writer = csv.writer(file)
writer.writerow(headers)
## Generate random data
start_date = datetime(2022, 1, 1)
status_options = ['completed', 'pending', 'failed', 'refunded']
for i in range(1, num_rows + 1):
## Generate random values
transaction_id = f"TXN-{i:08d}"
days_offset = random.randint(0, 365)
date = (start_date + timedelta(days=days_offset)).strftime('%Y-%m-%d')
customer_id = f"CUST-{random.randint(1001, 9999)}"
product_id = f"PROD-{random.randint(101, 999)}"
amount = round(random.uniform(10.0, 500.0), 2)
status = random.choice(status_options)
## Write row to CSV
writer.writerow([transaction_id, date, customer_id, product_id, amount, status])
## Print progress indicator for every 10,000 rows
if i % 10000 == 0:
print(f"Generated {i} rows...")
## Create a CSV file with 50,000 rows (about 5-10 MB)
create_large_csv('transactions.csv', 50000)
print("Large CSV file 'transactions.csv' has been created.")
Führen Sie dieses Skript aus, um die Datei zu erstellen:
python3 create_large_csv.py
Erwartete Ausgabe:
Generated 10000 rows...
Generated 20000 rows...
Generated 30000 rows...
Generated 40000 rows...
Generated 50000 rows...
Large CSV file 'transactions.csv' has been created.
Sie können die Größe der Datei mit folgendem Befehl überprüfen:
ls -lh transactions.csv
Erwartete Ausgabe (die Größe kann leicht variieren):
-rw-r--r-- 1 labex labex 3.8M Apr 15 12:30 transactions.csv
Ansatz 1: Zeilenweise Verarbeitung (Streaming)
Der speichereffizienteste Ansatz ist die zeilenweise Verarbeitung einer CSV-Datei. Erstellen Sie eine Datei mit dem Namen streaming_example.py
:
import csv
import time
def process_csv_streaming(filename):
print(f"Processing {filename} using streaming (line by line)...")
start_time = time.time()
## Track some statistics
row_count = 0
total_amount = 0
status_counts = {'completed': 0, 'pending': 0, 'failed': 0, 'refunded': 0}
## Process the file line by line
with open(filename, 'r') as file:
reader = csv.DictReader(file)
for row in reader:
## Increment row counter
row_count += 1
## Process row data
amount = float(row['amount'])
status = row['status']
## Update statistics
total_amount += amount
status_counts[status] += 1
## Calculate and display results
end_time = time.time()
processing_time = end_time - start_time
print(f"\nResults:")
print(f"Processed {row_count:,} rows in {processing_time:.2f} seconds")
print(f"Total transaction amount: ${total_amount:,.2f}")
print(f"Average transaction amount: ${total_amount/row_count:.2f}")
print("\nTransaction status breakdown:")
for status, count in status_counts.items():
percentage = (count / row_count) * 100
print(f" {status}: {count:,} ({percentage:.1f}%)")
## Process the file
process_csv_streaming('transactions.csv')
Führen Sie dieses Skript aus:
python3 streaming_example.py
Erwartete Ausgabe (Ihre genauen Zahlen können variieren):
Processing transactions.csv using streaming (line by line)...
Results:
Processed 50,000 rows in 0.17 seconds
Total transaction amount: $12,739,853.35
Average transaction amount: $254.80
Transaction status breakdown:
completed: 12,432 (24.9%)
pending: 12,598 (25.2%)
failed: 12,414 (24.8%)
refunded: 12,556 (25.1%)
Ansatz 2: Chunked Processing (Blockweise Verarbeitung)
Für komplexere Operationen oder wenn Sie Daten in Batches verarbeiten müssen, können Sie einen Chunked-Ansatz verwenden. Erstellen Sie eine Datei mit dem Namen chunked_example.py
:
import csv
import time
def process_csv_chunked(filename, chunk_size=10000):
print(f"Processing {filename} using chunks of {chunk_size} rows...")
start_time = time.time()
## Track some statistics
row_count = 0
total_amount = 0
status_counts = {'completed': 0, 'pending': 0, 'failed': 0, 'refunded': 0}
## Process the file in chunks
with open(filename, 'r') as file:
reader = csv.DictReader(file)
chunk = []
for row in reader:
## Add row to current chunk
chunk.append(row)
## When chunk reaches desired size, process it
if len(chunk) >= chunk_size:
## Process the chunk
for row_data in chunk:
## Update statistics
row_count += 1
amount = float(row_data['amount'])
status = row_data['status']
total_amount += amount
status_counts[status] += 1
print(f"Processed chunk of {len(chunk)} rows... ({row_count:,} total)")
## Clear the chunk for next batch
chunk = []
## Process any remaining rows in the last chunk
if chunk:
for row_data in chunk:
row_count += 1
amount = float(row_data['amount'])
status = row_data['status']
total_amount += amount
status_counts[status] += 1
print(f"Processed final chunk of {len(chunk)} rows... ({row_count:,} total)")
## Calculate and display results
end_time = time.time()
processing_time = end_time - start_time
print(f"\nResults:")
print(f"Processed {row_count:,} rows in {processing_time:.2f} seconds")
print(f"Total transaction amount: ${total_amount:,.2f}")
print(f"Average transaction amount: ${total_amount/row_count:.2f}")
print("\nTransaction status breakdown:")
for status, count in status_counts.items():
percentage = (count / row_count) * 100
print(f" {status}: {count:,} ({percentage:.1f}%)")
## Process the file with chunks of 10,000 rows
process_csv_chunked('transactions.csv', chunk_size=10000)
Führen Sie dieses Skript aus:
python3 chunked_example.py
Erwartete Ausgabe:
Processing transactions.csv using chunks of 10000 rows...
Processed chunk of 10000 rows... (10,000 total)
Processed chunk of 10000 rows... (20,000 total)
Processed chunk of 10000 rows... (30,000 total)
Processed chunk of 10000 rows... (40,000 total)
Processed chunk of 10000 rows... (50,000 total)
Results:
Processed 50,000 rows in 0.20 seconds
Total transaction amount: $12,739,853.35
Average transaction amount: $254.80
Transaction status breakdown:
completed: 12,432 (24.9%)
pending: 12,598 (25.2%)
failed: 12,414 (24.8%)
refunded: 12,556 (25.1%)
Speichervergleich
Die wesentlichen Unterschiede zwischen diesen Ansätzen sind:
-
Streaming (Zeile für Zeile):
- Verwendet unabhängig von der Dateigröße minimalen Speicher
- Am besten für sehr große Dateien geeignet
- Einfache Operationen für jede Zeile
-
Chunked Processing (Blockweise Verarbeitung):
- Verwendet mehr Speicher als Streaming, ist aber immer noch effizient
- Gut für Operationen, die Zeilen in Batches verarbeiten müssen
- Bietet Fortschrittsaktualisierungen während der Verarbeitung
- Kann für die parallele Verarbeitung mit Multiprocessing kombiniert werden
Für die meisten praktischen Zwecke wird der Streaming-Ansatz empfohlen, es sei denn, Sie benötigen explizit Batch-Verarbeitungsfunktionen. Er bietet die beste Speichereffizienz bei gleichzeitig guter Leistung.
Im nächsten Schritt werden wir die Verwendung von Bibliotheken von Drittanbietern wie Pandas für noch leistungsfähigere CSV-Verarbeitungsfunktionen untersuchen.