Vue3。0Tornado6。1发布订阅模式打造异步非阻塞实时通信聊天系统
"表达欲"是人类成长史上的强大"源动力",恩格斯早就直截了当地指出,处在蒙昧时代即低级阶段的人类,"以果实、坚果、根作为食物;音节清晰的语言的产生是这一时期的主要成就"。而在网络时代人们的表达欲往往更容易被满足,因为有聊天软件的存在。通常意义上,聊天大抵都基于两种形式:群聊和单聊。群聊或者群组聊天我们可以理解为聊天室,可以有人数上限,而单聊则可以认为是上限为2个人的特殊聊天室。
为了开发高质量的聊天系统,开发者应该具备客户机和服务器如何通信的基本知识。在聊天系统中,客户端可以是移动应用程序(C端)或web应用程序(B端)。客户端之间不直接通信。相反,每个客户端都连接到一个聊天服务,该服务支撑双方通信的功能。所以该服务在业务上必须支持的最基本功能:
1.能够实时接收来自其他客户端的信息。
2.能够将每条信息实时推送给收件人。
当客户端打算启动聊天时,它会使用一个或多个网络协议连接聊天服务。对于聊天服务,网络协议的选择至关重要,这里,我们选择Tornado框架内置Websocket协议的接口,简单而又方便,安装tornado6.1 pip3 install tornado==6.1
随后编写程序启动文件main.py: import tornado.httpserver import tornado.websocket import tornado.ioloop import tornado.web import redis import threading import asyncio # 用户列表 users = [] # websocket协议 class WB(tornado.websocket.WebSocketHandler): # 跨域支持 def check_origin(self,origin): return True # 开启链接 def open(self): users.append(self) # 接收消息 def on_message(self,message): self.write_message(message["data"]) # 断开 def on_close(self): users.remove(self) # 建立torando实例 app = tornado.web.Application( [ (r"/wb/",WB) ],debug=True ) if __name__ == "__main__": # 声明服务器 http_server_1 = tornado.httpserver.HTTPServer(app) # 监听端口 http_server_1.listen(8000) # 开启事件循环 tornado.ioloop.IOLoop.instance().start()
如此,就在短时间搭建起了一套websocket协议服务,每一次有客户端发起websocket连接请求,我们都会将它添加到用户列表中,等待用户的推送或者接收信息的动作。
下面我们需要通过某种形式将消息的发送方和接收方联系起来,以达到"聊天"的目的,这里选择Redis的发布订阅模式(pubsub),以一个demo来实例说明,server.py import redis r = redis.Redis() r.publish("test","hello")
随后编写 client.py: import redis r = redis.Redis() ps = r.pubsub() ps.subscribe("test") for item in ps.listen(): if item["type"] == "message": print(item["data"])
可以这么理解:订阅者(listener)负责订阅频道(channel);发送者(publisher)负责向频道(channel)发送二进制的字符串消息,然后频道收到消息时,推送给订阅者。
频道不仅可以联系发布者和订阅者,同时,也可以利用频道进行"消息隔离",即不同频道的消息只会给订阅该频道的用户进行推送:
根据发布者订阅者逻辑,改写main.py: import tornado.httpserver import tornado.websocket import tornado.ioloop import tornado.web import redis import threading import asyncio # 用户列表 users = [] # 频道列表 channels = ["channel_1","channel_2"] # websocket协议 class WB(tornado.websocket.WebSocketHandler): # 跨域支持 def check_origin(self,origin): return True # 开启链接 def open(self): users.append(self) # 接收消息 def on_message(self,message): self.write_message(message["data"]) # 断开 def on_close(self): users.remove(self) # 基于redis监听发布者发布消息 def redis_listener(loop): asyncio.set_event_loop(loop) async def listen(): r = redis.Redis(decode_responses=True) # 声明pubsb实例 ps = r.pubsub() # 订阅聊天室频道 ps.subscribe(["channel_1","channel_2"]) # 监听消息 for message in ps.listen(): print(message) # 遍历链接上的用户 for user in users: print(user) if message["type"] == "message" and message["channel"] == user.get_cookie("channel"): user.write_message(message["data"]) future = asyncio.gather(listen()) loop.run_until_complete(future) # 接口 发布信息 class Msg(tornado.web.RequestHandler): # 重写父类方法 def set_default_headers(self): # 设置请求头信息 print("开始设置") # 域名信息 self.set_header("Access-Control-Allow-Origin","*") # 请求信息 self.set_header("Access-Control-Allow-Headers","x-requested-with") # 请求方式 self.set_header("Access-Control-Allow-Methods","POST,GET,PUT,DELETE") # 发布信息 async def post(self): data = self.get_argument("data",None) channel = self.get_argument("channel","channel_1") print(data) # 发布 r = redis.Redis() r.publish(channel,data) return self.write("ok") # 建立torando实例 app = tornado.web.Application( [ (r"/send/",Msg), (r"/wb/",WB) ],debug=True ) if __name__ == "__main__": loop = asyncio.new_event_loop() # 单线程启动订阅者服务 threading.Thread(target=redis_listener,args=(loop,)).start() # 声明服务器 http_server_1 = tornado.httpserver.HTTPServer(app) # 监听端口 http_server_1.listen(8000) # 开启事件循环 tornado.ioloop.IOLoop.instance().start()
这里假设默认有两个频道,逻辑是这样的:由前端控制websocket链接用户选择将消息发布到那个频道上,同时每个用户通过前端cookie的设置具备频道属性,当具备频道属性的用户对该频道发布了一条消息之后,所有其他具备该频道属性的用户通过redis进行订阅后主动推送刚刚发布的消息,而频道的推送只匹配订阅该频道的用户,达到消息隔离的目的。
需要注意的一点是,通过线程启动redis订阅服务时,需要将当前的loop实例传递给协程对象,否则在订阅方法内将会获取不到websocket实例,报这个错误: IOLoop.current() doesn"t work in non-main
这是因为Tornado底层基于事件循环ioloop,而同步框架模式的Django或者Flask则没有这个问题。
下面编写前端代码,这里我们使用时下最流行的vue3.0框架,编写chat.vue: 聊天窗口
发送
这里前端在线客户端定期向状态服务器发送心跳事件。如果服务端在特定时间内(例如x秒)从客户端接收到心跳事件,则认为用户处于联机状态。否则,它将处于脱机状态,脱机后在阈值时间内可以进行重新连接的动作。同时利用vant框架的标签页可以同步切换频道,切换后将频道标识写入cookie,便于后端服务识别后匹配推送。
效果是这样的:
诚然,功能业已实现,但是如果我们处在一个高并发场景之下呢?试想一下如果一个频道有10万人同时在线,每秒有100条新消息,那么后台tornado的websocket服务推送频率是100w*10/s = 1000w/s 。
这样的系统架构如果不做负载均衡的话,很难抗住压力,那么瓶颈在哪里呢?没错,就是数据库redis,这里我们需要异步redis库aioredis的帮助: pip3 install aioredis
aioredis通过协程异步操作redis读写,避免了io阻塞问题,使消息的发布和订阅操作非阻塞。
此时,可以新建一个异步订阅服务文件main_with_aioredis.py: import asyncio import aioredis from tornado import web, websocket from tornado.ioloop import IOLoop import tornado.httpserver import async_timeout
之后主要的修改逻辑是,通过aioredis异步建立redis链接,并且异步订阅多个频道,随后通过原生协程的asyncio.create_task方法(也可以使用asyncio.ensure_future)注册订阅消费的异步任务reader: async def setup(): r = await aioredis.from_url("redis://localhost", decode_responses=True) pubsub = r.pubsub() print(pubsub) await pubsub.subscribe("channel_1","channel_2") #asyncio.ensure_future(reader(pubsub)) asyncio.create_task(reader(pubsub))
在订阅消费方法中,异步监听所订阅频道中的发布信息,同时和之前的同步方法一样,比对用户的频道属性并且进行按频道推送: async def reader(channel: aioredis.client.PubSub): while True: try: async with async_timeout.timeout(1): message = await channel.get_message(ignore_subscribe_messages=True) if message is not None: print(f"(Reader) Message Received: {message}") for user in users: if user.get_cookie("channel") == message["channel"]: user.write_message(message["data"]) await asyncio.sleep(0.01) except asyncio.TimeoutError: pass
最后,利用tornado事件循环IOLoop传递中执行回调方法,将setup方法加入到事件回调中: if __name__ == "__main__": # 监听端口 application.listen(8000) loop = IOLoop.current() loop.add_callback(setup) loop.start()
完整的异步消息发布、订阅、推送服务改造 main_aioredis.py: import asyncio import aioredis from tornado import web, websocket from tornado.ioloop import IOLoop import tornado.httpserver import async_timeout users = [] # websocket协议 class WB(tornado.websocket.WebSocketHandler): # 跨域支持 def check_origin(self,origin): return True # 开启链接 def open(self): users.append(self) # 接收消息 def on_message(self,message): self.write_message(message["data"]) # 断开 def on_close(self): users.remove(self) class Msg(web.RequestHandler): # 重写父类方法 def set_default_headers(self): # 设置请求头信息 print("开始设置") # 域名信息 self.set_header("Access-Control-Allow-Origin","*") # 请求信息 self.set_header("Access-Control-Allow-Headers","x-requested-with") # 请求方式 self.set_header("Access-Control-Allow-Methods","POST,GET,PUT,DELETE") # 发布信息 async def post(self): data = self.get_argument("data",None) channel = self.get_argument("channel","channel_1") print(data) # 发布 r = await aioredis.from_url("redis://localhost", decode_responses=True) await r.publish(channel,data) return self.write("ok") async def reader(channel: aioredis.client.PubSub): while True: try: async with async_timeout.timeout(1): message = await channel.get_message(ignore_subscribe_messages=True) if message is not None: print(f"(Reader) Message Received: {message}") for user in users: if user.get_cookie("channel") == message["channel"]: user.write_message(message["data"]) await asyncio.sleep(0.01) except asyncio.TimeoutError: pass async def setup(): r = await aioredis.from_url("redis://localhost", decode_responses=True) pubsub = r.pubsub() print(pubsub) await pubsub.subscribe("channel_1","channel_2") #asyncio.ensure_future(reader(pubsub)) asyncio.create_task(reader(pubsub)) application = web.Application([ (r"/send/",Msg), (r"/wb/", WB), ],debug=True) if __name__ == "__main__": # 监听端口 application.listen(8000) loop = IOLoop.current() loop.add_callback(setup) loop.start()
从程序设计角度上讲,充分利用了协程的异步执行思想,更加地丝滑流畅。
结语:实践操作来看,Redis发布订阅模式,非常契合这种实时(websocket)通信聊天系统的场景,但是发布的消息如果没有对应的频道或者消费者,消息则会被丢弃,假如我们在生产环境在消费的时候,突然断网,导致其中一个订阅者挂掉了一段时间,那么当它重新连接上的时候,中间这一段时间产生的消息也将不会存在,所以如果想要保证系统的健壮性,还需要其他服务来设计高可用的实时存储方案,不过那就是另外一个故事了,最后奉上项目地址,与众乡亲同飨:https://github.com/zcxey2911/tornado_redis_vue3_chatroom
散户必看这八种形态的股票一定要清仓卖出,刻不容缓股票市场千变万化,上一秒不知道下一秒是什么的走势,天地板,地天板时有发生,俗话说会买的是徒弟,会卖的是师傅。下面由老梁分析一下八种必卖的形态走势,文章有点长,希望大家认真看完,对大
从人口年龄结构看河南富士康VS印度富士康前言最近河南富士康的负面新闻引起较大关注,而印度富士康已有能力生产最新型的iphone手机,有人担心富士康会不会考虑逐步将河南产能转移到印度。本文从人口年龄结构角度分析,河南对富士
一道风景线丨补强筋骨!植物园新增钢结构栈道,可远眺浮山湾建筑群编者按青岛人有着深深的木栈道情结。在美丽的海滨木栈道上漫步是青岛市民最惬意的一种休闲。如今,随着青岛城市更新进程的逐步推进,公园城市建设取得明显成效。一些海滨木栈道得到了维护与更新
华中科技大学精准调节Zn(002)沉积,实现长寿命水系锌金属电池httpsdoi。org10。1021acsenergylett。2c02359背景为了满足电气设备和大规模储能的高能量和高安全性的要求,人们在追求下一代可充电电池方面做出了巨大努
中国矿业大学发表新型金属锡LG复合材料,用于锂离子电池成果简介本文,中国矿业大学朱俊生等研究人员在DiamRelatMater期刊发表名为Selfassemblydesignofnoveltinlignitederivedgraphe
往事如茶,我敬你一杯秋去冬来,树上黄叶渐渐凋落,目之所及也日趋萧条。疫情当前,被封在家里许久许久了,仿佛快忘了自由出行的日子。偶然间收到久未联系的一个故友,忽然发来的信息,问我最近可好,万般心绪如开闸
红尘作伴人啊,只有好好的爱自己了,人生的道路上就会越来越好。曾经的时候,总以为能讨得上他人的喜欢了,自己就能有幸福的生活。所以总是拼命的付出了许多,哪怕是自己为此并不是快乐,却也觉得无所谓
心动何必远方!2022韶关星级乡村旅游民宿名单来了,私享高颜值草坪庭院,度假必去2022年度韶关市星级乡村旅游民宿名单公布,15家民宿被评定为韶关市星级乡村旅游民宿,四星级2家,三星级13家!以后外出游玩又有新的好玩舒适的民宿啦,快来和小编一起瞧瞧吧01hr品
从再见爱人2学到的那些道理周末有空,重新刷了一把再见爱人2,感触有点多,今天来写一写。一艾威和Lisa再见爱人2里面,艾威就是假设了一个不容侵犯的一件事情不能打麻将,会影响Lisa的身体健康。所以一遇到打麻
主销车型成绩下滑,大众在华地位不再稳固?在11月各品牌零售销量榜中,我们能看到国产品牌的纷纷崛起,比亚迪,吉利,长安等车企都进入了厂商零售榜的前五名,这无疑令每一名国人感到自豪骄傲。但是,这样的成绩并不是让所有人都感到开
中国慕课数量已逾6万门记者从近日在线上召开的2022世界慕课与在线教育大会上获悉,慕课发展十年来,中国在线教育日新月异,中国慕课数量已经达到6。19万门,注册用户超过3。7亿人。本次大会以教育数字化引领
从3899元跌至2999元,12GB256GB120W,从高端市场跌至中端市场智能手机的功能非常强大,我们的生活也越来越离不开手机了,可以说如今手机已经涵盖了我们工作社交以及娱乐等方方面面,在这种情况下,一边玩手机一边充电也成为了常态,在手机电池技术以及无线
双芯拍摄人像有多给力?与iPhone13对比,看出国产手机真实水平现在有些手机的自拍镜头,人像拍摄都能够呈现出最真实的一面,而且人物妆容和整体型态也更贴近于自然。比如苹果的iPhone手机在自拍方面都强调真实,甚至成为自拍达人的首选。不过,在对比
美妆品牌如何推广?B站美妆品牌内容营销商业起飞介绍与美妆生活相关的内容,是研究年轻人偏好趋势的重要切口。年轻人通过内容来获取信息做出判断,也在内容观看和互动中透露自己的需求。追溯美妆消费从种草攻略到下单的路径,越来越多年轻人在B站
搭乘神十三回来的太空种子怎么样了?4月16日神舟十三号飞船顺利完成全部既定任务返回地球跟随航天员一同返航的还有12000颗种子如今,一个多月过去了这些种子有啥变化?太空种子茁壮成长在内蒙古,工作人员正在培育太空种子
华为将在数博会上推出智能数据集成平台5月26日,2022中国国际大数据产业博览会将在线上举办。华为作为连续八次参加数博会的熟面孔,将在线上企业展示平台推出华为iData智能数据集成平台,全面展现华为在行业数字化转型领
星链的威胁与中美的太空竞争人人能科普,处处有新知中美两国都明白,对低地球轨道的控制将越来越依赖于对地球和月球之间空间的控制,而那取决于对月球的控制。但马斯克美国或中国的成功或胜利也将取决于他们各自充分利用其
最新情况搭乘神十三回来的太空种子怎么样了?新闻页台海网4月16日神舟十三号飞船顺利完成全部既定任务返回地球跟随航天员一同返航的还有12000颗种子如今,一个多月过去了这些种子有啥变化?太空种子茁壮成长在内蒙古,工作人员正在
搭乘神十三回来的太空种子怎么样了?4月16日神舟十三号飞船顺利完成全部既定任务返回地球跟随航天员一同返航的还有12000颗种子如今,一个多月过去了这些种子有啥变化?太空种子茁壮成长在内蒙古,工作人员正在培育太空种子
搭乘神十三回来的太空种子怎么样了?最新情况来源央视新闻4月16日神舟十三号飞船顺利完成全部既定任务返回地球跟随航天员一同返航的还有12000颗种子如今,一个多月过去了这些种子有啥变化?太空种子茁壮成长在内蒙古,工作人员正在
在这世界的尽头,谁又分得清现实与梦幻TraveltoIceland。去冰岛旅行需要理由吗?这个在北极圈附近的岛国这几年已经成为国外境外旅游的新兴热门目的地。永恒夜空中绚烂的极光空灵梦幻的水晶冰洞数之不尽的瀑布浩渺无垠
太美了,一起来一场梦幻之旅吧天气随手拍翔安香山公园50亩的马鞭草开好了。微风吹动下,马鞭草摇曳生姿,勾勒出迷人且梦幻的紫色花海。走进花海中,马鞭草的高度能到达成人的腰部,可以挡住胖妹妹的肉肉,让胖妹妹们也能有