Python requests 에서 권한 없는 응답 처리 방법

PythonBeginner
지금 연습하기

소개

Python 에서 웹 API 작업을 할 때, 인증 문제로 인해 요청이 거부되는 상황에 자주 직면하게 됩니다. 이 랩에서는 Python requests 라이브러리를 사용하여 인증되지 않은 (401) 응답을 이해하고 효과적으로 처리하는 방법을 안내합니다. 적절한 오류 처리 기술을 배우면 인증 실패를 우아하게 관리하는 더욱 탄력적인 애플리케이션을 구축할 수 있습니다.

HTTP 인증 및 상태 코드 이해

인증되지 않은 응답을 처리하기 전에, 그것이 무엇이며 왜 발생하는지 이해하는 것이 중요합니다.

HTTP 상태 코드와 401 Unauthorized 응답

HTTP 상태 코드는 서버가 클라이언트 요청에 대한 응답으로 보내는 세 자리 숫자입니다. 이러한 코드는 다섯 가지 범주로 그룹화됩니다.

  • 1xx: 정보 응답
  • 2xx: 성공적인 응답
  • 3xx: 리디렉션 메시지
  • 4xx: 클라이언트 오류 응답
  • 5xx: 서버 오류 응답

401 Unauthorized 상태 코드는 4xx 범주에 속하며, 요청에 대상 리소스에 대한 유효한 인증 자격 증명이 없음을 나타냅니다. 이는 서버가 요청을 이해하지만 권한 부여를 거부하는 403 Forbidden 응답과는 다릅니다.

환경 설정

프로젝트 디렉토리를 만들고 필요한 패키지를 설치하는 것으로 시작해 보겠습니다.

  1. 터미널을 열고 새 디렉토리를 만듭니다.
mkdir -p ~/project/python-auth-handling
cd ~/project/python-auth-handling
  1. 이제 가상 환경을 만들고 requests 패키지를 설치해 보겠습니다.
python -m venv venv
source venv/bin/activate
pip install requests

출력은 다음과 유사해야 합니다.

Collecting requests
  Downloading requests-2.28.2-py3-none-any.whl (62 kB)
     ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 62.8/62.8 KB 1.8 MB/s eta 0:00:00
[...additional output...]
Successfully installed certifi-2023.5.7 charset-normalizer-3.1.0 idna-3.4 requests-2.28.2 urllib3-1.26.16

간단한 요청 만들기

이제 인증이 필요한 서비스에 요청을 보내는 Python 스크립트를 만들어 보겠습니다. HTTP 요청을 테스트하기 위한 엔드포인트를 제공하는 HTTPBin 서비스를 사용합니다.

WebIDE 에서 basic_request.py라는 새 파일을 만듭니다.

import requests

def make_request():
    url = "https://httpbin.org/basic-auth/user/pass"
    response = requests.get(url)

    print(f"Status Code: {response.status_code}")

    if response.status_code == 200:
        print("Request was successful.")
        print(f"Response content: {response.text}")
    elif response.status_code == 401:
        print("Unauthorized: Authentication is required and has failed.")
    else:
        print(f"Received unexpected status code: {response.status_code}")

if __name__ == "__main__":
    make_request()

파일을 저장하고 터미널에서 실행합니다.

python basic_request.py

다음과 유사한 출력을 볼 수 있습니다.

Status Code: 401
Unauthorized: Authentication is required and has failed.

이는 기본 인증이 필요한 엔드포인트에 접근하려 하지만 자격 증명을 제공하지 않았기 때문입니다.

응답 검토

응답에 대한 자세한 내용을 출력하도록 스크립트를 수정해 보겠습니다. examine_response.py라는 새 파일을 만듭니다.

import requests

def examine_response():
    url = "https://httpbin.org/basic-auth/user/pass"
    response = requests.get(url)

    print(f"Status Code: {response.status_code}")
    print(f"Headers: {response.headers}")

    ## The WWW-Authenticate header provides details about how to authenticate
    if 'WWW-Authenticate' in response.headers:
        print(f"WWW-Authenticate: {response.headers['WWW-Authenticate']}")

    ## Try to get JSON content, but it might not be JSON or might be empty
    try:
        print(f"Content: {response.json()}")
    except requests.exceptions.JSONDecodeError:
        print(f"Content (text): {response.text}")

if __name__ == "__main__":
    examine_response()

이 스크립트를 실행합니다.

python examine_response.py

출력에는 응답 헤더와 클라이언트가 인증하는 방법을 알려주는 WWW-Authenticate 헤더가 포함됩니다.

Status Code: 401
Headers: {'date': '...', 'content-type': '...', 'content-length': '0', 'connection': 'close', 'server': 'gunicorn/19.9.0', 'www-authenticate': 'Basic realm="Fake Realm"', 'access-control-allow-origin': '*', 'access-control-allow-credentials': 'true'}
WWW-Authenticate: Basic realm="Fake Realm"
Content (text):

WWW-Authenticate 헤더의 값 Basic realm="Fake Realm"은 서버가 기본 인증을 예상한다는 것을 나타냅니다.

401 오류 방지를 위한 기본 인증 사용

이제 401 Unauthorized 응답의 원인을 이해했으므로, 올바른 인증 자격 증명을 제공하여 이를 방지하는 방법을 알아보겠습니다.

Python Requests 의 기본 인증

Python requests 라이브러리는 요청에 기본 인증을 쉽게 추가할 수 있도록 해줍니다. 다음 중 하나를 수행할 수 있습니다.

  1. 사용자 이름과 비밀번호의 튜플과 함께 auth 매개변수를 전달합니다.
  2. requests.auth에서 HTTPBasicAuth 클래스를 사용합니다.

기본 인증을 포함하도록 스크립트를 수정해 보겠습니다. basic_auth.py라는 새 파일을 만듭니다.

import requests
from requests.auth import HTTPBasicAuth

def make_authenticated_request():
    url = "https://httpbin.org/basic-auth/user/pass"

    ## Method 1: Using the auth parameter with a tuple
    response1 = requests.get(url, auth=("user", "pass"))

    print("Method 1: Using auth tuple")
    print(f"Status Code: {response1.status_code}")
    if response1.status_code == 200:
        print(f"Response content: {response1.json()}")
    else:
        print(f"Request failed with status code: {response1.status_code}")

    print("\n" + "-"*50 + "\n")

    ## Method 2: Using the HTTPBasicAuth class
    response2 = requests.get(url, auth=HTTPBasicAuth("user", "pass"))

    print("Method 2: Using HTTPBasicAuth class")
    print(f"Status Code: {response2.status_code}")
    if response2.status_code == 200:
        print(f"Response content: {response2.json()}")
    else:
        print(f"Request failed with status code: {response2.status_code}")

if __name__ == "__main__":
    make_authenticated_request()

스크립트를 실행합니다.

python basic_auth.py

상태 코드 200 으로 성공적인 응답을 볼 수 있습니다.

Method 1: Using auth tuple
Status Code: 200
Response content: {'authenticated': True, 'user': 'user'}

--------------------------------------------------

Method 2: Using HTTPBasicAuth class
Status Code: 200
Response content: {'authenticated': True, 'user': 'user'}

두 가지 방법 모두 동일한 결과를 얻습니다. 즉, base64 로 인코딩된 사용자 이름과 비밀번호가 포함된 Authorization 헤더를 요청에 추가합니다.

인증 실패 시 발생하는 상황

잘못된 자격 증명을 제공하면 어떻게 되는지 살펴보겠습니다. failed_auth.py라는 새 파일을 만듭니다.

import requests

def test_failed_authentication():
    url = "https://httpbin.org/basic-auth/user/pass"

    ## Correct credentials
    response_correct = requests.get(url, auth=("user", "pass"))

    ## Incorrect password
    response_wrong_pass = requests.get(url, auth=("user", "wrong_password"))

    ## Incorrect username
    response_wrong_user = requests.get(url, auth=("wrong_user", "pass"))

    print("Correct credentials:")
    print(f"Status Code: {response_correct.status_code}")
    if response_correct.status_code == 200:
        print(f"Response content: {response_correct.json()}")

    print("\nIncorrect password:")
    print(f"Status Code: {response_wrong_pass.status_code}")

    print("\nIncorrect username:")
    print(f"Status Code: {response_wrong_user.status_code}")

if __name__ == "__main__":
    test_failed_authentication()

스크립트를 실행합니다.

python failed_auth.py

다음과 유사한 출력을 볼 수 있습니다.

Correct credentials:
Status Code: 200
Response content: {'authenticated': True, 'user': 'user'}

Incorrect password:
Status Code: 401

Incorrect username:
Status Code: 401

잘못된 사용자 이름과 잘못된 비밀번호 모두 401 Unauthorized 응답을 반환합니다.

인증 오류 처리

이제 인증 실패에 대한 오류 처리를 구현해 보겠습니다. handle_auth_errors.py라는 새 파일을 만듭니다.

import requests
from requests.exceptions import HTTPError

def make_authenticated_request(username, password):
    url = "https://httpbin.org/basic-auth/user/pass"

    try:
        response = requests.get(url, auth=(username, password))
        response.raise_for_status()  ## Raises an HTTPError for bad responses (4xx or 5xx)

        print(f"Authentication successful!")
        print(f"Response content: {response.json()}")
        return response

    except HTTPError as e:
        if response.status_code == 401:
            print(f"Authentication failed: Invalid credentials")
            ## You might want to retry with different credentials here
            return handle_authentication_failure()
        else:
            print(f"HTTP Error occurred: {e}")
            return None
    except Exception as e:
        print(f"An error occurred: {e}")
        return None

def handle_authentication_failure():
    ## In a real application, you might prompt the user for new credentials
    ## or use a token refresh mechanism
    print("Attempting to authenticate with default credentials...")
    return make_authenticated_request("user", "pass")

if __name__ == "__main__":
    ## First, try with incorrect credentials
    print("Trying with incorrect credentials:")
    make_authenticated_request("wrong_user", "wrong_pass")

    print("\n" + "-"*50 + "\n")

    ## Then, try with correct credentials
    print("Trying with correct credentials:")
    make_authenticated_request("user", "pass")

스크립트를 실행합니다.

python handle_auth_errors.py

출력은 오류가 처리되는 방식을 보여줍니다.

Trying with incorrect credentials:
Authentication failed: Invalid credentials
Attempting to authenticate with default credentials...
Authentication successful!
Response content: {'authenticated': True, 'user': 'user'}

--------------------------------------------------

Trying with correct credentials:
Authentication successful!
Response content: {'authenticated': True, 'user': 'user'}

이 스크립트는 다음과 같은 간단한 오류 처리 패턴을 보여줍니다.

  1. 제공된 자격 증명으로 요청을 시도합니다.
  2. raise_for_status()를 사용하여 4xx/5xx 응답에 대한 예외를 발생시킵니다.
  3. 401 오류를 특히 재시도 메커니즘으로 처리합니다.
  4. 다른 유형의 오류를 적절하게 처리합니다.

고급 인증 처리 전략

실제 애플리케이션에서는 인증을 처리하기 위해 더 고급 전략이 필요한 경우가 많습니다. 몇 가지 일반적인 기술을 살펴보겠습니다.

토큰 기반 인증

많은 최신 API 는 기본 인증 대신 토큰 기반 인증을 사용합니다. OAuth 2.0 은 인증 및 권한 부여에 토큰을 사용하는 일반적인 프로토콜입니다.

토큰 기반 인증을 시뮬레이션하는 스크립트를 만들어 보겠습니다. token_auth.py라는 파일을 만듭니다.

import requests
import time
import json

## Simulated token storage - in a real app, this might be a database or secure storage
TOKEN_FILE = "token.json"

def get_stored_token():
    """Retrieve the stored token if it exists."""
    try:
        with open(TOKEN_FILE, "r") as f:
            token_data = json.load(f)
            ## Check if token is expired
            if token_data.get("expires_at", 0) > time.time():
                return token_data["access_token"]
    except (FileNotFoundError, json.JSONDecodeError):
        pass
    return None

def save_token(token, expires_in=3600):
    """Save the token with expiration time."""
    token_data = {
        "access_token": token,
        "expires_at": time.time() + expires_in
    }
    with open(TOKEN_FILE, "w") as f:
        json.dump(token_data, f)

def get_new_token():
    """Simulate obtaining a new token from an authentication service."""
    ## In a real application, this would make a request to the auth server
    print("Obtaining new access token...")
    ## Simulating a token generation
    new_token = f"simulated_token_{int(time.time())}"
    save_token(new_token)
    return new_token

def make_authenticated_request(url):
    """Make a request with token authentication, refreshing if needed."""
    ## Try to get the stored token
    token = get_stored_token()

    ## If no valid token exists, get a new one
    if not token:
        token = get_new_token()

    ## Make the authenticated request
    headers = {"Authorization": f"Bearer {token}"}
    response = requests.get(url, headers=headers)

    ## If unauthorized, the token might be invalid - get a new one and retry
    if response.status_code == 401:
        print("Token rejected. Getting a new token and retrying...")
        token = get_new_token()
        headers = {"Authorization": f"Bearer {token}"}
        response = requests.get(url, headers=headers)

    return response

