如何在 Python 客户端 - 服务器系统中实现身份验证

PythonBeginner
立即练习

简介

实现安全的身份验证是构建强大的 Python 客户端 - 服务器系统的关键。本教程将指导你完成在 Python 应用程序中实现身份验证的过程,从基本的用户名和密码身份验证开始,逐步过渡到基于令牌的方法。通过完成这个实验(Lab),你将了解如何使用适当的身份验证机制来保护你的 Python 应用程序。

理解身份验证基础知识并设置你的环境

身份验证是在授予对受保护资源的访问权限之前,验证用户或系统身份的过程。在客户端 - 服务器系统中,适当的身份验证确保只有授权用户才能访问敏感数据或执行某些操作。

设置你的环境

让我们从设置我们的工作环境开始。首先,为我们的项目创建一个新目录:

mkdir -p ~/project/auth_demo
cd ~/project/auth_demo

接下来,我们需要安装在本实验(Lab)中将使用的必要的 Python 包:

pip install flask flask-login requests

你应该看到类似这样的输出:

Collecting flask
  Downloading Flask-2.2.3-py3-none-any.whl (101 kB)
     ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 101.8/101.8 KB 6.2 MB/s eta 0:00:00
Collecting flask-login
  Downloading Flask_Login-0.6.2-py3-none-any.whl (17 kB)
Collecting requests
  Downloading requests-2.29.0-py3-none-any.whl (62 kB)
     ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 62.5/62.5 KB 5.5 MB/s eta 0:00:00
...
Successfully installed flask-2.2.3 flask-login-0.6.2 requests-2.29.0 ...

身份验证概念

在深入研究实现之前,让我们了解基本的身份验证概念:

  1. 用户名和密码身份验证:最常见的形式,用户提供凭据来验证他们的身份。

  2. 基于令牌的身份验证:成功登录后,服务器为客户端生成一个令牌,以便在后续请求中使用,从而避免每次都发送凭据。

  3. 基于会话的身份验证:服务器在成功登录后存储会话信息,并向客户端提供会话 ID。

让我们可视化一个基本的身份验证流程:

客户端                                服务器
  |                                     |
  |--- 登录请求(凭据) ---->|
  |                                     |--- 验证凭据
  |                                     |
  |<---- 身份验证响应 -------|
  |                                     |
  |--- 后续请求(带身份验证) >|
  |                                     |--- 验证身份验证
  |                                     |
  |<---- 受保护的资源响应 ---|

现在我们了解了基础知识并设置了我们的环境,让我们为我们的项目创建一个简单的文件结构:

touch ~/project/auth_demo/server.py
touch ~/project/auth_demo/client.py

在下一步中,我们将使用 Flask 在服务器端实现一个基本的用户名和密码身份验证系统,并使用一个 Python 客户端与其进行身份验证。

实现基本的用户名和密码身份验证

在这一步中,我们将实现一个使用用户名和密码的基本身份验证系统。我们将创建:

  1. 一个处理身份验证的 Flask 服务器
  2. 一个与服务器进行身份验证的 Python 客户端

创建服务器

让我们使用基本身份验证功能来实现我们的 Flask 服务器。在 VSCode 编辑器中打开 server.py 文件:

code ~/project/auth_demo/server.py

现在,添加以下代码以实现一个简单的身份验证服务器:

from flask import Flask, request, jsonify, session
import os

app = Flask(__name__)
## Generate a random secret key for session management
app.secret_key = os.urandom(24)

## Simple in-memory user database
users = {
    "alice": "password123",
    "bob": "qwerty456"
}

@app.route('/login', methods=['POST'])
def login():
    data = request.get_json()

    ## Extract username and password from the request
    username = data.get('username')
    password = data.get('password')

    ## Check if the user exists and the password is correct
    if username in users and users[username] == password:
        session['logged_in'] = True
        session['username'] = username
        return jsonify({"status": "success", "message": f"Welcome {username}!"})
    else:
        return jsonify({"status": "error", "message": "Invalid credentials"}), 401

@app.route('/protected', methods=['GET'])
def protected():
    ## Check if the user is logged in
    if session.get('logged_in'):
        return jsonify({
            "status": "success",
            "message": f"You are viewing protected content, {session.get('username')}!"
        })
    else:
        return jsonify({"status": "error", "message": "Authentication required"}), 401

@app.route('/logout', methods=['POST'])
def logout():
    ## Remove session data
    session.pop('logged_in', None)
    session.pop('username', None)
    return jsonify({"status": "success", "message": "Logged out successfully"})

if __name__ == '__main__':
    app.run(host='0.0.0.0', port=5000, debug=True)

这个服务器:

  • 使用一个简单的内存字典来存储用户名和密码
  • 提供用于登录、访问受保护内容和注销的端点
  • 使用 Flask 的会话管理来维护身份验证状态

创建客户端

现在,让我们创建一个可以与我们的服务器进行身份验证的客户端。打开 client.py 文件:

code ~/project/auth_demo/client.py

添加以下代码:

import requests
import json

## Server URL
BASE_URL = "http://localhost:5000"

def login(username, password):
    """Authenticate with the server using username and password"""
    response = requests.post(
        f"{BASE_URL}/login",
        json={"username": username, "password": password}
    )

    print(f"Login response: {response.status_code}")
    print(response.json())

    ## Return the session cookies if login was successful
    return response.cookies if response.status_code == 200 else None

def access_protected(cookies=None):
    """Access a protected resource"""
    response = requests.get(f"{BASE_URL}/protected", cookies=cookies)

    print(f"Protected resource response: {response.status_code}")
    print(response.json())

    return response

def logout(cookies=None):
    """Logout from the server"""
    response = requests.post(f"{BASE_URL}/logout", cookies=cookies)

    print(f"Logout response: {response.status_code}")
    print(response.json())

    return response

if __name__ == "__main__":
    ## Test with valid credentials
    print("=== Authenticating with valid credentials ===")
    cookies = login("alice", "password123")

    if cookies:
        print("\n=== Accessing protected resource ===")
        access_protected(cookies)

        print("\n=== Logging out ===")
        logout(cookies)

    ## Test with invalid credentials
    print("\n=== Authenticating with invalid credentials ===")
    login("alice", "wrongpassword")

    ## Try to access protected resource without authentication
    print("\n=== Accessing protected resource without authentication ===")
    access_protected()

这个客户端:

  • 提供登录、访问受保护资源和注销的功能
  • 处理 cookie 以在请求之间维护会话
  • 测试有效和无效的凭据

运行身份验证系统

要查看我们的身份验证系统运行情况,我们需要同时运行服务器和客户端。首先,让我们启动服务器:

python ~/project/auth_demo/server.py

你应该看到类似这样的输出:

 * Serving Flask app 'server'
 * Debug mode: on
WARNING: This is a development server. Do not use it in a production deployment.
 * Running on all addresses (0.0.0.0)
 * Running on http://127.0.0.1:5000

现在,通过单击终端旁边的“+”按钮打开一个新的终端选项卡,然后运行客户端:

cd ~/project/auth_demo
python client.py

你应该看到演示以下内容的输出:

  1. 使用有效凭据成功登录
  2. 使用有效身份验证访问受保护的资源
  3. 成功注销
  4. 使用不正确的凭据登录尝试失败
  5. 尝试在没有身份验证的情况下访问受保护的资源失败

此输出确认我们的基本用户名和密码身份验证系统正在正常工作。

在下一步中,我们将通过实现基于令牌的身份验证来增强此系统,以提高安全性。

实现基于令牌的身份验证

在上一步中,我们使用会话创建了一个基本的身份验证系统。虽然这适用于简单的应用程序,但基于令牌的身份验证提供了几个优势,尤其是在 API 和分布式系统中。

在基于令牌的身份验证中:

  • 服务器在成功身份验证后生成一个令牌
  • 客户端存储此令牌并将其与后续请求一起发送
  • 服务器验证令牌,而不是每次都检查凭据

让我们升级我们的系统以使用 JSON Web Tokens (JWT),这是一种流行的基于令牌的身份验证标准。

安装所需的软件包

首先,我们需要安装 PyJWT 包:

pip install pyjwt

你应该看到确认安装的输出:

Collecting pyjwt
  Downloading PyJWT-2.6.0-py3-none-any.whl (20 kB)
Installing collected packages: pyjwt
Successfully installed pyjwt-2.6.0

更新服务器以进行基于令牌的身份验证

让我们修改我们的服务器以使用 JWT 令牌而不是会话。更新 server.py 文件:

code ~/project/auth_demo/server.py

将现有代码替换为:

from flask import Flask, request, jsonify
import jwt
import datetime
import os

app = Flask(__name__)
## Secret key for signing JWT tokens
SECRET_KEY = os.urandom(24)

## Simple in-memory user database
users = {
    "alice": "password123",
    "bob": "qwerty456"
}

def generate_token(username):
    """Generate a JWT token for the authenticated user"""
    ## Token expires after 30 minutes
    expiration = datetime.datetime.utcnow() + datetime.timedelta(minutes=30)

    payload = {
        'username': username,
        'exp': expiration
    }

    ## Create the JWT token
    token = jwt.encode(payload, SECRET_KEY, algorithm='HS256')
    return token

def verify_token(token):
    """Verify the JWT token"""
    try:
        ## Decode and verify the token
        payload = jwt.decode(token, SECRET_KEY, algorithms=['HS256'])
        return payload
    except jwt.ExpiredSignatureError:
        ## Token has expired
        return None
    except jwt.InvalidTokenError:
        ## Invalid token
        return None

@app.route('/login', methods=['POST'])
def login():
    data = request.get_json()

    ## Extract username and password from the request
    username = data.get('username')
    password = data.get('password')

    ## Check if the user exists and the password is correct
    if username in users and users[username] == password:
        ## Generate a JWT token
        token = generate_token(username)

        return jsonify({
            "status": "success",
            "message": f"Welcome {username}!",
            "token": token
        })
    else:
        return jsonify({"status": "error", "message": "Invalid credentials"}), 401

@app.route('/protected', methods=['GET'])
def protected():
    ## Get the token from the Authorization header
    auth_header = request.headers.get('Authorization')

    if not auth_header or not auth_header.startswith('Bearer '):
        return jsonify({"status": "error", "message": "Missing or invalid token"}), 401

    ## Extract the token
    token = auth_header.split(' ')[1]

    ## Verify the token
    payload = verify_token(token)

    if payload:
        return jsonify({
            "status": "success",
            "message": f"You are viewing protected content, {payload['username']}!"
        })
    else:
        return jsonify({"status": "error", "message": "Invalid or expired token"}), 401

if __name__ == '__main__':
    app.run(host='0.0.0.0', port=5000, debug=True)

这个更新后的服务器:

  • 在成功身份验证后生成 JWT 令牌
  • 验证受保护资源的令牌
  • 具有用于登录和访问受保护内容的端点

更新客户端以进行基于令牌的身份验证

现在,让我们更新我们的客户端以使用 JWT 令牌。更新 client.py 文件:

code ~/project/auth_demo/client.py

将现有代码替换为:

import requests
import json

## Server URL
BASE_URL = "http://localhost:5000"

def login(username, password):
    """Authenticate with the server using username and password"""
    response = requests.post(
        f"{BASE_URL}/login",
        json={"username": username, "password": password}
    )

    print(f"Login response: {response.status_code}")
    data = response.json()
    print(data)

    ## Return the token if login was successful
    return data.get('token') if response.status_code == 200 else None

def access_protected(token=None):
    """Access a protected resource using JWT token"""
    headers = {}
    if token:
        headers['Authorization'] = f'Bearer {token}'

    response = requests.get(f"{BASE_URL}/protected", headers=headers)

    print(f"Protected resource response: {response.status_code}")
    print(response.json())

    return response

if __name__ == "__main__":
    ## Test with valid credentials
    print("=== Authenticating with valid credentials ===")
    token = login("alice", "password123")

    if token:
        print("\n=== Accessing protected resource with valid token ===")
        access_protected(token)

    ## Test with invalid credentials
    print("\n=== Authenticating with invalid credentials ===")
    login("alice", "wrongpassword")

    ## Try to access protected resource without token
    print("\n=== Accessing protected resource without token ===")
    access_protected()

    ## Try to access protected resource with invalid token
    print("\n=== Accessing protected resource with invalid token ===")
    access_protected("invalid.token.value")

