Umgang mit unautorisierten Antworten in Python-Anfragen

PythonPythonBeginner
Jetzt üben

💡 Dieser Artikel wurde von AI-Assistenten übersetzt. Um die englische Version anzuzeigen, können Sie hier klicken

Einführung

Bei der Arbeit mit Web-APIs in Python werden Sie oft auf Situationen stoßen, in denen Ihre Anfragen aufgrund von Autorisierungsproblemen abgelehnt werden. Dieses Lab führt Sie durch das Verständnis und die effektive Handhabung von nicht autorisierten (401) Antworten bei der Verwendung der Python requests Bibliothek. Durch das Erlernen geeigneter Fehlerbehandlungstechniken können Sie widerstandsfähigere Anwendungen erstellen, die Authentifizierungsfehler elegant verwalten.

Verständnis von HTTP-Autorisierung und Statuscodes

Bevor wir uns mit der Behandlung von nicht autorisierten Antworten befassen, ist es wichtig zu verstehen, was sie sind und warum sie auftreten.

HTTP-Statuscodes und die 401 Unauthorized Response

HTTP-Statuscodes sind dreistellige Zahlen, die Server als Antwort auf Client-Anfragen senden. Diese Codes sind in fünf Kategorien unterteilt:

  • 1xx: Informative Antworten
  • 2xx: Erfolgreiche Antworten
  • 3xx: Weiterleitungsmeldungen
  • 4xx: Client-Fehlerantworten
  • 5xx: Server-Fehlerantworten

Der 401 Unauthorized Statuscode gehört zur 4xx-Kategorie und gibt an, dass die Anfrage keine gültigen Authentifizierungsnachweise für die Zielressource enthält. Dies unterscheidet sich von einer 403 Forbidden-Antwort, die bedeutet, dass der Server die Anfrage versteht, aber deren Autorisierung verweigert.

Einrichten unserer Umgebung

Beginnen wir damit, ein Verzeichnis für unser Projekt zu erstellen und die erforderlichen Pakete zu installieren.

  1. Öffnen Sie das Terminal und erstellen Sie ein neues Verzeichnis:
mkdir -p ~/project/python-auth-handling
cd ~/project/python-auth-handling
  1. Erstellen wir nun eine virtuelle Umgebung und installieren das requests-Paket:
python -m venv venv
source venv/bin/activate
pip install requests

Die Ausgabe sollte in etwa so aussehen:

Collecting requests
  Downloading requests-2.28.2-py3-none-any.whl (62 kB)
     ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 62.8/62.8 KB 1.8 MB/s eta 0:00:00
[...additional output...]
Successfully installed certifi-2023.5.7 charset-normalizer-3.1.0 idna-3.4 requests-2.28.2 urllib3-1.26.16

Erstellen einer einfachen Anfrage

Erstellen wir nun ein Python-Skript, um eine Anfrage an einen Dienst zu stellen, der eine Authentifizierung erfordert. Wir verwenden den HTTPBin-Dienst, der Endpunkte zum Testen von HTTP-Anfragen bereitstellt.

Erstellen Sie im WebIDE eine neue Datei namens basic_request.py:

import requests

def make_request():
    url = "https://httpbin.org/basic-auth/user/pass"
    response = requests.get(url)

    print(f"Status Code: {response.status_code}")

    if response.status_code == 200:
        print("Request was successful.")
        print(f"Response content: {response.text}")
    elif response.status_code == 401:
        print("Unauthorized: Authentication is required and has failed.")
    else:
        print(f"Received unexpected status code: {response.status_code}")

if __name__ == "__main__":
    make_request()

Speichern Sie die Datei und führen Sie sie im Terminal aus:

python basic_request.py

Sie sollten eine Ausgabe ähnlich der folgenden sehen:

Status Code: 401
Unauthorized: Authentication is required and has failed.

Dies liegt daran, dass wir versuchen, auf einen Endpunkt zuzugreifen, der eine einfache Authentifizierung erfordert, aber keine Anmeldeinformationen angegeben haben.

Untersuchen der Antwort

Ändern wir unser Skript, um weitere Details über die Antwort auszugeben. Erstellen Sie eine neue Datei namens examine_response.py:

import requests

def examine_response():
    url = "https://httpbin.org/basic-auth/user/pass"
    response = requests.get(url)

    print(f"Status Code: {response.status_code}")
    print(f"Headers: {response.headers}")

    ## The WWW-Authenticate header provides details about how to authenticate
    if 'WWW-Authenticate' in response.headers:
        print(f"WWW-Authenticate: {response.headers['WWW-Authenticate']}")

    ## Try to get JSON content, but it might not be JSON or might be empty
    try:
        print(f"Content: {response.json()}")
    except requests.exceptions.JSONDecodeError:
        print(f"Content (text): {response.text}")

