Cómo manejar respuestas no autorizadas en solicitudes Python

PythonBeginner
Practicar Ahora

Introducción

Cuando se trabaja con APIs web en Python, a menudo se encuentran situaciones en las que las solicitudes son denegadas debido a problemas de autorización. Este laboratorio le guía a través de la comprensión y el manejo efectivo de las respuestas no autorizadas (401) al usar la biblioteca requests de Python. Al aprender técnicas adecuadas de manejo de errores, podrá construir aplicaciones más resilientes que gestionen con elegancia las fallas de autenticación.

Comprensión de la Autorización HTTP y los Códigos de Estado

Antes de sumergirnos en el manejo de respuestas no autorizadas, es importante entender qué son y por qué ocurren.

Códigos de Estado HTTP y la Respuesta 401 No Autorizado

Los códigos de estado HTTP son números de tres dígitos que los servidores envían en respuesta a las solicitudes del cliente. Estos códigos se agrupan en cinco categorías:

  • 1xx: Respuestas informativas
  • 2xx: Respuestas exitosas
  • 3xx: Mensajes de redirección
  • 4xx: Respuestas de error del cliente
  • 5xx: Respuestas de error del servidor

El código de estado 401 No Autorizado pertenece a la categoría 4xx e indica que la solicitud carece de credenciales de autenticación válidas para el recurso de destino. Esto es diferente de una respuesta 403 Prohibido, que significa que el servidor comprende la solicitud pero se niega a autorizarla.

Configuración de Nuestro Entorno

Comencemos creando un directorio para nuestro proyecto e instalando los paquetes requeridos.

  1. Abra la terminal y cree un nuevo directorio:
mkdir -p ~/project/python-auth-handling
cd ~/project/python-auth-handling
  1. Ahora, creemos un entorno virtual e instalemos el paquete requests:
python -m venv venv
source venv/bin/activate
pip install requests

La salida debería ser similar a:

Collecting requests
  Downloading requests-2.28.2-py3-none-any.whl (62 kB)
     ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 62.8/62.8 KB 1.8 MB/s eta 0:00:00
[...additional output...]
Successfully installed certifi-2023.5.7 charset-normalizer-3.1.0 idna-3.4 requests-2.28.2 urllib3-1.26.16

Realizando una Solicitud Simple

Ahora, creemos un script de Python para realizar una solicitud a un servicio que requiere autenticación. Usaremos el servicio HTTPBin, que proporciona endpoints para probar solicitudes HTTP.

Cree un nuevo archivo llamado basic_request.py en el WebIDE:

import requests

def make_request():
    url = "https://httpbin.org/basic-auth/user/pass"
    response = requests.get(url)

    print(f"Status Code: {response.status_code}")

    if response.status_code == 200:
        print("Request was successful.")
        print(f"Response content: {response.text}")
    elif response.status_code == 401:
        print("Unauthorized: Authentication is required and has failed.")
    else:
        print(f"Received unexpected status code: {response.status_code}")

if __name__ == "__main__":
    make_request()

Guarde el archivo y ejecútelo en la terminal:

python basic_request.py

Debería ver una salida similar a:

Status Code: 401
Unauthorized: Authentication is required and has failed.

Esto se debe a que estamos intentando acceder a un endpoint que requiere autenticación básica, pero no hemos proporcionado ninguna credencial.

Examinando la Respuesta

Modifiquemos nuestro script para imprimir más detalles sobre la respuesta. Cree un nuevo archivo llamado examine_response.py:

import requests

def examine_response():
    url = "https://httpbin.org/basic-auth/user/pass"
    response = requests.get(url)

    print(f"Status Code: {response.status_code}")
    print(f"Headers: {response.headers}")

    ## The WWW-Authenticate header provides details about how to authenticate
    if 'WWW-Authenticate' in response.headers:
        print(f"WWW-Authenticate: {response.headers['WWW-Authenticate']}")

    ## Try to get JSON content, but it might not be JSON or might be empty
    try:
        print(f"Content: {response.json()}")
    except requests.exceptions.JSONDecodeError:
        print(f"Content (text): {response.text}")

if __name__ == "__main__":
    examine_response()

Ejecute este script:

python examine_response.py

La salida incluirá las cabeceras de la respuesta y la cabecera WWW-Authenticate, que le dice al cliente cómo autenticarse:

Status Code: 401
Headers: {'date': '...', 'content-type': '...', 'content-length': '0', 'connection': 'close', 'server': 'gunicorn/19.9.0', 'www-authenticate': 'Basic realm="Fake Realm"', 'access-control-allow-origin': '*', 'access-control-allow-credentials': 'true'}
WWW-Authenticate: Basic realm="Fake Realm"
Content (text):

La cabecera WWW-Authenticate con el valor Basic realm="Fake Realm" indica que el servidor espera autenticación básica.

Uso de la Autenticación Básica para Prevenir Errores 401

Ahora que entendemos qué causa una respuesta 401 No Autorizado, aprendamos cómo prevenirla proporcionando las credenciales de autenticación correctas.

Autenticación Básica en Solicitudes de Python

La biblioteca requests de Python facilita la adición de autenticación básica a sus solicitudes. Puede:

  1. Pasar el parámetro auth con una tupla de nombre de usuario y contraseña
  2. Usar la clase HTTPBasicAuth de requests.auth

Modifiquemos nuestro script para incluir la autenticación básica. Cree un nuevo archivo llamado basic_auth.py:

import requests
from requests.auth import HTTPBasicAuth

def make_authenticated_request():
    url = "https://httpbin.org/basic-auth/user/pass"

    ## Method 1: Using the auth parameter with a tuple
    response1 = requests.get(url, auth=("user", "pass"))

    print("Method 1: Using auth tuple")
    print(f"Status Code: {response1.status_code}")
    if response1.status_code == 200:
        print(f"Response content: {response1.json()}")
    else:
        print(f"Request failed with status code: {response1.status_code}")

    print("\n" + "-"*50 + "\n")

    ## Method 2: Using the HTTPBasicAuth class
    response2 = requests.get(url, auth=HTTPBasicAuth("user", "pass"))

    print("Method 2: Using HTTPBasicAuth class")
    print(f"Status Code: {response2.status_code}")
    if response2.status_code == 200:
        print(f"Response content: {response2.json()}")
    else:
        print(f"Request failed with status code: {response2.status_code}")

if __name__ == "__main__":
    make_authenticated_request()

Ejecute el script:

python basic_auth.py

Debería ver una respuesta exitosa con el código de estado 200:

Method 1: Using auth tuple
Status Code: 200
Response content: {'authenticated': True, 'user': 'user'}

--------------------------------------------------

Method 2: Using HTTPBasicAuth class
Status Code: 200
Response content: {'authenticated': True, 'user': 'user'}

Ambos métodos logran el mismo resultado: agregan una cabecera Authorization a la solicitud con el nombre de usuario y la contraseña codificados en base64.

Qué Sucede Cuando la Autenticación Falla

Veamos qué sucede cuando proporcionamos credenciales incorrectas. Cree un nuevo archivo llamado failed_auth.py:

import requests

def test_failed_authentication():
    url = "https://httpbin.org/basic-auth/user/pass"

    ## Correct credentials
    response_correct = requests.get(url, auth=("user", "pass"))

    ## Incorrect password
    response_wrong_pass = requests.get(url, auth=("user", "wrong_password"))

    ## Incorrect username
    response_wrong_user = requests.get(url, auth=("wrong_user", "pass"))

    print("Correct credentials:")
    print(f"Status Code: {response_correct.status_code}")
    if response_correct.status_code == 200:
        print(f"Response content: {response_correct.json()}")

    print("\nIncorrect password:")
    print(f"Status Code: {response_wrong_pass.status_code}")

    print("\nIncorrect username:")
    print(f"Status Code: {response_wrong_user.status_code}")

if __name__ == "__main__":
    test_failed_authentication()

Ejecute el script:

python failed_auth.py

Debería ver una salida similar a:

Correct credentials:
Status Code: 200
Response content: {'authenticated': True, 'user': 'user'}

Incorrect password:
Status Code: 401

Incorrect username:
Status Code: 401

Tanto el nombre de usuario incorrecto como la contraseña incorrecta resultan en una respuesta 401 No Autorizado.

Manejo de Errores de Autenticación

Ahora, implementemos el manejo de errores para las fallas de autenticación. Cree un nuevo archivo llamado handle_auth_errors.py:

import requests
from requests.exceptions import HTTPError

def make_authenticated_request(username, password):
    url = "https://httpbin.org/basic-auth/user/pass"

    try:
        response = requests.get(url, auth=(username, password))
        response.raise_for_status()  ## Raises an HTTPError for bad responses (4xx or 5xx)

        print(f"Authentication successful!")
        print(f"Response content: {response.json()}")
        return response

    except HTTPError as e:
        if response.status_code == 401:
            print(f"Authentication failed: Invalid credentials")
            ## You might want to retry with different credentials here
            return handle_authentication_failure()
        else:
            print(f"HTTP Error occurred: {e}")
            return None
    except Exception as e:
        print(f"An error occurred: {e}")
        return None

def handle_authentication_failure():
    ## In a real application, you might prompt the user for new credentials
    ## or use a token refresh mechanism
    print("Attempting to authenticate with default credentials...")
    return make_authenticated_request("user", "pass")

if __name__ == "__main__":
    ## First, try with incorrect credentials
    print("Trying with incorrect credentials:")
    make_authenticated_request("wrong_user", "wrong_pass")

    print("\n" + "-"*50 + "\n")

    ## Then, try with correct credentials
    print("Trying with correct credentials:")
    make_authenticated_request("user", "pass")

Ejecute el script:

python handle_auth_errors.py

La salida debería mostrar cómo se maneja el error:

Trying with incorrect credentials:
Authentication failed: Invalid credentials
Attempting to authenticate with default credentials...
Authentication successful!
Response content: {'authenticated': True, 'user': 'user'}

--------------------------------------------------

Trying with correct credentials:
Authentication successful!
Response content: {'authenticated': True, 'user': 'user'}

Este script demuestra un patrón simple de manejo de errores donde:

  1. Intentamos realizar la solicitud con las credenciales proporcionadas
  2. Usamos raise_for_status() para generar una excepción para respuestas 4xx/5xx
  3. Manejamos el error 401 específicamente, con un mecanismo de reintento
  4. Manejamos otros tipos de errores de manera apropiada

Estrategias Avanzadas de Manejo de Autenticación

En aplicaciones del mundo real, a menudo se necesitan estrategias más avanzadas para manejar la autenticación. Exploremos algunas técnicas comunes.

Autenticación Basada en Tokens

Muchas API modernas utilizan la autenticación basada en tokens en lugar de la autenticación básica. OAuth 2.0 es un protocolo común que utiliza tokens para la autenticación y autorización.

Creemos un script que simule la autenticación basada en tokens. Cree un archivo llamado token_auth.py:

import requests
import time
import json

## Simulated token storage - in a real app, this might be a database or secure storage
TOKEN_FILE = "token.json"

def get_stored_token():
    """Retrieve the stored token if it exists."""
    try:
        with open(TOKEN_FILE, "r") as f:
            token_data = json.load(f)
            ## Check if token is expired
            if token_data.get("expires_at", 0) > time.time():
                return token_data["access_token"]
    except (FileNotFoundError, json.JSONDecodeError):
        pass
    return None

def save_token(token, expires_in=3600):
    """Save the token with expiration time."""
    token_data = {
        "access_token": token,
        "expires_at": time.time() + expires_in
    }
    with open(TOKEN_FILE, "w") as f:
        json.dump(token_data, f)

def get_new_token():
    """Simulate obtaining a new token from an authentication service."""
    ## In a real application, this would make a request to the auth server
    print("Obtaining new access token...")
    ## Simulating a token generation
    new_token = f"simulated_token_{int(time.time())}"
    save_token(new_token)
    return new_token

def make_authenticated_request(url):
    """Make a request with token authentication, refreshing if needed."""
    ## Try to get the stored token
    token = get_stored_token()

    ## If no valid token exists, get a new one
    if not token:
        token = get_new_token()

    ## Make the authenticated request
    headers = {"Authorization": f"Bearer {token}"}
    response = requests.get(url, headers=headers)

    ## If unauthorized, the token might be invalid - get a new one and retry
    if response.status_code == 401:
        print("Token rejected. Getting a new token and retrying...")
        token = get_new_token()
        headers = {"Authorization": f"Bearer {token}"}
        response = requests.get(url, headers=headers)

    return response

