Python requests 如何处理未授权响应

PythonBeginner
立即练习

介绍

在使用 Python 进行 Web API 开发时,你经常会遇到由于授权问题导致请求被拒绝的情况。本实验将指导你理解并有效地处理在使用 Python requests 库时遇到的未授权 (401) 响应。通过学习正确的错误处理技术,你将能够构建更具韧性的应用程序,从而优雅地管理身份验证失败。

理解 HTTP 授权和状态码

在深入研究如何处理未授权响应之前,了解它们是什么以及为什么会发生是很重要的。

HTTP 状态码和 401 未授权响应

HTTP 状态码是服务器响应客户端请求时发送的三位数字。这些代码被分为五类:

  • 1xx:信息性响应
  • 2xx:成功响应
  • 3xx:重定向消息
  • 4xx:客户端错误响应
  • 5xx:服务器错误响应

401 未授权 (Unauthorized) 状态码属于 4xx 类别,表示请求缺少目标资源的有效身份验证凭据。这与 403 禁止 (Forbidden) 响应不同,后者意味着服务器理解请求,但拒绝授权。

设置我们的环境

让我们首先为我们的项目创建一个目录并安装所需的软件包。

  1. 打开终端并创建一个新目录:
mkdir -p ~/project/python-auth-handling
cd ~/project/python-auth-handling
  1. 现在,让我们创建一个虚拟环境并安装 requests 包:
python -m venv venv
source venv/bin/activate
pip install requests

输出应该类似于:

Collecting requests
  Downloading requests-2.28.2-py3-none-any.whl (62 kB)
     ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 62.8/62.8 KB 1.8 MB/s eta 0:00:00
[...additional output...]
Successfully installed certifi-2023.5.7 charset-normalizer-3.1.0 idna-3.4 requests-2.28.2 urllib3-1.26.16

发出一个简单请求

现在,让我们创建一个 Python 脚本来向需要身份验证的服务发出请求。我们将使用 HTTPBin 服务,该服务提供用于测试 HTTP 请求的端点。

在 WebIDE 中创建一个名为 basic_request.py 的新文件:

import requests

def make_request():
    url = "https://httpbin.org/basic-auth/user/pass"
    response = requests.get(url)

    print(f"Status Code: {response.status_code}")

    if response.status_code == 200:
        print("Request was successful.")
        print(f"Response content: {response.text}")
    elif response.status_code == 401:
        print("Unauthorized: Authentication is required and has failed.")
    else:
        print(f"Received unexpected status code: {response.status_code}")

if __name__ == "__main__":
    make_request()

保存文件并在终端中运行它:

python basic_request.py

你应该看到类似于以下的输出:

Status Code: 401
Unauthorized: Authentication is required and has failed.

这是因为我们试图访问一个需要基本身份验证的端点,但我们没有提供任何凭据。

检查响应

让我们修改我们的脚本以打印更多关于响应的详细信息。创建一个名为 examine_response.py 的新文件:

import requests

def examine_response():
    url = "https://httpbin.org/basic-auth/user/pass"
    response = requests.get(url)

    print(f"Status Code: {response.status_code}")
    print(f"Headers: {response.headers}")

    ## The WWW-Authenticate header provides details about how to authenticate
    if 'WWW-Authenticate' in response.headers:
        print(f"WWW-Authenticate: {response.headers['WWW-Authenticate']}")

    ## Try to get JSON content, but it might not be JSON or might be empty
    try:
        print(f"Content: {response.json()}")
    except requests.exceptions.JSONDecodeError:
        print(f"Content (text): {response.text}")

if __name__ == "__main__":
    examine_response()

运行此脚本:

python examine_response.py

输出将包括响应头和 WWW-Authenticate 头,它告诉客户端如何进行身份验证:

Status Code: 401
Headers: {'date': '...', 'content-type': '...', 'content-length': '0', 'connection': 'close', 'server': 'gunicorn/19.9.0', 'www-authenticate': 'Basic realm="Fake Realm"', 'access-control-allow-origin': '*', 'access-control-allow-credentials': 'true'}
WWW-Authenticate: Basic realm="Fake Realm"
Content (text):

