Python requests での未承認レスポンスの処理方法

PythonBeginner
オンラインで実践に進む

はじめに

Python で Web API を扱う際、認証の問題によりリクエストが拒否される状況に頻繁に遭遇します。この実験(Lab)では、Python の requests ライブラリを使用する際に、未承認(401)レスポンスを理解し、効果的に処理する方法を説明します。適切なエラー処理技術を学ぶことで、認証の失敗を適切に管理し、より回復力のあるアプリケーションを構築できるようになります。

HTTP 認証とステータスコードの理解

未承認レスポンスの処理に入る前に、それらが何であり、なぜ発生するのかを理解することが重要です。

HTTP ステータスコードと 401 Unauthorized レスポンス

HTTP ステータスコードは、サーバーがクライアントのリクエストに応じて送信する 3 桁の数字です。これらのコードは、次の 5 つのカテゴリに分類されます。

  • 1xx: 情報レスポンス
  • 2xx: 成功レスポンス
  • 3xx: リダイレクトメッセージ
  • 4xx: クライアントエラーレスポンス
  • 5xx: サーバーエラーレスポンス

401 Unauthorized ステータスコードは 4xx カテゴリに属し、リクエストがターゲットリソースに対する有効な認証資格情報を持っていないことを示します。これは、サーバーがリクエストを理解しているが、それを承認することを拒否する 403 Forbidden レスポンスとは異なります。

環境のセットアップ

まず、プロジェクト用のディレクトリを作成し、必要なパッケージをインストールすることから始めましょう。

  1. ターミナルを開き、新しいディレクトリを作成します。
mkdir -p ~/project/python-auth-handling
cd ~/project/python-auth-handling
  1. 次に、仮想環境を作成し、requests パッケージをインストールします。
python -m venv venv
source venv/bin/activate
pip install requests

出力は次のようになります。

Collecting requests
  Downloading requests-2.28.2-py3-none-any.whl (62 kB)
     ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 62.8/62.8 KB 1.8 MB/s eta 0:00:00
[...additional output...]
Successfully installed certifi-2023.5.7 charset-normalizer-3.1.0 idna-3.4 requests-2.28.2 urllib3-1.26.16

簡単なリクエストの作成

次に、認証を必要とするサービスにリクエストを行う Python スクリプトを作成しましょう。HTTP リクエストをテストするためのエンドポイントを提供する HTTPBin サービスを使用します。

WebIDE で basic_request.py という名前の新しいファイルを作成します。

import requests

def make_request():
    url = "https://httpbin.org/basic-auth/user/pass"
    response = requests.get(url)

    print(f"Status Code: {response.status_code}")

    if response.status_code == 200:
        print("Request was successful.")
        print(f"Response content: {response.text}")
    elif response.status_code == 401:
        print("Unauthorized: Authentication is required and has failed.")
    else:
        print(f"Received unexpected status code: {response.status_code}")

if __name__ == "__main__":
    make_request()

ファイルを保存し、ターミナルで実行します。

python basic_request.py

次のような出力が表示されます。

Status Code: 401
Unauthorized: Authentication is required and has failed.

これは、基本認証を必要とするエンドポイントにアクセスしようとしているが、資格情報を提供していないためです。

レスポンスの検証

レスポンスに関する詳細を出力するようにスクリプトを変更しましょう。examine_response.py という名前の新しいファイルを作成します。

import requests

def examine_response():
    url = "https://httpbin.org/basic-auth/user/pass"
    response = requests.get(url)

    print(f"Status Code: {response.status_code}")
    print(f"Headers: {response.headers}")

    ## The WWW-Authenticate header provides details about how to authenticate
    if 'WWW-Authenticate' in response.headers:
        print(f"WWW-Authenticate: {response.headers['WWW-Authenticate']}")

    ## Try to get JSON content, but it might not be JSON or might be empty
    try:
        print(f"Content: {response.json()}")
    except requests.exceptions.JSONDecodeError:
        print(f"Content (text): {response.text}")

if __name__ == "__main__":
    examine_response()

このスクリプトを実行します。

python examine_response.py

出力には、レスポンスヘッダーと、クライアントに認証方法を伝える WWW-Authenticate ヘッダーが含まれます。

Status Code: 401
Headers: {'date': '...', 'content-type': '...', 'content-length': '0', 'connection': 'close', 'server': 'gunicorn/19.9.0', 'www-authenticate': 'Basic realm="Fake Realm"', 'access-control-allow-origin': '*', 'access-control-allow-credentials': 'true'}
WWW-Authenticate: Basic realm="Fake Realm"
Content (text):

値が Basic realm="Fake Realm"WWW-Authenticate ヘッダーは、サーバーが基本認証を期待していることを示しています。

401 エラーを防ぐための基本認証の使用

401 Unauthorized レスポンスの原因を理解したので、正しい認証資格情報を提供して、それを防ぐ方法を学びましょう。

Python Requests での基本認証

Python requests ライブラリを使用すると、リクエストに基本認証を簡単に追加できます。次のいずれかの方法を使用できます。

  1. ユーザー名とパスワードのタプルを含む auth パラメータを渡す
  2. requests.auth から HTTPBasicAuth クラスを使用する

基本認証を含めるようにスクリプトを変更しましょう。basic_auth.py という名前の新しいファイルを作成します。

import requests
from requests.auth import HTTPBasicAuth

def make_authenticated_request():
    url = "https://httpbin.org/basic-auth/user/pass"

    ## Method 1: Using the auth parameter with a tuple
    response1 = requests.get(url, auth=("user", "pass"))

    print("Method 1: Using auth tuple")
    print(f"Status Code: {response1.status_code}")
    if response1.status_code == 200:
        print(f"Response content: {response1.json()}")
    else:
        print(f"Request failed with status code: {response1.status_code}")

    print("\n" + "-"*50 + "\n")

    ## Method 2: Using the HTTPBasicAuth class
    response2 = requests.get(url, auth=HTTPBasicAuth("user", "pass"))

    print("Method 2: Using HTTPBasicAuth class")
    print(f"Status Code: {response2.status_code}")
    if response2.status_code == 200:
        print(f"Response content: {response2.json()}")
    else:
        print(f"Request failed with status code: {response2.status_code}")

if __name__ == "__main__":
    make_authenticated_request()

スクリプトを実行します。

python basic_auth.py

ステータスコード 200 で成功したレスポンスが表示されます。

Method 1: Using auth tuple
Status Code: 200
Response content: {'authenticated': True, 'user': 'user'}

--------------------------------------------------

Method 2: Using HTTPBasicAuth class
Status Code: 200
Response content: {'authenticated': True, 'user': 'user'}

どちらの方法も同じ結果を達成します。つまり、base64 エンコードされたユーザー名とパスワードを含む Authorization ヘッダーをリクエストに追加します。

認証が失敗した場合に何が起こるか

誤った資格情報を提供した場合に何が起こるかを見てみましょう。failed_auth.py という名前の新しいファイルを作成します。

import requests

def test_failed_authentication():
    url = "https://httpbin.org/basic-auth/user/pass"

    ## Correct credentials
    response_correct = requests.get(url, auth=("user", "pass"))

    ## Incorrect password
    response_wrong_pass = requests.get(url, auth=("user", "wrong_password"))

    ## Incorrect username
    response_wrong_user = requests.get(url, auth=("wrong_user", "pass"))

    print("Correct credentials:")
    print(f"Status Code: {response_correct.status_code}")
    if response_correct.status_code == 200:
        print(f"Response content: {response_correct.json()}")

    print("\nIncorrect password:")
    print(f"Status Code: {response_wrong_pass.status_code}")

    print("\nIncorrect username:")
    print(f"Status Code: {response_wrong_user.status_code}")

if __name__ == "__main__":
    test_failed_authentication()

スクリプトを実行します。

python failed_auth.py

次のような出力が表示されます。

Correct credentials:
Status Code: 200
Response content: {'authenticated': True, 'user': 'user'}

Incorrect password:
Status Code: 401

Incorrect username:
Status Code: 401

誤ったユーザー名と誤ったパスワードの両方で、401 Unauthorized レスポンスが発生します。

認証エラーの処理

次に、認証の失敗に対するエラー処理を実装しましょう。handle_auth_errors.py という名前の新しいファイルを作成します。

import requests
from requests.exceptions import HTTPError

