¿Cómo procesar archivos CSV grandes de manera eficiente en Python?

PythonBeginner
Practicar Ahora

Introducción

El manejo de archivos CSV grandes es un desafío común para los desarrolladores de Python. Este tutorial le guiará a través de técnicas eficientes para procesar estos archivos de manera efectiva, centrándose en la optimización del rendimiento y el uso de la memoria. Al final de este laboratorio, estará equipado con conocimientos prácticos para abordar tareas de procesamiento de CSV intensivas en datos en Python.

Ya sea que esté analizando datos de clientes, procesando registros financieros o trabajando con cualquier tipo de datos estructurados, las habilidades aprendidas en este laboratorio le ayudarán a procesar grandes conjuntos de datos de manera eficiente sin tener problemas de memoria.

Creación y Lectura de un Archivo CSV Simple

CSV (Valores Separados por Comas) es un formato de archivo popular utilizado para almacenar datos tabulares. En este paso, crearemos un archivo CSV simple y aprenderemos a leerlo utilizando el módulo csv incorporado de Python.

Entendiendo los Archivos CSV

Un archivo CSV almacena datos en un formato de texto plano donde:

  • Cada línea representa una fila de datos
  • Los valores dentro de cada fila están separados por un delimitador (típicamente una coma)
  • La primera fila a menudo contiene encabezados de columna

Comencemos creando un archivo CSV simple con el que trabajar.

Creación de un Archivo CSV

Primero, creemos un directorio para trabajar y luego crearemos un archivo CSV simple:

  1. Abra la terminal en el WebIDE
  2. Cree un nuevo archivo Python llamado csv_basics.py en el editor

Ahora, agregue el siguiente código a csv_basics.py:

import csv

## Data to write to CSV
data = [
    ['Name', 'Age', 'City'],              ## Header row
    ['John Smith', '28', 'New York'],
    ['Sarah Johnson', '32', 'San Francisco'],
    ['Michael Brown', '45', 'Chicago'],
    ['Emily Davis', '36', 'Boston'],
    ['David Wilson', '52', 'Seattle']
]

## Writing data to a CSV file
with open('sample_data.csv', 'w', newline='') as file:
    writer = csv.writer(file)
    writer.writerows(data)

print("CSV file 'sample_data.csv' created successfully.")

Ejecute este código ejecutando el siguiente comando en la terminal:

python3 csv_basics.py

Salida esperada:

CSV file 'sample_data.csv' created successfully.

Esto creará un nuevo archivo CSV llamado sample_data.csv en su directorio actual. Puede ver el contenido de este archivo ejecutando:

cat sample_data.csv

Salida esperada:

Name,Age,City
John Smith,28,New York
Sarah Johnson,32,San Francisco
Michael Brown,45,Chicago
Emily Davis,36,Boston
David Wilson,52,Seattle

Lectura de un Archivo CSV

Ahora, leamos el archivo CSV que acabamos de crear. Cree un nuevo archivo llamado read_csv.py con el siguiente código:

import csv

## Reading a CSV file
with open('sample_data.csv', 'r') as file:
    reader = csv.reader(file)

    print("Contents of sample_data.csv:")
    print("--------------------------")

    for row in reader:
        print(row)

## Reading and accessing specific columns
print("\nReading specific columns:")
print("--------------------------")

with open('sample_data.csv', 'r') as file:
    reader = csv.reader(file)
    headers = next(reader)  ## Skip the header row

    for row in reader:
        name = row[0]
        age = row[1]
        city = row[2]
        print(f"Name: {name}, Age: {age}, City: {city}")

Ejecute este código con:

python3 read_csv.py

Salida esperada:

Contents of sample_data.csv:
--------------------------
['Name', 'Age', 'City']
['John Smith', '28', 'New York']
['Sarah Johnson', '32', 'San Francisco']
['Michael Brown', '45', 'Chicago']
['Emily Davis', '36', 'Boston']
['David Wilson', '52', 'Seattle']