if __name__ == "__main__":
    examine_response()

Führen Sie dieses Skript aus:

python examine_response.py

Die Ausgabe enthält die Antwort-Header und den WWW-Authenticate-Header, der dem Client mitteilt, wie er sich authentifizieren soll:

Status Code: 401
Headers: {'date': '...', 'content-type': '...', 'content-length': '0', 'connection': 'close', 'server': 'gunicorn/19.9.0', 'www-authenticate': 'Basic realm="Fake Realm"', 'access-control-allow-origin': '*', 'access-control-allow-credentials': 'true'}
WWW-Authenticate: Basic realm="Fake Realm"
Content (text):

Der WWW-Authenticate-Header mit dem Wert Basic realm="Fake Realm" gibt an, dass der Server eine einfache Authentifizierung erwartet.

Verwendung der einfachen Authentifizierung zur Vermeidung von 401-Fehlern

Nachdem wir nun verstanden haben, was eine 401 Unauthorized-Antwort verursacht, wollen wir lernen, wie wir diese verhindern können, indem wir die richtigen Authentifizierungsnachweise angeben.

Einfache Authentifizierung in Python-Anfragen

Die Python requests-Bibliothek macht es einfach, Ihren Anfragen eine einfache Authentifizierung hinzuzufügen. Sie können entweder:

  1. Den auth-Parameter mit einem Tupel aus Benutzername und Passwort übergeben
  2. Die HTTPBasicAuth-Klasse aus requests.auth verwenden

Ändern wir unser Skript, um eine einfache Authentifizierung einzubeziehen. Erstellen Sie eine neue Datei namens basic_auth.py:

import requests
from requests.auth import HTTPBasicAuth

def make_authenticated_request():
    url = "https://httpbin.org/basic-auth/user/pass"

    ## Method 1: Using the auth parameter with a tuple
    response1 = requests.get(url, auth=("user", "pass"))

    print("Method 1: Using auth tuple")
    print(f"Status Code: {response1.status_code}")
    if response1.status_code == 200:
        print(f"Response content: {response1.json()}")
    else:
        print(f"Request failed with status code: {response1.status_code}")

    print("\n" + "-"*50 + "\n")

    ## Method 2: Using the HTTPBasicAuth class
    response2 = requests.get(url, auth=HTTPBasicAuth("user", "pass"))

    print("Method 2: Using HTTPBasicAuth class")
    print(f"Status Code: {response2.status_code}")
    if response2.status_code == 200:
        print(f"Response content: {response2.json()}")
    else:
        print(f"Request failed with status code: {response2.status_code}")

if __name__ == "__main__":
    make_authenticated_request()

Führen Sie das Skript aus:

python basic_auth.py

Sie sollten eine erfolgreiche Antwort mit dem Statuscode 200 sehen:

Method 1: Using auth tuple
Status Code: 200
Response content: {'authenticated': True, 'user': 'user'}

--------------------------------------------------

Method 2: Using HTTPBasicAuth class
Status Code: 200
Response content: {'authenticated': True, 'user': 'user'}

Beide Methoden erzielen das gleiche Ergebnis - sie fügen der Anfrage einen Authorization-Header mit dem base64-codierten Benutzernamen und Passwort hinzu.

Was passiert, wenn die Authentifizierung fehlschlägt

Sehen wir uns an, was passiert, wenn wir falsche Anmeldeinformationen angeben. Erstellen Sie eine neue Datei namens failed_auth.py:

import requests

def test_failed_authentication():
    url = "https://httpbin.org/basic-auth/user/pass"

    ## Correct credentials
    response_correct = requests.get(url, auth=("user", "pass"))

    ## Incorrect password
    response_wrong_pass = requests.get(url, auth=("user", "wrong_password"))

    ## Incorrect username
    response_wrong_user = requests.get(url, auth=("wrong_user", "pass"))

    print("Correct credentials:")
    print(f"Status Code: {response_correct.status_code}")
    if response_correct.status_code == 200:
        print(f"Response content: {response_correct.json()}")

    print("\nIncorrect password:")
    print(f"Status Code: {response_wrong_pass.status_code}")

    print("\nIncorrect username:")
    print(f"Status Code: {response_wrong_user.status_code}")

if __name__ == "__main__":
    test_failed_authentication()

Führen Sie das Skript aus:

python failed_auth.py

Sie sollten eine Ausgabe ähnlich der folgenden sehen:

Correct credentials:
Status Code: 200
Response content: {'authenticated': True, 'user': 'user'}