## For testing, we'll use httpbin's bearer auth endpoint
if __name__ == "__main__":
    ## Using httpbin.org/bearer which checks for the Authorization header
    url = "https://httpbin.org/bearer"

    ## First, delete any existing token to simulate a fresh start
    try:
        import os
        os.remove(TOKEN_FILE)
    except FileNotFoundError:
        pass

    print("First request (should obtain a new token):")
    response = make_authenticated_request(url)
    print(f"Status code: {response.status_code}")
    print(f"Response: {response.json()}")

    print("\nSecond request (should use the stored token):")
    response = make_authenticated_request(url)
    print(f"Status code: {response.status_code}")
    print(f"Response: {response.json()}")

Ejecute el script:

python token_auth.py

La salida debería ser similar a:

First request (should obtain a new token):
Obtaining new access token...
Status code: 200
Response: {'authenticated': True, 'token': 'simulated_token_1623456789'}

Second request (should use the stored token):
Status code: 200
Response: {'authenticated': True, 'token': 'simulated_token_1623456789'}

Este script demuestra la autenticación basada en tokens con la actualización automática del token cuando un token es rechazado.

Implementación de Lógica de Reintento

A veces, las fallas de autenticación pueden ser temporales. Implementemos un mecanismo de reintento con retroceso exponencial. Cree un archivo llamado retry_auth.py:

import requests
import time
import random

def make_request_with_retry(url, auth, max_retries=3, backoff_factor=0.5):
    """
    Makes a request with retry logic and exponential backoff

    Args:
        url: URL to request
        auth: Authentication tuple (username, password)
        max_retries: Maximum number of retry attempts
        backoff_factor: Factor by which to increase the delay between retries
    """
    retries = 0

    while retries <= max_retries:
        try:
            response = requests.get(url, auth=auth, timeout=10)

            ## If successful, return the response
            if response.status_code == 200:
                return response

            ## If unauthorized, we might want to handle differently
            if response.status_code == 401:
                print(f"Attempt {retries+1}/{max_retries+1}: Authentication failed")
                ## In a real app, we might refresh tokens here or prompt for new credentials
            else:
                print(f"Attempt {retries+1}/{max_retries+1}: Failed with status code {response.status_code}")

            ## If we've reached max retries, give up
            if retries == max_retries:
                print("Maximum retry attempts reached.")
                return response

            ## Calculate delay with exponential backoff and jitter
            delay = backoff_factor * (2 ** retries) + random.uniform(0, 0.1)
            print(f"Retrying in {delay:.2f} seconds...")
            time.sleep(delay)
            retries += 1

        except requests.exceptions.RequestException as e:
            print(f"Request exception: {e}")

            ## If we've reached max retries, give up
            if retries == max_retries:
                print("Maximum retry attempts reached.")
                raise

            ## Calculate delay with exponential backoff and jitter
            delay = backoff_factor * (2 ** retries) + random.uniform(0, 0.1)
            print(f"Retrying in {delay:.2f} seconds...")
            time.sleep(delay)
            retries += 1

if __name__ == "__main__":
    url = "https://httpbin.org/basic-auth/user/pass"

    print("Testing with incorrect credentials:")
    response = make_request_with_retry(url, auth=("wrong_user", "wrong_pass"))
    print(f"Final status code: {response.status_code}")

    print("\nTesting with correct credentials:")
    response = make_request_with_retry(url, auth=("user", "pass"))
    print(f"Final status code: {response.status_code}")
    print(f"Response content: {response.json()}")

Ejecute el script:

python retry_auth.py

La salida debería ser similar a:

Testing with incorrect credentials:
Attempt 1/4: Authentication failed
Retrying in 0.54 seconds...
Attempt 2/4: Authentication failed
Retrying in 1.05 seconds...
Attempt 3/4: Authentication failed
Retrying in 2.08 seconds...
Attempt 4/4: Authentication failed
Maximum retry attempts reached.
Final status code: 401

Testing with correct credentials:
Final status code: 200
Response content: {'authenticated': True, 'user': 'user'}

Este script demuestra un mecanismo de reintento con retroceso exponencial, que es un patrón común al tratar con solicitudes de red que podrían fallar temporalmente.

Creación de una Sesión de Autenticación Reutilizable