WWW-Authenticate 头的 value 为 Basic realm="Fake Realm",表明服务器期望基本身份验证。

使用基本身份验证来防止 401 错误

现在我们已经了解了导致 401 未授权响应的原因,让我们学习如何通过提供正确的身份验证凭据来防止它。

Python 请求中的基本身份验证

Python requests 库使将基本身份验证添加到你的请求变得容易。你可以:

  1. 使用包含用户名和密码的元组传递 auth 参数
  2. 使用 requests.auth 中的 HTTPBasicAuth

让我们修改我们的脚本以包含基本身份验证。创建一个名为 basic_auth.py 的新文件:

import requests
from requests.auth import HTTPBasicAuth

def make_authenticated_request():
    url = "https://httpbin.org/basic-auth/user/pass"

    ## Method 1: Using the auth parameter with a tuple
    response1 = requests.get(url, auth=("user", "pass"))

    print("Method 1: Using auth tuple")
    print(f"Status Code: {response1.status_code}")
    if response1.status_code == 200:
        print(f"Response content: {response1.json()}")
    else:
        print(f"Request failed with status code: {response1.status_code}")

    print("\n" + "-"*50 + "\n")

    ## Method 2: Using the HTTPBasicAuth class
    response2 = requests.get(url, auth=HTTPBasicAuth("user", "pass"))

    print("Method 2: Using HTTPBasicAuth class")
    print(f"Status Code: {response2.status_code}")
    if response2.status_code == 200:
        print(f"Response content: {response2.json()}")
    else:
        print(f"Request failed with status code: {response2.status_code}")

if __name__ == "__main__":
    make_authenticated_request()

运行脚本:

python basic_auth.py

你应该看到一个成功的响应,状态码为 200:

Method 1: Using auth tuple
Status Code: 200
Response content: {'authenticated': True, 'user': 'user'}

--------------------------------------------------

Method 2: Using HTTPBasicAuth class
Status Code: 200
Response content: {'authenticated': True, 'user': 'user'}

这两种方法都实现了相同的结果——它们向请求添加了一个 Authorization 头,其中包含 base64 编码的用户名和密码。

身份验证失败时会发生什么

让我们看看当我们提供不正确的凭据时会发生什么。创建一个名为 failed_auth.py 的新文件:

import requests

def test_failed_authentication():
    url = "https://httpbin.org/basic-auth/user/pass"

    ## Correct credentials
    response_correct = requests.get(url, auth=("user", "pass"))

    ## Incorrect password
    response_wrong_pass = requests.get(url, auth=("user", "wrong_password"))

    ## Incorrect username
    response_wrong_user = requests.get(url, auth=("wrong_user", "pass"))

    print("Correct credentials:")
    print(f"Status Code: {response_correct.status_code}")
    if response_correct.status_code == 200:
        print(f"Response content: {response_correct.json()}")

    print("\nIncorrect password:")
    print(f"Status Code: {response_wrong_pass.status_code}")

    print("\nIncorrect username:")
    print(f"Status Code: {response_wrong_user.status_code}")

if __name__ == "__main__":
    test_failed_authentication()

运行脚本:

python failed_auth.py

你应该看到类似于以下的输出:

Correct credentials:
Status Code: 200
Response content: {'authenticated': True, 'user': 'user'}

Incorrect password:
Status Code: 401

Incorrect username:
Status Code: 401

用户名和密码都不正确都会导致 401 未授权响应。

处理身份验证错误

现在,让我们为身份验证失败实现错误处理。创建一个名为 handle_auth_errors.py 的新文件:

import requests
from requests.exceptions import HTTPError