def make_authenticated_request(username, password):
    url = "https://httpbin.org/basic-auth/user/pass"

    try:
        response = requests.get(url, auth=(username, password))
        response.raise_for_status()  ## Raises an HTTPError for bad responses (4xx or 5xx)

        print(f"Authentication successful!")
        print(f"Response content: {response.json()}")
        return response

    except HTTPError as e:
        if response.status_code == 401:
            print(f"Authentication failed: Invalid credentials")
            ## You might want to retry with different credentials here
            return handle_authentication_failure()
        else:
            print(f"HTTP Error occurred: {e}")
            return None
    except Exception as e:
        print(f"An error occurred: {e}")
        return None

def handle_authentication_failure():
    ## In a real application, you might prompt the user for new credentials
    ## or use a token refresh mechanism
    print("Attempting to authenticate with default credentials...")
    return make_authenticated_request("user", "pass")

if __name__ == "__main__":
    ## First, try with incorrect credentials
    print("Trying with incorrect credentials:")
    make_authenticated_request("wrong_user", "wrong_pass")

    print("\n" + "-"*50 + "\n")

    ## Then, try with correct credentials
    print("Trying with correct credentials:")
    make_authenticated_request("user", "pass")

スクリプトを実行します。

python handle_auth_errors.py

出力には、エラーがどのように処理されるかが表示されます。

Trying with incorrect credentials:
Authentication failed: Invalid credentials
Attempting to authenticate with default credentials...
Authentication successful!
Response content: {'authenticated': True, 'user': 'user'}

--------------------------------------------------

Trying with correct credentials:
Authentication successful!
Response content: {'authenticated': True, 'user': 'user'}

このスクリプトは、次のような単純なエラー処理パターンを示しています。

  1. 提供された資格情報でリクエストを試みます
  2. raise_for_status() を使用して、4xx/5xx レスポンスに対して例外を発生させます
  3. 401 エラーを具体的に処理し、再試行メカニズムを使用します
  4. 他のエラーの種類を適切に処理します

高度な認証処理戦略

実際のアプリケーションでは、認証を処理するために、より高度な戦略が必要になることがよくあります。いくつかの一般的な手法を探ってみましょう。

トークンベース認証

多くの最新の API は、基本認証の代わりにトークンベース認証を使用しています。OAuth 2.0 は、認証と認可にトークンを使用する一般的なプロトコルです。

トークンベース認証をシミュレートするスクリプトを作成しましょう。token_auth.py という名前のファイルを作成します。

import requests
import time
import json

## Simulated token storage - in a real app, this might be a database or secure storage
TOKEN_FILE = "token.json"

def get_stored_token():
    """Retrieve the stored token if it exists."""
    try:
        with open(TOKEN_FILE, "r") as f:
            token_data = json.load(f)
            ## Check if token is expired
            if token_data.get("expires_at", 0) > time.time():
                return token_data["access_token"]
    except (FileNotFoundError, json.JSONDecodeError):
        pass
    return None

def save_token(token, expires_in=3600):
    """Save the token with expiration time."""
    token_data = {
        "access_token": token,
        "expires_at": time.time() + expires_in
    }
    with open(TOKEN_FILE, "w") as f:
        json.dump(token_data, f)

def get_new_token():
    """Simulate obtaining a new token from an authentication service."""
    ## In a real application, this would make a request to the auth server
    print("Obtaining new access token...")
    ## Simulating a token generation
    new_token = f"simulated_token_{int(time.time())}"
    save_token(new_token)
    return new_token

def make_authenticated_request(url):
    """Make a request with token authentication, refreshing if needed."""
    ## Try to get the stored token
    token = get_stored_token()

    ## If no valid token exists, get a new one
    if not token:
        token = get_new_token()

    ## Make the authenticated request
    headers = {"Authorization": f"Bearer {token}"}
    response = requests.get(url, headers=headers)

    ## If unauthorized, the token might be invalid - get a new one and retry
    if response.status_code == 401:
        print("Token rejected. Getting a new token and retrying...")
        token = get_new_token()
        headers = {"Authorization": f"Bearer {token}"}
        response = requests.get(url, headers=headers)

    return response