Reading specific columns:
--------------------------
Name: John Smith, Age: 28, City: New York
Name: Sarah Johnson, Age: 32, City: San Francisco
Name: Michael Brown, Age: 45, City: Chicago
Name: Emily Davis, Age: 36, City: Boston
Name: David Wilson, Age: 52, City: Seattle

Entendiendo el Módulo CSV

El módulo csv de Python proporciona dos clases principales:

  • csv.reader: Lee archivos CSV y devuelve cada fila como una lista de cadenas
  • csv.writer: Escribe datos en archivos CSV

Este módulo maneja todas las complejidades de tratar con diferentes formatos CSV, como escapar caracteres especiales y manejar comillas.

En este paso, ha aprendido a crear y leer un archivo CSV simple. En el siguiente paso, exploraremos formas más eficientes de manejar archivos CSV más grandes.

Uso de DictReader para el Procesamiento Conveniente de CSV

En el paso anterior, trabajamos con las funciones básicas csv.reader y csv.writer. Ahora, exploremos una forma más conveniente de procesar archivos CSV utilizando la clase csv.DictReader, que es especialmente útil cuando se trabaja con datos que tienen encabezados de columna.

¿Qué es DictReader?

csv.DictReader lee archivos CSV y devuelve cada fila como un diccionario donde:

  • Las claves se toman de los encabezados de columna (la primera fila del archivo CSV por defecto)
  • Los valores son los datos correspondientes de cada fila

Este enfoque hace que su código sea más legible y menos propenso a errores porque puede hacer referencia a las columnas por nombre en lugar de por índice.

Crear un Archivo de Prueba Más Grande

Primero, creemos un archivo CSV un poco más grande para demostrar los beneficios de DictReader. Cree un nuevo archivo llamado create_users_data.py con el siguiente código:

import csv
import random

## Generate some sample user data
def generate_users(count):
    users = [['id', 'name', 'email', 'age', 'country']]  ## Header row

    domains = ['gmail.com', 'yahoo.com', 'outlook.com', 'example.com']
    countries = ['USA', 'Canada', 'UK', 'Australia', 'Germany', 'France', 'Japan', 'Brazil']
    first_names = ['John', 'Jane', 'Michael', 'Emily', 'David', 'Sarah', 'Robert', 'Lisa']
    last_names = ['Smith', 'Johnson', 'Brown', 'Davis', 'Wilson', 'Miller', 'Jones', 'Taylor']

    for i in range(1, count + 1):
        first_name = random.choice(first_names)
        last_name = random.choice(last_names)
        name = f"{first_name} {last_name}"
        email = f"{first_name.lower()}.{last_name.lower()}@{random.choice(domains)}"
        age = random.randint(18, 65)
        country = random.choice(countries)

        users.append([str(i), name, email, str(age), country])

    return users

## Create a CSV file with 100 users
users_data = generate_users(100)

## Write data to CSV file
with open('users_data.csv', 'w', newline='') as file:
    writer = csv.writer(file)
    writer.writerows(users_data)

print(f"Created 'users_data.csv' with 100 user records")

Ejecute el script para crear el archivo:

python3 create_users_data.py

Salida esperada:

Created 'users_data.csv' with 100 user records

Examinemos las primeras líneas de este nuevo archivo:

head -n 5 users_data.csv

Debería ver la fila de encabezado seguida de 4 filas de datos:

id,name,email,age,country
1,John Smith,john.smith@gmail.com,25,USA
2,Emily Brown,emily.brown@yahoo.com,32,Canada
3,David Jones,david.jones@outlook.com,45,UK
4,Sarah Wilson,sarah.wilson@example.com,28,Australia

Uso de DictReader para Procesar el Archivo CSV

Ahora, creemos un script para procesar este archivo usando DictReader. Cree un nuevo archivo llamado dict_reader_example.py con el siguiente código:

import csv

## Read the CSV file using DictReader
with open('users_data.csv', 'r') as file:
    csv_reader = csv.DictReader(file)

    ## Print the field names (column headers)
    print(f"Column headers: {csv_reader.fieldnames}")
    print("\nFirst 5 records:")
    print("-----------------")

    ## Print the first 5 records
    for i, row in enumerate(csv_reader):
        if i < 5:
            ## Access fields by name
            print(f"User {row['id']}: {row['name']}, {row['age']} years old, from {row['country']}")
            print(f"  Email: {row['email']}")
        else:
            break