def make_authenticated_request(username, password):
    url = "https://httpbin.org/basic-auth/user/pass"

    try:
        response = requests.get(url, auth=(username, password))
        response.raise_for_status()  ## Raises an HTTPError for bad responses (4xx or 5xx)

        print(f"Authentication successful!")
        print(f"Response content: {response.json()}")
        return response

    except HTTPError as e:
        if response.status_code == 401:
            print(f"Authentication failed: Invalid credentials")
            ## You might want to retry with different credentials here
            return handle_authentication_failure()
        else:
            print(f"HTTP Error occurred: {e}")
            return None
    except Exception as e:
        print(f"An error occurred: {e}")
        return None

def handle_authentication_failure():
    ## In a real application, you might prompt the user for new credentials
    ## or use a token refresh mechanism
    print("Attempting to authenticate with default credentials...")
    return make_authenticated_request("user", "pass")

if __name__ == "__main__":
    ## First, try with incorrect credentials
    print("Trying with incorrect credentials:")
    make_authenticated_request("wrong_user", "wrong_pass")

    print("\n" + "-"*50 + "\n")

    ## Then, try with correct credentials
    print("Trying with correct credentials:")
    make_authenticated_request("user", "pass")

运行脚本:

python handle_auth_errors.py

输出应该显示如何处理错误:

Trying with incorrect credentials:
Authentication failed: Invalid credentials
Attempting to authenticate with default credentials...
Authentication successful!
Response content: {'authenticated': True, 'user': 'user'}

--------------------------------------------------

Trying with correct credentials:
Authentication successful!
Response content: {'authenticated': True, 'user': 'user'}

此脚本演示了一种简单的错误处理模式,其中我们:

  1. 尝试使用提供的凭据发出请求
  2. 使用 raise_for_status() 为 4xx/5xx 响应引发异常
  3. 专门处理 401 错误,并使用重试机制
  4. 适当地处理其他类型的错误

高级身份验证处理策略

在实际应用中,你通常需要更高级的策略来处理身份验证。让我们探讨一些常见的技术。

基于令牌的身份验证

许多现代 API 使用基于令牌的身份验证,而不是基本身份验证。OAuth 2.0 是一种常用协议,它使用令牌进行身份验证和授权。

让我们创建一个模拟基于令牌的身份验证的脚本。创建一个名为 token_auth.py 的文件:

import requests
import time
import json

## Simulated token storage - in a real app, this might be a database or secure storage
TOKEN_FILE = "token.json"

def get_stored_token():
    """Retrieve the stored token if it exists."""
    try:
        with open(TOKEN_FILE, "r") as f:
            token_data = json.load(f)
            ## Check if token is expired
            if token_data.get("expires_at", 0) > time.time():
                return token_data["access_token"]
    except (FileNotFoundError, json.JSONDecodeError):
        pass
    return None

def save_token(token, expires_in=3600):
    """Save the token with expiration time."""
    token_data = {
        "access_token": token,
        "expires_at": time.time() + expires_in
    }
    with open(TOKEN_FILE, "w") as f:
        json.dump(token_data, f)

def get_new_token():
    """Simulate obtaining a new token from an authentication service."""
    ## In a real application, this would make a request to the auth server
    print("Obtaining new access token...")
    ## Simulating a token generation
    new_token = f"simulated_token_{int(time.time())}"
    save_token(new_token)
    return new_token

def make_authenticated_request(url):
    """Make a request with token authentication, refreshing if needed."""
    ## Try to get the stored token
    token = get_stored_token()

    ## If no valid token exists, get a new one
    if not token:
        token = get_new_token()

    ## Make the authenticated request
    headers = {"Authorization": f"Bearer {token}"}
    response = requests.get(url, headers=headers)

    ## If unauthorized, the token might be invalid - get a new one and retry
    if response.status_code == 401:
        print("Token rejected. Getting a new token and retrying...")
        token = get_new_token()
        headers = {"Authorization": f"Bearer {token}"}
        response = requests.get(url, headers=headers)

    return response