## For testing, we'll use httpbin's bearer auth endpoint
if __name__ == "__main__":
    ## Using httpbin.org/bearer which checks for the Authorization header
    url = "https://httpbin.org/bearer"

    ## First, delete any existing token to simulate a fresh start
    try:
        import os
        os.remove(TOKEN_FILE)
    except FileNotFoundError:
        pass

    print("First request (should obtain a new token):")
    response = make_authenticated_request(url)
    print(f"Status code: {response.status_code}")
    print(f"Response: {response.json()}")

    print("\nSecond request (should use the stored token):")
    response = make_authenticated_request(url)
    print(f"Status code: {response.status_code}")
    print(f"Response: {response.json()}")

スクリプトを実行します。

python token_auth.py

出力は次のようになります。

First request (should obtain a new token):
Obtaining new access token...
Status code: 200
Response: {'authenticated': True, 'token': 'simulated_token_1623456789'}

Second request (should use the stored token):
Status code: 200
Response: {'authenticated': True, 'token': 'simulated_token_1623456789'}

このスクリプトは、トークンが拒否された場合に自動的にトークンを更新するトークンベース認証を示しています。

リトライロジックの実装

場合によっては、認証の失敗が一時的なものになることがあります。指数バックオフを使用してリトライメカニズムを実装しましょう。retry_auth.py という名前のファイルを作成します。

import requests
import time
import random

def make_request_with_retry(url, auth, max_retries=3, backoff_factor=0.5):
    """
    Makes a request with retry logic and exponential backoff

    Args:
        url: URL to request
        auth: Authentication tuple (username, password)
        max_retries: Maximum number of retry attempts
        backoff_factor: Factor by which to increase the delay between retries
    """
    retries = 0

    while retries <= max_retries:
        try:
            response = requests.get(url, auth=auth, timeout=10)

            ## If successful, return the response
            if response.status_code == 200:
                return response

            ## If unauthorized, we might want to handle differently
            if response.status_code == 401:
                print(f"Attempt {retries+1}/{max_retries+1}: Authentication failed")
                ## In a real app, we might refresh tokens here or prompt for new credentials
            else:
                print(f"Attempt {retries+1}/{max_retries+1}: Failed with status code {response.status_code}")

            ## If we've reached max retries, give up
            if retries == max_retries:
                print("Maximum retry attempts reached.")
                return response

            ## Calculate delay with exponential backoff and jitter
            delay = backoff_factor * (2 ** retries) + random.uniform(0, 0.1)
            print(f"Retrying in {delay:.2f} seconds...")
            time.sleep(delay)
            retries += 1

        except requests.exceptions.RequestException as e:
            print(f"Request exception: {e}")

            ## If we've reached max retries, give up
            if retries == max_retries:
                print("Maximum retry attempts reached.")
                raise

            ## Calculate delay with exponential backoff and jitter
            delay = backoff_factor * (2 ** retries) + random.uniform(0, 0.1)
            print(f"Retrying in {delay:.2f} seconds...")
            time.sleep(delay)
            retries += 1

if __name__ == "__main__":
    url = "https://httpbin.org/basic-auth/user/pass"

    print("Testing with incorrect credentials:")
    response = make_request_with_retry(url, auth=("wrong_user", "wrong_pass"))
    print(f"Final status code: {response.status_code}")

    print("\nTesting with correct credentials:")
    response = make_request_with_retry(url, auth=("user", "pass"))
    print(f"Final status code: {response.status_code}")
    print(f"Response content: {response.json()}")

スクリプトを実行します。

python retry_auth.py

出力は次のようになります。

Testing with incorrect credentials:
Attempt 1/4: Authentication failed
Retrying in 0.54 seconds...
Attempt 2/4: Authentication failed
Retrying in 1.05 seconds...
Attempt 3/4: Authentication failed
Retrying in 2.08 seconds...
Attempt 4/4: Authentication failed
Maximum retry attempts reached.
Final status code: 401

Testing with correct credentials:
Final status code: 200
Response content: {'authenticated': True, 'user': 'user'}

このスクリプトは、指数バックオフを使用したリトライメカニズムを示しています。これは、一時的に失敗する可能性のあるネットワークリクエストを処理する場合の一般的なパターンです。

再利用可能な認証セッションの作成