## Using DictReader for data analysis
with open('users_data.csv', 'r') as file:
    csv_reader = csv.DictReader(file)

    ## Calculate average age
    total_age = 0
    user_count = 0

    ## Count users by country
    countries = {}

    for row in csv_reader:
        user_count += 1
        total_age += int(row['age'])

        ## Count users by country
        country = row['country']
        if country in countries:
            countries[country] += 1
        else:
            countries[country] = 1

    avg_age = total_age / user_count if user_count > 0 else 0

    print("\nData Analysis:")
    print("--------------")
    print(f"Total users: {user_count}")
    print(f"Average age: {avg_age:.2f} years")
    print("\nUsers by country:")

    for country, count in sorted(countries.items(), key=lambda x: x[1], reverse=True):
        print(f"  {country}: {count} users")

Ejecute este script:

python3 dict_reader_example.py

Salida esperada (sus valores exactos pueden variar ya que los datos se generan aleatoriamente):

Column headers: ['id', 'name', 'email', 'age', 'country']

First 5 records:
-----------------
User 1: John Smith, 25 years old, from USA
  Email: john.smith@gmail.com
User 2: Emily Brown, 32 years old, from Canada
  Email: emily.brown@yahoo.com
User 3: David Jones, 45 years old, from UK
  Email: david.jones@outlook.com
User 4: Sarah Wilson, 28 years old, from Australia
  Email: sarah.wilson@example.com
User 5: Michael Taylor, 37 years old, from Germany
  Email: michael.taylor@example.com

Data Analysis:
--------------
Total users: 100
Average age: 41.35 years

Users by country:
  USA: 16 users
  Canada: 14 users
  Japan: 13 users
  UK: 12 users
  Germany: 12 users
  Australia: 12 users
  France: 11 users
  Brazil: 10 users

Beneficios de Usar DictReader

Como puede ver, usar DictReader proporciona varias ventajas:

  1. Código legible: Puede acceder a los campos por nombre en lugar de recordar las posiciones de los índices
  2. Autodocumentado: El código muestra claramente a qué campo está accediendo
  3. Flexibilidad: Si el orden de las columnas cambia en el archivo CSV, su código aún funciona siempre que los nombres de las columnas sigan siendo los mismos

Este enfoque es particularmente útil cuando se trabaja con datos del mundo real que tienen muchas columnas o cuando el orden de las columnas podría cambiar con el tiempo.

En el siguiente paso, exploraremos técnicas eficientes para procesar archivos CSV más grandes sin cargar todo en la memoria a la vez.

Procesamiento Eficiente de Archivos CSV Grandes

En escenarios del mundo real, es posible que deba procesar archivos CSV que tengan varios gigabytes de tamaño. Cargar estos archivos por completo en la memoria puede hacer que su aplicación se bloquee o se ralentice significativamente. En este paso, exploraremos técnicas para procesar archivos CSV grandes de manera eficiente.

El Desafío de la Memoria con Archivos Grandes

Cuando se trabaja con archivos CSV, existen tres enfoques comunes, cada uno con diferentes requisitos de memoria:

  1. Cargar todo el archivo en la memoria: Simple pero utiliza la mayor cantidad de memoria
  2. Transmitir el archivo línea por línea: Utiliza una memoria mínima pero puede ser más lento para operaciones complejas
  3. Fragmentación (Chunking): Un punto intermedio que procesa el archivo en fragmentos manejables

Exploremos cada uno de estos enfoques con ejemplos prácticos.

Creación de un Archivo de Muestra Más Grande

Primero, creemos un archivo de muestra más grande para demostrar estas técnicas. Cree un nuevo archivo llamado create_large_csv.py:

import csv
import random
from datetime import datetime, timedelta