Incorrect password:
Status Code: 401

Incorrect username:
Status Code: 401

Sowohl ein falscher Benutzername als auch ein falsches Passwort führen zu einer 401 Unauthorized-Antwort.

Umgang mit Authentifizierungsfehlern

Implementieren wir nun die Fehlerbehandlung für Authentifizierungsfehler. Erstellen Sie eine neue Datei namens handle_auth_errors.py:

import requests
from requests.exceptions import HTTPError

def make_authenticated_request(username, password):
    url = "https://httpbin.org/basic-auth/user/pass"

    try:
        response = requests.get(url, auth=(username, password))
        response.raise_for_status()  ## Raises an HTTPError for bad responses (4xx or 5xx)

        print(f"Authentication successful!")
        print(f"Response content: {response.json()}")
        return response

    except HTTPError as e:
        if response.status_code == 401:
            print(f"Authentication failed: Invalid credentials")
            ## You might want to retry with different credentials here
            return handle_authentication_failure()
        else:
            print(f"HTTP Error occurred: {e}")
            return None
    except Exception as e:
        print(f"An error occurred: {e}")
        return None

def handle_authentication_failure():
    ## In a real application, you might prompt the user for new credentials
    ## or use a token refresh mechanism
    print("Attempting to authenticate with default credentials...")
    return make_authenticated_request("user", "pass")

if __name__ == "__main__":
    ## First, try with incorrect credentials
    print("Trying with incorrect credentials:")
    make_authenticated_request("wrong_user", "wrong_pass")

    print("\n" + "-"*50 + "\n")

    ## Then, try with correct credentials
    print("Trying with correct credentials:")
    make_authenticated_request("user", "pass")

Führen Sie das Skript aus:

python handle_auth_errors.py

Die Ausgabe sollte zeigen, wie der Fehler behandelt wird:

Trying with incorrect credentials:
Authentication failed: Invalid credentials
Attempting to authenticate with default credentials...
Authentication successful!
Response content: {'authenticated': True, 'user': 'user'}

--------------------------------------------------

Trying with correct credentials:
Authentication successful!
Response content: {'authenticated': True, 'user': 'user'}

Dieses Skript demonstriert ein einfaches Fehlerbehandlungsmuster, bei dem wir:

  1. Versuchen, die Anfrage mit den angegebenen Anmeldeinformationen zu stellen
  2. raise_for_status() verwenden, um eine Ausnahme für 4xx/5xx-Antworten auszulösen
  3. Den 401-Fehler speziell behandeln, mit einem Wiederholungsmechanismus
  4. Andere Arten von Fehlern angemessen behandeln

Erweiterte Strategien zur Authentifizierungsbehandlung

In realitätsnahen Anwendungen benötigen Sie oft erweiterte Strategien zur Handhabung der Authentifizierung. Lassen Sie uns einige gängige Techniken untersuchen.

Token-basierte Authentifizierung

Viele moderne APIs verwenden eine tokenbasierte Authentifizierung anstelle der einfachen Authentifizierung. OAuth 2.0 ist ein gängiges Protokoll, das Tokens für die Authentifizierung und Autorisierung verwendet.

Erstellen wir ein Skript, das eine tokenbasierte Authentifizierung simuliert. Erstellen Sie eine Datei namens token_auth.py:

import requests
import time
import json

## Simulated token storage - in a real app, this might be a database or secure storage
TOKEN_FILE = "token.json"

def get_stored_token():
    """Retrieve the stored token if it exists."""
    try:
        with open(TOKEN_FILE, "r") as f:
            token_data = json.load(f)
            ## Check if token is expired
            if token_data.get("expires_at", 0) > time.time():
                return token_data["access_token"]
    except (FileNotFoundError, json.JSONDecodeError):
        pass
    return None

def save_token(token, expires_in=3600):
    """Save the token with expiration time."""
    token_data = {
        "access_token": token,
        "expires_at": time.time() + expires_in
    }
    with open(TOKEN_FILE, "w") as f:
        json.dump(token_data, f)

def get_new_token():
    """Simulate obtaining a new token from an authentication service."""
    ## In a real application, this would make a request to the auth server
    print("Obtaining new access token...")
    ## Simulating a token generation
    new_token = f"simulated_token_{int(time.time())}"
    save_token(new_token)
    return new_token

