介绍
在使用 Python 进行 Web API 开发时,你经常会遇到由于授权问题导致请求被拒绝的情况。本实验将指导你理解并有效地处理在使用 Python requests 库时遇到的未授权 (401) 响应。通过学习正确的错误处理技术,你将能够构建更具韧性的应用程序,从而优雅地管理身份验证失败。
在使用 Python 进行 Web API 开发时,你经常会遇到由于授权问题导致请求被拒绝的情况。本实验将指导你理解并有效地处理在使用 Python requests 库时遇到的未授权 (401) 响应。通过学习正确的错误处理技术,你将能够构建更具韧性的应用程序,从而优雅地管理身份验证失败。
在深入研究如何处理未授权响应之前,了解它们是什么以及为什么会发生是很重要的。
HTTP 状态码是服务器响应客户端请求时发送的三位数字。这些代码被分为五类:
401 未授权 (Unauthorized) 状态码属于 4xx 类别,表示请求缺少目标资源的有效身份验证凭据。这与 403 禁止 (Forbidden) 响应不同,后者意味着服务器理解请求,但拒绝授权。
让我们首先为我们的项目创建一个目录并安装所需的软件包。
mkdir -p ~/project/python-auth-handling
cd ~/project/python-auth-handling
requests 包:python -m venv venv
source venv/bin/activate
pip install requests
输出应该类似于:
Collecting requests
Downloading requests-2.28.2-py3-none-any.whl (62 kB)
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 62.8/62.8 KB 1.8 MB/s eta 0:00:00
[...additional output...]
Successfully installed certifi-2023.5.7 charset-normalizer-3.1.0 idna-3.4 requests-2.28.2 urllib3-1.26.16
现在,让我们创建一个 Python 脚本来向需要身份验证的服务发出请求。我们将使用 HTTPBin 服务,该服务提供用于测试 HTTP 请求的端点。
在 WebIDE 中创建一个名为 basic_request.py 的新文件:
import requests
def make_request():
url = "https://httpbin.org/basic-auth/user/pass"
response = requests.get(url)
print(f"Status Code: {response.status_code}")
if response.status_code == 200:
print("Request was successful.")
print(f"Response content: {response.text}")
elif response.status_code == 401:
print("Unauthorized: Authentication is required and has failed.")
else:
print(f"Received unexpected status code: {response.status_code}")
if __name__ == "__main__":
make_request()
保存文件并在终端中运行它:
python basic_request.py
你应该看到类似于以下的输出:
Status Code: 401
Unauthorized: Authentication is required and has failed.
这是因为我们试图访问一个需要基本身份验证的端点,但我们没有提供任何凭据。
让我们修改我们的脚本以打印更多关于响应的详细信息。创建一个名为 examine_response.py 的新文件:
import requests
def examine_response():
url = "https://httpbin.org/basic-auth/user/pass"
response = requests.get(url)
print(f"Status Code: {response.status_code}")
print(f"Headers: {response.headers}")
## The WWW-Authenticate header provides details about how to authenticate
if 'WWW-Authenticate' in response.headers:
print(f"WWW-Authenticate: {response.headers['WWW-Authenticate']}")
## Try to get JSON content, but it might not be JSON or might be empty
try:
print(f"Content: {response.json()}")
except requests.exceptions.JSONDecodeError:
print(f"Content (text): {response.text}")
if __name__ == "__main__":
examine_response()
运行此脚本:
python examine_response.py
输出将包括响应头和 WWW-Authenticate 头,它告诉客户端如何进行身份验证:
Status Code: 401
Headers: {'date': '...', 'content-type': '...', 'content-length': '0', 'connection': 'close', 'server': 'gunicorn/19.9.0', 'www-authenticate': 'Basic realm="Fake Realm"', 'access-control-allow-origin': '*', 'access-control-allow-credentials': 'true'}
WWW-Authenticate: Basic realm="Fake Realm"
Content (text):
WWW-Authenticate 头的 value 为 Basic realm="Fake Realm",表明服务器期望基本身份验证。
现在我们已经了解了导致 401 未授权响应的原因,让我们学习如何通过提供正确的身份验证凭据来防止它。
Python requests 库使将基本身份验证添加到你的请求变得容易。你可以:
auth 参数requests.auth 中的 HTTPBasicAuth 类让我们修改我们的脚本以包含基本身份验证。创建一个名为 basic_auth.py 的新文件:
import requests
from requests.auth import HTTPBasicAuth
def make_authenticated_request():
url = "https://httpbin.org/basic-auth/user/pass"
## Method 1: Using the auth parameter with a tuple
response1 = requests.get(url, auth=("user", "pass"))
print("Method 1: Using auth tuple")
print(f"Status Code: {response1.status_code}")
if response1.status_code == 200:
print(f"Response content: {response1.json()}")
else:
print(f"Request failed with status code: {response1.status_code}")
print("\n" + "-"*50 + "\n")
## Method 2: Using the HTTPBasicAuth class
response2 = requests.get(url, auth=HTTPBasicAuth("user", "pass"))
print("Method 2: Using HTTPBasicAuth class")
print(f"Status Code: {response2.status_code}")
if response2.status_code == 200:
print(f"Response content: {response2.json()}")
else:
print(f"Request failed with status code: {response2.status_code}")
if __name__ == "__main__":
make_authenticated_request()
运行脚本:
python basic_auth.py
你应该看到一个成功的响应,状态码为 200:
Method 1: Using auth tuple
Status Code: 200
Response content: {'authenticated': True, 'user': 'user'}
--------------------------------------------------
Method 2: Using HTTPBasicAuth class
Status Code: 200
Response content: {'authenticated': True, 'user': 'user'}
这两种方法都实现了相同的结果——它们向请求添加了一个 Authorization 头,其中包含 base64 编码的用户名和密码。
让我们看看当我们提供不正确的凭据时会发生什么。创建一个名为 failed_auth.py 的新文件:
import requests
def test_failed_authentication():
url = "https://httpbin.org/basic-auth/user/pass"
## Correct credentials
response_correct = requests.get(url, auth=("user", "pass"))
## Incorrect password
response_wrong_pass = requests.get(url, auth=("user", "wrong_password"))
## Incorrect username
response_wrong_user = requests.get(url, auth=("wrong_user", "pass"))
print("Correct credentials:")
print(f"Status Code: {response_correct.status_code}")
if response_correct.status_code == 200:
print(f"Response content: {response_correct.json()}")
print("\nIncorrect password:")
print(f"Status Code: {response_wrong_pass.status_code}")
print("\nIncorrect username:")
print(f"Status Code: {response_wrong_user.status_code}")
if __name__ == "__main__":
test_failed_authentication()
运行脚本:
python failed_auth.py
你应该看到类似于以下的输出:
Correct credentials:
Status Code: 200
Response content: {'authenticated': True, 'user': 'user'}
Incorrect password:
Status Code: 401
Incorrect username:
Status Code: 401
用户名和密码都不正确都会导致 401 未授权响应。
现在,让我们为身份验证失败实现错误处理。创建一个名为 handle_auth_errors.py 的新文件:
import requests
from requests.exceptions import HTTPError
def make_authenticated_request(username, password):
url = "https://httpbin.org/basic-auth/user/pass"
try:
response = requests.get(url, auth=(username, password))
response.raise_for_status() ## Raises an HTTPError for bad responses (4xx or 5xx)
print(f"Authentication successful!")
print(f"Response content: {response.json()}")
return response
except HTTPError as e:
if response.status_code == 401:
print(f"Authentication failed: Invalid credentials")
## You might want to retry with different credentials here
return handle_authentication_failure()
else:
print(f"HTTP Error occurred: {e}")
return None
except Exception as e:
print(f"An error occurred: {e}")
return None
def handle_authentication_failure():
## In a real application, you might prompt the user for new credentials
## or use a token refresh mechanism
print("Attempting to authenticate with default credentials...")
return make_authenticated_request("user", "pass")
if __name__ == "__main__":
## First, try with incorrect credentials
print("Trying with incorrect credentials:")
make_authenticated_request("wrong_user", "wrong_pass")
print("\n" + "-"*50 + "\n")
## Then, try with correct credentials
print("Trying with correct credentials:")
make_authenticated_request("user", "pass")
运行脚本:
python handle_auth_errors.py
输出应该显示如何处理错误:
Trying with incorrect credentials:
Authentication failed: Invalid credentials
Attempting to authenticate with default credentials...
Authentication successful!
Response content: {'authenticated': True, 'user': 'user'}
--------------------------------------------------
Trying with correct credentials:
Authentication successful!
Response content: {'authenticated': True, 'user': 'user'}
此脚本演示了一种简单的错误处理模式,其中我们:
raise_for_status() 为 4xx/5xx 响应引发异常在实际应用中,你通常需要更高级的策略来处理身份验证。让我们探讨一些常见的技术。
许多现代 API 使用基于令牌的身份验证,而不是基本身份验证。OAuth 2.0 是一种常用协议,它使用令牌进行身份验证和授权。
让我们创建一个模拟基于令牌的身份验证的脚本。创建一个名为 token_auth.py 的文件:
import requests
import time
import json
## Simulated token storage - in a real app, this might be a database or secure storage
TOKEN_FILE = "token.json"
def get_stored_token():
"""Retrieve the stored token if it exists."""
try:
with open(TOKEN_FILE, "r") as f:
token_data = json.load(f)
## Check if token is expired
if token_data.get("expires_at", 0) > time.time():
return token_data["access_token"]
except (FileNotFoundError, json.JSONDecodeError):
pass
return None
def save_token(token, expires_in=3600):
"""Save the token with expiration time."""
token_data = {
"access_token": token,
"expires_at": time.time() + expires_in
}
with open(TOKEN_FILE, "w") as f:
json.dump(token_data, f)
def get_new_token():
"""Simulate obtaining a new token from an authentication service."""
## In a real application, this would make a request to the auth server
print("Obtaining new access token...")
## Simulating a token generation
new_token = f"simulated_token_{int(time.time())}"
save_token(new_token)
return new_token
def make_authenticated_request(url):
"""Make a request with token authentication, refreshing if needed."""
## Try to get the stored token
token = get_stored_token()
## If no valid token exists, get a new one
if not token:
token = get_new_token()
## Make the authenticated request
headers = {"Authorization": f"Bearer {token}"}
response = requests.get(url, headers=headers)
## If unauthorized, the token might be invalid - get a new one and retry
if response.status_code == 401:
print("Token rejected. Getting a new token and retrying...")
token = get_new_token()
headers = {"Authorization": f"Bearer {token}"}
response = requests.get(url, headers=headers)
return response
## For testing, we'll use httpbin's bearer auth endpoint
if __name__ == "__main__":
## Using httpbin.org/bearer which checks for the Authorization header
url = "https://httpbin.org/bearer"
## First, delete any existing token to simulate a fresh start
try:
import os
os.remove(TOKEN_FILE)
except FileNotFoundError:
pass
print("First request (should obtain a new token):")
response = make_authenticated_request(url)
print(f"Status code: {response.status_code}")
print(f"Response: {response.json()}")
print("\nSecond request (should use the stored token):")
response = make_authenticated_request(url)
print(f"Status code: {response.status_code}")
print(f"Response: {response.json()}")
运行脚本:
python token_auth.py
输出应该类似于:
First request (should obtain a new token):
Obtaining new access token...
Status code: 200
Response: {'authenticated': True, 'token': 'simulated_token_1623456789'}
Second request (should use the stored token):
Status code: 200
Response: {'authenticated': True, 'token': 'simulated_token_1623456789'}
此脚本演示了基于令牌的身份验证,并在令牌被拒绝时自动刷新令牌。
有时,身份验证失败可能是暂时的。让我们实现一个带有指数退避的重试机制。创建一个名为 retry_auth.py 的文件:
import requests
import time
import random
def make_request_with_retry(url, auth, max_retries=3, backoff_factor=0.5):
"""
Makes a request with retry logic and exponential backoff
Args:
url: URL to request
auth: Authentication tuple (username, password)
max_retries: Maximum number of retry attempts
backoff_factor: Factor by which to increase the delay between retries
"""
retries = 0
while retries <= max_retries:
try:
response = requests.get(url, auth=auth, timeout=10)
## If successful, return the response
if response.status_code == 200:
return response
## If unauthorized, we might want to handle differently
if response.status_code == 401:
print(f"Attempt {retries+1}/{max_retries+1}: Authentication failed")
## In a real app, we might refresh tokens here or prompt for new credentials
else:
print(f"Attempt {retries+1}/{max_retries+1}: Failed with status code {response.status_code}")
## If we've reached max retries, give up
if retries == max_retries:
print("Maximum retry attempts reached.")
return response
## Calculate delay with exponential backoff and jitter
delay = backoff_factor * (2 ** retries) + random.uniform(0, 0.1)
print(f"Retrying in {delay:.2f} seconds...")
time.sleep(delay)
retries += 1
except requests.exceptions.RequestException as e:
print(f"Request exception: {e}")
## If we've reached max retries, give up
if retries == max_retries:
print("Maximum retry attempts reached.")
raise
## Calculate delay with exponential backoff and jitter
delay = backoff_factor * (2 ** retries) + random.uniform(0, 0.1)
print(f"Retrying in {delay:.2f} seconds...")
time.sleep(delay)
retries += 1
if __name__ == "__main__":
url = "https://httpbin.org/basic-auth/user/pass"
print("Testing with incorrect credentials:")
response = make_request_with_retry(url, auth=("wrong_user", "wrong_pass"))
print(f"Final status code: {response.status_code}")
print("\nTesting with correct credentials:")
response = make_request_with_retry(url, auth=("user", "pass"))
print(f"Final status code: {response.status_code}")
print(f"Response content: {response.json()}")
运行脚本:
python retry_auth.py
输出应该类似于:
Testing with incorrect credentials:
Attempt 1/4: Authentication failed
Retrying in 0.54 seconds...
Attempt 2/4: Authentication failed
Retrying in 1.05 seconds...
Attempt 3/4: Authentication failed
Retrying in 2.08 seconds...
Attempt 4/4: Authentication failed
Maximum retry attempts reached.
Final status code: 401
Testing with correct credentials:
Final status code: 200
Response content: {'authenticated': True, 'user': 'user'}
此脚本演示了带有指数退避的重试机制,这是一种处理可能暂时失败的网络请求的常见模式。
对于对同一服务的多个请求,创建会话以维护身份验证信息通常更有效。创建一个名为 auth_session.py 的文件:
import requests
def create_authenticated_session(base_url, username, password):
"""
Creates and returns a requests session with basic authentication
Args:
base_url: Base URL for the API
username: Username for authentication
password: Password for authentication
Returns:
A configured requests.Session object
"""
session = requests.Session()
session.auth = (username, password)
session.headers.update({
'User-Agent': 'MyApp/1.0',
'Accept': 'application/json'
})
## Test the authentication
test_url = f"{base_url}/basic-auth/{username}/{password}"
try:
response = session.get(test_url)
response.raise_for_status()
print(f"Authentication successful for user: {username}")
except requests.exceptions.HTTPError as e:
print(f"Authentication failed: {e}")
## In a real app, you might raise an exception here or handle it differently
return session
def make_authenticated_requests(session, base_url):
"""
Makes multiple requests using the authenticated session
Args:
session: The authenticated requests.Session
base_url: Base URL for the API
"""
## Make a request to a different endpoint
response1 = session.get(f"{base_url}/get")
print(f"\nRequest 1 status code: {response1.status_code}")
print(f"Request 1 response: {response1.json()}")
## Make another request
response2 = session.get(f"{base_url}/headers")
print(f"\nRequest 2 status code: {response2.status_code}")
print(f"Request 2 response: {response2.json()}")
if __name__ == "__main__":
base_url = "https://httpbin.org"
## Create an authenticated session
session = create_authenticated_session(base_url, "user", "pass")
## Use the session for multiple requests
make_authenticated_requests(session, base_url)
运行脚本:
python auth_session.py
输出应该类似于:
Authentication successful for user: user
Request 1 status code: 200
Request 1 response: {'args': {}, 'headers': {...}, 'origin': '...', 'url': 'https://httpbin.org/get'}
Request 2 status code: 200
Request 2 response: {'headers': {...}}
使用会话有几个优点:
在实际应用中,你经常会遇到更复杂的身份验证场景。让我们探讨一些实际的例子和最佳实践。
requests 库允许你通过子类化 requests.auth.AuthBase 类来创建自定义身份验证处理程序。这在使用具有自定义身份验证方案的 API 时非常有用。
创建一个名为 custom_auth.py 的文件:
import requests
from requests.auth import AuthBase
import time
import hashlib
class CustomAuth(AuthBase):
"""
Example of a custom authentication mechanism that adds a timestamp and HMAC signature
This is similar to how many APIs authenticate requests (like AWS, Stripe, etc.)
"""
def __init__(self, api_key, api_secret):
self.api_key = api_key
self.api_secret = api_secret
def __call__(self, request):
## Add timestamp to the request
timestamp = str(int(time.time()))
## Add the API key and timestamp as headers
request.headers['X-API-Key'] = self.api_key
request.headers['X-Timestamp'] = timestamp
## Create a signature based on the request
## In a real implementation, this would include more request elements
method = request.method
path = request.path_url
## Create a simple signature (in real apps, use proper HMAC)
signature_data = f"{method}{path}{timestamp}{self.api_secret}".encode('utf-8')
signature = hashlib.sha256(signature_data).hexdigest()
## Add the signature to the headers
request.headers['X-Signature'] = signature
return request
def test_custom_auth():
"""Test our custom authentication with httpbin.org"""
url = "https://httpbin.org/headers"
## Create our custom auth handler
auth = CustomAuth(api_key="test_key", api_secret="test_secret")
## Make the request with our custom auth
response = requests.get(url, auth=auth)
print(f"Status Code: {response.status_code}")
print(f"Response Headers: {response.json()}")
## Verify our custom headers were sent
headers = response.json()['headers']
for header in ['X-Api-Key', 'X-Timestamp', 'X-Signature']:
if header.upper() in headers:
print(f"{header} was sent: {headers[header.upper()]}")
else:
print(f"{header} was not found in the response")
if __name__ == "__main__":
test_custom_auth()
运行脚本:
python custom_auth.py
输出应该在响应中包含自定义标头:
Status Code: 200
Response Headers: {'headers': {'Accept': '*/*', 'Accept-Encoding': 'gzip, deflate', 'Host': 'httpbin.org', 'User-Agent': 'python-requests/2.28.2', 'X-Api-Key': 'test_key', 'X-Signature': '...', 'X-Timestamp': '1623456789'}}
X-Api-Key was sent: test_key
X-Timestamp was sent: 1623456789
X-Signature was sent: ...
此示例展示了如何实现自定义身份验证方案,该方案包括时间戳和签名,类似于许多商业 API 使用的方式。
许多 API 实施速率限制以防止滥用。当你超过速率限制时,服务器通常会使用 429 Too Many Requests 状态代码进行响应。让我们创建一个脚本来处理这种情况。
创建一个名为 rate_limit_handler.py 的文件:
import requests
import time
def make_request_with_rate_limit_handling(url, max_retries=5):
"""
Makes a request with handling for rate limits (429 responses)
Args:
url: URL to request
max_retries: Maximum number of retry attempts
"""
retries = 0
while retries <= max_retries:
response = requests.get(url)
## If not rate limited, return the response
if response.status_code != 429:
return response
## We got rate limited, check if we have retry information
retry_after = response.headers.get('Retry-After')
## If we have retry information, wait that long
if retry_after:
wait_time = int(retry_after)
print(f"Rate limited. Waiting for {wait_time} seconds as specified by Retry-After header.")
time.sleep(wait_time)
else:
## If no retry information, use exponential backoff
wait_time = 2 ** retries
print(f"Rate limited. No Retry-After header. Using exponential backoff: {wait_time} seconds.")
time.sleep(wait_time)
retries += 1
if retries > max_retries:
print(f"Maximum retries ({max_retries}) exceeded.")
return response
## For demonstration, we'll simulate rate limiting using httpbin's status endpoint
if __name__ == "__main__":
## First request should succeed
print("Making request to endpoint that returns 200:")
response = make_request_with_rate_limit_handling("https://httpbin.org/status/200")
print(f"Final status code: {response.status_code}")
## Simulate being rate limited initially, then succeeding
print("\nSimulating rate limiting scenario:")
## We'll make 3 requests in sequence to different status codes (429, 429, 200)
## to simulate being rate limited twice and then succeeding
urls = [
"https://httpbin.org/status/429", ## First will be rate limited
"https://httpbin.org/status/429", ## Second will also be rate limited
"https://httpbin.org/status/200" ## Third will succeed
]
for i, url in enumerate(urls):
print(f"\nRequest {i+1}:")
response = make_request_with_rate_limit_handling(url, max_retries=1)
print(f"Final status code: {response.status_code}")
运行脚本:
python rate_limit_handler.py
输出将模拟处理速率限制:
Making request to endpoint that returns 200:
Final status code: 200
Simulating rate limiting scenario:
Request 1:
Rate limited. No Retry-After header. Using exponential backoff: 1 seconds.
Rate limited. No Retry-After header. Using exponential backoff: 2 seconds.
Maximum retries (1) exceeded.
Final status code: 429
Request 2:
Rate limited. No Retry-After header. Using exponential backoff: 1 seconds.
Rate limited. No Retry-After header. Using exponential backoff: 2 seconds.
Maximum retries (1) exceeded.
Final status code: 429
Request 3:
Final status code: 200
这演示了如何通过遵守 Retry-After 标头或实现指数退避来处理速率限制。
最后,让我们将所有内容整合到一个全面的错误处理解决方案中。创建一个名为 complete_auth_handler.py 的文件:
import requests
import time
import random
import json
from requests.exceptions import RequestException, HTTPError, ConnectionError, Timeout
class AuthenticationManager:
"""Manages authentication tokens and credentials"""
def __init__(self, token_file="auth_token.json"):
self.token_file = token_file
self.token = self.load_token()
def load_token(self):
"""Load token from file storage"""
try:
with open(self.token_file, "r") as f:
token_data = json.load(f)
## Check if token is expired
if token_data.get("expires_at", 0) > time.time():
return token_data.get("access_token")
except (FileNotFoundError, json.JSONDecodeError):
pass
return None
def save_token(self, token, expires_in=3600):
"""Save token to file storage"""
token_data = {
"access_token": token,
"expires_at": time.time() + expires_in
}
with open(self.token_file, "w") as f:
json.dump(token_data, f)
def get_token(self):
"""Get a valid token, refreshing if necessary"""
if not self.token:
self.token = self.refresh_token()
return self.token
def refresh_token(self):
"""Refresh the authentication token"""
## In a real app, this would make a call to the auth server
print("Getting a new authentication token...")
new_token = f"refreshed_token_{int(time.time())}"
self.save_token(new_token)
return new_token
def invalidate_token(self):
"""Invalidate the current token"""
self.token = None
try:
with open(self.token_file, "w") as f:
json.dump({}, f)
except Exception:
pass
class APIClient:
"""Client for making API requests with comprehensive error handling"""
def __init__(self, base_url, auth_manager=None):
self.base_url = base_url
self.auth_manager = auth_manager or AuthenticationManager()
self.session = requests.Session()
def make_request(self, method, endpoint, **kwargs):
"""
Make an API request with comprehensive error handling
Args:
method: HTTP method (get, post, etc.)
endpoint: API endpoint to call
**kwargs: Additional arguments to pass to requests
Returns:
Response object or None if all retries failed
"""
url = f"{self.base_url}{endpoint}"
max_retries = kwargs.pop("max_retries", 3)
retry_backoff_factor = kwargs.pop("retry_backoff_factor", 0.5)
## Add authentication if we have an auth manager
if self.auth_manager:
token = self.auth_manager.get_token()
headers = kwargs.get("headers", {})
headers["Authorization"] = f"Bearer {token}"
kwargs["headers"] = headers
## Make the request with retries
for retry in range(max_retries + 1):
try:
request_func = getattr(self.session, method.lower())
response = request_func(url, **kwargs)
## Handle different status codes
if response.status_code == 200:
return response
elif response.status_code == 401:
print("Unauthorized: Token may be invalid")
if self.auth_manager and retry < max_retries:
print("Refreshing authentication token...")
self.auth_manager.invalidate_token()
token = self.auth_manager.get_token()
kwargs.get("headers", {})["Authorization"] = f"Bearer {token}"
continue
elif response.status_code == 429:
if retry < max_retries:
retry_after = response.headers.get("Retry-After")
if retry_after:
wait_time = int(retry_after)
else:
wait_time = retry_backoff_factor * (2 ** retry) + random.uniform(0, 0.1)
print(f"Rate limited. Waiting {wait_time:.2f} seconds before retry {retry + 1}/{max_retries}")
time.sleep(wait_time)
continue
else:
print("Rate limit retries exhausted")
elif 500 <= response.status_code < 600:
if retry < max_retries:
wait_time = retry_backoff_factor * (2 ** retry) + random.uniform(0, 0.1)
print(f"Server error ({response.status_code}). Retrying in {wait_time:.2f} seconds...")
time.sleep(wait_time)
continue
else:
print(f"Server error retries exhausted")
## If we get here, we're returning the response as-is
return response
except ConnectionError:
if retry < max_retries:
wait_time = retry_backoff_factor * (2 ** retry) + random.uniform(0, 0.1)
print(f"Connection error. Retrying in {wait_time:.2f} seconds...")
time.sleep(wait_time)
else:
print("Connection error retries exhausted")
raise
except Timeout:
if retry < max_retries:
wait_time = retry_backoff_factor * (2 ** retry) + random.uniform(0, 0.1)
print(f"Request timed out. Retrying in {wait_time:.2f} seconds...")
time.sleep(wait_time)
else:
print("Timeout retries exhausted")
raise
except RequestException as e:
print(f"Request failed: {e}")
raise
return None
def get(self, endpoint, **kwargs):
"""Make a GET request"""
return self.make_request("get", endpoint, **kwargs)
def post(self, endpoint, **kwargs):
"""Make a POST request"""
return self.make_request("post", endpoint, **kwargs)
## Additional methods can be added for other HTTP verbs
## Test the client with httpbin
if __name__ == "__main__":
## Clean any existing token files
import os
try:
os.remove("auth_token.json")
except FileNotFoundError:
pass
client = APIClient("https://httpbin.org")
print("Making a GET request to /get:")
response = client.get("/get")
print(f"Status code: {response.status_code}")
print(f"Response content: {response.json()}")
print("\nSimulating an unauthorized response:")
## For demonstration purposes, we'll use httpbin's status endpoint to simulate a 401
response = client.get("/status/401")
print(f"Status code: {response.status_code}")
print("\nSimulating a rate-limited response:")
## Simulate a 429 response
response = client.get("/status/429")
print(f"Status code: {response.status_code}")
运行脚本:
python complete_auth_handler.py
输出应该演示全面的错误处理:
Getting a new authentication token...
Making a GET request to /get:
Status code: 200
Response content: {'args': {}, 'headers': {...}, 'origin': '...', 'url': 'https://httpbin.org/get'}
Simulating an unauthorized response:
Unauthorized: Token may be invalid
Refreshing authentication token...
Getting a new authentication token...
Unauthorized: Token may be invalid
Refreshing authentication token...
Getting a new authentication token...
Unauthorized: Token may be invalid
Refreshing authentication token...
Getting a new authentication token...
Unauthorized: Token may be invalid
Status code: 401
Simulating a rate-limited response:
Rate limited. Waiting 0.54 seconds before retry 1/3
Rate limited. Waiting 1.04 seconds before retry 2/3
Rate limited. Waiting 2.07 seconds before retry 3/3
Rate limit retries exhausted
Status code: 429
这个全面的例子演示了在生产就绪的 Python 应用程序中处理身份验证和其他与 API 相关的错误的许多最佳实践。
在这个实验中,你已经学会了如何在 Python 的请求中有效地处理未授权的响应。我们涵盖了几个重要的方面:
理解 HTTP 身份验证和状态码:你了解了 401 未授权响应的含义,以及如何在你的 Python 应用程序中识别它们。
基本身份验证:你使用 auth 参数和 HTTPBasicAuth 类实现了基本身份验证,并学习了如何处理身份验证失败。
高级身份验证策略:你探索了基于令牌的身份验证、带有指数退避的重试逻辑,以及创建可重用的已验证会话。
真实的身份验证挑战:你实现了一个自定义身份验证类,处理了速率限制,并为生产就绪的应用程序创建了一个全面的错误处理解决方案。
这些技术将帮助你构建更强大、更具弹性的 Python 应用程序,这些应用程序在与 Web API 交互时,可以优雅地处理身份验证失败和其他相关错误。
为了进一步学习,请考虑探索: