首页 >资讯动态 > 正文

Vue3.0+Tornado6.1发布订阅模式打造异步非阻塞实时=通信聊天系统 焦点播报

2023-02-05 08:11:00 来源:刘悦技术分享

“表达欲”是人类成长史上的强大“源动力”,恩格斯早就直截了当地指出,处在蒙昧时代即低级阶段的人类,“以果实、坚果、根作为食物;音节清晰的语言的产生是这一时期的主要成就”。而在网络时代人们的表达欲往往更容易被满足,因为有聊天软件的存在。通常意义上,聊天大抵都基于两种形式:群聊和单聊。群聊或者群组聊天我们可以理解为聊天室,可以有人数上限,而单聊则可以认为是上限为2个人的特殊聊天室。


【资料图】

为了开发高质量的聊天系统,开发者应该具备客户机和服务器如何通信的基本知识。在聊天系统中,客户端可以是移动应用程序(C端)或web应用程序(B端)。客户端之间不直接通信。相反,每个客户端都连接到一个聊天服务,该服务支撑双方通信的功能。所以该服务在业务上必须支持的最基本功能:

1.能够实时接收来自其他客户端的信息。

2.能够将每条信息实时推送给收件人。

当客户端打算启动聊天时,它会使用一个或多个网络协议连接聊天服务。对于聊天服务,网络协议的选择至关重要,这里,我们选择Tornado框架内置Websocket协议的接口,简单而又方便,安装tornado6.1

pip3 install tornado==6.1

随后编写程序启动文件main.py:

import tornado.httpserverimport tornado.websocketimport tornado.ioloopimport tornado.webimport redisimport threadingimport asyncio# 用户列表users = []# websocket协议class WB(tornado.websocket.WebSocketHandler):# 跨域支持def check_origin(self,origin):return True# 开启链接def open(self):                users.append(self)# 接收消息def on_message(self,message):self.write_message(message["data"])# 断开def on_close(self):users.remove(self)# 建立torando实例app = tornado.web.Application([(r"/wb/",WB)],debug=True)if __name__ == "__main__":# 声明服务器http_server_1 = tornado.httpserver.HTTPServer(app)# 监听端口http_server_1.listen(8000)# 开启事件循环tornado.ioloop.IOLoop.instance().start()

如此,就在短时间搭建起了一套websocket协议服务,每一次有客户端发起websocket连接请求,我们都会将它添加到用户列表中,等待用户的推送或者接收信息的动作。

下面我们需要通过某种形式将消息的发送方和接收方联系起来,以达到“聊天”的目的,这里选择Redis的发布订阅模式(pubsub),以一个demo来实例说明,server.py

import redisr = redis.Redis()r.publish(\"test\","hello")

随后编写 client.py:

import redisr = redis.Redis()ps = r.pubsub()ps.subscribe("test")  for item in ps.listen():     if item["type"] == "message":        print(item["data"])

可以这么理解:订阅者(listener)负责订阅频道(channel);发送者(publisher)负责向频道(channel)发送二进制的字符串消息,然后频道收到消息时,推送给订阅者。

频道不仅可以联系发布者和订阅者,同时,也可以利用频道进行“消息隔离”,即不同频道的消息只会给订阅该频道的用户进行推送:

根据发布者订阅者逻辑,改写main.py:

import tornado.httpserverimport tornado.websocketimport tornado.ioloopimport tornado.webimport redisimport threadingimport asyncio# 用户列表users = []# 频道列表channels = [\"channel_1\",\"channel_2\"]# websocket协议class WB(tornado.websocket.WebSocketHandler):# 跨域支持def check_origin(self,origin):return True# 开启链接def open(self):users.append(self)# 接收消息def on_message(self,message):self.write_message(message["data"])# 断开def on_close(self):users.remove(self)# 基于redis监听发布者发布消息def redis_listener(loop):asyncio.set_event_loop(loop)async def listen(): r = redis.Redis(decode_responses=True)# 声明pubsb实例ps = r.pubsub()# 订阅聊天室频道ps.subscribe([\"channel_1\",\"channel_2\"])# 监听消息for message in ps.listen():print(message)# 遍历链接上的用户for user in users:print(user)if message[\"type\"] == \"message\" and message[\"channel\"] == user.get_cookie(\"channel\"):user.write_message(message[\"data\"])future = asyncio.gather(listen())loop.run_until_complete(future)# 接口  发布信息class Msg(tornado.web.RequestHandler):# 重写父类方法def set_default_headers(self):# 设置请求头信息print(\"开始设置\")# 域名信息self.set_header(\"Access-Control-Allow-Origin\",\"*\")# 请求信息self.set_header(\"Access-Control-Allow-Headers\",\"x-requested-with\")# 请求方式self.set_header(\"Access-Control-Allow-Methods\",\"POST,GET,PUT,DELETE\")# 发布信息async def post(self):data = self.get_argument(\"data\",None)channel = self.get_argument(\"channel\",\"channel_1\")print(data)# 发布r = redis.Redis()r.publish(channel,data)return self.write(\"ok\")# 建立torando实例app = tornado.web.Application([(r"/send/",Msg),(r"/wb/",WB)],debug=True)if __name__ == "__main__":loop = asyncio.new_event_loop()# 单线程启动订阅者服务threading.Thread(target=redis_listener,args=(loop,)).start()# 声明服务器http_server_1 = tornado.httpserver.HTTPServer(app)# 监听端口http_server_1.listen(8000)# 开启事件循环tornado.ioloop.IOLoop.instance().start()

这里假设默认有两个频道,逻辑是这样的:由前端控制websocket链接用户选择将消息发布到那个频道上,同时每个用户通过前端cookie的设置具备频道属性,当具备频道属性的用户对该频道发布了一条消息之后,所有其他具备该频道属性的用户通过redis进行订阅后主动推送刚刚发布的消息,而频道的推送只匹配订阅该频道的用户,达到消息隔离的目的。

需要注意的一点是,通过线程启动redis订阅服务时,需要将当前的loop实例传递给协程对象,否则在订阅方法内将会获取不到websocket实例,报这个错误:

IOLoop.current() doesn"t work in non-main

这是因为Tornado底层基于事件循环ioloop,而同步框架模式的Django或者Flask则没有这个问题。

下面编写前端代码,这里我们使用时下最流行的vue3.0框架,编写chat.vue:

这里前端在线客户端定期向状态服务器发送心跳事件。如果服务端在特定时间内(例如x秒)从客户端接收到心跳事件,则认为用户处于联机状态。否则,它将处于脱机状态,脱机后在阈值时间内可以进行重新连接的动作。同时利用vant框架的标签页可以同步切换频道,切换后将频道标识写入cookie,便于后端服务识别后匹配推送。

效果是这样的:

诚然,功能业已实现,但是如果我们处在一个高并发场景之下呢?试想一下如果一个频道有10万人同时在线,每秒有100条新消息,那么后台tornado的websocket服务推送频率是100w*10/s = 1000w/s 。

这样的系统架构如果不做负载均衡的话,很难抗住压力,那么瓶颈在哪里呢?没错,就是数据库redis,这里我们需要异步redis库aioredis的帮助:

pip3 install aioredis

aioredis通过协程异步操作redis读写,避免了io阻塞问题,使消息的发布和订阅操作非阻塞。

此时,可以新建一个异步订阅服务文件main_with_aioredis.py:

import asyncioimport aioredisfrom tornado import web, websocketfrom tornado.ioloop import IOLoopimport tornado.httpserverimport async_timeout

之后主要的修改逻辑是,通过aioredis异步建立redis链接,并且异步订阅多个频道,随后通过原生协程的asyncio.create_task方法(也可以使用asyncio.ensure_future)注册订阅消费的异步任务reader:

async def setup():    r = await aioredis.from_url(\"redis://localhost\", decode_responses=True)    pubsub = r.pubsub()    print(pubsub)    await pubsub.subscribe(\"channel_1\",\"channel_2\")    #asyncio.ensure_future(reader(pubsub))    asyncio.create_task(reader(pubsub))

在订阅消费方法中,异步监听所订阅频道中的发布信息,同时和之前的同步方法一样,比对用户的频道属性并且进行按频道推送:

async def reader(channel: aioredis.client.PubSub):    while True:        try:            async with async_timeout.timeout(1):                message = await channel.get_message(ignore_subscribe_messages=True)                if message is not None:                    print(f\"(Reader) Message Received: {message}\")                    for user in users:                        if user.get_cookie(\"channel\") == message[\"channel\"]:                            user.write_message(message[\"data\"])                        await asyncio.sleep(0.01)        except asyncio.TimeoutError:            pass

最后,利用tornado事件循环IOLoop传递中执行回调方法,将setup方法加入到事件回调中:

if __name__ == "__main__":    # 监听端口    application.listen(8000)    loop = IOLoop.current()    loop.add_callback(setup)    loop.start()

完整的异步消息发布、订阅、推送服务改造 main_aioredis.py:

import asyncioimport aioredisfrom tornado import web, websocketfrom tornado.ioloop import IOLoopimport tornado.httpserverimport async_timeoutusers = []# websocket协议class WB(tornado.websocket.WebSocketHandler):    # 跨域支持    def check_origin(self,origin):        return True    # 开启链接    def open(self):        users.append(self)    # 接收消息    def on_message(self,message):        self.write_message(message["data"])    # 断开    def on_close(self):        users.remove(self)class Msg(web.RequestHandler):    # 重写父类方法    def set_default_headers(self):        # 设置请求头信息        print(\"开始设置\")        # 域名信息        self.set_header(\"Access-Control-Allow-Origin\",\"*\")        # 请求信息        self.set_header(\"Access-Control-Allow-Headers\",\"x-requested-with\")        # 请求方式        self.set_header(\"Access-Control-Allow-Methods\",\"POST,GET,PUT,DELETE\")    # 发布信息    async def post(self):        data = self.get_argument(\"data\",None)        channel = self.get_argument(\"channel\",\"channel_1\")        print(data)        # 发布        r = await aioredis.from_url(\"redis://localhost\", decode_responses=True)        await r.publish(channel,data)        return self.write(\"ok\")async def reader(channel: aioredis.client.PubSub):    while True:        try:            async with async_timeout.timeout(1):                message = await channel.get_message(ignore_subscribe_messages=True)                if message is not None:                    print(f\"(Reader) Message Received: {message}\")                    for user in users:                        if user.get_cookie(\"channel\") == message[\"channel\"]:                            user.write_message(message[\"data\"])                        await asyncio.sleep(0.01)        except asyncio.TimeoutError:            passasync def setup():    r = await aioredis.from_url(\"redis://localhost\", decode_responses=True)    pubsub = r.pubsub()    print(pubsub)    await pubsub.subscribe(\"channel_1\",\"channel_2\")    #asyncio.ensure_future(reader(pubsub))    asyncio.create_task(reader(pubsub))application = web.Application([    (r"/send/",Msg),    (r"/wb/", WB),],debug=True)    if __name__ == "__main__":    # 监听端口    application.listen(8000)    loop = IOLoop.current()    loop.add_callback(setup)    loop.start()

从程序设计角度上讲,充分利用了协程的异步执行思想,更加地丝滑流畅。

结语:实践操作来看,Redis发布订阅模式,非常契合这种实时(websocket)通信聊天系统的场景,但是发布的消息如果没有对应的频道或者消费者,消息则会被丢弃,假如我们在生产环境在消费的时候,突然断网,导致其中一个订阅者挂掉了一段时间,那么当它重新连接上的时候,中间这一段时间产生的消息也将不会存在,所以如果想要保证系统的健壮性,还需要其他服务来设计高可用的实时存储方案,不过那就是另外一个故事了,最后奉上项目地址,与众乡亲同飨:https://github.com/zcxey2911/tornado_redis_vue3_chatroom

标签: 监听端口 聊天系统 重新连接

x 广告
x 广告

Copyright ©   2015-2022 现在劳务网版权所有  备案号:粤ICP备18023326号-5   联系邮箱:855 729 8@qq.com