Umgang mit realen Authentifizierungsherausforderungen
In realitätsnahen Anwendungen stoßen Sie oft auf komplexere Authentifizierungsszenarien. Lassen Sie uns einige praktische Beispiele und Best Practices untersuchen.
Implementierung einer benutzerdefinierten Authentifizierungs-Klasse
Die requests
-Bibliothek ermöglicht es Ihnen, benutzerdefinierte Authentifizierungs-Handler zu erstellen, indem Sie die Klasse requests.auth.AuthBase
unterklassifizieren. Dies ist nützlich, wenn Sie mit APIs arbeiten, die über benutzerdefinierte Authentifizierungsschemata verfügen.
Erstellen Sie eine Datei namens custom_auth.py
:
import requests
from requests.auth import AuthBase
import time
import hashlib
class CustomAuth(AuthBase):
"""
Example of a custom authentication mechanism that adds a timestamp and HMAC signature
This is similar to how many APIs authenticate requests (like AWS, Stripe, etc.)
"""
def __init__(self, api_key, api_secret):
self.api_key = api_key
self.api_secret = api_secret
def __call__(self, request):
## Add timestamp to the request
timestamp = str(int(time.time()))
## Add the API key and timestamp as headers
request.headers['X-API-Key'] = self.api_key
request.headers['X-Timestamp'] = timestamp
## Create a signature based on the request
## In a real implementation, this would include more request elements
method = request.method
path = request.path_url
## Create a simple signature (in real apps, use proper HMAC)
signature_data = f"{method}{path}{timestamp}{self.api_secret}".encode('utf-8')
signature = hashlib.sha256(signature_data).hexdigest()
## Add the signature to the headers
request.headers['X-Signature'] = signature
return request
def test_custom_auth():
"""Test our custom authentication with httpbin.org"""
url = "https://httpbin.org/headers"
## Create our custom auth handler
auth = CustomAuth(api_key="test_key", api_secret="test_secret")
## Make the request with our custom auth
response = requests.get(url, auth=auth)
print(f"Status Code: {response.status_code}")
print(f"Response Headers: {response.json()}")
## Verify our custom headers were sent
headers = response.json()['headers']
for header in ['X-Api-Key', 'X-Timestamp', 'X-Signature']:
if header.upper() in headers:
print(f"{header} was sent: {headers[header.upper()]}")
else:
print(f"{header} was not found in the response")
if __name__ == "__main__":
test_custom_auth()
Führen Sie das Skript aus:
python custom_auth.py
Die Ausgabe sollte die benutzerdefinierten Header in der Antwort enthalten:
Status Code: 200
Response Headers: {'headers': {'Accept': '*/*', 'Accept-Encoding': 'gzip, deflate', 'Host': 'httpbin.org', 'User-Agent': 'python-requests/2.28.2', 'X-Api-Key': 'test_key', 'X-Signature': '...', 'X-Timestamp': '1623456789'}}
X-Api-Key was sent: test_key
X-Timestamp was sent: 1623456789
X-Signature was sent: ...
Dieses Beispiel zeigt, wie man ein benutzerdefiniertes Authentifizierungsschema implementiert, das einen Zeitstempel und eine Signatur enthält, ähnlich wie es viele kommerzielle APIs verwenden.
Umgang mit Ratenbegrenzung und 429-Antworten
Viele APIs implementieren eine Ratenbegrenzung, um Missbrauch zu verhindern. Wenn Sie die Ratenbegrenzung überschreiten, antwortet der Server in der Regel mit dem Statuscode 429 Too Many Requests. Erstellen wir ein Skript, um dieses Szenario zu behandeln.
Erstellen Sie eine Datei namens rate_limit_handler.py
:
import requests
import time
def make_request_with_rate_limit_handling(url, max_retries=5):
"""
Makes a request with handling for rate limits (429 responses)
Args:
url: URL to request
max_retries: Maximum number of retry attempts
"""
retries = 0
while retries <= max_retries:
response = requests.get(url)
## If not rate limited, return the response
if response.status_code != 429:
return response
## We got rate limited, check if we have retry information
retry_after = response.headers.get('Retry-After')
## If we have retry information, wait that long
if retry_after:
wait_time = int(retry_after)
print(f"Rate limited. Waiting for {wait_time} seconds as specified by Retry-After header.")
time.sleep(wait_time)
else:
## If no retry information, use exponential backoff
wait_time = 2 ** retries
print(f"Rate limited. No Retry-After header. Using exponential backoff: {wait_time} seconds.")
time.sleep(wait_time)
retries += 1
if retries > max_retries:
print(f"Maximum retries ({max_retries}) exceeded.")
return response
## For demonstration, we'll simulate rate limiting using httpbin's status endpoint
if __name__ == "__main__":
## First request should succeed
print("Making request to endpoint that returns 200:")
response = make_request_with_rate_limit_handling("https://httpbin.org/status/200")
print(f"Final status code: {response.status_code}")
## Simulate being rate limited initially, then succeeding
print("\nSimulating rate limiting scenario:")
## We'll make 3 requests in sequence to different status codes (429, 429, 200)
## to simulate being rate limited twice and then succeeding
urls = [
"https://httpbin.org/status/429", ## First will be rate limited
"https://httpbin.org/status/429", ## Second will also be rate limited
"https://httpbin.org/status/200" ## Third will succeed
]
for i, url in enumerate(urls):
print(f"\nRequest {i+1}:")
response = make_request_with_rate_limit_handling(url, max_retries=1)
print(f"Final status code: {response.status_code}")
Führen Sie das Skript aus:
python rate_limit_handler.py
Die Ausgabe simuliert die Behandlung der Ratenbegrenzung:
Making request to endpoint that returns 200:
Final status code: 200
Simulating rate limiting scenario:
Request 1:
Rate limited. No Retry-After header. Using exponential backoff: 1 seconds.
Rate limited. No Retry-After header. Using exponential backoff: 2 seconds.
Maximum retries (1) exceeded.
Final status code: 429
Request 2:
Rate limited. No Retry-After header. Using exponential backoff: 1 seconds.
Rate limited. No Retry-After header. Using exponential backoff: 2 seconds.
Maximum retries (1) exceeded.
Final status code: 429
Request 3:
Final status code: 200
Dies zeigt, wie man die Ratenbegrenzung behandelt, indem man den Retry-After
-Header berücksichtigt oder einen exponentiellen Backoff implementiert.
Komplettlösung zur Fehlerbehandlung
Zum Schluss wollen wir alles in einer umfassenden Fehlerbehandlungslösung zusammenführen. Erstellen Sie eine Datei namens complete_auth_handler.py
:
import requests
import time
import random
import json
from requests.exceptions import RequestException, HTTPError, ConnectionError, Timeout
class AuthenticationManager:
"""Manages authentication tokens and credentials"""
def __init__(self, token_file="auth_token.json"):
self.token_file = token_file
self.token = self.load_token()
def load_token(self):
"""Load token from file storage"""
try:
with open(self.token_file, "r") as f:
token_data = json.load(f)
## Check if token is expired
if token_data.get("expires_at", 0) > time.time():
return token_data.get("access_token")
except (FileNotFoundError, json.JSONDecodeError):
pass
return None
def save_token(self, token, expires_in=3600):
"""Save token to file storage"""
token_data = {
"access_token": token,
"expires_at": time.time() + expires_in
}
with open(self.token_file, "w") as f:
json.dump(token_data, f)
def get_token(self):
"""Get a valid token, refreshing if necessary"""
if not self.token:
self.token = self.refresh_token()
return self.token
def refresh_token(self):
"""Refresh the authentication token"""
## In a real app, this would make a call to the auth server
print("Getting a new authentication token...")
new_token = f"refreshed_token_{int(time.time())}"
self.save_token(new_token)
return new_token
def invalidate_token(self):
"""Invalidate the current token"""
self.token = None
try:
with open(self.token_file, "w") as f:
json.dump({}, f)
except Exception:
pass
class APIClient:
"""Client for making API requests with comprehensive error handling"""
def __init__(self, base_url, auth_manager=None):
self.base_url = base_url
self.auth_manager = auth_manager or AuthenticationManager()
self.session = requests.Session()
def make_request(self, method, endpoint, **kwargs):
"""
Make an API request with comprehensive error handling
Args:
method: HTTP method (get, post, etc.)
endpoint: API endpoint to call
**kwargs: Additional arguments to pass to requests
Returns:
Response object or None if all retries failed
"""
url = f"{self.base_url}{endpoint}"
max_retries = kwargs.pop("max_retries", 3)
retry_backoff_factor = kwargs.pop("retry_backoff_factor", 0.5)
## Add authentication if we have an auth manager
if self.auth_manager:
token = self.auth_manager.get_token()
headers = kwargs.get("headers", {})
headers["Authorization"] = f"Bearer {token}"
kwargs["headers"] = headers
## Make the request with retries
for retry in range(max_retries + 1):
try:
request_func = getattr(self.session, method.lower())
response = request_func(url, **kwargs)
## Handle different status codes
if response.status_code == 200:
return response
elif response.status_code == 401:
print("Unauthorized: Token may be invalid")
if self.auth_manager and retry < max_retries:
print("Refreshing authentication token...")
self.auth_manager.invalidate_token()
token = self.auth_manager.get_token()
kwargs.get("headers", {})["Authorization"] = f"Bearer {token}"
continue
elif response.status_code == 429:
if retry < max_retries:
retry_after = response.headers.get("Retry-After")
if retry_after:
wait_time = int(retry_after)
else:
wait_time = retry_backoff_factor * (2 ** retry) + random.uniform(0, 0.1)
print(f"Rate limited. Waiting {wait_time:.2f} seconds before retry {retry + 1}/{max_retries}")
time.sleep(wait_time)
continue
else:
print("Rate limit retries exhausted")
elif 500 <= response.status_code < 600:
if retry < max_retries:
wait_time = retry_backoff_factor * (2 ** retry) + random.uniform(0, 0.1)
print(f"Server error ({response.status_code}). Retrying in {wait_time:.2f} seconds...")
time.sleep(wait_time)
continue
else:
print(f"Server error retries exhausted")
## If we get here, we're returning the response as-is
return response
except ConnectionError:
if retry < max_retries:
wait_time = retry_backoff_factor * (2 ** retry) + random.uniform(0, 0.1)
print(f"Connection error. Retrying in {wait_time:.2f} seconds...")
time.sleep(wait_time)
else:
print("Connection error retries exhausted")
raise
except Timeout:
if retry < max_retries:
wait_time = retry_backoff_factor * (2 ** retry) + random.uniform(0, 0.1)
print(f"Request timed out. Retrying in {wait_time:.2f} seconds...")
time.sleep(wait_time)
else:
print("Timeout retries exhausted")
raise
except RequestException as e:
print(f"Request failed: {e}")
raise
return None
def get(self, endpoint, **kwargs):
"""Make a GET request"""
return self.make_request("get", endpoint, **kwargs)
def post(self, endpoint, **kwargs):
"""Make a POST request"""
return self.make_request("post", endpoint, **kwargs)
## Additional methods can be added for other HTTP verbs
## Test the client with httpbin
if __name__ == "__main__":
## Clean any existing token files
import os
try:
os.remove("auth_token.json")
except FileNotFoundError:
pass
client = APIClient("https://httpbin.org")
print("Making a GET request to /get:")
response = client.get("/get")
print(f"Status code: {response.status_code}")
print(f"Response content: {response.json()}")
print("\nSimulating an unauthorized response:")
## For demonstration purposes, we'll use httpbin's status endpoint to simulate a 401
response = client.get("/status/401")
print(f"Status code: {response.status_code}")
print("\nSimulating a rate-limited response:")
## Simulate a 429 response
response = client.get("/status/429")
print(f"Status code: {response.status_code}")
Führen Sie das Skript aus:
python complete_auth_handler.py
Die Ausgabe sollte die umfassende Fehlerbehandlung demonstrieren:
Getting a new authentication token...
Making a GET request to /get:
Status code: 200
Response content: {'args': {}, 'headers': {...}, 'origin': '...', 'url': 'https://httpbin.org/get'}
Simulating an unauthorized response:
Unauthorized: Token may be invalid
Refreshing authentication token...
Getting a new authentication token...
Unauthorized: Token may be invalid
Refreshing authentication token...
Getting a new authentication token...
Unauthorized: Token may be invalid
Refreshing authentication token...
Getting a new authentication token...
Unauthorized: Token may be invalid
Status code: 401
Simulating a rate-limited response:
Rate limited. Waiting 0.54 seconds before retry 1/3
Rate limited. Waiting 1.04 seconds before retry 2/3
Rate limited. Waiting 2.07 seconds before retry 3/3
Rate limit retries exhausted
Status code: 429
Dieses umfassende Beispiel demonstriert viele Best Practices für die Behandlung von Authentifizierungs- und anderen API-bezogenen Fehlern in einer produktionsreifen Python-Anwendung.