def make_authenticated_request(url):
    """Make a request with token authentication, refreshing if needed."""
    ## Try to get the stored token
    token = get_stored_token()

    ## If no valid token exists, get a new one
    if not token:
        token = get_new_token()

    ## Make the authenticated request
    headers = {"Authorization": f"Bearer {token}"}
    response = requests.get(url, headers=headers)

    ## If unauthorized, the token might be invalid - get a new one and retry
    if response.status_code == 401:
        print("Token rejected. Getting a new token and retrying...")
        token = get_new_token()
        headers = {"Authorization": f"Bearer {token}"}
        response = requests.get(url, headers=headers)

    return response

## For testing, we'll use httpbin's bearer auth endpoint
if __name__ == "__main__":
    ## Using httpbin.org/bearer which checks for the Authorization header
    url = "https://httpbin.org/bearer"

    ## First, delete any existing token to simulate a fresh start
    try:
        import os
        os.remove(TOKEN_FILE)
    except FileNotFoundError:
        pass

    print("First request (should obtain a new token):")
    response = make_authenticated_request(url)
    print(f"Status code: {response.status_code}")
    print(f"Response: {response.json()}")

    print("\nSecond request (should use the stored token):")
    response = make_authenticated_request(url)
    print(f"Status code: {response.status_code}")
    print(f"Response: {response.json()}")

Führen Sie das Skript aus:

python token_auth.py

Die Ausgabe sollte in etwa so aussehen:

First request (should obtain a new token):
Obtaining new access token...
Status code: 200
Response: {'authenticated': True, 'token': 'simulated_token_1623456789'}

Second request (should use the stored token):
Status code: 200
Response: {'authenticated': True, 'token': 'simulated_token_1623456789'}

Dieses Skript demonstriert eine tokenbasierte Authentifizierung mit automatischer Token-Aktualisierung, wenn ein Token abgelehnt wird.

Implementierung der Wiederholungslogik

Manchmal können Authentifizierungsfehler vorübergehend sein. Implementieren wir einen Wiederholungsmechanismus mit exponentiellem Backoff. Erstellen Sie eine Datei namens retry_auth.py:

import requests
import time
import random

def make_request_with_retry(url, auth, max_retries=3, backoff_factor=0.5):
    """
    Makes a request with retry logic and exponential backoff

    Args:
        url: URL to request
        auth: Authentication tuple (username, password)
        max_retries: Maximum number of retry attempts
        backoff_factor: Factor by which to increase the delay between retries
    """
    retries = 0

    while retries <= max_retries:
        try:
            response = requests.get(url, auth=auth, timeout=10)

            ## If successful, return the response
            if response.status_code == 200:
                return response

            ## If unauthorized, we might want to handle differently
            if response.status_code == 401:
                print(f"Attempt {retries+1}/{max_retries+1}: Authentication failed")
                ## In a real app, we might refresh tokens here or prompt for new credentials
            else:
                print(f"Attempt {retries+1}/{max_retries+1}: Failed with status code {response.status_code}")

            ## If we've reached max retries, give up
            if retries == max_retries:
                print("Maximum retry attempts reached.")
                return response

            ## Calculate delay with exponential backoff and jitter
            delay = backoff_factor * (2 ** retries) + random.uniform(0, 0.1)
            print(f"Retrying in {delay:.2f} seconds...")
            time.sleep(delay)
            retries += 1

        except requests.exceptions.RequestException as e:
            print(f"Request exception: {e}")

            ## If we've reached max retries, give up
            if retries == max_retries:
                print("Maximum retry attempts reached.")
                raise

            ## Calculate delay with exponential backoff and jitter
            delay = backoff_factor * (2 ** retries) + random.uniform(0, 0.1)
            print(f"Retrying in {delay:.2f} seconds...")
            time.sleep(delay)
            retries += 1

if __name__ == "__main__":
    url = "https://httpbin.org/basic-auth/user/pass"

    print("Testing with incorrect credentials:")
    response = make_request_with_retry(url, auth=("wrong_user", "wrong_pass"))
    print(f"Final status code: {response.status_code}")

    print("\nTesting with correct credentials:")
    response = make_request_with_retry(url, auth=("user", "pass"))
    print(f"Final status code: {response.status_code}")
    print(f"Response content: {response.json()}")

Führen Sie das Skript aus:

python retry_auth.py

Die Ausgabe sollte in etwa so aussehen:

Testing with incorrect credentials:
Attempt 1/4: Authentication failed
Retrying in 0.54 seconds...
Attempt 2/4: Authentication failed
Retrying in 1.05 seconds...
Attempt 3/4: Authentication failed
Retrying in 2.08 seconds...
Attempt 4/4: Authentication failed
Maximum retry attempts reached.
Final status code: 401

Testing with correct credentials:
Final status code: 200
Response content: {'authenticated': True, 'user': 'user'}