## For testing, we'll use httpbin's bearer auth endpoint
if __name__ == "__main__":
    ## Using httpbin.org/bearer which checks for the Authorization header
    url = "https://httpbin.org/bearer"

    ## First, delete any existing token to simulate a fresh start
    try:
        import os
        os.remove(TOKEN_FILE)
    except FileNotFoundError:
        pass

    print("First request (should obtain a new token):")
    response = make_authenticated_request(url)
    print(f"Status code: {response.status_code}")
    print(f"Response: {response.json()}")

    print("\nSecond request (should use the stored token):")
    response = make_authenticated_request(url)
    print(f"Status code: {response.status_code}")
    print(f"Response: {response.json()}")

运行脚本:

python token_auth.py

输出应该类似于:

First request (should obtain a new token):
Obtaining new access token...
Status code: 200
Response: {'authenticated': True, 'token': 'simulated_token_1623456789'}

Second request (should use the stored token):
Status code: 200
Response: {'authenticated': True, 'token': 'simulated_token_1623456789'}

此脚本演示了基于令牌的身份验证,并在令牌被拒绝时自动刷新令牌。

实现重试逻辑

有时,身份验证失败可能是暂时的。让我们实现一个带有指数退避的重试机制。创建一个名为 retry_auth.py 的文件:

import requests
import time
import random

def make_request_with_retry(url, auth, max_retries=3, backoff_factor=0.5):
    """
    Makes a request with retry logic and exponential backoff

    Args:
        url: URL to request
        auth: Authentication tuple (username, password)
        max_retries: Maximum number of retry attempts
        backoff_factor: Factor by which to increase the delay between retries
    """
    retries = 0

    while retries <= max_retries:
        try:
            response = requests.get(url, auth=auth, timeout=10)

            ## If successful, return the response
            if response.status_code == 200:
                return response

            ## If unauthorized, we might want to handle differently
            if response.status_code == 401:
                print(f"Attempt {retries+1}/{max_retries+1}: Authentication failed")
                ## In a real app, we might refresh tokens here or prompt for new credentials
            else:
                print(f"Attempt {retries+1}/{max_retries+1}: Failed with status code {response.status_code}")

            ## If we've reached max retries, give up
            if retries == max_retries:
                print("Maximum retry attempts reached.")
                return response

            ## Calculate delay with exponential backoff and jitter
            delay = backoff_factor * (2 ** retries) + random.uniform(0, 0.1)
            print(f"Retrying in {delay:.2f} seconds...")
            time.sleep(delay)
            retries += 1

        except requests.exceptions.RequestException as e:
            print(f"Request exception: {e}")

            ## If we've reached max retries, give up
            if retries == max_retries:
                print("Maximum retry attempts reached.")
                raise

            ## Calculate delay with exponential backoff and jitter
            delay = backoff_factor * (2 ** retries) + random.uniform(0, 0.1)
            print(f"Retrying in {delay:.2f} seconds...")
            time.sleep(delay)
            retries += 1

if __name__ == "__main__":
    url = "https://httpbin.org/basic-auth/user/pass"

    print("Testing with incorrect credentials:")
    response = make_request_with_retry(url, auth=("wrong_user", "wrong_pass"))
    print(f"Final status code: {response.status_code}")

    print("\nTesting with correct credentials:")
    response = make_request_with_retry(url, auth=("user", "pass"))
    print(f"Final status code: {response.status_code}")
    print(f"Response content: {response.json()}")

运行脚本:

python retry_auth.py

输出应该类似于:

Testing with incorrect credentials:
Attempt 1/4: Authentication failed
Retrying in 0.54 seconds...
Attempt 2/4: Authentication failed
Retrying in 1.05 seconds...
Attempt 3/4: Authentication failed
Retrying in 2.08 seconds...
Attempt 4/4: Authentication failed
Maximum retry attempts reached.
Final status code: 401

Testing with correct credentials:
Final status code: 200
Response content: {'authenticated': True, 'user': 'user'}

此脚本演示了带有指数退避的重试机制,这是一种处理可能暂时失败的网络请求的常见模式。

创建可重用的身份验证会话

对于对同一服务的多个请求,创建会话以维护身份验证信息通常更有效。创建一个名为 auth_session.py 的文件:

import requests

