SpringBootVueWebsocket实现服务器端向客户端主动发送消息
概述
本文通过一个实际的场景来介绍在前后端分离的项目中通过 WebSocket 来实现服务器端主动向客户端发送消息的应用。主要内容如下 WebSocket 是什么 服务器端 向 客户端 主动发送消息的案例说明 SpringBoot 后端中 Websocket 的配置和使用 后端 Websocket 实现原理 Vue 前端 Websocket 的配置和使用 WebSocket 是什么
Websocket 是一种在单个 TCP 连接上进行全双工通信的协议。WebSocket 连接成功后,服务端与客户端可以双向通信。在需要消息推送的场景,Websocket 相对于轮询能更好的节省服务器资源和带宽,并且能够更实时地进行通讯。
具体如下特点 与 HTTP 协议有着良好的兼容性。默认端口也是 80 和 443,并且握手阶段采用 HTTP 协议,因此握手时不容易屏蔽,能通过各种 HTTP 代理服务器。 依赖于 TCP 协议 数据格式比较轻量,性能开销小,通信高效。 可以发送文本,也可以发送二进制数据。 没有同源限制,客户端可以与任意服务器通信。 协议标识符是 ws(如果加密,则为 wss),服务器网址就是 URL 服务器端 向 客户端 主动发送消息的案例说明
在客户端的列表数据中有个 status 字段,服务器端需要花费较长的时间进行处理,处理完成后才会更新对应数据的 status 字段值,通过 Websocket 的处理流程如下: 前端页面列表数据加载后,初始化一组 Websocket 客户端对象 服务器端 接收到 前端数据状态的查询请求 服务器端 每隔一段时间查询一下数据库,然后返回给客户端 客户端 根据返回的数据状态,再更新页面数据 后端 SpringBoot 中 Websocket 的配置和使用Maven 依赖 org.springframework.boot spring-boot-starter-websocket 配置
通过注入 ServerEndpointExporter 类,用于在项目启动的时候自动将使用了 @ServerEndpoint 注解声明的 Websocket endpoint 注册到 WebSocketContainer 中。 package com.ckjava.config; /** * Function: * * @author chenkui 2022/4/6 17:55 */ import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.web.socket.server.standard.ServerEndpointExporter; @Configuration public class WebSocketConfig { /** * 注入 ServerEndpointExporter, * 这个 bean 会自动注册使用了 @ServerEndpoint 注解声明的 Websocket endpoint */ @Bean public ServerEndpointExporter serverEndpointExporter() { return new ServerEndpointExporter(); } } 通过@ServerEndpoint注解标注实现类通过在类上增加 @ServerEndpoint 和 @Component 注解,用于标注 Websocket 的实现类 通过 ConcurrentHashMap 管理多个客户端的 Session 通过 ScheduledExecutorService 和 init 方法实现定时对客户端进行消息发送 @OnOpen 标注的方法,用于接收客户端的 连接请求,其中的 @PathParam 用于接收 url 中的参数 @OnClose 标注的方法,用于接收客户端的 关闭请求。 @OnMessage标注的方法,用于接收客户端的 消息。 @OnError标注的方法,用于错误处理 package com.ckjava.websocket; import com.ckjava.xutils.JsonUtils; import com.ckjava.entity.TSysPubEntity; import com.ckjava.service.TSysPubService; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Component; import javax.annotation.PostConstruct; import javax.annotation.Resource; import javax.websocket.*; import javax.websocket.server.PathParam; import javax.websocket.server.ServerEndpoint; import java.util.Objects; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; /** * Function: * * @author chenkui 2022/4/6 18:00 */ @Slf4j @Component @ServerEndpoint("/websocket/{pubId}") public class PubStatusWS { /** * 每个数据对应一个 session */ private static final ConcurrentHashMap sessionPool = new ConcurrentHashMap<>(); /** * 定时任务的线程池 */ private static final ScheduledExecutorService scheduledExecutorService = new ScheduledThreadPoolExecutor(1); @Resource private TSysPubService pubService; /** * 定时从数据库中获取数据的最新状态,然后返回给前端 */ @PostConstruct public void init() { scheduledExecutorService.scheduleAtFixedRate(() -> { sessionPool.forEach((pubId, session) -> { final TSysPubEntity entity = pubService.findById(pubId); // 返回最新的状态 session.getAsyncRemote().sendText(JsonUtils.toJSONString(entity)); log.debug(String.format("websocket消息 server send to pubId %s, text:%s", session.getId(), entity.getPubStatus())); }); }, 10, 30, TimeUnit.SECONDS); } @OnOpen public void onOpen(final Session session, final EndpointConfig endpointConfig, @PathParam(value = "pubId") final Integer pubId) { sessionPool.put(pubId, session); log.info(String.format("【websocket消息】 pubId:%s 加入连接,当前总数为:%s", pubId, sessionPool.size())); } @OnClose public void onClose(final Session session, final CloseReason closeReason) { final AtomicReference atomicReference = new AtomicReference<>(); sessionPool.forEach((pubId, s) -> { if (Objects.equals(s.getId(), session.getId())) { atomicReference.set(pubId); } }); sessionPool.remove(atomicReference.get()); log.info(String.format("【websocket消息】pubId:%s 连接断开,当前总数为:%s", atomicReference.get(), sessionPool.size())); } @OnMessage public void onMessage(final Session session, final String message) { log.error(String.format("【websocket消息】收到客户端 pubId:%s 消息:%s", session.getId(), message)); } @OnError public void onError(final Session session, final Throwable throwable) { log.error(String.format("【websocket消息】pubId:%s 出现异常:%s", session.getId(), throwable)); } /** * 向客户端群发消息 * @param message 文本消息 */ public void sendAllMessage(final String message) { sessionPool.forEach((pubId, session) -> { session.getAsyncRemote().sendText(message); }); } /** * 向某个客户端发送消息 * * @param pubId 客户端id * @param message 文本消息 */ public void sendOneMessage(final Integer pubId, final String message) { final Session session = sessionPool.get(pubId); if (session != null) { session.getAsyncRemote().sendText(message); } } } 后端 Websocket 实现原理
为什么增加一个 ServerEndpointExporter Bean,并通过在一个类上增加 @ServerEndpoint 和 @Component 注解就可以实现服务器端 Websocket 功能,这里简单解析一下。 ServerEndpointExporter 的核心方法ServerEndpointExporter 实现了 spring 中的 SmartInitializingSingleton 接口,并重写了 afterSingletonsInstantiated 方法,具体如下 @Override public void afterSingletonsInstantiated() { registerEndpoints(); } 在 registerEndpoints 方法中可以发现,通过 ApplicationContext 中的 getBeanNamesForAnnotation 方法,从 spring 的 ioc 容器中获取含有 @ServerEndpoint 注解的类。 /** * Actually register the endpoints. Called by {@link #afterSingletonsInstantiated()}. */ protected void registerEndpoints() { Set> endpointClasses = new LinkedHashSet<>(); if (this.annotatedEndpointClasses != null) { endpointClasses.addAll(this.annotatedEndpointClasses); } ApplicationContext context = getApplicationContext(); if (context != null) { String[] endpointBeanNames = context.getBeanNamesForAnnotation(ServerEndpoint.class); for (String beanName : endpointBeanNames) { endpointClasses.add(context.getType(beanName)); } } for (Class<?> endpointClass : endpointClasses) { registerEndpoint(endpointClass); } if (context != null) { Map endpointConfigMap = context.getBeansOfType(ServerEndpointConfig.class); for (ServerEndpointConfig endpointConfig : endpointConfigMap.values()) { registerEndpoint(endpointConfig); } } } 在 registerEndpoint 方法中,通过 ServerContainer 的 addEndpoint 方法,最终将 endpoint 实现类注册到 ServerContainer 中。 private void registerEndpoint(Class<?> endpointClass) { ServerContainer serverContainer = getServerContainer(); Assert.state(serverContainer != null, "No ServerContainer set. Most likely the server"s own WebSocket ServletContainerInitializer " + "has not run yet. Was the Spring ApplicationContext refreshed through a " + "org.springframework.web.context.ContextLoaderListener, " + "i.e. after the ServletContext has been fully initialized?"); try { if (logger.isDebugEnabled()) { logger.debug("Registering @ServerEndpoint class: " + endpointClass); } serverContainer.addEndpoint(endpointClass); } catch (DeploymentException ex) { throw new IllegalStateException("Failed to register @ServerEndpoint class: " + endpointClass, ex); } } ServerContainer
java 定义了一套 javax.servlet-api, 一个 HttpServlet 就是一个 HTTP 服务。java websocket 并非基于 servlet-api 简单扩展, 而是新定义了一套 javax.websocket-api。
一个 websocket 服务对应一个 Endpoint。与 ServletContext 对应, websocket-api 也定义了 WebSocketContainer, 而编程方式注册 websocket 的接口是继承自 WebSocketContainer 的 ServerContainer。
一个 websocket 可以接受并管理多个连接, 因此可被视作一个 server。主流 servlet 容器都支持 websocket, 如 tomcat, jetty 等。看 ServerContainer api 文档, 可从 ServletContext attribute 找到 ServerContainer。 Vue 前端 Websocket 的配置和使用在 created 方法中调用了 getPageData 方法,用于接收到列表数据后,通过 initWs 方法 给 每个数据 id 初始化一个 WebSocket 客户端 通过 onerror 事件绑定 出现异常 时候的回调方法 通过 onopen 事件绑定 连接成功 的回调方法 通过 onmessage 事件绑定 接收到服务器端消息 的回调方法,这里收到消息后,再更新前端的数据 通过 onclose 事件绑定 关闭连接 的回调方法 在 unmounted 方法中调用了 onbeforeunload 方法,用于关闭所有的 WebSocket 连接