def create_large_csv(filename, num_rows):
    ## Define column headers
    headers = ['transaction_id', 'date', 'customer_id', 'product_id', 'amount', 'status']

    ## Create a file with the specified number of rows
    with open(filename, 'w', newline='') as file:
        writer = csv.writer(file)
        writer.writerow(headers)

        ## Generate random data
        start_date = datetime(2022, 1, 1)
        status_options = ['completed', 'pending', 'failed', 'refunded']

        for i in range(1, num_rows + 1):
            ## Generate random values
            transaction_id = f"TXN-{i:08d}"
            days_offset = random.randint(0, 365)
            date = (start_date + timedelta(days=days_offset)).strftime('%Y-%m-%d')
            customer_id = f"CUST-{random.randint(1001, 9999)}"
            product_id = f"PROD-{random.randint(101, 999)}"
            amount = round(random.uniform(10.0, 500.0), 2)
            status = random.choice(status_options)

            ## Write row to CSV
            writer.writerow([transaction_id, date, customer_id, product_id, amount, status])

            ## Print progress indicator for every 10,000 rows
            if i % 10000 == 0:
                print(f"Generated {i} rows...")

## Create a CSV file with 50,000 rows (about 5-10 MB)
create_large_csv('transactions.csv', 50000)
print("Large CSV file 'transactions.csv' has been created.")

Ejecute este script para crear el archivo:

python3 create_large_csv.py

Salida esperada:

Generated 10000 rows...
Generated 20000 rows...
Generated 30000 rows...
Generated 40000 rows...
Generated 50000 rows...
Large CSV file 'transactions.csv' has been created.

Puede verificar el tamaño del archivo con:

ls -lh transactions.csv

Salida esperada (el tamaño puede variar ligeramente):

-rw-r--r-- 1 labex labex 3.8M Apr 15 12:30 transactions.csv

Enfoque 1: Procesamiento Línea por Línea (Streaming)

El enfoque más eficiente en cuanto a memoria es procesar un archivo CSV línea por línea. Cree un archivo llamado streaming_example.py:

import csv
import time

def process_csv_streaming(filename):
    print(f"Processing {filename} using streaming (line by line)...")
    start_time = time.time()

    ## Track some statistics
    row_count = 0
    total_amount = 0
    status_counts = {'completed': 0, 'pending': 0, 'failed': 0, 'refunded': 0}

    ## Process the file line by line
    with open(filename, 'r') as file:
        reader = csv.DictReader(file)

        for row in reader:
            ## Increment row counter
            row_count += 1

            ## Process row data
            amount = float(row['amount'])
            status = row['status']

            ## Update statistics
            total_amount += amount
            status_counts[status] += 1

    ## Calculate and display results
    end_time = time.time()
    processing_time = end_time - start_time

    print(f"\nResults:")
    print(f"Processed {row_count:,} rows in {processing_time:.2f} seconds")
    print(f"Total transaction amount: ${total_amount:,.2f}")
    print(f"Average transaction amount: ${total_amount/row_count:.2f}")
    print("\nTransaction status breakdown:")
    for status, count in status_counts.items():
        percentage = (count / row_count) * 100
        print(f"  {status}: {count:,} ({percentage:.1f}%)")

## Process the file
process_csv_streaming('transactions.csv')

Ejecute este script:

python3 streaming_example.py

Salida esperada (sus números exactos pueden variar):

Processing transactions.csv using streaming (line by line)...

Results:
Processed 50,000 rows in 0.17 seconds
Total transaction amount: $12,739,853.35
Average transaction amount: $254.80

Transaction status breakdown:
  completed: 12,432 (24.9%)
  pending: 12,598 (25.2%)
  failed: 12,414 (24.8%)
  refunded: 12,556 (25.1%)

Enfoque 2: Procesamiento por Fragmentos (Chunked Processing)

Para operaciones más complejas o cuando necesita procesar datos en lotes, puede usar un enfoque fragmentado. Cree un archivo llamado chunked_example.py:

import csv
import time