## For testing, we'll use httpbin's bearer auth endpoint
if __name__ == "__main__":
    ## Using httpbin.org/bearer which checks for the Authorization header
    url = "https://httpbin.org/bearer"

    ## First, delete any existing token to simulate a fresh start
    try:
        import os
        os.remove(TOKEN_FILE)
    except FileNotFoundError:
        pass

    print("First request (should obtain a new token):")
    response = make_authenticated_request(url)
    print(f"Status code: {response.status_code}")
    print(f"Response: {response.json()}")

    print("\nSecond request (should use the stored token):")
    response = make_authenticated_request(url)
    print(f"Status code: {response.status_code}")
    print(f"Response: {response.json()}")

스크립트를 실행합니다.

python token_auth.py

출력은 다음과 유사해야 합니다.

First request (should obtain a new token):
Obtaining new access token...
Status code: 200
Response: {'authenticated': True, 'token': 'simulated_token_1623456789'}

Second request (should use the stored token):
Status code: 200
Response: {'authenticated': True, 'token': 'simulated_token_1623456789'}

이 스크립트는 토큰이 거부될 때 자동 토큰 새로 고침을 사용하는 토큰 기반 인증을 보여줍니다.

재시도 로직 구현

때로는 인증 실패가 일시적일 수 있습니다. 지수 백오프 (exponential backoff) 를 사용하여 재시도 메커니즘을 구현해 보겠습니다. retry_auth.py라는 파일을 만듭니다.

import requests
import time
import random

def make_request_with_retry(url, auth, max_retries=3, backoff_factor=0.5):
    """
    Makes a request with retry logic and exponential backoff

    Args:
        url: URL to request
        auth: Authentication tuple (username, password)
        max_retries: Maximum number of retry attempts
        backoff_factor: Factor by which to increase the delay between retries
    """
    retries = 0

    while retries <= max_retries:
        try:
            response = requests.get(url, auth=auth, timeout=10)

            ## If successful, return the response
            if response.status_code == 200:
                return response

            ## If unauthorized, we might want to handle differently
            if response.status_code == 401:
                print(f"Attempt {retries+1}/{max_retries+1}: Authentication failed")
                ## In a real app, we might refresh tokens here or prompt for new credentials
            else:
                print(f"Attempt {retries+1}/{max_retries+1}: Failed with status code {response.status_code}")

            ## If we've reached max retries, give up
            if retries == max_retries:
                print("Maximum retry attempts reached.")
                return response

            ## Calculate delay with exponential backoff and jitter
            delay = backoff_factor * (2 ** retries) + random.uniform(0, 0.1)
            print(f"Retrying in {delay:.2f} seconds...")
            time.sleep(delay)
            retries += 1

        except requests.exceptions.RequestException as e:
            print(f"Request exception: {e}")

            ## If we've reached max retries, give up
            if retries == max_retries:
                print("Maximum retry attempts reached.")
                raise

            ## Calculate delay with exponential backoff and jitter
            delay = backoff_factor * (2 ** retries) + random.uniform(0, 0.1)
            print(f"Retrying in {delay:.2f} seconds...")
            time.sleep(delay)
            retries += 1

if __name__ == "__main__":
    url = "https://httpbin.org/basic-auth/user/pass"

    print("Testing with incorrect credentials:")
    response = make_request_with_retry(url, auth=("wrong_user", "wrong_pass"))
    print(f"Final status code: {response.status_code}")

    print("\nTesting with correct credentials:")
    response = make_request_with_retry(url, auth=("user", "pass"))
    print(f"Final status code: {response.status_code}")
    print(f"Response content: {response.json()}")

스크립트를 실행합니다.

python retry_auth.py

출력은 다음과 유사해야 합니다.

Testing with incorrect credentials:
Attempt 1/4: Authentication failed
Retrying in 0.54 seconds...
Attempt 2/4: Authentication failed
Retrying in 1.05 seconds...
Attempt 3/4: Authentication failed
Retrying in 2.08 seconds...
Attempt 4/4: Authentication failed
Maximum retry attempts reached.
Final status code: 401

Testing with correct credentials:
Final status code: 200
Response content: {'authenticated': True, 'user': 'user'}

이 스크립트는 지수 백오프를 사용하는 재시도 메커니즘을 보여줍니다. 이는 일시적으로 실패할 수 있는 네트워크 요청을 처리할 때 일반적인 패턴입니다.

재사용 가능한 인증 세션 만들기

동일한 서비스에 대한 여러 요청의 경우, 인증 정보를 유지하는 세션을 만드는 것이 더 효율적인 경우가 많습니다. auth_session.py라는 파일을 만듭니다.

import requests

def create_authenticated_session(base_url, username, password):
    """
    Creates and returns a requests session with basic authentication

    Args:
        base_url: Base URL for the API
        username: Username for authentication
        password: Password for authentication

    Returns:
        A configured requests.Session object
    """
    session = requests.Session()
    session.auth = (username, password)
    session.headers.update({
        'User-Agent': 'MyApp/1.0',
        'Accept': 'application/json'
    })

    ## Test the authentication
    test_url = f"{base_url}/basic-auth/{username}/{password}"
    try:
        response = session.get(test_url)
        response.raise_for_status()
        print(f"Authentication successful for user: {username}")
    except requests.exceptions.HTTPError as e:
        print(f"Authentication failed: {e}")
        ## In a real app, you might raise an exception here or handle it differently

    return session

def make_authenticated_requests(session, base_url):
    """
    Makes multiple requests using the authenticated session

    Args:
        session: The authenticated requests.Session
        base_url: Base URL for the API
    """
    ## Make a request to a different endpoint
    response1 = session.get(f"{base_url}/get")
    print(f"\nRequest 1 status code: {response1.status_code}")
    print(f"Request 1 response: {response1.json()}")

    ## Make another request
    response2 = session.get(f"{base_url}/headers")
    print(f"\nRequest 2 status code: {response2.status_code}")
    print(f"Request 2 response: {response2.json()}")

if __name__ == "__main__":
    base_url = "https://httpbin.org"

    ## Create an authenticated session
    session = create_authenticated_session(base_url, "user", "pass")

    ## Use the session for multiple requests
    make_authenticated_requests(session, base_url)

스크립트를 실행합니다.

python auth_session.py

출력은 다음과 유사해야 합니다.

Authentication successful for user: user

Request 1 status code: 200
Request 1 response: {'args': {}, 'headers': {...}, 'origin': '...', 'url': 'https://httpbin.org/get'}

Request 2 status code: 200
Request 2 response: {'headers': {...}}

세션을 사용하면 다음과 같은 몇 가지 이점이 있습니다.

  1. 기본 TCP 연결을 재사용하여 후속 요청 속도를 높입니다.
  2. 각 요청에 자동으로 인증 자격 증명을 포함합니다.
  3. 요청 간에 쿠키를 유지합니다. 이는 세션 기반 인증에 유용합니다.

실제 환경에서의 인증 문제 처리

실제 애플리케이션에서는 더 복잡한 인증 시나리오를 자주 접하게 됩니다. 몇 가지 실용적인 예시와 모범 사례를 살펴보겠습니다.

사용자 정의 인증 클래스 구현

requests 라이브러리를 사용하면 requests.auth.AuthBase 클래스를 서브클래싱하여 사용자 정의 인증 핸들러를 만들 수 있습니다. 이는 사용자 정의 인증 체계를 가진 API 를 사용할 때 유용합니다.

custom_auth.py라는 파일을 만듭니다.

import requests
from requests.auth import AuthBase
import time
import hashlib

class CustomAuth(AuthBase):
    """
    Example of a custom authentication mechanism that adds a timestamp and HMAC signature
    This is similar to how many APIs authenticate requests (like AWS, Stripe, etc.)
    """
    def __init__(self, api_key, api_secret):
        self.api_key = api_key
        self.api_secret = api_secret

    def __call__(self, request):
        ## Add timestamp to the request
        timestamp = str(int(time.time()))

        ## Add the API key and timestamp as headers
        request.headers['X-API-Key'] = self.api_key
        request.headers['X-Timestamp'] = timestamp

        ## Create a signature based on the request
        ## In a real implementation, this would include more request elements
        method = request.method
        path = request.path_url

        ## Create a simple signature (in real apps, use proper HMAC)
        signature_data = f"{method}{path}{timestamp}{self.api_secret}".encode('utf-8')
        signature = hashlib.sha256(signature_data).hexdigest()

        ## Add the signature to the headers
        request.headers['X-Signature'] = signature

        return request

