功能实现
经过上述分析,聊天室的整个流程已经清晰。现在让我们开始实现这个聊天室的功能。
在 ~/project
目录下创建一个名为 app.py
的文件,并输入以下源代码:
import datetime
import flask
import redis
app = flask.Flask("labex-sse-chat")
app.secret_key = "labex"
app.config["DEBUG"] = True
r = redis.StrictRedis()
## 主页路由函数
@app.route("/")
def home():
## 如果用户未登录,则重定向到登录页面
if "user" not in flask.session:
return flask.redirect("/login")
user = flask.session["user"]
return flask.render_template("index.html", user=user)
## 消息生成器
def event_stream():
## 创建一个发布-订阅系统
pubsub = r.pubsub()
## 使用发布-订阅系统的subscribe方法订阅一个频道
pubsub.subscribe("chat")
for message in pubsub.listen():
data = message["data"]
if type(data) == bytes:
yield "data: {}\n\n".format(data.decode())
## 登录函数,首次访问需要登录
@app.route("/login", methods=["GET", "POST"])
def login():
if flask.request.method == "POST":
## 将用户名存储在会话字典中,然后重定向到主页
flask.session["user"] = flask.request.form["user"]
return flask.redirect("/")
return flask.render_template("login.html")
## 使用POST方法接收JavaScript发送的数据
@app.route("/post", methods=["POST"])
def post():
message = flask.request.form["message"]
user = flask.session.get("user", "anonymous")
now = datetime.datetime.now().replace(microsecond=0).time()
r.publish("chat", "[{}] {}: {}\n".format(now.isoformat(), user, message))
return flask.Response(status=204)
## 事件流接口
@app.route("/stream")
def stream():
## 此路由函数的返回对象必须是text/event-stream类型
return flask.Response(event_stream(), mimetype="text/event-stream")
## 运行Flask应用程序
app.run()
在上述代码中,我们使用Flask的会话功能来存储用户登录信息,使用Redis的发布-订阅功能来收发消息,并使用SSE来实现消息推送。
这里:
event_stream
函数是一个消息生成器,它不断从Redis中检索消息并推送给客户端。
stream
函数是一个事件流接口,它返回一个 text/event-stream
类型的对象,即SSE事件流。
post
函数是一个接口,它使用POST方法接收JavaScript发送的数据。它将接收到的数据发布到Redis中的 chat
频道。
login
函数是一个登录函数,首次访问需要登录。登录成功后,用户名存储在会话字典中,然后重定向到主页。
home
函数是主页路由函数。如果用户未登录,它将重定向到登录页面。如果用户已登录,它将渲染 index.html
模板。