def process_csv_chunked(filename, chunk_size=10000):
    print(f"Processing {filename} using chunks of {chunk_size} rows...")
    start_time = time.time()

    ## Track some statistics
    row_count = 0
    total_amount = 0
    status_counts = {'completed': 0, 'pending': 0, 'failed': 0, 'refunded': 0}

    ## Process the file in chunks
    with open(filename, 'r') as file:
        reader = csv.DictReader(file)

        chunk = []
        for row in reader:
            ## Add row to current chunk
            chunk.append(row)

            ## When chunk reaches desired size, process it
            if len(chunk) >= chunk_size:
                ## Process the chunk
                for row_data in chunk:
                    ## Update statistics
                    row_count += 1
                    amount = float(row_data['amount'])
                    status = row_data['status']

                    total_amount += amount
                    status_counts[status] += 1

                print(f"Processed chunk of {len(chunk)} rows... ({row_count:,} total)")
                ## Clear the chunk for next batch
                chunk = []

        ## Process any remaining rows in the last chunk
        if chunk:
            for row_data in chunk:
                row_count += 1
                amount = float(row_data['amount'])
                status = row_data['status']

                total_amount += amount
                status_counts[status] += 1

            print(f"Processed final chunk of {len(chunk)} rows... ({row_count:,} total)")

    ## Calculate and display results
    end_time = time.time()
    processing_time = end_time - start_time

    print(f"\nResults:")
    print(f"Processed {row_count:,} rows in {processing_time:.2f} seconds")
    print(f"Total transaction amount: ${total_amount:,.2f}")
    print(f"Average transaction amount: ${total_amount/row_count:.2f}")
    print("\nTransaction status breakdown:")
    for status, count in status_counts.items():
        percentage = (count / row_count) * 100
        print(f"  {status}: {count:,} ({percentage:.1f}%)")

## Process the file with chunks of 10,000 rows
process_csv_chunked('transactions.csv', chunk_size=10000)

Ejecute este script:

python3 chunked_example.py

Salida esperada:

Processing transactions.csv using chunks of 10000 rows...
Processed chunk of 10000 rows... (10,000 total)
Processed chunk of 10000 rows... (20,000 total)
Processed chunk of 10000 rows... (30,000 total)
Processed chunk of 10000 rows... (40,000 total)
Processed chunk of 10000 rows... (50,000 total)

Results:
Processed 50,000 rows in 0.20 seconds
Total transaction amount: $12,739,853.35
Average transaction amount: $254.80

Transaction status breakdown:
  completed: 12,432 (24.9%)
  pending: 12,598 (25.2%)
  failed: 12,414 (24.8%)
  refunded: 12,556 (25.1%)

Comparación del Uso de Memoria

Las diferencias clave entre estos enfoques son:

  1. Streaming (línea por línea):

    • Utiliza una memoria mínima independientemente del tamaño del archivo
    • Mejor para archivos muy grandes
    • Operaciones simples en cada fila
  2. Procesamiento por fragmentos:

    • Utiliza más memoria que el streaming pero sigue siendo eficiente
    • Bueno para operaciones que necesitan procesar filas en lotes
    • Proporciona actualizaciones de progreso durante el procesamiento
    • Se puede combinar con el procesamiento multiproceso para el procesamiento en paralelo

Para la mayoría de los propósitos prácticos, se recomienda el enfoque de streaming a menos que necesite específicamente capacidades de procesamiento por lotes. Proporciona la mejor eficiencia de memoria manteniendo un buen rendimiento.

En el siguiente paso, exploraremos el uso de bibliotecas de terceros como pandas para capacidades de procesamiento de CSV aún más potentes.

Uso de Pandas para el Procesamiento Avanzado de CSV

Si bien el módulo csv incorporado de Python es potente, la biblioteca pandas ofrece una funcionalidad más avanzada para el análisis y la manipulación de datos. En este paso, exploraremos cómo usar pandas para el procesamiento eficiente de CSV.

Instalación de Pandas

Primero, instalemos la biblioteca pandas:

pip install pandas

Salida esperada:

Collecting pandas
  Downloading pandas-2.0.0-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (12.3 MB)
     ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 12.3/12.3 MB 42.6 MB/s eta 0:00:00