def test_custom_auth():
    """Test our custom authentication with httpbin.org"""
    url = "https://httpbin.org/headers"

    ## Create our custom auth handler
    auth = CustomAuth(api_key="test_key", api_secret="test_secret")

    ## Make the request with our custom auth
    response = requests.get(url, auth=auth)

    print(f"Status Code: {response.status_code}")
    print(f"Response Headers: {response.json()}")

    ## Verify our custom headers were sent
    headers = response.json()['headers']
    for header in ['X-Api-Key', 'X-Timestamp', 'X-Signature']:
        if header.upper() in headers:
            print(f"{header} was sent: {headers[header.upper()]}")
        else:
            print(f"{header} was not found in the response")

if __name__ == "__main__":
    test_custom_auth()

스크립트를 실행합니다.

python custom_auth.py

출력에는 응답에 사용자 정의 헤더가 포함되어야 합니다.

Status Code: 200
Response Headers: {'headers': {'Accept': '*/*', 'Accept-Encoding': 'gzip, deflate', 'Host': 'httpbin.org', 'User-Agent': 'python-requests/2.28.2', 'X-Api-Key': 'test_key', 'X-Signature': '...', 'X-Timestamp': '1623456789'}}
X-Api-Key was sent: test_key
X-Timestamp was sent: 1623456789
X-Signature was sent: ...

이 예제는 많은 상업용 API 에서 사용하는 것과 유사하게 타임스탬프와 서명을 포함하는 사용자 정의 인증 체계를 구현하는 방법을 보여줍니다.

Rate Limiting 및 429 응답 처리

많은 API 는 남용을 방지하기 위해 rate limiting 을 구현합니다. rate limit 을 초과하면 서버는 일반적으로 429 Too Many Requests 상태 코드로 응답합니다. 이 시나리오를 처리하는 스크립트를 만들어 보겠습니다.

rate_limit_handler.py라는 파일을 만듭니다.

import requests
import time

def make_request_with_rate_limit_handling(url, max_retries=5):
    """
    Makes a request with handling for rate limits (429 responses)

    Args:
        url: URL to request
        max_retries: Maximum number of retry attempts
    """
    retries = 0

    while retries <= max_retries:
        response = requests.get(url)

        ## If not rate limited, return the response
        if response.status_code != 429:
            return response

        ## We got rate limited, check if we have retry information
        retry_after = response.headers.get('Retry-After')

        ## If we have retry information, wait that long
        if retry_after:
            wait_time = int(retry_after)
            print(f"Rate limited. Waiting for {wait_time} seconds as specified by Retry-After header.")
            time.sleep(wait_time)
        else:
            ## If no retry information, use exponential backoff
            wait_time = 2 ** retries
            print(f"Rate limited. No Retry-After header. Using exponential backoff: {wait_time} seconds.")
            time.sleep(wait_time)

        retries += 1

        if retries > max_retries:
            print(f"Maximum retries ({max_retries}) exceeded.")
            return response

## For demonstration, we'll simulate rate limiting using httpbin's status endpoint
if __name__ == "__main__":
    ## First request should succeed
    print("Making request to endpoint that returns 200:")
    response = make_request_with_rate_limit_handling("https://httpbin.org/status/200")
    print(f"Final status code: {response.status_code}")

    ## Simulate being rate limited initially, then succeeding
    print("\nSimulating rate limiting scenario:")
    ## We'll make 3 requests in sequence to different status codes (429, 429, 200)
    ## to simulate being rate limited twice and then succeeding
    urls = [
        "https://httpbin.org/status/429",  ## First will be rate limited
        "https://httpbin.org/status/429",  ## Second will also be rate limited
        "https://httpbin.org/status/200"   ## Third will succeed
    ]

    for i, url in enumerate(urls):
        print(f"\nRequest {i+1}:")
        response = make_request_with_rate_limit_handling(url, max_retries=1)
        print(f"Final status code: {response.status_code}")

스크립트를 실행합니다.

python rate_limit_handler.py

출력은 rate limiting 처리를 시뮬레이션합니다.

Making request to endpoint that returns 200:
Final status code: 200

Simulating rate limiting scenario:

Request 1:
Rate limited. No Retry-After header. Using exponential backoff: 1 seconds.
Rate limited. No Retry-After header. Using exponential backoff: 2 seconds.
Maximum retries (1) exceeded.
Final status code: 429

Request 2:
Rate limited. No Retry-After header. Using exponential backoff: 1 seconds.
Rate limited. No Retry-After header. Using exponential backoff: 2 seconds.
Maximum retries (1) exceeded.
Final status code: 429