同じサービスへの複数のリクエストの場合、認証情報を維持するセッションを作成する方が効率的であることがよくあります。auth_session.py という名前のファイルを作成します。

import requests

def create_authenticated_session(base_url, username, password):
    """
    Creates and returns a requests session with basic authentication

    Args:
        base_url: Base URL for the API
        username: Username for authentication
        password: Password for authentication

    Returns:
        A configured requests.Session object
    """
    session = requests.Session()
    session.auth = (username, password)
    session.headers.update({
        'User-Agent': 'MyApp/1.0',
        'Accept': 'application/json'
    })

    ## Test the authentication
    test_url = f"{base_url}/basic-auth/{username}/{password}"
    try:
        response = session.get(test_url)
        response.raise_for_status()
        print(f"Authentication successful for user: {username}")
    except requests.exceptions.HTTPError as e:
        print(f"Authentication failed: {e}")
        ## In a real app, you might raise an exception here or handle it differently

    return session

def make_authenticated_requests(session, base_url):
    """
    Makes multiple requests using the authenticated session

    Args:
        session: The authenticated requests.Session
        base_url: Base URL for the API
    """
    ## Make a request to a different endpoint
    response1 = session.get(f"{base_url}/get")
    print(f"\nRequest 1 status code: {response1.status_code}")
    print(f"Request 1 response: {response1.json()}")

    ## Make another request
    response2 = session.get(f"{base_url}/headers")
    print(f"\nRequest 2 status code: {response2.status_code}")
    print(f"Request 2 response: {response2.json()}")

if __name__ == "__main__":
    base_url = "https://httpbin.org"

    ## Create an authenticated session
    session = create_authenticated_session(base_url, "user", "pass")

    ## Use the session for multiple requests
    make_authenticated_requests(session, base_url)

スクリプトを実行します。

python auth_session.py

出力は次のようになります。

Authentication successful for user: user

Request 1 status code: 200
Request 1 response: {'args': {}, 'headers': {...}, 'origin': '...', 'url': 'https://httpbin.org/get'}

Request 2 status code: 200
Request 2 response: {'headers': {...}}

セッションを使用することには、いくつかの利点があります。

  1. 基盤となる TCP 接続を再利用するため、後続のリクエストが高速化されます
  2. 各リクエストに自動的に認証資格情報が含まれます
  3. リクエスト間で Cookie を維持します。これは、セッションベースの認証に役立ちます

実際の認証における課題への対処

実際のアプリケーションでは、より複雑な認証シナリオに遭遇することがよくあります。いくつかの実用的な例とベストプラクティスを探ってみましょう。

カスタム認証クラスの実装

requests ライブラリを使用すると、requests.auth.AuthBase クラスをサブクラス化することにより、カスタム認証ハンドラーを作成できます。これは、カスタム認証スキームを持つ API を使用する場合に役立ちます。

custom_auth.py という名前のファイルを作成します。

import requests
from requests.auth import AuthBase
import time
import hashlib

class CustomAuth(AuthBase):
    """
    Example of a custom authentication mechanism that adds a timestamp and HMAC signature
    This is similar to how many APIs authenticate requests (like AWS, Stripe, etc.)
    """
    def __init__(self, api_key, api_secret):
        self.api_key = api_key
        self.api_secret = api_secret

    def __call__(self, request):
        ## Add timestamp to the request
        timestamp = str(int(time.time()))

        ## Add the API key and timestamp as headers
        request.headers['X-API-Key'] = self.api_key
        request.headers['X-Timestamp'] = timestamp

        ## Create a signature based on the request
        ## In a real implementation, this would include more request elements
        method = request.method
        path = request.path_url

        ## Create a simple signature (in real apps, use proper HMAC)
        signature_data = f"{method}{path}{timestamp}{self.api_secret}".encode('utf-8')
        signature = hashlib.sha256(signature_data).hexdigest()

        ## Add the signature to the headers
        request.headers['X-Signature'] = signature

        return request