这个更新后的客户端:

  • 在成功身份验证后检索并存储 JWT 令牌
  • 在 Authorization 标头中发送令牌以访问受保护的资源
  • 测试各种场景,包括有效和无效的身份验证尝试

运行基于令牌的身份验证系统

现在,让我们运行我们更新的基于令牌的身份验证系统。首先,使用 Ctrl+C 停止任何正在运行的服务器进程,然后启动更新后的服务器:

python ~/project/auth_demo/server.py

打开一个新的终端选项卡或使用现有的第二个选项卡来运行客户端:

cd ~/project/auth_demo
python client.py

你应该看到演示以下内容的输出:

  1. 使用有效凭据成功登录,接收 JWT 令牌
  2. 使用有效令牌访问受保护的资源
  3. 使用不正确的凭据登录尝试失败
  4. 尝试在没有令牌的情况下访问受保护的资源失败
  5. 尝试使用无效令牌访问受保护的资源失败

此输出确认我们的基于令牌的身份验证系统正在正常工作。

基于令牌的身份验证的优势

基于令牌的身份验证比基于会话的身份验证具有几个优势:

  1. 无状态(Stateless):服务器不需要存储会话信息。
  2. 可扩展性:在具有多个服务器的分布式环境中运行良好。
  3. 对移动设备友好:适用于 cookie 可能无法很好地工作的移动应用程序。
  4. 跨域:可以轻松地跨不同的域使用。
  5. 安全性:可以配置令牌过期,从而降低会话劫持的风险。

在实际应用中,你可能希望使用诸如令牌刷新、基于角色的访问控制和安全令牌存储等功能来进一步增强你的身份验证系统。

使用密码哈希增强安全性

在我们之前的实现中,我们以纯文本形式存储密码,这是一个重大的安全风险。在这一步中,我们将通过使用 bcrypt 算法实现密码哈希来增强我们的身份验证系统。

为什么要对密码进行哈希处理?

以纯文本形式存储密码会带来几个安全风险:

  • 如果你的数据库被攻破,攻击者可以立即使用密码
  • 用户经常在多个站点上重复使用密码,因此一次泄露可能会影响其他服务
  • 它违反了安全最佳实践,并可能违反数据保护法规

密码哈希通过以下方式解决了这些问题:

  • 将密码转换为无法逆转的固定长度字符串
  • 添加“盐值(salt)”以防止攻击者使用预先计算的查找表(彩虹表)
  • 确保即使两个用户使用相同的密码,他们的哈希值也会不同

安装所需的软件包

首先,我们需要安装 bcrypt 包:

pip install bcrypt

你应该看到确认安装的输出:

Collecting bcrypt
  Downloading bcrypt-4.0.1-cp36-abi3-manylinux_2_28_x86_64.whl (593 kB)
     ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 593.8/593.8 KB 10.5 MB/s eta 0:00:00
Installing collected packages: bcrypt
Successfully installed bcrypt-4.0.1

创建用户注册系统

让我们增强我们的服务器,以包含带有密码哈希的用户注册。更新 server.py 文件:

code ~/project/auth_demo/server.py

将现有代码替换为:

from flask import Flask, request, jsonify
import jwt
import datetime
import os
import bcrypt

app = Flask(__name__)
## Secret key for signing JWT tokens
SECRET_KEY = os.urandom(24)

## Store users as a dictionary with username as key and a dict of hashed password and roles as value
users = {}

def hash_password(password):
    """Hash a password using bcrypt"""
    ## Convert password to bytes if it's a string
    if isinstance(password, str):
        password = password.encode('utf-8')

    ## Generate a salt and hash the password
    salt = bcrypt.gensalt()
    hashed = bcrypt.hashpw(password, salt)

    return hashed

def check_password(password, hashed):
    """Verify a password against its hash"""
    ## Convert password to bytes if it's a string
    if isinstance(password, str):
        password = password.encode('utf-8')

    ## Check if the password matches the hash
    return bcrypt.checkpw(password, hashed)