def create_authenticated_session(base_url, username, password):
    """
    Creates and returns a requests session with basic authentication

    Args:
        base_url: Base URL for the API
        username: Username for authentication
        password: Password for authentication

    Returns:
        A configured requests.Session object
    """
    session = requests.Session()
    session.auth = (username, password)
    session.headers.update({
        'User-Agent': 'MyApp/1.0',
        'Accept': 'application/json'
    })

    ## Test the authentication
    test_url = f"{base_url}/basic-auth/{username}/{password}"
    try:
        response = session.get(test_url)
        response.raise_for_status()
        print(f"Authentication successful for user: {username}")
    except requests.exceptions.HTTPError as e:
        print(f"Authentication failed: {e}")
        ## In a real app, you might raise an exception here or handle it differently

    return session

def make_authenticated_requests(session, base_url):
    """
    Makes multiple requests using the authenticated session

    Args:
        session: The authenticated requests.Session
        base_url: Base URL for the API
    """
    ## Make a request to a different endpoint
    response1 = session.get(f"{base_url}/get")
    print(f"\nRequest 1 status code: {response1.status_code}")
    print(f"Request 1 response: {response1.json()}")

    ## Make another request
    response2 = session.get(f"{base_url}/headers")
    print(f"\nRequest 2 status code: {response2.status_code}")
    print(f"Request 2 response: {response2.json()}")

if __name__ == "__main__":
    base_url = "https://httpbin.org"

    ## Create an authenticated session
    session = create_authenticated_session(base_url, "user", "pass")

    ## Use the session for multiple requests
    make_authenticated_requests(session, base_url)

运行脚本:

python auth_session.py

输出应该类似于:

Authentication successful for user: user

Request 1 status code: 200
Request 1 response: {'args': {}, 'headers': {...}, 'origin': '...', 'url': 'https://httpbin.org/get'}

Request 2 status code: 200
Request 2 response: {'headers': {...}}

使用会话有几个优点:

  1. 它重用底层 TCP 连接,使后续请求更快
  2. 它会自动在每个请求中包含身份验证凭据
  3. 它在请求之间维护 cookie,这对于基于会话的身份验证很有用

处理真实的身份验证挑战

在实际应用中,你经常会遇到更复杂的身份验证场景。让我们探讨一些实际的例子和最佳实践。

实现自定义身份验证类

requests 库允许你通过子类化 requests.auth.AuthBase 类来创建自定义身份验证处理程序。这在使用具有自定义身份验证方案的 API 时非常有用。

创建一个名为 custom_auth.py 的文件:

import requests
from requests.auth import AuthBase
import time
import hashlib

class CustomAuth(AuthBase):
    """
    Example of a custom authentication mechanism that adds a timestamp and HMAC signature
    This is similar to how many APIs authenticate requests (like AWS, Stripe, etc.)
    """
    def __init__(self, api_key, api_secret):
        self.api_key = api_key
        self.api_secret = api_secret

    def __call__(self, request):
        ## Add timestamp to the request
        timestamp = str(int(time.time()))

        ## Add the API key and timestamp as headers
        request.headers['X-API-Key'] = self.api_key
        request.headers['X-Timestamp'] = timestamp

        ## Create a signature based on the request
        ## In a real implementation, this would include more request elements
        method = request.method
        path = request.path_url

        ## Create a simple signature (in real apps, use proper HMAC)
        signature_data = f"{method}{path}{timestamp}{self.api_secret}".encode('utf-8')
        signature = hashlib.sha256(signature_data).hexdigest()

        ## Add the signature to the headers
        request.headers['X-Signature'] = signature

        return request

def test_custom_auth():
    """Test our custom authentication with httpbin.org"""
    url = "https://httpbin.org/headers"

    ## Create our custom auth handler
    auth = CustomAuth(api_key="test_key", api_secret="test_secret")

    ## Make the request with our custom auth
    response = requests.get(url, auth=auth)

    print(f"Status Code: {response.status_code}")
    print(f"Response Headers: {response.json()}")

    ## Verify our custom headers were sent
    headers = response.json()['headers']
    for header in ['X-Api-Key', 'X-Timestamp', 'X-Signature']:
        if header.upper() in headers:
            print(f"{header} was sent: {headers[header.upper()]}")
        else:
            print(f"{header} was not found in the response")