...
Successfully installed pandas-2.0.0 numpy-1.24.3 python-dateutil-2.8.2 pytz-2023.3 tzdata-2023.3

Lectura de Archivos CSV con Pandas

Pandas facilita la lectura, el análisis y la manipulación de datos CSV. Cree un archivo llamado pandas_basic.py:

import pandas as pd
import time

def process_with_pandas(filename):
    print(f"Processing {filename} with pandas...")
    start_time = time.time()

    ## Read the CSV file into a DataFrame
    df = pd.read_csv(filename)

    ## Display basic information
    print(f"\nDataFrame Info:")
    print(f"Shape: {df.shape} (rows, columns)")
    print(f"Column names: {', '.join(df.columns)}")

    ## Display the first 5 rows
    print("\nFirst 5 rows:")
    print(df.head())

    ## Basic statistics
    print("\nSummary statistics for numeric columns:")
    print(df.describe())

    ## Group by analysis
    print("\nTransaction counts by status:")
    status_counts = df['status'].value_counts()
    print(status_counts)

    ## Calculate average amount by status
    print("\nAverage transaction amount by status:")
    avg_by_status = df.groupby('status')['amount'].mean()
    print(avg_by_status)

    ## Calculate total amount by date (first 5 dates)
    print("\nTotal transaction amount by date (first 5 dates):")
    total_by_date = df.groupby('date')['amount'].sum().sort_values(ascending=False).head(5)
    print(total_by_date)

    end_time = time.time()
    print(f"\nProcessed in {end_time - start_time:.2f} seconds")

## Process the transactions file
process_with_pandas('transactions.csv')

Ejecute este script:

python3 pandas_basic.py

Salida esperada (truncada para abreviar):

Processing transactions.csv with pandas...

DataFrame Info:
Shape: (50000, 6) (rows, columns)
Column names: transaction_id, date, customer_id, product_id, amount, status

First 5 rows:
  transaction_id        date customer_id product_id   amount    status
0    TXN-00000001  2022-12-19   CUST-5421   PROD-383  385.75  refunded
1    TXN-00000002  2022-02-01   CUST-7078   PROD-442  286.83  completed
2    TXN-00000003  2022-12-24   CUST-2356   PROD-701  364.87    failed
3    TXN-00000004  2022-04-09   CUST-3458   PROD-854  247.73   pending
4    TXN-00000005  2022-03-07   CUST-6977   PROD-307  298.69  completed

Summary statistics for numeric columns:
              amount
count  50000.000000
mean     254.797067
std      141.389125
min       10.010000
25%      127.732500
50%      254.865000
75%      381.387500
max      499.990000

Transaction counts by status:
pending     12598
refunded    12556
completed   12432
failed      12414
Name: status, dtype: int64

Average transaction amount by status:
status
completed    255.028733
failed       254.709444
pending      254.690785
refunded     254.760390
Name: amount, dtype: float64

Total transaction amount by date (first 5 dates):
date
2022-01-20    38883.19
2022-08-30    38542.49
2022-03-10    38331.67
2022-11-29    38103.61
2022-06-24    37954.87
Name: amount, dtype: float64

Processed in 0.11 seconds

Procesamiento de Archivos CSV Grandes con Pandas

Si bien pandas es potente, puede consumir mucha memoria al cargar archivos CSV grandes. Para archivos muy grandes, puede usar el parámetro chunksize para leer el archivo en fragmentos. Cree un archivo llamado pandas_chunked.py:

import pandas as pd
import time