Dieses Skript demonstriert einen Wiederholungsmechanismus mit exponentiellem Backoff, der ein gängiges Muster bei der Verarbeitung von Netzwerkanfragen ist, die vorübergehend fehlschlagen können.

Erstellen einer wiederverwendbaren Authentifizierungssitzung

Für mehrere Anfragen an denselben Dienst ist es oft effizienter, eine Sitzung zu erstellen, die die Authentifizierungsinformationen beibehält. Erstellen Sie eine Datei namens auth_session.py:

import requests

def create_authenticated_session(base_url, username, password):
    """
    Creates and returns a requests session with basic authentication

    Args:
        base_url: Base URL for the API
        username: Username for authentication
        password: Password for authentication

    Returns:
        A configured requests.Session object
    """
    session = requests.Session()
    session.auth = (username, password)
    session.headers.update({
        'User-Agent': 'MyApp/1.0',
        'Accept': 'application/json'
    })

    ## Test the authentication
    test_url = f"{base_url}/basic-auth/{username}/{password}"
    try:
        response = session.get(test_url)
        response.raise_for_status()
        print(f"Authentication successful for user: {username}")
    except requests.exceptions.HTTPError as e:
        print(f"Authentication failed: {e}")
        ## In a real app, you might raise an exception here or handle it differently

    return session

def make_authenticated_requests(session, base_url):
    """
    Makes multiple requests using the authenticated session

    Args:
        session: The authenticated requests.Session
        base_url: Base URL for the API
    """
    ## Make a request to a different endpoint
    response1 = session.get(f"{base_url}/get")
    print(f"\nRequest 1 status code: {response1.status_code}")
    print(f"Request 1 response: {response1.json()}")

    ## Make another request
    response2 = session.get(f"{base_url}/headers")
    print(f"\nRequest 2 status code: {response2.status_code}")
    print(f"Request 2 response: {response2.json()}")

if __name__ == "__main__":
    base_url = "https://httpbin.org"

    ## Create an authenticated session
    session = create_authenticated_session(base_url, "user", "pass")

    ## Use the session for multiple requests
    make_authenticated_requests(session, base_url)

Führen Sie das Skript aus:

python auth_session.py

Die Ausgabe sollte in etwa so aussehen:

Authentication successful for user: user

Request 1 status code: 200
Request 1 response: {'args': {}, 'headers': {...}, 'origin': '...', 'url': 'https://httpbin.org/get'}

Request 2 status code: 200
Request 2 response: {'headers': {...}}

Die Verwendung einer Sitzung hat mehrere Vorteile:

  1. Sie verwendet die zugrunde liegende TCP-Verbindung wieder, wodurch nachfolgende Anfragen schneller werden
  2. Sie enthält automatisch Authentifizierungsnachweise bei jeder Anfrage
  3. Sie verwaltet Cookies über Anfragen hinweg, was für die sitzungsbasierte Authentifizierung nützlich ist

Umgang mit realen Authentifizierungsherausforderungen

In realitätsnahen Anwendungen stoßen Sie oft auf komplexere Authentifizierungsszenarien. Lassen Sie uns einige praktische Beispiele und Best Practices untersuchen.

Implementierung einer benutzerdefinierten Authentifizierungs-Klasse

Die requests-Bibliothek ermöglicht es Ihnen, benutzerdefinierte Authentifizierungs-Handler zu erstellen, indem Sie die Klasse requests.auth.AuthBase unterklassifizieren. Dies ist nützlich, wenn Sie mit APIs arbeiten, die über benutzerdefinierte Authentifizierungsschemata verfügen.

Erstellen Sie eine Datei namens custom_auth.py:

import requests
from requests.auth import AuthBase
import time
import hashlib

class CustomAuth(AuthBase):
    """
    Example of a custom authentication mechanism that adds a timestamp and HMAC signature
    This is similar to how many APIs authenticate requests (like AWS, Stripe, etc.)
    """
    def __init__(self, api_key, api_secret):
        self.api_key = api_key
        self.api_secret = api_secret

    def __call__(self, request):
        ## Add timestamp to the request
        timestamp = str(int(time.time()))

        ## Add the API key and timestamp as headers
        request.headers['X-API-Key'] = self.api_key
        request.headers['X-Timestamp'] = timestamp

        ## Create a signature based on the request
        ## In a real implementation, this would include more request elements
        method = request.method
        path = request.path_url

        ## Create a simple signature (in real apps, use proper HMAC)
        signature_data = f"{method}{path}{timestamp}{self.api_secret}".encode('utf-8')
        signature = hashlib.sha256(signature_data).hexdigest()

        ## Add the signature to the headers
        request.headers['X-Signature'] = signature

        return request