Request 3:
Final status code: 200

이는 Retry-After 헤더를 준수하거나 지수 백오프를 구현하여 rate limiting 을 처리하는 방법을 보여줍니다.

완전한 오류 처리 솔루션

마지막으로 모든 것을 종합하여 포괄적인 오류 처리 솔루션을 만들어 보겠습니다. complete_auth_handler.py라는 파일을 만듭니다.

import requests
import time
import random
import json
from requests.exceptions import RequestException, HTTPError, ConnectionError, Timeout

class AuthenticationManager:
    """Manages authentication tokens and credentials"""

    def __init__(self, token_file="auth_token.json"):
        self.token_file = token_file
        self.token = self.load_token()

    def load_token(self):
        """Load token from file storage"""
        try:
            with open(self.token_file, "r") as f:
                token_data = json.load(f)
                ## Check if token is expired
                if token_data.get("expires_at", 0) > time.time():
                    return token_data.get("access_token")
        except (FileNotFoundError, json.JSONDecodeError):
            pass
        return None

    def save_token(self, token, expires_in=3600):
        """Save token to file storage"""
        token_data = {
            "access_token": token,
            "expires_at": time.time() + expires_in
        }
        with open(self.token_file, "w") as f:
            json.dump(token_data, f)

    def get_token(self):
        """Get a valid token, refreshing if necessary"""
        if not self.token:
            self.token = self.refresh_token()
        return self.token

    def refresh_token(self):
        """Refresh the authentication token"""
        ## In a real app, this would make a call to the auth server
        print("Getting a new authentication token...")
        new_token = f"refreshed_token_{int(time.time())}"
        self.save_token(new_token)
        return new_token

    def invalidate_token(self):
        """Invalidate the current token"""
        self.token = None
        try:
            with open(self.token_file, "w") as f:
                json.dump({}, f)
        except Exception:
            pass

class APIClient:
    """Client for making API requests with comprehensive error handling"""

    def __init__(self, base_url, auth_manager=None):
        self.base_url = base_url
        self.auth_manager = auth_manager or AuthenticationManager()
        self.session = requests.Session()

    def make_request(self, method, endpoint, **kwargs):
        """
        Make an API request with comprehensive error handling

        Args:
            method: HTTP method (get, post, etc.)
            endpoint: API endpoint to call
            **kwargs: Additional arguments to pass to requests

        Returns:
            Response object or None if all retries failed
        """
        url = f"{self.base_url}{endpoint}"
        max_retries = kwargs.pop("max_retries", 3)
        retry_backoff_factor = kwargs.pop("retry_backoff_factor", 0.5)

        ## Add authentication if we have an auth manager
        if self.auth_manager:
            token = self.auth_manager.get_token()
            headers = kwargs.get("headers", {})
            headers["Authorization"] = f"Bearer {token}"
            kwargs["headers"] = headers

        ## Make the request with retries
        for retry in range(max_retries + 1):
            try:
                request_func = getattr(self.session, method.lower())
                response = request_func(url, **kwargs)

                ## Handle different status codes
                if response.status_code == 200:
                    return response
                elif response.status_code == 401:
                    print("Unauthorized: Token may be invalid")
                    if self.auth_manager and retry < max_retries:
                        print("Refreshing authentication token...")
                        self.auth_manager.invalidate_token()
                        token = self.auth_manager.get_token()
                        kwargs.get("headers", {})["Authorization"] = f"Bearer {token}"
                        continue
                elif response.status_code == 429:
                    if retry < max_retries:
                        retry_after = response.headers.get("Retry-After")
                        if retry_after:
                            wait_time = int(retry_after)
                        else:
                            wait_time = retry_backoff_factor * (2 ** retry) + random.uniform(0, 0.1)

                        print(f"Rate limited. Waiting {wait_time:.2f} seconds before retry {retry + 1}/{max_retries}")
                        time.sleep(wait_time)
                        continue
                    else:
                        print("Rate limit retries exhausted")
                elif 500 <= response.status_code < 600:
                    if retry < max_retries:
                        wait_time = retry_backoff_factor * (2 ** retry) + random.uniform(0, 0.1)
                        print(f"Server error ({response.status_code}). Retrying in {wait_time:.2f} seconds...")
                        time.sleep(wait_time)
                        continue
                    else:
                        print(f"Server error retries exhausted")

                ## If we get here, we're returning the response as-is
                return response

            except ConnectionError:
                if retry < max_retries:
                    wait_time = retry_backoff_factor * (2 ** retry) + random.uniform(0, 0.1)
                    print(f"Connection error. Retrying in {wait_time:.2f} seconds...")
                    time.sleep(wait_time)
                else:
                    print("Connection error retries exhausted")
                    raise
            except Timeout:
                if retry < max_retries:
                    wait_time = retry_backoff_factor * (2 ** retry) + random.uniform(0, 0.1)
                    print(f"Request timed out. Retrying in {wait_time:.2f} seconds...")
                    time.sleep(wait_time)
                else:
                    print("Timeout retries exhausted")
                    raise
            except RequestException as e:
                print(f"Request failed: {e}")
                raise

        return None

    def get(self, endpoint, **kwargs):
        """Make a GET request"""
        return self.make_request("get", endpoint, **kwargs)

    def post(self, endpoint, **kwargs):
        """Make a POST request"""
        return self.make_request("post", endpoint, **kwargs)

    ## Additional methods can be added for other HTTP verbs