def process_large_csv_with_pandas(filename, chunk_size=10000):
    print(f"Processing {filename} with pandas using chunks of {chunk_size} rows...")
    start_time = time.time()

    ## Initialize variables to store aggregated results
    total_rows = 0
    total_amount = 0
    status_counts = {'completed': 0, 'pending': 0, 'failed': 0, 'refunded': 0}
    daily_totals = {}

    ## Process the file in chunks
    for chunk_num, chunk in enumerate(pd.read_csv(filename, chunksize=chunk_size)):
        ## Update row count
        chunk_rows = len(chunk)
        total_rows += chunk_rows

        ## Update total amount
        chunk_amount = chunk['amount'].sum()
        total_amount += chunk_amount

        ## Update status counts
        for status, count in chunk['status'].value_counts().items():
            status_counts[status] += count

        ## Update daily totals
        for date, group in chunk.groupby('date'):
            if date in daily_totals:
                daily_totals[date] += group['amount'].sum()
            else:
                daily_totals[date] = group['amount'].sum()

        print(f"Processed chunk {chunk_num + 1} ({total_rows:,} rows so far)")

    ## Calculate results
    end_time = time.time()
    processing_time = end_time - start_time

    print(f"\nResults after processing {total_rows:,} rows in {processing_time:.2f} seconds:")
    print(f"Total transaction amount: ${total_amount:,.2f}")
    print(f"Average transaction amount: ${total_amount/total_rows:.2f}")

    print("\nTransaction status breakdown:")
    for status, count in status_counts.items():
        percentage = (count / total_rows) * 100
        print(f"  {status}: {count:,} ({percentage:.1f}%)")

    ## Show top 5 days by transaction amount
    print("\nTop 5 days by transaction amount:")
    top_days = sorted(daily_totals.items(), key=lambda x: x[1], reverse=True)[:5]
    for date, amount in top_days:
        print(f"  {date}: ${amount:,.2f}")

## Process the transactions file with chunks
process_large_csv_with_pandas('transactions.csv', chunk_size=10000)

Ejecute este script:

python3 pandas_chunked.py

Salida esperada:

Processing transactions.csv with pandas using chunks of 10000 rows...
Processed chunk 1 (10,000 rows so far)
Processed chunk 2 (20,000 rows so far)
Processed chunk 3 (30,000 rows so far)
Processed chunk 4 (40,000 rows so far)
Processed chunk 5 (50,000 rows so far)

Results after processing 50,000 rows in 0.34 seconds:
Total transaction amount: $12,739,853.35
Average transaction amount: $254.80

Transaction status breakdown:
  completed: 12,432 (24.9%)
  pending: 12,598 (25.2%)
  failed: 12,414 (24.8%)
  refunded: 12,556 (25.1%)

Top 5 days by transaction amount:
  2022-01-20: $38,883.19
  2022-08-30: $38,542.49
  2022-03-10: $38,331.67
  2022-11-29: $38,103.61
  2022-06-24: $37,954.87

Filtrado y Transformación de Datos con Pandas

Uno de los grandes beneficios de pandas es la capacidad de filtrar y transformar datos fácilmente. Cree un archivo llamado pandas_filter.py:

import pandas as pd
import time

def filter_and_transform(filename):
    print(f"Filtering and transforming data from {filename}...")
    start_time = time.time()

    ## Read the CSV file
    df = pd.read_csv(filename)

    ## 1. Filter: Get only completed transactions
    completed = df[df['status'] == 'completed']
    print(f"Number of completed transactions: {len(completed)}")

    ## 2. Filter: Get high-value transactions (over $400)
    high_value = df[df['amount'] > 400]
    print(f"Number of high-value transactions (>${400}): {len(high_value)}")

    ## 3. Filter: Transactions from first quarter of 2022
    df['date'] = pd.to_datetime(df['date'])  ## Convert to datetime
    q1_2022 = df[(df['date'] >= '2022-01-01') & (df['date'] <= '2022-03-31')]
    print(f"Number of transactions in Q1 2022: {len(q1_2022)}")

    ## 4. Add a new column: transaction_month
    df['month'] = df['date'].dt.strftime('%Y-%m')

    ## 5. Group by month and status
    monthly_by_status = df.groupby(['month', 'status']).agg({
        'transaction_id': 'count',
        'amount': 'sum'
    }).rename(columns={'transaction_id': 'count'})

    ## Calculate success rate by month (completed / total)
    print("\nTransaction success rates by month:")
    for month, month_data in df.groupby('month'):
        total = len(month_data)
        completed = len(month_data[month_data['status'] == 'completed'])
        success_rate = (completed / total) * 100
        print(f"  {month}: {success_rate:.1f}% ({completed}/{total})")

    ## Save filtered data to a new CSV file
    completed_high_value = df[(df['status'] == 'completed') & (df['amount'] > 300)]
    output_file = 'high_value_completed.csv'
    completed_high_value.to_csv(output_file, index=False)

    end_time = time.time()
    print(f"\nFiltering completed in {end_time - start_time:.2f} seconds")
    print(f"Saved {len(completed_high_value)} high-value completed transactions to {output_file}")