def generate_token(username, role):
    """Generate a JWT token for the authenticated user"""
    ## Token expires after 30 minutes
    expiration = datetime.datetime.utcnow() + datetime.timedelta(minutes=30)

    payload = {
        'username': username,
        'role': role,
        'exp': expiration
    }

    ## Create the JWT token
    token = jwt.encode(payload, SECRET_KEY, algorithm='HS256')
    return token

def verify_token(token):
    """Verify the JWT token"""
    try:
        ## Decode and verify the token
        payload = jwt.decode(token, SECRET_KEY, algorithms=['HS256'])
        return payload
    except jwt.ExpiredSignatureError:
        ## Token has expired
        return None
    except jwt.InvalidTokenError:
        ## Invalid token
        return None

@app.route('/register', methods=['POST'])
def register():
    data = request.get_json()

    ## Extract registration data
    username = data.get('username')
    password = data.get('password')

    ## Basic validation
    if not username or not password:
        return jsonify({"status": "error", "message": "Username and password are required"}), 400

    ## Check if the username already exists
    if username in users:
        return jsonify({"status": "error", "message": "Username already exists"}), 409

    ## Hash the password and store the user
    hashed_password = hash_password(password)
    users[username] = {
        'password': hashed_password,
        'role': 'user'  ## Default role
    }

    return jsonify({
        "status": "success",
        "message": f"User {username} registered successfully"
    })

@app.route('/login', methods=['POST'])
def login():
    data = request.get_json()

    ## Extract username and password from the request
    username = data.get('username')
    password = data.get('password')

    ## Check if the user exists
    if username not in users:
        return jsonify({"status": "error", "message": "Invalid credentials"}), 401

    ## Check if the password is correct
    if check_password(password, users[username]['password']):
        ## Generate a JWT token
        token = generate_token(username, users[username]['role'])

        return jsonify({
            "status": "success",
            "message": f"Welcome {username}!",
            "token": token
        })
    else:
        return jsonify({"status": "error", "message": "Invalid credentials"}), 401

@app.route('/protected', methods=['GET'])
def protected():
    ## Get the token from the Authorization header
    auth_header = request.headers.get('Authorization')

    if not auth_header or not auth_header.startswith('Bearer '):
        return jsonify({"status": "error", "message": "Missing or invalid token"}), 401

    ## Extract the token
    token = auth_header.split(' ')[1]

    ## Verify the token
    payload = verify_token(token)

    if payload:
        return jsonify({
            "status": "success",
            "message": f"You are viewing protected content, {payload['username']}!",
            "role": payload['role']
        })
    else:
        return jsonify({"status": "error", "message": "Invalid or expired token"}), 401

@app.route('/admin', methods=['GET'])
def admin():
    ## Get the token from the Authorization header
    auth_header = request.headers.get('Authorization')

    if not auth_header or not auth_header.startswith('Bearer '):
        return jsonify({"status": "error", "message": "Missing or invalid token"}), 401

    ## Extract the token
    token = auth_header.split(' ')[1]

    ## Verify the token
    payload = verify_token(token)

    if not payload:
        return jsonify({"status": "error", "message": "Invalid or expired token"}), 401

    ## Check if user has admin role
    if payload['role'] != 'admin':
        return jsonify({"status": "error", "message": "Admin access required"}), 403

    return jsonify({
        "status": "success",
        "message": f"Welcome to the admin panel, {payload['username']}!"
    })

## Add an admin user for testing
admin_password = hash_password("admin123")
users["admin"] = {
    'password': admin_password,
    'role': 'admin'
}

if __name__ == '__main__':
    app.run(host='0.0.0.0', port=5000, debug=True)

这个增强的服务器:

  • 使用 bcrypt 进行安全密码哈希
  • 包含一个用户注册端点
  • 添加了基于角色的访问控制(管理员与普通用户)
  • 预先创建了一个管理员用户以供测试

更新客户端

现在,让我们更新我们的客户端以测试这些新功能。更新 client.py 文件:

code ~/project/auth_demo/client.py

将现有代码替换为:

import requests
import json

## Server URL
BASE_URL = "http://localhost:5000"

def register(username, password):
    """Register a new user"""
    response = requests.post(
        f"{BASE_URL}/register",
        json={"username": username, "password": password}
    )

    print(f"Registration response: {response.status_code}")
    print(response.json())

    return response.status_code == 200