if __name__ == "__main__":
    test_custom_auth()

运行脚本:

python custom_auth.py

输出应该在响应中包含自定义标头:

Status Code: 200
Response Headers: {'headers': {'Accept': '*/*', 'Accept-Encoding': 'gzip, deflate', 'Host': 'httpbin.org', 'User-Agent': 'python-requests/2.28.2', 'X-Api-Key': 'test_key', 'X-Signature': '...', 'X-Timestamp': '1623456789'}}
X-Api-Key was sent: test_key
X-Timestamp was sent: 1623456789
X-Signature was sent: ...

此示例展示了如何实现自定义身份验证方案,该方案包括时间戳和签名,类似于许多商业 API 使用的方式。

处理速率限制和 429 响应

许多 API 实施速率限制以防止滥用。当你超过速率限制时,服务器通常会使用 429 Too Many Requests 状态代码进行响应。让我们创建一个脚本来处理这种情况。

创建一个名为 rate_limit_handler.py 的文件:

import requests
import time

def make_request_with_rate_limit_handling(url, max_retries=5):
    """
    Makes a request with handling for rate limits (429 responses)

    Args:
        url: URL to request
        max_retries: Maximum number of retry attempts
    """
    retries = 0

    while retries <= max_retries:
        response = requests.get(url)

        ## If not rate limited, return the response
        if response.status_code != 429:
            return response

        ## We got rate limited, check if we have retry information
        retry_after = response.headers.get('Retry-After')

        ## If we have retry information, wait that long
        if retry_after:
            wait_time = int(retry_after)
            print(f"Rate limited. Waiting for {wait_time} seconds as specified by Retry-After header.")
            time.sleep(wait_time)
        else:
            ## If no retry information, use exponential backoff
            wait_time = 2 ** retries
            print(f"Rate limited. No Retry-After header. Using exponential backoff: {wait_time} seconds.")
            time.sleep(wait_time)

        retries += 1

        if retries > max_retries:
            print(f"Maximum retries ({max_retries}) exceeded.")
            return response

## For demonstration, we'll simulate rate limiting using httpbin's status endpoint
if __name__ == "__main__":
    ## First request should succeed
    print("Making request to endpoint that returns 200:")
    response = make_request_with_rate_limit_handling("https://httpbin.org/status/200")
    print(f"Final status code: {response.status_code}")

    ## Simulate being rate limited initially, then succeeding
    print("\nSimulating rate limiting scenario:")
    ## We'll make 3 requests in sequence to different status codes (429, 429, 200)
    ## to simulate being rate limited twice and then succeeding
    urls = [
        "https://httpbin.org/status/429",  ## First will be rate limited
        "https://httpbin.org/status/429",  ## Second will also be rate limited
        "https://httpbin.org/status/200"   ## Third will succeed
    ]

    for i, url in enumerate(urls):
        print(f"\nRequest {i+1}:")
        response = make_request_with_rate_limit_handling(url, max_retries=1)
        print(f"Final status code: {response.status_code}")

运行脚本:

python rate_limit_handler.py

输出将模拟处理速率限制:

Making request to endpoint that returns 200:
Final status code: 200

Simulating rate limiting scenario:

Request 1:
Rate limited. No Retry-After header. Using exponential backoff: 1 seconds.
Rate limited. No Retry-After header. Using exponential backoff: 2 seconds.
Maximum retries (1) exceeded.
Final status code: 429

Request 2:
Rate limited. No Retry-After header. Using exponential backoff: 1 seconds.
Rate limited. No Retry-After header. Using exponential backoff: 2 seconds.
Maximum retries (1) exceeded.
Final status code: 429

Request 3:
Final status code: 200

这演示了如何通过遵守 Retry-After 标头或实现指数退避来处理速率限制。

完整的错误处理解决方案

最后,让我们将所有内容整合到一个全面的错误处理解决方案中。创建一个名为 complete_auth_handler.py 的文件:

import requests
import time
import random
import json
from requests.exceptions import RequestException, HTTPError, ConnectionError, Timeout

class AuthenticationManager:
    """Manages authentication tokens and credentials"""

    def __init__(self, token_file="auth_token.json"):
        self.token_file = token_file
        self.token = self.load_token()

    def load_token(self):
        """Load token from file storage"""
        try:
            with open(self.token_file, "r") as f:
                token_data = json.load(f)
                ## Check if token is expired
                if token_data.get("expires_at", 0) > time.time():
                    return token_data.get("access_token")
        except (FileNotFoundError, json.JSONDecodeError):
            pass
        return None

    def save_token(self, token, expires_in=3600):
        """Save token to file storage"""
        token_data = {
            "access_token": token,
            "expires_at": time.time() + expires_in
        }
        with open(self.token_file, "w") as f:
            json.dump(token_data, f)

    def get_token(self):
        """Get a valid token, refreshing if necessary"""
        if not self.token:
            self.token = self.refresh_token()
        return self.token

    def refresh_token(self):
        """Refresh the authentication token"""
        ## In a real app, this would make a call to the auth server
        print("Getting a new authentication token...")
        new_token = f"refreshed_token_{int(time.time())}"
        self.save_token(new_token)
        return new_token

    def invalidate_token(self):
        """Invalidate the current token"""
        self.token = None
        try:
            with open(self.token_file, "w") as f:
                json.dump({}, f)
        except Exception:
            pass

class APIClient:
    """Client for making API requests with comprehensive error handling"""

    def __init__(self, base_url, auth_manager=None):
        self.base_url = base_url
        self.auth_manager = auth_manager or AuthenticationManager()
        self.session = requests.Session()

    def make_request(self, method, endpoint, **kwargs):
        """
        Make an API request with comprehensive error handling

        Args:
            method: HTTP method (get, post, etc.)
            endpoint: API endpoint to call
            **kwargs: Additional arguments to pass to requests

        Returns:
            Response object or None if all retries failed
        """
        url = f"{self.base_url}{endpoint}"
        max_retries = kwargs.pop("max_retries", 3)
        retry_backoff_factor = kwargs.pop("retry_backoff_factor", 0.5)

        ## Add authentication if we have an auth manager
        if self.auth_manager:
            token = self.auth_manager.get_token()
            headers = kwargs.get("headers", {})
            headers["Authorization"] = f"Bearer {token}"
            kwargs["headers"] = headers

        ## Make the request with retries
        for retry in range(max_retries + 1):
            try:
                request_func = getattr(self.session, method.lower())
                response = request_func(url, **kwargs)

                ## Handle different status codes
                if response.status_code == 200:
                    return response
                elif response.status_code == 401:
                    print("Unauthorized: Token may be invalid")
                    if self.auth_manager and retry < max_retries:
                        print("Refreshing authentication token...")
                        self.auth_manager.invalidate_token()
                        token = self.auth_manager.get_token()
                        kwargs.get("headers", {})["Authorization"] = f"Bearer {token}"
                        continue
                elif response.status_code == 429:
                    if retry < max_retries:
                        retry_after = response.headers.get("Retry-After")
                        if retry_after:
                            wait_time = int(retry_after)
                        else:
                            wait_time = retry_backoff_factor * (2 ** retry) + random.uniform(0, 0.1)

                        print(f"Rate limited. Waiting {wait_time:.2f} seconds before retry {retry + 1}/{max_retries}")
                        time.sleep(wait_time)
                        continue
                    else:
                        print("Rate limit retries exhausted")
                elif 500 <= response.status_code < 600:
                    if retry < max_retries:
                        wait_time = retry_backoff_factor * (2 ** retry) + random.uniform(0, 0.1)
                        print(f"Server error ({response.status_code}). Retrying in {wait_time:.2f} seconds...")
                        time.sleep(wait_time)
                        continue
                    else:
                        print(f"Server error retries exhausted")

                ## If we get here, we're returning the response as-is
                return response

            except ConnectionError:
                if retry < max_retries:
                    wait_time = retry_backoff_factor * (2 ** retry) + random.uniform(0, 0.1)
                    print(f"Connection error. Retrying in {wait_time:.2f} seconds...")
                    time.sleep(wait_time)
                else:
                    print("Connection error retries exhausted")
                    raise
            except Timeout:
                if retry < max_retries:
                    wait_time = retry_backoff_factor * (2 ** retry) + random.uniform(0, 0.1)
                    print(f"Request timed out. Retrying in {wait_time:.2f} seconds...")
                    time.sleep(wait_time)
                else:
                    print("Timeout retries exhausted")
                    raise
            except RequestException as e:
                print(f"Request failed: {e}")
                raise

        return None

    def get(self, endpoint, **kwargs):
        """Make a GET request"""
        return self.make_request("get", endpoint, **kwargs)

    def post(self, endpoint, **kwargs):
        """Make a POST request"""
        return self.make_request("post", endpoint, **kwargs)

    ## Additional methods can be added for other HTTP verbs