def test_custom_auth():
    """Test our custom authentication with httpbin.org"""
    url = "https://httpbin.org/headers"

    ## Create our custom auth handler
    auth = CustomAuth(api_key="test_key", api_secret="test_secret")

    ## Make the request with our custom auth
    response = requests.get(url, auth=auth)

    print(f"Status Code: {response.status_code}")
    print(f"Response Headers: {response.json()}")

    ## Verify our custom headers were sent
    headers = response.json()['headers']
    for header in ['X-Api-Key', 'X-Timestamp', 'X-Signature']:
        if header.upper() in headers:
            print(f"{header} was sent: {headers[header.upper()]}")
        else:
            print(f"{header} was not found in the response")

if __name__ == "__main__":
    test_custom_auth()

スクリプトを実行します。

python custom_auth.py

出力には、レスポンス内のカスタムヘッダーが含まれます。

Status Code: 200
Response Headers: {'headers': {'Accept': '*/*', 'Accept-Encoding': 'gzip, deflate', 'Host': 'httpbin.org', 'User-Agent': 'python-requests/2.28.2', 'X-Api-Key': 'test_key', 'X-Signature': '...', 'X-Timestamp': '1623456789'}}
X-Api-Key was sent: test_key
X-Timestamp was sent: 1623456789
X-Signature was sent: ...

この例は、多くの商用 API が使用しているものと同様に、タイムスタンプと署名を含むカスタム認証スキームを実装する方法を示しています。

レート制限と 429 レスポンスの処理

多くの API は、不正使用を防ぐためにレート制限を実装しています。レート制限を超えると、サーバーは通常、429 Too Many Requests ステータスコードで応答します。このシナリオを処理するスクリプトを作成しましょう。

rate_limit_handler.py という名前のファイルを作成します。

import requests
import time

def make_request_with_rate_limit_handling(url, max_retries=5):
    """
    Makes a request with handling for rate limits (429 responses)

    Args:
        url: URL to request
        max_retries: Maximum number of retry attempts
    """
    retries = 0

    while retries <= max_retries:
        response = requests.get(url)

        ## If not rate limited, return the response
        if response.status_code != 429:
            return response

        ## We got rate limited, check if we have retry information
        retry_after = response.headers.get('Retry-After')

        ## If we have retry information, wait that long
        if retry_after:
            wait_time = int(retry_after)
            print(f"Rate limited. Waiting for {wait_time} seconds as specified by Retry-After header.")
            time.sleep(wait_time)
        else:
            ## If no retry information, use exponential backoff
            wait_time = 2 ** retries
            print(f"Rate limited. No Retry-After header. Using exponential backoff: {wait_time} seconds.")
            time.sleep(wait_time)

        retries += 1

        if retries > max_retries:
            print(f"Maximum retries ({max_retries}) exceeded.")
            return response

## For demonstration, we'll simulate rate limiting using httpbin's status endpoint
if __name__ == "__main__":
    ## First request should succeed
    print("Making request to endpoint that returns 200:")
    response = make_request_with_rate_limit_handling("https://httpbin.org/status/200")
    print(f"Final status code: {response.status_code}")

    ## Simulate being rate limited initially, then succeeding
    print("\nSimulating rate limiting scenario:")
    ## We'll make 3 requests in sequence to different status codes (429, 429, 200)
    ## to simulate being rate limited twice and then succeeding
    urls = [
        "https://httpbin.org/status/429",  ## First will be rate limited
        "https://httpbin.org/status/429",  ## Second will also be rate limited
        "https://httpbin.org/status/200"   ## Third will succeed
    ]

    for i, url in enumerate(urls):
        print(f"\nRequest {i+1}:")
        response = make_request_with_rate_limit_handling(url, max_retries=1)
        print(f"Final status code: {response.status_code}")

スクリプトを実行します。

python rate_limit_handler.py

出力は、レート制限の処理をシミュレートします。

Making request to endpoint that returns 200:
Final status code: 200

Simulating rate limiting scenario:

Request 1:
Rate limited. No Retry-After header. Using exponential backoff: 1 seconds.
Rate limited. No Retry-After header. Using exponential backoff: 2 seconds.
Maximum retries (1) exceeded.
Final status code: 429

Request 2:
Rate limited. No Retry-After header. Using exponential backoff: 1 seconds.
Rate limited. No Retry-After header. Using exponential backoff: 2 seconds.
Maximum retries (1) exceeded.
Final status code: 429