## Test the client with httpbin
if __name__ == "__main__":
    ## Clean any existing token files
    import os
    try:
        os.remove("auth_token.json")
    except FileNotFoundError:
        pass

    client = APIClient("https://httpbin.org")

    print("Making a GET request to /get:")
    response = client.get("/get")
    print(f"Status code: {response.status_code}")
    print(f"Response content: {response.json()}")

    print("\nSimulating an unauthorized response:")
    ## For demonstration purposes, we'll use httpbin's status endpoint to simulate a 401
    response = client.get("/status/401")
    print(f"Status code: {response.status_code}")

    print("\nSimulating a rate-limited response:")
    ## Simulate a 429 response
    response = client.get("/status/429")
    print(f"Status code: {response.status_code}")

스크립트를 실행합니다.

python complete_auth_handler.py

출력은 포괄적인 오류 처리를 보여줍니다.

Getting a new authentication token...
Making a GET request to /get:
Status code: 200
Response content: {'args': {}, 'headers': {...}, 'origin': '...', 'url': 'https://httpbin.org/get'}

Simulating an unauthorized response:
Unauthorized: Token may be invalid
Refreshing authentication token...
Getting a new authentication token...
Unauthorized: Token may be invalid
Refreshing authentication token...
Getting a new authentication token...
Unauthorized: Token may be invalid
Refreshing authentication token...
Getting a new authentication token...
Unauthorized: Token may be invalid
Status code: 401

Simulating a rate-limited response:
Rate limited. Waiting 0.54 seconds before retry 1/3
Rate limited. Waiting 1.04 seconds before retry 2/3
Rate limited. Waiting 2.07 seconds before retry 3/3
Rate limit retries exhausted
Status code: 429

이 포괄적인 예제는 프로덕션 준비가 된 Python 애플리케이션에서 인증 및 기타 API 관련 오류를 처리하기 위한 많은 모범 사례를 보여줍니다.

요약

이 Lab 에서는 Python requests 에서 권한이 없는 응답을 효과적으로 처리하는 방법을 배웠습니다. 몇 가지 중요한 측면을 다루었습니다.

  1. HTTP 인증 및 상태 코드 이해: 401 Unauthorized 응답의 의미와 Python 애플리케이션에서 이를 식별하는 방법을 배웠습니다.

  2. 기본 인증: auth 매개변수와 HTTPBasicAuth 클래스를 모두 사용하여 기본 인증을 구현하고 인증 실패를 처리하는 방법을 배웠습니다.

  3. 고급 인증 전략: 토큰 기반 인증, 지수 백오프를 사용한 재시도 로직, 재사용 가능한 인증 세션 만들기를 살펴보았습니다.

  4. 실제 환경에서의 인증 문제: 사용자 정의 인증 클래스를 구현하고, rate limiting 을 처리했으며, 프로덕션 준비가 된 애플리케이션을 위한 포괄적인 오류 처리 솔루션을 만들었습니다.

이러한 기술은 웹 API 와 상호 작용할 때 인증 실패 및 기타 관련 오류를 적절하게 처리할 수 있는 더 강력하고 탄력적인 Python 애플리케이션을 구축하는 데 도움이 될 것입니다.

추가 학습을 위해 다음을 고려해 보세요.

  • OAuth 2.0 인증 흐름
  • 상태 비저장 인증을 위한 JWT (JSON Web Tokens)
  • API 키 순환 전략
  • 안전한 자격 증명 저장소