## Test the client with httpbin
if __name__ == "__main__":
    ## Clean any existing token files
    import os
    try:
        os.remove("auth_token.json")
    except FileNotFoundError:
        pass

    client = APIClient("https://httpbin.org")

    print("Making a GET request to /get:")
    response = client.get("/get")
    print(f"Status code: {response.status_code}")
    print(f"Response content: {response.json()}")

    print("\nSimulating an unauthorized response:")
    ## For demonstration purposes, we'll use httpbin's status endpoint to simulate a 401
    response = client.get("/status/401")
    print(f"Status code: {response.status_code}")

    print("\nSimulating a rate-limited response:")
    ## Simulate a 429 response
    response = client.get("/status/429")
    print(f"Status code: {response.status_code}")

运行脚本:

python complete_auth_handler.py

输出应该演示全面的错误处理:

Getting a new authentication token...
Making a GET request to /get:
Status code: 200
Response content: {'args': {}, 'headers': {...}, 'origin': '...', 'url': 'https://httpbin.org/get'}

Simulating an unauthorized response:
Unauthorized: Token may be invalid
Refreshing authentication token...
Getting a new authentication token...
Unauthorized: Token may be invalid
Refreshing authentication token...
Getting a new authentication token...
Unauthorized: Token may be invalid
Refreshing authentication token...
Getting a new authentication token...
Unauthorized: Token may be invalid
Status code: 401

Simulating a rate-limited response:
Rate limited. Waiting 0.54 seconds before retry 1/3
Rate limited. Waiting 1.04 seconds before retry 2/3
Rate limited. Waiting 2.07 seconds before retry 3/3
Rate limit retries exhausted
Status code: 429

这个全面的例子演示了在生产就绪的 Python 应用程序中处理身份验证和其他与 API 相关的错误的许多最佳实践。

总结

在这个实验中,你已经学会了如何在 Python 的请求中有效地处理未授权的响应。我们涵盖了几个重要的方面:

  1. 理解 HTTP 身份验证和状态码:你了解了 401 未授权响应的含义,以及如何在你的 Python 应用程序中识别它们。

  2. 基本身份验证:你使用 auth 参数和 HTTPBasicAuth 类实现了基本身份验证,并学习了如何处理身份验证失败。

  3. 高级身份验证策略:你探索了基于令牌的身份验证、带有指数退避的重试逻辑,以及创建可重用的已验证会话。

  4. 真实的身份验证挑战:你实现了一个自定义身份验证类,处理了速率限制,并为生产就绪的应用程序创建了一个全面的错误处理解决方案。

这些技术将帮助你构建更强大、更具弹性的 Python 应用程序,这些应用程序在与 Web API 交互时,可以优雅地处理身份验证失败和其他相关错误。

为了进一步学习,请考虑探索:

  • OAuth 2.0 身份验证流程
  • 用于无状态身份验证的 JWT (JSON Web Tokens)
  • API 密钥轮换策略
  • 安全凭据存储