def test_custom_auth():
    """Test our custom authentication with httpbin.org"""
    url = "https://httpbin.org/headers"

    ## Create our custom auth handler
    auth = CustomAuth(api_key="test_key", api_secret="test_secret")

    ## Make the request with our custom auth
    response = requests.get(url, auth=auth)

    print(f"Status Code: {response.status_code}")
    print(f"Response Headers: {response.json()}")

    ## Verify our custom headers were sent
    headers = response.json()['headers']
    for header in ['X-Api-Key', 'X-Timestamp', 'X-Signature']:
        if header.upper() in headers:
            print(f"{header} was sent: {headers[header.upper()]}")
        else:
            print(f"{header} was not found in the response")

if __name__ == "__main__":
    test_custom_auth()

Führen Sie das Skript aus:

python custom_auth.py

Die Ausgabe sollte die benutzerdefinierten Header in der Antwort enthalten:

Status Code: 200
Response Headers: {'headers': {'Accept': '*/*', 'Accept-Encoding': 'gzip, deflate', 'Host': 'httpbin.org', 'User-Agent': 'python-requests/2.28.2', 'X-Api-Key': 'test_key', 'X-Signature': '...', 'X-Timestamp': '1623456789'}}
X-Api-Key was sent: test_key
X-Timestamp was sent: 1623456789
X-Signature was sent: ...

Dieses Beispiel zeigt, wie man ein benutzerdefiniertes Authentifizierungsschema implementiert, das einen Zeitstempel und eine Signatur enthält, ähnlich wie es viele kommerzielle APIs verwenden.

Umgang mit Ratenbegrenzung und 429-Antworten

Viele APIs implementieren eine Ratenbegrenzung, um Missbrauch zu verhindern. Wenn Sie die Ratenbegrenzung überschreiten, antwortet der Server in der Regel mit dem Statuscode 429 Too Many Requests. Erstellen wir ein Skript, um dieses Szenario zu behandeln.

Erstellen Sie eine Datei namens rate_limit_handler.py:

import requests
import time

def make_request_with_rate_limit_handling(url, max_retries=5):
    """
    Makes a request with handling for rate limits (429 responses)

    Args:
        url: URL to request
        max_retries: Maximum number of retry attempts
    """
    retries = 0

    while retries <= max_retries:
        response = requests.get(url)

        ## If not rate limited, return the response
        if response.status_code != 429:
            return response

        ## We got rate limited, check if we have retry information
        retry_after = response.headers.get('Retry-After')

        ## If we have retry information, wait that long
        if retry_after:
            wait_time = int(retry_after)
            print(f"Rate limited. Waiting for {wait_time} seconds as specified by Retry-After header.")
            time.sleep(wait_time)
        else:
            ## If no retry information, use exponential backoff
            wait_time = 2 ** retries
            print(f"Rate limited. No Retry-After header. Using exponential backoff: {wait_time} seconds.")
            time.sleep(wait_time)

        retries += 1

        if retries > max_retries:
            print(f"Maximum retries ({max_retries}) exceeded.")
            return response

## For demonstration, we'll simulate rate limiting using httpbin's status endpoint
if __name__ == "__main__":
    ## First request should succeed
    print("Making request to endpoint that returns 200:")
    response = make_request_with_rate_limit_handling("https://httpbin.org/status/200")
    print(f"Final status code: {response.status_code}")

    ## Simulate being rate limited initially, then succeeding
    print("\nSimulating rate limiting scenario:")
    ## We'll make 3 requests in sequence to different status codes (429, 429, 200)
    ## to simulate being rate limited twice and then succeeding
    urls = [
        "https://httpbin.org/status/429",  ## First will be rate limited
        "https://httpbin.org/status/429",  ## Second will also be rate limited
        "https://httpbin.org/status/200"   ## Third will succeed
    ]

    for i, url in enumerate(urls):
        print(f"\nRequest {i+1}:")
        response = make_request_with_rate_limit_handling(url, max_retries=1)
        print(f"Final status code: {response.status_code}")

Führen Sie das Skript aus:

python rate_limit_handler.py

Die Ausgabe simuliert die Behandlung der Ratenbegrenzung:

Making request to endpoint that returns 200:
Final status code: 200

Simulating rate limiting scenario:

Request 1:
Rate limited. No Retry-After header. Using exponential backoff: 1 seconds.
Rate limited. No Retry-After header. Using exponential backoff: 2 seconds.
Maximum retries (1) exceeded.
Final status code: 429

Request 2:
Rate limited. No Retry-After header. Using exponential backoff: 1 seconds.
Rate limited. No Retry-After header. Using exponential backoff: 2 seconds.
Maximum retries (1) exceeded.
Final status code: 429