Request 3:
Final status code: 200

これは、Retry-After ヘッダーを尊重するか、指数バックオフを実装することにより、レート制限を処理する方法を示しています。

完全なエラー処理ソリューション

最後に、すべてを包括的なエラー処理ソリューションにまとめましょう。complete_auth_handler.py という名前のファイルを作成します。

import requests
import time
import random
import json
from requests.exceptions import RequestException, HTTPError, ConnectionError, Timeout

class AuthenticationManager:
    """Manages authentication tokens and credentials"""

    def __init__(self, token_file="auth_token.json"):
        self.token_file = token_file
        self.token = self.load_token()

    def load_token(self):
        """Load token from file storage"""
        try:
            with open(self.token_file, "r") as f:
                token_data = json.load(f)
                ## Check if token is expired
                if token_data.get("expires_at", 0) > time.time():
                    return token_data.get("access_token")
        except (FileNotFoundError, json.JSONDecodeError):
            pass
        return None

    def save_token(self, token, expires_in=3600):
        """Save token to file storage"""
        token_data = {
            "access_token": token,
            "expires_at": time.time() + expires_in
        }
        with open(self.token_file, "w") as f:
            json.dump(token_data, f)

    def get_token(self):
        """Get a valid token, refreshing if necessary"""
        if not self.token:
            self.token = self.refresh_token()
        return self.token

    def refresh_token(self):
        """Refresh the authentication token"""
        ## In a real app, this would make a call to the auth server
        print("Getting a new authentication token...")
        new_token = f"refreshed_token_{int(time.time())}"
        self.save_token(new_token)
        return new_token

    def invalidate_token(self):
        """Invalidate the current token"""
        self.token = None
        try:
            with open(self.token_file, "w") as f:
                json.dump({}, f)
        except Exception:
            pass

class APIClient:
    """Client for making API requests with comprehensive error handling"""

    def __init__(self, base_url, auth_manager=None):
        self.base_url = base_url
        self.auth_manager = auth_manager or AuthenticationManager()
        self.session = requests.Session()

    def make_request(self, method, endpoint, **kwargs):
        """
        Make an API request with comprehensive error handling

        Args:
            method: HTTP method (get, post, etc.)
            endpoint: API endpoint to call
            **kwargs: Additional arguments to pass to requests

        Returns:
            Response object or None if all retries failed
        """
        url = f"{self.base_url}{endpoint}"
        max_retries = kwargs.pop("max_retries", 3)
        retry_backoff_factor = kwargs.pop("retry_backoff_factor", 0.5)

        ## Add authentication if we have an auth manager
        if self.auth_manager:
            token = self.auth_manager.get_token()
            headers = kwargs.get("headers", {})
            headers["Authorization"] = f"Bearer {token}"
            kwargs["headers"] = headers

        ## Make the request with retries
        for retry in range(max_retries + 1):
            try:
                request_func = getattr(self.session, method.lower())
                response = request_func(url, **kwargs)

                ## Handle different status codes
                if response.status_code == 200:
                    return response
                elif response.status_code == 401:
                    print("Unauthorized: Token may be invalid")
                    if self.auth_manager and retry < max_retries:
                        print("Refreshing authentication token...")
                        self.auth_manager.invalidate_token()
                        token = self.auth_manager.get_token()
                        kwargs.get("headers", {})["Authorization"] = f"Bearer {token}"
                        continue
                elif response.status_code == 429:
                    if retry < max_retries:
                        retry_after = response.headers.get("Retry-After")
                        if retry_after:
                            wait_time = int(retry_after)
                        else:
                            wait_time = retry_backoff_factor * (2 ** retry) + random.uniform(0, 0.1)

                        print(f"Rate limited. Waiting {wait_time:.2f} seconds before retry {retry + 1}/{max_retries}")
                        time.sleep(wait_time)
                        continue
                    else:
                        print("Rate limit retries exhausted")
                elif 500 <= response.status_code < 600:
                    if retry < max_retries:
                        wait_time = retry_backoff_factor * (2 ** retry) + random.uniform(0, 0.1)
                        print(f"Server error ({response.status_code}). Retrying in {wait_time:.2f} seconds...")
                        time.sleep(wait_time)
                        continue
                    else:
                        print(f"Server error retries exhausted")

                ## If we get here, we're returning the response as-is
                return response

            except ConnectionError:
                if retry < max_retries:
                    wait_time = retry_backoff_factor * (2 ** retry) + random.uniform(0, 0.1)
                    print(f"Connection error. Retrying in {wait_time:.2f} seconds...")
                    time.sleep(wait_time)
                else:
                    print("Connection error retries exhausted")
                    raise
            except Timeout:
                if retry < max_retries:
                    wait_time = retry_backoff_factor * (2 ** retry) + random.uniform(0, 0.1)
                    print(f"Request timed out. Retrying in {wait_time:.2f} seconds...")
                    time.sleep(wait_time)
                else:
                    print("Timeout retries exhausted")
                    raise
            except RequestException as e:
                print(f"Request failed: {e}")
                raise

        return None

    def get(self, endpoint, **kwargs):
        """Make a GET request"""
        return self.make_request("get", endpoint, **kwargs)

    def post(self, endpoint, **kwargs):
        """Make a POST request"""
        return self.make_request("post", endpoint, **kwargs)

    ## Additional methods can be added for other HTTP verbs