Para múltiples solicitudes al mismo servicio, a menudo es más eficiente crear una sesión que mantendrá la información de autenticación. Cree un archivo llamado auth_session.py:

import requests

def create_authenticated_session(base_url, username, password):
    """
    Creates and returns a requests session with basic authentication

    Args:
        base_url: Base URL for the API
        username: Username for authentication
        password: Password for authentication

    Returns:
        A configured requests.Session object
    """
    session = requests.Session()
    session.auth = (username, password)
    session.headers.update({
        'User-Agent': 'MyApp/1.0',
        'Accept': 'application/json'
    })

    ## Test the authentication
    test_url = f"{base_url}/basic-auth/{username}/{password}"
    try:
        response = session.get(test_url)
        response.raise_for_status()
        print(f"Authentication successful for user: {username}")
    except requests.exceptions.HTTPError as e:
        print(f"Authentication failed: {e}")
        ## In a real app, you might raise an exception here or handle it differently

    return session

def make_authenticated_requests(session, base_url):
    """
    Makes multiple requests using the authenticated session

    Args:
        session: The authenticated requests.Session
        base_url: Base URL for the API
    """
    ## Make a request to a different endpoint
    response1 = session.get(f"{base_url}/get")
    print(f"\nRequest 1 status code: {response1.status_code}")
    print(f"Request 1 response: {response1.json()}")

    ## Make another request
    response2 = session.get(f"{base_url}/headers")
    print(f"\nRequest 2 status code: {response2.status_code}")
    print(f"Request 2 response: {response2.json()}")

if __name__ == "__main__":
    base_url = "https://httpbin.org"

    ## Create an authenticated session
    session = create_authenticated_session(base_url, "user", "pass")

    ## Use the session for multiple requests
    make_authenticated_requests(session, base_url)

Ejecute el script:

python auth_session.py

La salida debería ser similar a:

Authentication successful for user: user

Request 1 status code: 200
Request 1 response: {'args': {}, 'headers': {...}, 'origin': '...', 'url': 'https://httpbin.org/get'}

Request 2 status code: 200
Request 2 response: {'headers': {...}}

Usar una sesión tiene varias ventajas:

  1. Reutiliza la conexión TCP subyacente, lo que hace que las solicitudes posteriores sean más rápidas
  2. Incluye automáticamente las credenciales de autenticación con cada solicitud
  3. Mantiene las cookies entre solicitudes, lo cual es útil para la autenticación basada en sesiones

Manejo de Desafíos de Autenticación del Mundo Real

En aplicaciones del mundo real, a menudo se encuentran escenarios de autenticación más complejos. Exploremos algunos ejemplos prácticos y las mejores prácticas.

Implementación de una Clase de Autenticación Personalizada

La biblioteca requests le permite crear manejadores de autenticación personalizados mediante la subclase de la clase requests.auth.AuthBase. Esto es útil cuando se trabaja con API que tienen esquemas de autenticación personalizados.

Cree un archivo llamado custom_auth.py:

import requests
from requests.auth import AuthBase
import time
import hashlib

class CustomAuth(AuthBase):
    """
    Example of a custom authentication mechanism that adds a timestamp and HMAC signature
    This is similar to how many APIs authenticate requests (like AWS, Stripe, etc.)
    """
    def __init__(self, api_key, api_secret):
        self.api_key = api_key
        self.api_secret = api_secret

    def __call__(self, request):
        ## Add timestamp to the request
        timestamp = str(int(time.time()))

        ## Add the API key and timestamp as headers
        request.headers['X-API-Key'] = self.api_key
        request.headers['X-Timestamp'] = timestamp

        ## Create a signature based on the request
        ## In a real implementation, this would include more request elements
        method = request.method
        path = request.path_url

        ## Create a simple signature (in real apps, use proper HMAC)
        signature_data = f"{method}{path}{timestamp}{self.api_secret}".encode('utf-8')
        signature = hashlib.sha256(signature_data).hexdigest()

        ## Add the signature to the headers
        request.headers['X-Signature'] = signature

        return request