Request 3:
Final status code: 200

Dies zeigt, wie man die Ratenbegrenzung behandelt, indem man den Retry-After-Header berücksichtigt oder einen exponentiellen Backoff implementiert.

Komplettlösung zur Fehlerbehandlung

Zum Schluss wollen wir alles in einer umfassenden Fehlerbehandlungslösung zusammenführen. Erstellen Sie eine Datei namens complete_auth_handler.py:

import requests
import time
import random
import json
from requests.exceptions import RequestException, HTTPError, ConnectionError, Timeout

class AuthenticationManager:
    """Manages authentication tokens and credentials"""

    def __init__(self, token_file="auth_token.json"):
        self.token_file = token_file
        self.token = self.load_token()

    def load_token(self):
        """Load token from file storage"""
        try:
            with open(self.token_file, "r") as f:
                token_data = json.load(f)
                ## Check if token is expired
                if token_data.get("expires_at", 0) > time.time():
                    return token_data.get("access_token")
        except (FileNotFoundError, json.JSONDecodeError):
            pass
        return None

    def save_token(self, token, expires_in=3600):
        """Save token to file storage"""
        token_data = {
            "access_token": token,
            "expires_at": time.time() + expires_in
        }
        with open(self.token_file, "w") as f:
            json.dump(token_data, f)

    def get_token(self):
        """Get a valid token, refreshing if necessary"""
        if not self.token:
            self.token = self.refresh_token()
        return self.token

    def refresh_token(self):
        """Refresh the authentication token"""
        ## In a real app, this would make a call to the auth server
        print("Getting a new authentication token...")
        new_token = f"refreshed_token_{int(time.time())}"
        self.save_token(new_token)
        return new_token

    def invalidate_token(self):
        """Invalidate the current token"""
        self.token = None
        try:
            with open(self.token_file, "w") as f:
                json.dump({}, f)
        except Exception:
            pass

class APIClient:
    """Client for making API requests with comprehensive error handling"""

    def __init__(self, base_url, auth_manager=None):
        self.base_url = base_url
        self.auth_manager = auth_manager or AuthenticationManager()
        self.session = requests.Session()

    def make_request(self, method, endpoint, **kwargs):
        """
        Make an API request with comprehensive error handling

        Args:
            method: HTTP method (get, post, etc.)
            endpoint: API endpoint to call
            **kwargs: Additional arguments to pass to requests

        Returns:
            Response object or None if all retries failed
        """
        url = f"{self.base_url}{endpoint}"
        max_retries = kwargs.pop("max_retries", 3)
        retry_backoff_factor = kwargs.pop("retry_backoff_factor", 0.5)

        ## Add authentication if we have an auth manager
        if self.auth_manager:
            token = self.auth_manager.get_token()
            headers = kwargs.get("headers", {})
            headers["Authorization"] = f"Bearer {token}"
            kwargs["headers"] = headers

        ## Make the request with retries
        for retry in range(max_retries + 1):
            try:
                request_func = getattr(self.session, method.lower())
                response = request_func(url, **kwargs)

                ## Handle different status codes
                if response.status_code == 200:
                    return response
                elif response.status_code == 401:
                    print("Unauthorized: Token may be invalid")
                    if self.auth_manager and retry < max_retries:
                        print("Refreshing authentication token...")
                        self.auth_manager.invalidate_token()
                        token = self.auth_manager.get_token()
                        kwargs.get("headers", {})["Authorization"] = f"Bearer {token}"
                        continue
                elif response.status_code == 429:
                    if retry < max_retries:
                        retry_after = response.headers.get("Retry-After")
                        if retry_after:
                            wait_time = int(retry_after)
                        else:
                            wait_time = retry_backoff_factor * (2 ** retry) + random.uniform(0, 0.1)

                        print(f"Rate limited. Waiting {wait_time:.2f} seconds before retry {retry + 1}/{max_retries}")
                        time.sleep(wait_time)
                        continue
                    else:
                        print("Rate limit retries exhausted")
                elif 500 <= response.status_code < 600:
                    if retry < max_retries:
                        wait_time = retry_backoff_factor * (2 ** retry) + random.uniform(0, 0.1)
                        print(f"Server error ({response.status_code}). Retrying in {wait_time:.2f} seconds...")
                        time.sleep(wait_time)
                        continue
                    else:
                        print(f"Server error retries exhausted")

                ## If we get here, we're returning the response as-is
                return response

            except ConnectionError:
                if retry < max_retries:
                    wait_time = retry_backoff_factor * (2 ** retry) + random.uniform(0, 0.1)
                    print(f"Connection error. Retrying in {wait_time:.2f} seconds...")
                    time.sleep(wait_time)
                else:
                    print("Connection error retries exhausted")
                    raise
            except Timeout:
                if retry < max_retries:
                    wait_time = retry_backoff_factor * (2 ** retry) + random.uniform(0, 0.1)
                    print(f"Request timed out. Retrying in {wait_time:.2f} seconds...")
                    time.sleep(wait_time)
                else:
                    print("Timeout retries exhausted")
                    raise
            except RequestException as e:
                print(f"Request failed: {e}")
                raise

        return None

    def get(self, endpoint, **kwargs):
        """Make a GET request"""
        return self.make_request("get", endpoint, **kwargs)

    def post(self, endpoint, **kwargs):
        """Make a POST request"""
        return self.make_request("post", endpoint, **kwargs)

    ## Additional methods can be added for other HTTP verbs