## Test the client with httpbin
if __name__ == "__main__":
    ## Clean any existing token files
    import os
    try:
        os.remove("auth_token.json")
    except FileNotFoundError:
        pass

    client = APIClient("https://httpbin.org")

    print("Making a GET request to /get:")
    response = client.get("/get")
    print(f"Status code: {response.status_code}")
    print(f"Response content: {response.json()}")

    print("\nSimulating an unauthorized response:")
    ## For demonstration purposes, we'll use httpbin's status endpoint to simulate a 401
    response = client.get("/status/401")
    print(f"Status code: {response.status_code}")

    print("\nSimulating a rate-limited response:")
    ## Simulate a 429 response
    response = client.get("/status/429")
    print(f"Status code: {response.status_code}")

スクリプトを実行します。

python complete_auth_handler.py

出力は、包括的なエラー処理を示しています。

Getting a new authentication token...
Making a GET request to /get:
Status code: 200
Response content: {'args': {}, 'headers': {...}, 'origin': '...', 'url': 'https://httpbin.org/get'}

Simulating an unauthorized response:
Unauthorized: Token may be invalid
Refreshing authentication token...
Getting a new authentication token...
Unauthorized: Token may be invalid
Refreshing authentication token...
Getting a new authentication token...
Unauthorized: Token may be invalid
Refreshing authentication token...
Getting a new authentication token...
Unauthorized: Token may be invalid
Status code: 401

Simulating a rate-limited response:
Rate limited. Waiting 0.54 seconds before retry 1/3
Rate limited. Waiting 1.04 seconds before retry 2/3
Rate limited. Waiting 2.07 seconds before retry 3/3
Rate limit retries exhausted
Status code: 429

この包括的な例は、本番環境対応の Python アプリケーションで認証やその他の API 関連のエラーを処理するための多くのベストプラクティスを示しています。

まとめ

この実験では、Python のリクエストで未承認のレスポンスを効果的に処理する方法を学びました。いくつかの重要な側面をカバーしました。

  1. HTTP 認証とステータスコードの理解: 401 Unauthorized レスポンスの意味と、Python アプリケーションでそれらを識別する方法を学びました。

  2. 基本認証: auth パラメータと HTTPBasicAuth クラスの両方を使用して基本認証を実装し、認証の失敗を処理する方法を学びました。

  3. 高度な認証戦略: トークンベース認証、指数バックオフによるリトライロジック、および再利用可能な認証済みセッションの作成について検討しました。

  4. 実際の認証における課題: カスタム認証クラスを実装し、レート制限を処理し、本番環境対応のアプリケーション向けの包括的なエラー処理ソリューションを作成しました。

これらのテクニックは、Web API と対話する際に、認証の失敗やその他の関連エラーを適切に処理できる、より堅牢で回復力のある Python アプリケーションを構築するのに役立ちます。

さらに学習するには、以下を検討してください。

  • OAuth 2.0 認証フロー
  • ステートレス認証のための JWT (JSON Web Token)
  • API キーローテーション戦略
  • 安全な資格情報の保存