def test_custom_auth():
    """Test our custom authentication with httpbin.org"""
    url = "https://httpbin.org/headers"

    ## Create our custom auth handler
    auth = CustomAuth(api_key="test_key", api_secret="test_secret")

    ## Make the request with our custom auth
    response = requests.get(url, auth=auth)

    print(f"Status Code: {response.status_code}")
    print(f"Response Headers: {response.json()}")

    ## Verify our custom headers were sent
    headers = response.json()['headers']
    for header in ['X-Api-Key', 'X-Timestamp', 'X-Signature']:
        if header.upper() in headers:
            print(f"{header} was sent: {headers[header.upper()]}")
        else:
            print(f"{header} was not found in the response")

if __name__ == "__main__":
    test_custom_auth()

Ejecute el script:

python custom_auth.py

La salida debería incluir las cabeceras personalizadas en la respuesta:

Status Code: 200
Response Headers: {'headers': {'Accept': '*/*', 'Accept-Encoding': 'gzip, deflate', 'Host': 'httpbin.org', 'User-Agent': 'python-requests/2.28.2', 'X-Api-Key': 'test_key', 'X-Signature': '...', 'X-Timestamp': '1623456789'}}
X-Api-Key was sent: test_key
X-Timestamp was sent: 1623456789
X-Signature was sent: ...

Este ejemplo muestra cómo implementar un esquema de autenticación personalizado que incluye una marca de tiempo y una firma, similar a lo que utilizan muchas API comerciales.

Manejo de la Limitación de Tasa y Respuestas 429

Muchas API implementan la limitación de tasa para evitar el abuso. Cuando excede el límite de tasa, el servidor normalmente responde con un código de estado 429 Too Many Requests. Creemos un script para manejar este escenario.

Cree un archivo llamado rate_limit_handler.py:

import requests
import time

def make_request_with_rate_limit_handling(url, max_retries=5):
    """
    Makes a request with handling for rate limits (429 responses)

    Args:
        url: URL to request
        max_retries: Maximum number of retry attempts
    """
    retries = 0

    while retries <= max_retries:
        response = requests.get(url)

        ## If not rate limited, return the response
        if response.status_code != 429:
            return response

        ## We got rate limited, check if we have retry information
        retry_after = response.headers.get('Retry-After')

        ## If we have retry information, wait that long
        if retry_after:
            wait_time = int(retry_after)
            print(f"Rate limited. Waiting for {wait_time} seconds as specified by Retry-After header.")
            time.sleep(wait_time)
        else:
            ## If no retry information, use exponential backoff
            wait_time = 2 ** retries
            print(f"Rate limited. No Retry-After header. Using exponential backoff: {wait_time} seconds.")
            time.sleep(wait_time)

        retries += 1

        if retries > max_retries:
            print(f"Maximum retries ({max_retries}) exceeded.")
            return response

## For demonstration, we'll simulate rate limiting using httpbin's status endpoint
if __name__ == "__main__":
    ## First request should succeed
    print("Making request to endpoint that returns 200:")
    response = make_request_with_rate_limit_handling("https://httpbin.org/status/200")
    print(f"Final status code: {response.status_code}")

    ## Simulate being rate limited initially, then succeeding
    print("\nSimulating rate limiting scenario:")
    ## We'll make 3 requests in sequence to different status codes (429, 429, 200)
    ## to simulate being rate limited twice and then succeeding
    urls = [
        "https://httpbin.org/status/429",  ## First will be rate limited
        "https://httpbin.org/status/429",  ## Second will also be rate limited
        "https://httpbin.org/status/200"   ## Third will succeed
    ]

    for i, url in enumerate(urls):
        print(f"\nRequest {i+1}:")
        response = make_request_with_rate_limit_handling(url, max_retries=1)
        print(f"Final status code: {response.status_code}")

Ejecute el script:

python rate_limit_handler.py

La salida simulará el manejo de la limitación de tasa:

Making request to endpoint that returns 200:
Final status code: 200

Simulating rate limiting scenario:

Request 1:
Rate limited. No Retry-After header. Using exponential backoff: 1 seconds.
Rate limited. No Retry-After header. Using exponential backoff: 2 seconds.
Maximum retries (1) exceeded.
Final status code: 429

Request 2:
Rate limited. No Retry-After header. Using exponential backoff: 1 seconds.
Rate limited. No Retry-After header. Using exponential backoff: 2 seconds.
Maximum retries (1) exceeded.
Final status code: 429

Request 3:
Final status code: 200