## Test the client with httpbin
if __name__ == "__main__":
    ## Clean any existing token files
    import os
    try:
        os.remove("auth_token.json")
    except FileNotFoundError:
        pass

    client = APIClient("https://httpbin.org")

    print("Making a GET request to /get:")
    response = client.get("/get")
    print(f"Status code: {response.status_code}")
    print(f"Response content: {response.json()}")

    print("\nSimulating an unauthorized response:")
    ## For demonstration purposes, we'll use httpbin's status endpoint to simulate a 401
    response = client.get("/status/401")
    print(f"Status code: {response.status_code}")

    print("\nSimulating a rate-limited response:")
    ## Simulate a 429 response
    response = client.get("/status/429")
    print(f"Status code: {response.status_code}")

Führen Sie das Skript aus:

python complete_auth_handler.py

Die Ausgabe sollte die umfassende Fehlerbehandlung demonstrieren:

Getting a new authentication token...
Making a GET request to /get:
Status code: 200
Response content: {'args': {}, 'headers': {...}, 'origin': '...', 'url': 'https://httpbin.org/get'}

Simulating an unauthorized response:
Unauthorized: Token may be invalid
Refreshing authentication token...
Getting a new authentication token...
Unauthorized: Token may be invalid
Refreshing authentication token...
Getting a new authentication token...
Unauthorized: Token may be invalid
Refreshing authentication token...
Getting a new authentication token...
Unauthorized: Token may be invalid
Status code: 401

Simulating a rate-limited response:
Rate limited. Waiting 0.54 seconds before retry 1/3
Rate limited. Waiting 1.04 seconds before retry 2/3
Rate limited. Waiting 2.07 seconds before retry 3/3
Rate limit retries exhausted
Status code: 429

Dieses umfassende Beispiel demonstriert viele Best Practices für die Behandlung von Authentifizierungs- und anderen API-bezogenen Fehlern in einer produktionsreifen Python-Anwendung.

Zusammenfassung

In diesem Lab haben Sie gelernt, wie Sie unautorisierte Antworten in Python-Anfragen effektiv behandeln können. Wir haben verschiedene wichtige Aspekte behandelt:

  1. Verständnis der HTTP-Authentifizierung und Statuscodes: Sie haben gelernt, was 401 Unauthorized-Antworten bedeuten und wie Sie diese in Ihren Python-Anwendungen identifizieren können.

  2. Basic Authentication: Sie haben die Basic Authentication sowohl mit dem auth-Parameter als auch mit der HTTPBasicAuth-Klasse implementiert und gelernt, wie man Authentifizierungsfehler behandelt.

  3. Erweiterte Authentifizierungsstrategien: Sie haben die tokenbasierte Authentifizierung, die Wiederholungslogik mit exponentiellem Backoff und das Erstellen wiederverwendbarer authentifizierter Sitzungen untersucht.

  4. Reale Authentifizierungsherausforderungen: Sie haben eine benutzerdefinierte Authentifizierungs-Klasse implementiert, die Ratenbegrenzung behandelt und eine umfassende Fehlerbehandlungslösung für produktionsbereite Anwendungen erstellt.

Diese Techniken helfen Ihnen dabei, robustere und widerstandsfähigere Python-Anwendungen zu erstellen, die Authentifizierungsfehler und andere verwandte Fehler bei der Interaktion mit Web-APIs problemlos behandeln können.

Für weiteres Lernen sollten Sie Folgendes in Betracht ziehen:

  • OAuth 2.0 Authentifizierungsabläufe
  • JWT (JSON Web Tokens) für zustandslose Authentifizierung
  • Strategien zur API-Schlüsselrotation
  • Sichere Anmeldeinformationsspeicherung