## Filter and transform the data
filter_and_transform('transactions.csv')

Ejecute este script:

python3 pandas_filter.py

Salida esperada:

Filtering and transforming data from transactions.csv...
Number of completed transactions: 12432
Number of high-value transactions (>$400): 6190
Number of transactions in Q1 2022: 12348

Transaction success rates by month:
  2022-01: 24.8% (1048/4225)
  2022-02: 25.0% (1010/4034)
  2022-03: 25.4% (1042/4089)
  2022-04: 24.2% (978/4052)
  2022-05: 24.4% (1047/4297)
  2022-06: 24.4% (1046/4280)
  2022-07: 24.7% (1071/4341)
  2022-08: 25.1% (1090/4343)
  2022-09: 26.1% (1091/4177)
  2022-10: 24.1% (1008/4182)
  2022-11: 24.8% (1009/4075)
  2022-12: 25.2% (992/3905)

Filtering completed in 0.38 seconds
Saved 6304 high-value completed transactions to high_value_completed.csv

Ventajas de Usar Pandas

Como puede ver en estos ejemplos, pandas ofrece varias ventajas para el procesamiento de CSV:

  1. Funcionalidad rica: Métodos integrados para filtrar, agrupar, agregar y transformar datos
  2. Rendimiento: Código C optimizado en segundo plano para operaciones rápidas en conjuntos de datos grandes
  3. Análisis de datos fácil: Formas sencillas de calcular estadísticas y obtener información
  4. Capacidades de visualización: Fácil integración con bibliotecas de trazado (no se muestra en los ejemplos)
  5. Procesamiento por fragmentos: Capacidad para manejar archivos más grandes que la memoria disponible

Para la mayoría de las tareas de análisis de datos que involucran archivos CSV, pandas es el enfoque recomendado a menos que tenga restricciones de memoria específicas que requieran el uso del módulo csv de Python puro.

Resumen

En este tutorial, aprendió varios enfoques para procesar archivos CSV de manera eficiente en Python, desde conjuntos de datos pequeños hasta grandes que requieren una cuidadosa gestión de la memoria:

  1. Procesamiento Básico de CSV: Uso del módulo csv incorporado de Python para leer y escribir archivos CSV con csv.reader y csv.writer.

  2. Procesamiento Basado en Diccionarios: Uso de csv.DictReader para trabajar con datos CSV de una manera más intuitiva, accediendo a los campos por nombre en lugar de por índice.

  3. Técnicas de Procesamiento Eficientes:

    • Streaming (Transmisión): Procesamiento de archivos línea por línea para un uso mínimo de memoria
    • Chunking (Fragmentación): Procesamiento de archivos en lotes para una mejor gestión de la memoria
  4. Procesamiento Avanzado con Pandas:

    • Lectura de archivos CSV en DataFrames
    • Análisis y filtrado de datos
    • Procesamiento de archivos grandes en fragmentos
    • Transformación y exportación de datos

Estas técnicas proporcionan un conjunto de herramientas completo para manejar archivos CSV de cualquier tamaño en Python. Para la mayoría de las tareas de análisis de datos, pandas es la biblioteca recomendada debido a su rica funcionalidad y rendimiento. Sin embargo, para archivos muy grandes o tareas de procesamiento simples, los enfoques de streaming y fragmentación con el módulo csv incorporado pueden ser más eficientes en cuanto a memoria.

Al aplicar la técnica adecuada según sus requisitos específicos, puede procesar de manera eficiente archivos CSV de cualquier tamaño sin tener problemas de memoria.