Esto demuestra cómo manejar la limitación de tasa respetando la cabecera Retry-After o implementando un retroceso exponencial.

Solución Completa de Manejo de Errores

Finalmente, juntemos todo en una solución integral de manejo de errores. Cree un archivo llamado complete_auth_handler.py:

import requests
import time
import random
import json
from requests.exceptions import RequestException, HTTPError, ConnectionError, Timeout

class AuthenticationManager:
    """Manages authentication tokens and credentials"""

    def __init__(self, token_file="auth_token.json"):
        self.token_file = token_file
        self.token = self.load_token()

    def load_token(self):
        """Load token from file storage"""
        try:
            with open(self.token_file, "r") as f:
                token_data = json.load(f)
                ## Check if token is expired
                if token_data.get("expires_at", 0) > time.time():
                    return token_data.get("access_token")
        except (FileNotFoundError, json.JSONDecodeError):
            pass
        return None

    def save_token(self, token, expires_in=3600):
        """Save token to file storage"""
        token_data = {
            "access_token": token,
            "expires_at": time.time() + expires_in
        }
        with open(self.token_file, "w") as f:
            json.dump(token_data, f)

    def get_token(self):
        """Get a valid token, refreshing if necessary"""
        if not self.token:
            self.token = self.refresh_token()
        return self.token

    def refresh_token(self):
        """Refresh the authentication token"""
        ## In a real app, this would make a call to the auth server
        print("Getting a new authentication token...")
        new_token = f"refreshed_token_{int(time.time())}"
        self.save_token(new_token)
        return new_token

    def invalidate_token(self):
        """Invalidate the current token"""
        self.token = None
        try:
            with open(self.token_file, "w") as f:
                json.dump({}, f)
        except Exception:
            pass

class APIClient:
    """Client for making API requests with comprehensive error handling"""

    def __init__(self, base_url, auth_manager=None):
        self.base_url = base_url
        self.auth_manager = auth_manager or AuthenticationManager()
        self.session = requests.Session()

    def make_request(self, method, endpoint, **kwargs):
        """
        Make an API request with comprehensive error handling

        Args:
            method: HTTP method (get, post, etc.)
            endpoint: API endpoint to call
            **kwargs: Additional arguments to pass to requests

        Returns:
            Response object or None if all retries failed
        """
        url = f"{self.base_url}{endpoint}"
        max_retries = kwargs.pop("max_retries", 3)
        retry_backoff_factor = kwargs.pop("retry_backoff_factor", 0.5)

        ## Add authentication if we have an auth manager
        if self.auth_manager:
            token = self.auth_manager.get_token()
            headers = kwargs.get("headers", {})
            headers["Authorization"] = f"Bearer {token}"
            kwargs["headers"] = headers

        ## Make the request with retries
        for retry in range(max_retries + 1):
            try:
                request_func = getattr(self.session, method.lower())
                response = request_func(url, **kwargs)

                ## Handle different status codes
                if response.status_code == 200:
                    return response
                elif response.status_code == 401:
                    print("Unauthorized: Token may be invalid")
                    if self.auth_manager and retry < max_retries:
                        print("Refreshing authentication token...")
                        self.auth_manager.invalidate_token()
                        token = self.auth_manager.get_token()
                        kwargs.get("headers", {})["Authorization"] = f"Bearer {token}"
                        continue
                elif response.status_code == 429:
                    if retry < max_retries:
                        retry_after = response.headers.get("Retry-After")
                        if retry_after:
                            wait_time = int(retry_after)
                        else:
                            wait_time = retry_backoff_factor * (2 ** retry) + random.uniform(0, 0.1)

                        print(f"Rate limited. Waiting {wait_time:.2f} seconds before retry {retry + 1}/{max_retries}")
                        time.sleep(wait_time)
                        continue
                    else:
                        print("Rate limit retries exhausted")
                elif 500 <= response.status_code < 600:
                    if retry < max_retries:
                        wait_time = retry_backoff_factor * (2 ** retry) + random.uniform(0, 0.1)
                        print(f"Server error ({response.status_code}). Retrying in {wait_time:.2f} seconds...")
                        time.sleep(wait_time)
                        continue
                    else:
                        print(f"Server error retries exhausted")

                ## If we get here, we're returning the response as-is
                return response

            except ConnectionError:
                if retry < max_retries:
                    wait_time = retry_backoff_factor * (2 ** retry) + random.uniform(0, 0.1)
                    print(f"Connection error. Retrying in {wait_time:.2f} seconds...")
                    time.sleep(wait_time)
                else:
                    print("Connection error retries exhausted")
                    raise
            except Timeout:
                if retry < max_retries:
                    wait_time = retry_backoff_factor * (2 ** retry) + random.uniform(0, 0.1)
                    print(f"Request timed out. Retrying in {wait_time:.2f} seconds...")
                    time.sleep(wait_time)
                else:
                    print("Timeout retries exhausted")
                    raise
            except RequestException as e:
                print(f"Request failed: {e}")
                raise

        return None

    def get(self, endpoint, **kwargs):
        """Make a GET request"""
        return self.make_request("get", endpoint, **kwargs)

    def post(self, endpoint, **kwargs):
        """Make a POST request"""
        return self.make_request("post", endpoint, **kwargs)

    ## Additional methods can be added for other HTTP verbs

