はじめに
Python で Web API を扱う際、認証の問題によりリクエストが拒否される状況に頻繁に遭遇します。この実験(Lab)では、Python の requests ライブラリを使用する際に、未承認(401)レスポンスを理解し、効果的に処理する方法を説明します。適切なエラー処理技術を学ぶことで、認証の失敗を適切に管理し、より回復力のあるアプリケーションを構築できるようになります。
Python で Web API を扱う際、認証の問題によりリクエストが拒否される状況に頻繁に遭遇します。この実験(Lab)では、Python の requests ライブラリを使用する際に、未承認(401)レスポンスを理解し、効果的に処理する方法を説明します。適切なエラー処理技術を学ぶことで、認証の失敗を適切に管理し、より回復力のあるアプリケーションを構築できるようになります。
未承認レスポンスの処理に入る前に、それらが何であり、なぜ発生するのかを理解することが重要です。
HTTP ステータスコードは、サーバーがクライアントのリクエストに応じて送信する 3 桁の数字です。これらのコードは、次の 5 つのカテゴリに分類されます。
401 Unauthorized ステータスコードは 4xx カテゴリに属し、リクエストがターゲットリソースに対する有効な認証資格情報を持っていないことを示します。これは、サーバーがリクエストを理解しているが、それを承認することを拒否する 403 Forbidden レスポンスとは異なります。
まず、プロジェクト用のディレクトリを作成し、必要なパッケージをインストールすることから始めましょう。
mkdir -p ~/project/python-auth-handling
cd ~/project/python-auth-handling
requests パッケージをインストールします。python -m venv venv
source venv/bin/activate
pip install requests
出力は次のようになります。
Collecting requests
Downloading requests-2.28.2-py3-none-any.whl (62 kB)
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 62.8/62.8 KB 1.8 MB/s eta 0:00:00
[...additional output...]
Successfully installed certifi-2023.5.7 charset-normalizer-3.1.0 idna-3.4 requests-2.28.2 urllib3-1.26.16
次に、認証を必要とするサービスにリクエストを行う Python スクリプトを作成しましょう。HTTP リクエストをテストするためのエンドポイントを提供する HTTPBin サービスを使用します。
WebIDE で basic_request.py という名前の新しいファイルを作成します。
import requests
def make_request():
url = "https://httpbin.org/basic-auth/user/pass"
response = requests.get(url)
print(f"Status Code: {response.status_code}")
if response.status_code == 200:
print("Request was successful.")
print(f"Response content: {response.text}")
elif response.status_code == 401:
print("Unauthorized: Authentication is required and has failed.")
else:
print(f"Received unexpected status code: {response.status_code}")
if __name__ == "__main__":
make_request()
ファイルを保存し、ターミナルで実行します。
python basic_request.py
次のような出力が表示されます。
Status Code: 401
Unauthorized: Authentication is required and has failed.
これは、基本認証を必要とするエンドポイントにアクセスしようとしているが、資格情報を提供していないためです。
レスポンスに関する詳細を出力するようにスクリプトを変更しましょう。examine_response.py という名前の新しいファイルを作成します。
import requests
def examine_response():
url = "https://httpbin.org/basic-auth/user/pass"
response = requests.get(url)
print(f"Status Code: {response.status_code}")
print(f"Headers: {response.headers}")
## The WWW-Authenticate header provides details about how to authenticate
if 'WWW-Authenticate' in response.headers:
print(f"WWW-Authenticate: {response.headers['WWW-Authenticate']}")
## Try to get JSON content, but it might not be JSON or might be empty
try:
print(f"Content: {response.json()}")
except requests.exceptions.JSONDecodeError:
print(f"Content (text): {response.text}")
if __name__ == "__main__":
examine_response()
このスクリプトを実行します。
python examine_response.py
出力には、レスポンスヘッダーと、クライアントに認証方法を伝える WWW-Authenticate ヘッダーが含まれます。
Status Code: 401
Headers: {'date': '...', 'content-type': '...', 'content-length': '0', 'connection': 'close', 'server': 'gunicorn/19.9.0', 'www-authenticate': 'Basic realm="Fake Realm"', 'access-control-allow-origin': '*', 'access-control-allow-credentials': 'true'}
WWW-Authenticate: Basic realm="Fake Realm"
Content (text):
値が Basic realm="Fake Realm" の WWW-Authenticate ヘッダーは、サーバーが基本認証を期待していることを示しています。
401 Unauthorized レスポンスの原因を理解したので、正しい認証資格情報を提供して、それを防ぐ方法を学びましょう。
Python requests ライブラリを使用すると、リクエストに基本認証を簡単に追加できます。次のいずれかの方法を使用できます。
auth パラメータを渡すrequests.auth から HTTPBasicAuth クラスを使用する基本認証を含めるようにスクリプトを変更しましょう。basic_auth.py という名前の新しいファイルを作成します。
import requests
from requests.auth import HTTPBasicAuth
def make_authenticated_request():
url = "https://httpbin.org/basic-auth/user/pass"
## Method 1: Using the auth parameter with a tuple
response1 = requests.get(url, auth=("user", "pass"))
print("Method 1: Using auth tuple")
print(f"Status Code: {response1.status_code}")
if response1.status_code == 200:
print(f"Response content: {response1.json()}")
else:
print(f"Request failed with status code: {response1.status_code}")
print("\n" + "-"*50 + "\n")
## Method 2: Using the HTTPBasicAuth class
response2 = requests.get(url, auth=HTTPBasicAuth("user", "pass"))
print("Method 2: Using HTTPBasicAuth class")
print(f"Status Code: {response2.status_code}")
if response2.status_code == 200:
print(f"Response content: {response2.json()}")
else:
print(f"Request failed with status code: {response2.status_code}")
if __name__ == "__main__":
make_authenticated_request()
スクリプトを実行します。
python basic_auth.py
ステータスコード 200 で成功したレスポンスが表示されます。
Method 1: Using auth tuple
Status Code: 200
Response content: {'authenticated': True, 'user': 'user'}
--------------------------------------------------
Method 2: Using HTTPBasicAuth class
Status Code: 200
Response content: {'authenticated': True, 'user': 'user'}
どちらの方法も同じ結果を達成します。つまり、base64 エンコードされたユーザー名とパスワードを含む Authorization ヘッダーをリクエストに追加します。
誤った資格情報を提供した場合に何が起こるかを見てみましょう。failed_auth.py という名前の新しいファイルを作成します。
import requests
def test_failed_authentication():
url = "https://httpbin.org/basic-auth/user/pass"
## Correct credentials
response_correct = requests.get(url, auth=("user", "pass"))
## Incorrect password
response_wrong_pass = requests.get(url, auth=("user", "wrong_password"))
## Incorrect username
response_wrong_user = requests.get(url, auth=("wrong_user", "pass"))
print("Correct credentials:")
print(f"Status Code: {response_correct.status_code}")
if response_correct.status_code == 200:
print(f"Response content: {response_correct.json()}")
print("\nIncorrect password:")
print(f"Status Code: {response_wrong_pass.status_code}")
print("\nIncorrect username:")
print(f"Status Code: {response_wrong_user.status_code}")
if __name__ == "__main__":
test_failed_authentication()
スクリプトを実行します。
python failed_auth.py
次のような出力が表示されます。
Correct credentials:
Status Code: 200
Response content: {'authenticated': True, 'user': 'user'}
Incorrect password:
Status Code: 401
Incorrect username:
Status Code: 401
誤ったユーザー名と誤ったパスワードの両方で、401 Unauthorized レスポンスが発生します。
次に、認証の失敗に対するエラー処理を実装しましょう。handle_auth_errors.py という名前の新しいファイルを作成します。
import requests
from requests.exceptions import HTTPError
def make_authenticated_request(username, password):
url = "https://httpbin.org/basic-auth/user/pass"
try:
response = requests.get(url, auth=(username, password))
response.raise_for_status() ## Raises an HTTPError for bad responses (4xx or 5xx)
print(f"Authentication successful!")
print(f"Response content: {response.json()}")
return response
except HTTPError as e:
if response.status_code == 401:
print(f"Authentication failed: Invalid credentials")
## You might want to retry with different credentials here
return handle_authentication_failure()
else:
print(f"HTTP Error occurred: {e}")
return None
except Exception as e:
print(f"An error occurred: {e}")
return None
def handle_authentication_failure():
## In a real application, you might prompt the user for new credentials
## or use a token refresh mechanism
print("Attempting to authenticate with default credentials...")
return make_authenticated_request("user", "pass")
if __name__ == "__main__":
## First, try with incorrect credentials
print("Trying with incorrect credentials:")
make_authenticated_request("wrong_user", "wrong_pass")
print("\n" + "-"*50 + "\n")
## Then, try with correct credentials
print("Trying with correct credentials:")
make_authenticated_request("user", "pass")
スクリプトを実行します。
python handle_auth_errors.py
出力には、エラーがどのように処理されるかが表示されます。
Trying with incorrect credentials:
Authentication failed: Invalid credentials
Attempting to authenticate with default credentials...
Authentication successful!
Response content: {'authenticated': True, 'user': 'user'}
--------------------------------------------------
Trying with correct credentials:
Authentication successful!
Response content: {'authenticated': True, 'user': 'user'}
このスクリプトは、次のような単純なエラー処理パターンを示しています。
raise_for_status() を使用して、4xx/5xx レスポンスに対して例外を発生させます実際のアプリケーションでは、認証を処理するために、より高度な戦略が必要になることがよくあります。いくつかの一般的な手法を探ってみましょう。
多くの最新の API は、基本認証の代わりにトークンベース認証を使用しています。OAuth 2.0 は、認証と認可にトークンを使用する一般的なプロトコルです。
トークンベース認証をシミュレートするスクリプトを作成しましょう。token_auth.py という名前のファイルを作成します。
import requests
import time
import json
## Simulated token storage - in a real app, this might be a database or secure storage
TOKEN_FILE = "token.json"
def get_stored_token():
"""Retrieve the stored token if it exists."""
try:
with open(TOKEN_FILE, "r") as f:
token_data = json.load(f)
## Check if token is expired
if token_data.get("expires_at", 0) > time.time():
return token_data["access_token"]
except (FileNotFoundError, json.JSONDecodeError):
pass
return None
def save_token(token, expires_in=3600):
"""Save the token with expiration time."""
token_data = {
"access_token": token,
"expires_at": time.time() + expires_in
}
with open(TOKEN_FILE, "w") as f:
json.dump(token_data, f)
def get_new_token():
"""Simulate obtaining a new token from an authentication service."""
## In a real application, this would make a request to the auth server
print("Obtaining new access token...")
## Simulating a token generation
new_token = f"simulated_token_{int(time.time())}"
save_token(new_token)
return new_token
def make_authenticated_request(url):
"""Make a request with token authentication, refreshing if needed."""
## Try to get the stored token
token = get_stored_token()
## If no valid token exists, get a new one
if not token:
token = get_new_token()
## Make the authenticated request
headers = {"Authorization": f"Bearer {token}"}
response = requests.get(url, headers=headers)
## If unauthorized, the token might be invalid - get a new one and retry
if response.status_code == 401:
print("Token rejected. Getting a new token and retrying...")
token = get_new_token()
headers = {"Authorization": f"Bearer {token}"}
response = requests.get(url, headers=headers)
return response
## For testing, we'll use httpbin's bearer auth endpoint
if __name__ == "__main__":
## Using httpbin.org/bearer which checks for the Authorization header
url = "https://httpbin.org/bearer"
## First, delete any existing token to simulate a fresh start
try:
import os
os.remove(TOKEN_FILE)
except FileNotFoundError:
pass
print("First request (should obtain a new token):")
response = make_authenticated_request(url)
print(f"Status code: {response.status_code}")
print(f"Response: {response.json()}")
print("\nSecond request (should use the stored token):")
response = make_authenticated_request(url)
print(f"Status code: {response.status_code}")
print(f"Response: {response.json()}")
スクリプトを実行します。
python token_auth.py
出力は次のようになります。
First request (should obtain a new token):
Obtaining new access token...
Status code: 200
Response: {'authenticated': True, 'token': 'simulated_token_1623456789'}
Second request (should use the stored token):
Status code: 200
Response: {'authenticated': True, 'token': 'simulated_token_1623456789'}
このスクリプトは、トークンが拒否された場合に自動的にトークンを更新するトークンベース認証を示しています。
場合によっては、認証の失敗が一時的なものになることがあります。指数バックオフを使用してリトライメカニズムを実装しましょう。retry_auth.py という名前のファイルを作成します。
import requests
import time
import random
def make_request_with_retry(url, auth, max_retries=3, backoff_factor=0.5):
"""
Makes a request with retry logic and exponential backoff
Args:
url: URL to request
auth: Authentication tuple (username, password)
max_retries: Maximum number of retry attempts
backoff_factor: Factor by which to increase the delay between retries
"""
retries = 0
while retries <= max_retries:
try:
response = requests.get(url, auth=auth, timeout=10)
## If successful, return the response
if response.status_code == 200:
return response
## If unauthorized, we might want to handle differently
if response.status_code == 401:
print(f"Attempt {retries+1}/{max_retries+1}: Authentication failed")
## In a real app, we might refresh tokens here or prompt for new credentials
else:
print(f"Attempt {retries+1}/{max_retries+1}: Failed with status code {response.status_code}")
## If we've reached max retries, give up
if retries == max_retries:
print("Maximum retry attempts reached.")
return response
## Calculate delay with exponential backoff and jitter
delay = backoff_factor * (2 ** retries) + random.uniform(0, 0.1)
print(f"Retrying in {delay:.2f} seconds...")
time.sleep(delay)
retries += 1
except requests.exceptions.RequestException as e:
print(f"Request exception: {e}")
## If we've reached max retries, give up
if retries == max_retries:
print("Maximum retry attempts reached.")
raise
## Calculate delay with exponential backoff and jitter
delay = backoff_factor * (2 ** retries) + random.uniform(0, 0.1)
print(f"Retrying in {delay:.2f} seconds...")
time.sleep(delay)
retries += 1
if __name__ == "__main__":
url = "https://httpbin.org/basic-auth/user/pass"
print("Testing with incorrect credentials:")
response = make_request_with_retry(url, auth=("wrong_user", "wrong_pass"))
print(f"Final status code: {response.status_code}")
print("\nTesting with correct credentials:")
response = make_request_with_retry(url, auth=("user", "pass"))
print(f"Final status code: {response.status_code}")
print(f"Response content: {response.json()}")
スクリプトを実行します。
python retry_auth.py
出力は次のようになります。
Testing with incorrect credentials:
Attempt 1/4: Authentication failed
Retrying in 0.54 seconds...
Attempt 2/4: Authentication failed
Retrying in 1.05 seconds...
Attempt 3/4: Authentication failed
Retrying in 2.08 seconds...
Attempt 4/4: Authentication failed
Maximum retry attempts reached.
Final status code: 401
Testing with correct credentials:
Final status code: 200
Response content: {'authenticated': True, 'user': 'user'}
このスクリプトは、指数バックオフを使用したリトライメカニズムを示しています。これは、一時的に失敗する可能性のあるネットワークリクエストを処理する場合の一般的なパターンです。
同じサービスへの複数のリクエストの場合、認証情報を維持するセッションを作成する方が効率的であることがよくあります。auth_session.py という名前のファイルを作成します。
import requests
def create_authenticated_session(base_url, username, password):
"""
Creates and returns a requests session with basic authentication
Args:
base_url: Base URL for the API
username: Username for authentication
password: Password for authentication
Returns:
A configured requests.Session object
"""
session = requests.Session()
session.auth = (username, password)
session.headers.update({
'User-Agent': 'MyApp/1.0',
'Accept': 'application/json'
})
## Test the authentication
test_url = f"{base_url}/basic-auth/{username}/{password}"
try:
response = session.get(test_url)
response.raise_for_status()
print(f"Authentication successful for user: {username}")
except requests.exceptions.HTTPError as e:
print(f"Authentication failed: {e}")
## In a real app, you might raise an exception here or handle it differently
return session
def make_authenticated_requests(session, base_url):
"""
Makes multiple requests using the authenticated session
Args:
session: The authenticated requests.Session
base_url: Base URL for the API
"""
## Make a request to a different endpoint
response1 = session.get(f"{base_url}/get")
print(f"\nRequest 1 status code: {response1.status_code}")
print(f"Request 1 response: {response1.json()}")
## Make another request
response2 = session.get(f"{base_url}/headers")
print(f"\nRequest 2 status code: {response2.status_code}")
print(f"Request 2 response: {response2.json()}")
if __name__ == "__main__":
base_url = "https://httpbin.org"
## Create an authenticated session
session = create_authenticated_session(base_url, "user", "pass")
## Use the session for multiple requests
make_authenticated_requests(session, base_url)
スクリプトを実行します。
python auth_session.py
出力は次のようになります。
Authentication successful for user: user
Request 1 status code: 200
Request 1 response: {'args': {}, 'headers': {...}, 'origin': '...', 'url': 'https://httpbin.org/get'}
Request 2 status code: 200
Request 2 response: {'headers': {...}}
セッションを使用することには、いくつかの利点があります。
実際のアプリケーションでは、より複雑な認証シナリオに遭遇することがよくあります。いくつかの実用的な例とベストプラクティスを探ってみましょう。
requests ライブラリを使用すると、requests.auth.AuthBase クラスをサブクラス化することにより、カスタム認証ハンドラーを作成できます。これは、カスタム認証スキームを持つ API を使用する場合に役立ちます。
custom_auth.py という名前のファイルを作成します。
import requests
from requests.auth import AuthBase
import time
import hashlib
class CustomAuth(AuthBase):
"""
Example of a custom authentication mechanism that adds a timestamp and HMAC signature
This is similar to how many APIs authenticate requests (like AWS, Stripe, etc.)
"""
def __init__(self, api_key, api_secret):
self.api_key = api_key
self.api_secret = api_secret
def __call__(self, request):
## Add timestamp to the request
timestamp = str(int(time.time()))
## Add the API key and timestamp as headers
request.headers['X-API-Key'] = self.api_key
request.headers['X-Timestamp'] = timestamp
## Create a signature based on the request
## In a real implementation, this would include more request elements
method = request.method
path = request.path_url
## Create a simple signature (in real apps, use proper HMAC)
signature_data = f"{method}{path}{timestamp}{self.api_secret}".encode('utf-8')
signature = hashlib.sha256(signature_data).hexdigest()
## Add the signature to the headers
request.headers['X-Signature'] = signature
return request
def test_custom_auth():
"""Test our custom authentication with httpbin.org"""
url = "https://httpbin.org/headers"
## Create our custom auth handler
auth = CustomAuth(api_key="test_key", api_secret="test_secret")
## Make the request with our custom auth
response = requests.get(url, auth=auth)
print(f"Status Code: {response.status_code}")
print(f"Response Headers: {response.json()}")
## Verify our custom headers were sent
headers = response.json()['headers']
for header in ['X-Api-Key', 'X-Timestamp', 'X-Signature']:
if header.upper() in headers:
print(f"{header} was sent: {headers[header.upper()]}")
else:
print(f"{header} was not found in the response")
if __name__ == "__main__":
test_custom_auth()
スクリプトを実行します。
python custom_auth.py
出力には、レスポンス内のカスタムヘッダーが含まれます。
Status Code: 200
Response Headers: {'headers': {'Accept': '*/*', 'Accept-Encoding': 'gzip, deflate', 'Host': 'httpbin.org', 'User-Agent': 'python-requests/2.28.2', 'X-Api-Key': 'test_key', 'X-Signature': '...', 'X-Timestamp': '1623456789'}}
X-Api-Key was sent: test_key
X-Timestamp was sent: 1623456789
X-Signature was sent: ...
この例は、多くの商用 API が使用しているものと同様に、タイムスタンプと署名を含むカスタム認証スキームを実装する方法を示しています。
多くの API は、不正使用を防ぐためにレート制限を実装しています。レート制限を超えると、サーバーは通常、429 Too Many Requests ステータスコードで応答します。このシナリオを処理するスクリプトを作成しましょう。
rate_limit_handler.py という名前のファイルを作成します。
import requests
import time
def make_request_with_rate_limit_handling(url, max_retries=5):
"""
Makes a request with handling for rate limits (429 responses)
Args:
url: URL to request
max_retries: Maximum number of retry attempts
"""
retries = 0
while retries <= max_retries:
response = requests.get(url)
## If not rate limited, return the response
if response.status_code != 429:
return response
## We got rate limited, check if we have retry information
retry_after = response.headers.get('Retry-After')
## If we have retry information, wait that long
if retry_after:
wait_time = int(retry_after)
print(f"Rate limited. Waiting for {wait_time} seconds as specified by Retry-After header.")
time.sleep(wait_time)
else:
## If no retry information, use exponential backoff
wait_time = 2 ** retries
print(f"Rate limited. No Retry-After header. Using exponential backoff: {wait_time} seconds.")
time.sleep(wait_time)
retries += 1
if retries > max_retries:
print(f"Maximum retries ({max_retries}) exceeded.")
return response
## For demonstration, we'll simulate rate limiting using httpbin's status endpoint
if __name__ == "__main__":
## First request should succeed
print("Making request to endpoint that returns 200:")
response = make_request_with_rate_limit_handling("https://httpbin.org/status/200")
print(f"Final status code: {response.status_code}")
## Simulate being rate limited initially, then succeeding
print("\nSimulating rate limiting scenario:")
## We'll make 3 requests in sequence to different status codes (429, 429, 200)
## to simulate being rate limited twice and then succeeding
urls = [
"https://httpbin.org/status/429", ## First will be rate limited
"https://httpbin.org/status/429", ## Second will also be rate limited
"https://httpbin.org/status/200" ## Third will succeed
]
for i, url in enumerate(urls):
print(f"\nRequest {i+1}:")
response = make_request_with_rate_limit_handling(url, max_retries=1)
print(f"Final status code: {response.status_code}")
スクリプトを実行します。
python rate_limit_handler.py
出力は、レート制限の処理をシミュレートします。
Making request to endpoint that returns 200:
Final status code: 200
Simulating rate limiting scenario:
Request 1:
Rate limited. No Retry-After header. Using exponential backoff: 1 seconds.
Rate limited. No Retry-After header. Using exponential backoff: 2 seconds.
Maximum retries (1) exceeded.
Final status code: 429
Request 2:
Rate limited. No Retry-After header. Using exponential backoff: 1 seconds.
Rate limited. No Retry-After header. Using exponential backoff: 2 seconds.
Maximum retries (1) exceeded.
Final status code: 429
Request 3:
Final status code: 200
これは、Retry-After ヘッダーを尊重するか、指数バックオフを実装することにより、レート制限を処理する方法を示しています。
最後に、すべてを包括的なエラー処理ソリューションにまとめましょう。complete_auth_handler.py という名前のファイルを作成します。
import requests
import time
import random
import json
from requests.exceptions import RequestException, HTTPError, ConnectionError, Timeout
class AuthenticationManager:
"""Manages authentication tokens and credentials"""
def __init__(self, token_file="auth_token.json"):
self.token_file = token_file
self.token = self.load_token()
def load_token(self):
"""Load token from file storage"""
try:
with open(self.token_file, "r") as f:
token_data = json.load(f)
## Check if token is expired
if token_data.get("expires_at", 0) > time.time():
return token_data.get("access_token")
except (FileNotFoundError, json.JSONDecodeError):
pass
return None
def save_token(self, token, expires_in=3600):
"""Save token to file storage"""
token_data = {
"access_token": token,
"expires_at": time.time() + expires_in
}
with open(self.token_file, "w") as f:
json.dump(token_data, f)
def get_token(self):
"""Get a valid token, refreshing if necessary"""
if not self.token:
self.token = self.refresh_token()
return self.token
def refresh_token(self):
"""Refresh the authentication token"""
## In a real app, this would make a call to the auth server
print("Getting a new authentication token...")
new_token = f"refreshed_token_{int(time.time())}"
self.save_token(new_token)
return new_token
def invalidate_token(self):
"""Invalidate the current token"""
self.token = None
try:
with open(self.token_file, "w") as f:
json.dump({}, f)
except Exception:
pass
class APIClient:
"""Client for making API requests with comprehensive error handling"""
def __init__(self, base_url, auth_manager=None):
self.base_url = base_url
self.auth_manager = auth_manager or AuthenticationManager()
self.session = requests.Session()
def make_request(self, method, endpoint, **kwargs):
"""
Make an API request with comprehensive error handling
Args:
method: HTTP method (get, post, etc.)
endpoint: API endpoint to call
**kwargs: Additional arguments to pass to requests
Returns:
Response object or None if all retries failed
"""
url = f"{self.base_url}{endpoint}"
max_retries = kwargs.pop("max_retries", 3)
retry_backoff_factor = kwargs.pop("retry_backoff_factor", 0.5)
## Add authentication if we have an auth manager
if self.auth_manager:
token = self.auth_manager.get_token()
headers = kwargs.get("headers", {})
headers["Authorization"] = f"Bearer {token}"
kwargs["headers"] = headers
## Make the request with retries
for retry in range(max_retries + 1):
try:
request_func = getattr(self.session, method.lower())
response = request_func(url, **kwargs)
## Handle different status codes
if response.status_code == 200:
return response
elif response.status_code == 401:
print("Unauthorized: Token may be invalid")
if self.auth_manager and retry < max_retries:
print("Refreshing authentication token...")
self.auth_manager.invalidate_token()
token = self.auth_manager.get_token()
kwargs.get("headers", {})["Authorization"] = f"Bearer {token}"
continue
elif response.status_code == 429:
if retry < max_retries:
retry_after = response.headers.get("Retry-After")
if retry_after:
wait_time = int(retry_after)
else:
wait_time = retry_backoff_factor * (2 ** retry) + random.uniform(0, 0.1)
print(f"Rate limited. Waiting {wait_time:.2f} seconds before retry {retry + 1}/{max_retries}")
time.sleep(wait_time)
continue
else:
print("Rate limit retries exhausted")
elif 500 <= response.status_code < 600:
if retry < max_retries:
wait_time = retry_backoff_factor * (2 ** retry) + random.uniform(0, 0.1)
print(f"Server error ({response.status_code}). Retrying in {wait_time:.2f} seconds...")
time.sleep(wait_time)
continue
else:
print(f"Server error retries exhausted")
## If we get here, we're returning the response as-is
return response
except ConnectionError:
if retry < max_retries:
wait_time = retry_backoff_factor * (2 ** retry) + random.uniform(0, 0.1)
print(f"Connection error. Retrying in {wait_time:.2f} seconds...")
time.sleep(wait_time)
else:
print("Connection error retries exhausted")
raise
except Timeout:
if retry < max_retries:
wait_time = retry_backoff_factor * (2 ** retry) + random.uniform(0, 0.1)
print(f"Request timed out. Retrying in {wait_time:.2f} seconds...")
time.sleep(wait_time)
else:
print("Timeout retries exhausted")
raise
except RequestException as e:
print(f"Request failed: {e}")
raise
return None
def get(self, endpoint, **kwargs):
"""Make a GET request"""
return self.make_request("get", endpoint, **kwargs)
def post(self, endpoint, **kwargs):
"""Make a POST request"""
return self.make_request("post", endpoint, **kwargs)
## Additional methods can be added for other HTTP verbs
## Test the client with httpbin
if __name__ == "__main__":
## Clean any existing token files
import os
try:
os.remove("auth_token.json")
except FileNotFoundError:
pass
client = APIClient("https://httpbin.org")
print("Making a GET request to /get:")
response = client.get("/get")
print(f"Status code: {response.status_code}")
print(f"Response content: {response.json()}")
print("\nSimulating an unauthorized response:")
## For demonstration purposes, we'll use httpbin's status endpoint to simulate a 401
response = client.get("/status/401")
print(f"Status code: {response.status_code}")
print("\nSimulating a rate-limited response:")
## Simulate a 429 response
response = client.get("/status/429")
print(f"Status code: {response.status_code}")
スクリプトを実行します。
python complete_auth_handler.py
出力は、包括的なエラー処理を示しています。
Getting a new authentication token...
Making a GET request to /get:
Status code: 200
Response content: {'args': {}, 'headers': {...}, 'origin': '...', 'url': 'https://httpbin.org/get'}
Simulating an unauthorized response:
Unauthorized: Token may be invalid
Refreshing authentication token...
Getting a new authentication token...
Unauthorized: Token may be invalid
Refreshing authentication token...
Getting a new authentication token...
Unauthorized: Token may be invalid
Refreshing authentication token...
Getting a new authentication token...
Unauthorized: Token may be invalid
Status code: 401
Simulating a rate-limited response:
Rate limited. Waiting 0.54 seconds before retry 1/3
Rate limited. Waiting 1.04 seconds before retry 2/3
Rate limited. Waiting 2.07 seconds before retry 3/3
Rate limit retries exhausted
Status code: 429
この包括的な例は、本番環境対応の Python アプリケーションで認証やその他の API 関連のエラーを処理するための多くのベストプラクティスを示しています。
この実験では、Python のリクエストで未承認のレスポンスを効果的に処理する方法を学びました。いくつかの重要な側面をカバーしました。
HTTP 認証とステータスコードの理解: 401 Unauthorized レスポンスの意味と、Python アプリケーションでそれらを識別する方法を学びました。
基本認証: auth パラメータと HTTPBasicAuth クラスの両方を使用して基本認証を実装し、認証の失敗を処理する方法を学びました。
高度な認証戦略: トークンベース認証、指数バックオフによるリトライロジック、および再利用可能な認証済みセッションの作成について検討しました。
実際の認証における課題: カスタム認証クラスを実装し、レート制限を処理し、本番環境対応のアプリケーション向けの包括的なエラー処理ソリューションを作成しました。
これらのテクニックは、Web API と対話する際に、認証の失敗やその他の関連エラーを適切に処理できる、より堅牢で回復力のある Python アプリケーションを構築するのに役立ちます。
さらに学習するには、以下を検討してください。