def login(username, password):
    """Authenticate with the server using username and password"""
    response = requests.post(
        f"{BASE_URL}/login",
        json={"username": username, "password": password}
    )

    print(f"Login response: {response.status_code}")
    data = response.json()
    print(data)

    ## Return the token if login was successful
    return data.get('token') if response.status_code == 200 else None

def access_protected(token=None):
    """Access a protected resource using JWT token"""
    headers = {}
    if token:
        headers['Authorization'] = f'Bearer {token}'

    response = requests.get(f"{BASE_URL}/protected", headers=headers)

    print(f"Protected resource response: {response.status_code}")
    print(response.json())

    return response

def access_admin(token=None):
    """Access the admin panel using JWT token"""
    headers = {}
    if token:
        headers['Authorization'] = f'Bearer {token}'

    response = requests.get(f"{BASE_URL}/admin", headers=headers)

    print(f"Admin panel response: {response.status_code}")
    print(response.json())

    return response

if __name__ == "__main__":
    ## Test user registration
    print("=== Registering a new user ===")
    register("testuser", "testpass123")

    ## Try to register with the same username (should fail)
    print("\n=== Registering with existing username ===")
    register("testuser", "differentpass")

    ## Test regular user login and access
    print("\n=== Regular user login ===")
    user_token = login("testuser", "testpass123")

    if user_token:
        print("\n=== Regular user accessing protected resource ===")
        access_protected(user_token)

        print("\n=== Regular user trying to access admin panel ===")
        access_admin(user_token)

    ## Test admin login and access
    print("\n=== Admin login ===")
    admin_token = login("admin", "admin123")

    if admin_token:
        print("\n=== Admin accessing protected resource ===")
        access_protected(admin_token)

        print("\n=== Admin accessing admin panel ===")
        access_admin(admin_token)

这个更新后的客户端:

  • 测试用户注册功能
  • 演示基于角色的访问控制
  • 测试普通用户和管理员身份验证

运行增强的身份验证系统

现在,让我们运行我们增强的身份验证系统。首先,使用 Ctrl+C 停止任何正在运行的服务器进程,然后启动更新后的服务器:

python ~/project/auth_demo/server.py

打开一个新的终端选项卡或使用现有的第二个选项卡来运行客户端:

cd ~/project/auth_demo
python client.py

你应该看到演示以下内容的输出:

  1. 成功用户注册
  2. 使用现有用户名注册失败
  3. 普通用户登录并访问受保护的资源
  4. 普通用户尝试访问管理面板失败
  5. 管理员登录并访问受保护的资源和管理面板

此输出确认我们增强的身份验证系统(具有密码哈希和基于角色的访问控制)正在正常工作。

安全优势

我们增强的系统现在提供:

  1. 安全密码存储:密码使用 bcrypt 进行哈希处理,bcrypt 是一种缓慢的哈希函数,旨在抵抗暴力攻击。
  2. 基于角色的访问控制:不同的用户可以根据其角色拥有不同的访问级别。
  3. 基于令牌的身份验证:继续提供我们之前实现的 JWT 令牌的优势。
  4. 用户注册:允许使用安全密码存储动态创建用户。

这些增强功能使我们的身份验证系统更加强大,并且适合实际应用。

总结

在这个实验中,你已经学习了如何在 Python 客户端 - 服务器系统中实现身份验证,从基本概念到高级技术。你已经成功地:

  1. 理解了客户端 - 服务器系统中身份验证的基本概念
  2. 使用 Flask 会话实现了基本的用户名和密码身份验证
  3. 使用 JWT 通过基于令牌的身份验证增强了安全性
  4. 使用 bcrypt 添加了安全的密码哈希
  5. 为不同的用户类型实现了基于角色的访问控制

这些技能为构建安全的 Python 应用程序提供了坚实的基础。请记住,身份验证只是应用程序安全的一个方面——在生产系统中,你还应该考虑:

  • 对所有通信使用 HTTPS
  • 实施速率限制以防止暴力攻击
  • 添加日志记录和监控以进行可疑活动
  • 保持所有依赖项更新以解决安全漏洞

通过应用这些原则,你可以构建能够有效保护用户数据并保持安全运营的 Python 应用程序。