## Test the client with httpbin
if __name__ == "__main__":
    ## Clean any existing token files
    import os
    try:
        os.remove("auth_token.json")
    except FileNotFoundError:
        pass

    client = APIClient("https://httpbin.org")

    print("Making a GET request to /get:")
    response = client.get("/get")
    print(f"Status code: {response.status_code}")
    print(f"Response content: {response.json()}")

    print("\nSimulating an unauthorized response:")
    ## For demonstration purposes, we'll use httpbin's status endpoint to simulate a 401
    response = client.get("/status/401")
    print(f"Status code: {response.status_code}")

    print("\nSimulating a rate-limited response:")
    ## Simulate a 429 response
    response = client.get("/status/429")
    print(f"Status code: {response.status_code}")

Ejecute el script:

python complete_auth_handler.py

La salida debería demostrar el manejo de errores integral:

Getting a new authentication token...
Making a GET request to /get:
Status code: 200
Response content: {'args': {}, 'headers': {...}, 'origin': '...', 'url': 'https://httpbin.org/get'}

Simulating an unauthorized response:
Unauthorized: Token may be invalid
Refreshing authentication token...
Getting a new authentication token...
Unauthorized: Token may be invalid
Refreshing authentication token...
Getting a new authentication token...
Unauthorized: Token may be invalid
Refreshing authentication token...
Getting a new authentication token...
Unauthorized: Token may be invalid
Status code: 401

Simulating a rate-limited response:
Rate limited. Waiting 0.54 seconds before retry 1/3
Rate limited. Waiting 1.04 seconds before retry 2/3
Rate limited. Waiting 2.07 seconds before retry 3/3
Rate limit retries exhausted
Status code: 429

Este ejemplo completo demuestra muchas de las mejores prácticas para manejar la autenticación y otros errores relacionados con la API en una aplicación Python lista para producción.

Resumen

En este laboratorio, ha aprendido a manejar eficazmente las respuestas no autorizadas en las solicitudes de Python. Cubrimos varios aspectos importantes:

  1. Comprensión de la Autenticación HTTP y los Códigos de Estado: Aprendió qué significan las respuestas 401 Unauthorized y cómo identificarlas en sus aplicaciones Python.

  2. Autenticación Básica: Implementó la autenticación básica utilizando tanto el parámetro auth como la clase HTTPBasicAuth, y aprendió a manejar las fallas de autenticación.

  3. Estrategias Avanzadas de Autenticación: Exploró la autenticación basada en tokens, la lógica de reintento con retroceso exponencial y la creación de sesiones autenticadas reutilizables.

  4. Desafíos de Autenticación del Mundo Real: Implementó una clase de autenticación personalizada, manejó la limitación de tasa y creó una solución integral de manejo de errores para aplicaciones listas para producción.

Estas técnicas le ayudarán a construir aplicaciones Python más robustas y resilientes que puedan manejar con elegancia las fallas de autenticación y otros errores relacionados al interactuar con las API web.

Para un aprendizaje adicional, considere explorar:

  • Flujos de autenticación OAuth 2.0
  • JWT (JSON Web Tokens) para la autenticación sin estado
  • Estrategias de rotación de claves de API
  • Almacenamiento seguro de credenciales