Manejo de Desafíos de Autenticación del Mundo Real
En aplicaciones del mundo real, a menudo se encuentran escenarios de autenticación más complejos. Exploremos algunos ejemplos prácticos y las mejores prácticas.
Implementación de una Clase de Autenticación Personalizada
La biblioteca requests le permite crear manejadores de autenticación personalizados mediante la subclase de la clase requests.auth.AuthBase. Esto es útil cuando se trabaja con API que tienen esquemas de autenticación personalizados.
Cree un archivo llamado custom_auth.py:
import requests
from requests.auth import AuthBase
import time
import hashlib
class CustomAuth(AuthBase):
"""
Example of a custom authentication mechanism that adds a timestamp and HMAC signature
This is similar to how many APIs authenticate requests (like AWS, Stripe, etc.)
"""
def __init__(self, api_key, api_secret):
self.api_key = api_key
self.api_secret = api_secret
def __call__(self, request):
## Add timestamp to the request
timestamp = str(int(time.time()))
## Add the API key and timestamp as headers
request.headers['X-API-Key'] = self.api_key
request.headers['X-Timestamp'] = timestamp
## Create a signature based on the request
## In a real implementation, this would include more request elements
method = request.method
path = request.path_url
## Create a simple signature (in real apps, use proper HMAC)
signature_data = f"{method}{path}{timestamp}{self.api_secret}".encode('utf-8')
signature = hashlib.sha256(signature_data).hexdigest()
## Add the signature to the headers
request.headers['X-Signature'] = signature
return request
def test_custom_auth():
"""Test our custom authentication with httpbin.org"""
url = "https://httpbin.org/headers"
## Create our custom auth handler
auth = CustomAuth(api_key="test_key", api_secret="test_secret")
## Make the request with our custom auth
response = requests.get(url, auth=auth)
print(f"Status Code: {response.status_code}")
print(f"Response Headers: {response.json()}")
## Verify our custom headers were sent
headers = response.json()['headers']
for header in ['X-Api-Key', 'X-Timestamp', 'X-Signature']:
if header.upper() in headers:
print(f"{header} was sent: {headers[header.upper()]}")
else:
print(f"{header} was not found in the response")
if __name__ == "__main__":
test_custom_auth()
Ejecute el script:
python custom_auth.py
La salida debería incluir las cabeceras personalizadas en la respuesta:
Status Code: 200
Response Headers: {'headers': {'Accept': '*/*', 'Accept-Encoding': 'gzip, deflate', 'Host': 'httpbin.org', 'User-Agent': 'python-requests/2.28.2', 'X-Api-Key': 'test_key', 'X-Signature': '...', 'X-Timestamp': '1623456789'}}
X-Api-Key was sent: test_key
X-Timestamp was sent: 1623456789
X-Signature was sent: ...
Este ejemplo muestra cómo implementar un esquema de autenticación personalizado que incluye una marca de tiempo y una firma, similar a lo que utilizan muchas API comerciales.
Manejo de la Limitación de Tasa y Respuestas 429
Muchas API implementan la limitación de tasa para evitar el abuso. Cuando excede el límite de tasa, el servidor normalmente responde con un código de estado 429 Too Many Requests. Creemos un script para manejar este escenario.
Cree un archivo llamado rate_limit_handler.py:
import requests
import time
def make_request_with_rate_limit_handling(url, max_retries=5):
"""
Makes a request with handling for rate limits (429 responses)
Args:
url: URL to request
max_retries: Maximum number of retry attempts
"""
retries = 0
while retries <= max_retries:
response = requests.get(url)
## If not rate limited, return the response
if response.status_code != 429:
return response
## We got rate limited, check if we have retry information
retry_after = response.headers.get('Retry-After')
## If we have retry information, wait that long
if retry_after:
wait_time = int(retry_after)
print(f"Rate limited. Waiting for {wait_time} seconds as specified by Retry-After header.")
time.sleep(wait_time)
else:
## If no retry information, use exponential backoff
wait_time = 2 ** retries
print(f"Rate limited. No Retry-After header. Using exponential backoff: {wait_time} seconds.")
time.sleep(wait_time)
retries += 1
if retries > max_retries:
print(f"Maximum retries ({max_retries}) exceeded.")
return response
## For demonstration, we'll simulate rate limiting using httpbin's status endpoint
if __name__ == "__main__":
## First request should succeed
print("Making request to endpoint that returns 200:")
response = make_request_with_rate_limit_handling("https://httpbin.org/status/200")
print(f"Final status code: {response.status_code}")
## Simulate being rate limited initially, then succeeding
print("\nSimulating rate limiting scenario:")
## We'll make 3 requests in sequence to different status codes (429, 429, 200)
## to simulate being rate limited twice and then succeeding
urls = [
"https://httpbin.org/status/429", ## First will be rate limited
"https://httpbin.org/status/429", ## Second will also be rate limited
"https://httpbin.org/status/200" ## Third will succeed
]
for i, url in enumerate(urls):
print(f"\nRequest {i+1}:")
response = make_request_with_rate_limit_handling(url, max_retries=1)
print(f"Final status code: {response.status_code}")
Ejecute el script:
python rate_limit_handler.py
La salida simulará el manejo de la limitación de tasa:
Making request to endpoint that returns 200:
Final status code: 200
Simulating rate limiting scenario:
Request 1:
Rate limited. No Retry-After header. Using exponential backoff: 1 seconds.
Rate limited. No Retry-After header. Using exponential backoff: 2 seconds.
Maximum retries (1) exceeded.
Final status code: 429
Request 2:
Rate limited. No Retry-After header. Using exponential backoff: 1 seconds.
Rate limited. No Retry-After header. Using exponential backoff: 2 seconds.
Maximum retries (1) exceeded.
Final status code: 429
Request 3:
Final status code: 200
Esto demuestra cómo manejar la limitación de tasa respetando la cabecera Retry-After o implementando un retroceso exponencial.
Solución Completa de Manejo de Errores
Finalmente, juntemos todo en una solución integral de manejo de errores. Cree un archivo llamado complete_auth_handler.py:
import requests
import time
import random
import json
from requests.exceptions import RequestException, HTTPError, ConnectionError, Timeout
class AuthenticationManager:
"""Manages authentication tokens and credentials"""
def __init__(self, token_file="auth_token.json"):
self.token_file = token_file
self.token = self.load_token()
def load_token(self):
"""Load token from file storage"""
try:
with open(self.token_file, "r") as f:
token_data = json.load(f)
## Check if token is expired
if token_data.get("expires_at", 0) > time.time():
return token_data.get("access_token")
except (FileNotFoundError, json.JSONDecodeError):
pass
return None
def save_token(self, token, expires_in=3600):
"""Save token to file storage"""
token_data = {
"access_token": token,
"expires_at": time.time() + expires_in
}
with open(self.token_file, "w") as f:
json.dump(token_data, f)
def get_token(self):
"""Get a valid token, refreshing if necessary"""
if not self.token:
self.token = self.refresh_token()
return self.token
def refresh_token(self):
"""Refresh the authentication token"""
## In a real app, this would make a call to the auth server
print("Getting a new authentication token...")
new_token = f"refreshed_token_{int(time.time())}"
self.save_token(new_token)
return new_token
def invalidate_token(self):
"""Invalidate the current token"""
self.token = None
try:
with open(self.token_file, "w") as f:
json.dump({}, f)
except Exception:
pass
class APIClient:
"""Client for making API requests with comprehensive error handling"""
def __init__(self, base_url, auth_manager=None):
self.base_url = base_url
self.auth_manager = auth_manager or AuthenticationManager()
self.session = requests.Session()
def make_request(self, method, endpoint, **kwargs):
"""
Make an API request with comprehensive error handling
Args:
method: HTTP method (get, post, etc.)
endpoint: API endpoint to call
**kwargs: Additional arguments to pass to requests
Returns:
Response object or None if all retries failed
"""
url = f"{self.base_url}{endpoint}"
max_retries = kwargs.pop("max_retries", 3)
retry_backoff_factor = kwargs.pop("retry_backoff_factor", 0.5)
## Add authentication if we have an auth manager
if self.auth_manager:
token = self.auth_manager.get_token()
headers = kwargs.get("headers", {})
headers["Authorization"] = f"Bearer {token}"
kwargs["headers"] = headers
## Make the request with retries
for retry in range(max_retries + 1):
try:
request_func = getattr(self.session, method.lower())
response = request_func(url, **kwargs)
## Handle different status codes
if response.status_code == 200:
return response
elif response.status_code == 401:
print("Unauthorized: Token may be invalid")
if self.auth_manager and retry < max_retries:
print("Refreshing authentication token...")
self.auth_manager.invalidate_token()
token = self.auth_manager.get_token()
kwargs.get("headers", {})["Authorization"] = f"Bearer {token}"
continue
elif response.status_code == 429:
if retry < max_retries:
retry_after = response.headers.get("Retry-After")
if retry_after:
wait_time = int(retry_after)
else:
wait_time = retry_backoff_factor * (2 ** retry) + random.uniform(0, 0.1)
print(f"Rate limited. Waiting {wait_time:.2f} seconds before retry {retry + 1}/{max_retries}")
time.sleep(wait_time)
continue
else:
print("Rate limit retries exhausted")
elif 500 <= response.status_code < 600:
if retry < max_retries:
wait_time = retry_backoff_factor * (2 ** retry) + random.uniform(0, 0.1)
print(f"Server error ({response.status_code}). Retrying in {wait_time:.2f} seconds...")
time.sleep(wait_time)
continue
else:
print(f"Server error retries exhausted")
## If we get here, we're returning the response as-is
return response
except ConnectionError:
if retry < max_retries:
wait_time = retry_backoff_factor * (2 ** retry) + random.uniform(0, 0.1)
print(f"Connection error. Retrying in {wait_time:.2f} seconds...")
time.sleep(wait_time)
else:
print("Connection error retries exhausted")
raise
except Timeout:
if retry < max_retries:
wait_time = retry_backoff_factor * (2 ** retry) + random.uniform(0, 0.1)
print(f"Request timed out. Retrying in {wait_time:.2f} seconds...")
time.sleep(wait_time)
else:
print("Timeout retries exhausted")
raise
except RequestException as e:
print(f"Request failed: {e}")
raise
return None
def get(self, endpoint, **kwargs):
"""Make a GET request"""
return self.make_request("get", endpoint, **kwargs)
def post(self, endpoint, **kwargs):
"""Make a POST request"""
return self.make_request("post", endpoint, **kwargs)
## Additional methods can be added for other HTTP verbs
## Test the client with httpbin
if __name__ == "__main__":
## Clean any existing token files
import os
try:
os.remove("auth_token.json")
except FileNotFoundError:
pass
client = APIClient("https://httpbin.org")
print("Making a GET request to /get:")
response = client.get("/get")
print(f"Status code: {response.status_code}")
print(f"Response content: {response.json()}")
print("\nSimulating an unauthorized response:")
## For demonstration purposes, we'll use httpbin's status endpoint to simulate a 401
response = client.get("/status/401")
print(f"Status code: {response.status_code}")
print("\nSimulating a rate-limited response:")
## Simulate a 429 response
response = client.get("/status/429")
print(f"Status code: {response.status_code}")
Ejecute el script:
python complete_auth_handler.py
La salida debería demostrar el manejo de errores integral:
Getting a new authentication token...
Making a GET request to /get:
Status code: 200
Response content: {'args': {}, 'headers': {...}, 'origin': '...', 'url': 'https://httpbin.org/get'}
Simulating an unauthorized response:
Unauthorized: Token may be invalid
Refreshing authentication token...
Getting a new authentication token...
Unauthorized: Token may be invalid
Refreshing authentication token...
Getting a new authentication token...
Unauthorized: Token may be invalid
Refreshing authentication token...
Getting a new authentication token...
Unauthorized: Token may be invalid
Status code: 401
Simulating a rate-limited response:
Rate limited. Waiting 0.54 seconds before retry 1/3
Rate limited. Waiting 1.04 seconds before retry 2/3
Rate limited. Waiting 2.07 seconds before retry 3/3
Rate limit retries exhausted
Status code: 429
Este ejemplo completo demuestra muchas de las mejores prácticas para manejar la autenticación y otros errores relacionados con la API en una aplicación Python lista para producción.