springBoot集成websocket实时消息推送

程序浅谈 后端 一天前

springBoot集成websocket实时消息推送

springBoot集成websocket实时消息推送

WebSocket是一种在Web应用程序中实现双向通信的协议。它允许在客户端和服务器之间建立持久性的连接,并支持双向数据传输,实现了实时、低延迟的通信。

📍常见的消息推送方法

  1. WebSocket:通过使用WebSocket协议,可以在Java后端实现双向通信,从而实现消息的实时推送。你可以使用Java中的WebSocket API或者使用开源库如Tomcat的WebSocket支持、Spring WebSocket等来实现。
  2. Server-Sent Events (SSE):SSE是一种基于HTTP的轻量级服务器推送技术,它允许服务器向客户端单向推送消息。在Java后端,你可以使用Servlet或者基于框架的实现来支持SSE。
  3. 消息队列:通过使用消息队列如RabbitMQ、ActiveMQ或者Kafka等,Java后端可以将消息发布到消息队列中,然后客户端通过订阅消息队列来获取实时消息推送。
  4. 短轮询(Long Polling):即浏览器定时向服务器发送请求,以此来更新数据的方法。如下图所示,原理就是客户端不断地向服务端发请求,如果服务端数据有更新,服务端就把数据发送回来,客户端就能接收到新数据了
  5. 长轮询(Long Polling):虽然不同于实时推送,但长轮询是一种模拟实时推送的技术。在Java后端,你可以实现长轮询机制来达到类似实时推送的效果。以上是一些常见的Java后端实现消息实时推送提醒的方法。每种方法都有其适用的场景和特点

📍引入依赖xml

代码解读
复制代码
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-websocket</artifactId> </dependency>

📍WebSocketConfig配置类java

代码解读
复制代码
package com.yxsd.cnooc.data.wb; import org.springframework.context.annotation.Bean; import org.springframework.stereotype.Component; import org.springframework.web.socket.server.standard.ServerEndpointExporter; @Component public class WebSocketConfig { @Bean public ServerEndpointExporter serverEndpointExporter(){ return new ServerEndpointExporter(); } }

📍WebSocketServer服务器类java

代码解读
复制代码
package com.yxsd.cnooc.data.service; import org.springframework.stereotype.Component; import javax.websocket.*; import javax.websocket.server.PathParam; import javax.websocket.server.ServerEndpoint; import java.io.IOException; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CopyOnWriteArraySet; /** * @ServerEndpoint 注解是一个类层次的注解,它的功能主要是将目前的类定义成一个websocket服务器端, * 注解的值将被用于监听用户连接的终端访问URL地址,客户端可以通过这个URL来连接到WebSocket服务器端 */ @Component @ServerEndpoint("/websocket/{userId}") public class WebSocketTest { private static ConcurrentHashMap<String, CopyOnWriteArraySet<WebSocketTest>> userwebSocketMap = new ConcurrentHashMap<String, CopyOnWriteArraySet<WebSocketTest>>(); private static ConcurrentHashMap<String, Integer> count = new ConcurrentHashMap<String, Integer>(); private String userId; /* * 与某个客户端的连接会话,需要通过它来给客户端发送数据 */ private Session session; /** * 连接建立成功调用的方法 * * @param session 可选的参数。session为与某个客户端的连接会话,需要通过它来给客户端发送数据 */ @OnOpen public void onOpen(Session session, @PathParam("userId") final String userId) { this.session = session; this.userId = userId; System.out.println("session:"+session); System.out.println("userId:"+userId); if (!exitUser(userId)) { initUserInfo(userId); } else { CopyOnWriteArraySet<WebSocketTest> webSocketTestSet = getUserSocketSet(userId); webSocketTestSet.add(this); userCountIncrease(userId); } System.out.println("有" + userId + "新连接加入!当前在线人数为" + getCurrUserCount(userId)); } /** * 连接关闭调用的方法 */ @OnClose public void onClose() { CopyOnWriteArraySet<WebSocketTest> webSocketTestSet = userwebSocketMap.get(userId); //从set中删除 webSocketTestSet.remove(this); //在线数减1 userCountDecrement(userId); System.out.println("有一连接关闭!当前在线人数为" + getCurrUserCount(userId)); } /** * 收到客户端消息后调用的方法 * * @param message 客户端发送过来的消息 * @param session 可选的参数 */ @OnMessage public void onMessage(String message, Session session) { CopyOnWriteArraySet<WebSocketTest> webSocketSet = userwebSocketMap.get(userId); /* System.out.println("来自客户端" + userId + "的消息:" + message); //群发消息 for (WebSocketTest item : webSocketSet) { try { item.sendMessage(message); } catch (IOException e) { e.printStackTrace(); continue; } }*/ } /** * 发生错误时调用 * * @param session * @param error */ @OnError public void onError(Session session, Throwable error) { System.out.println("发生错误"); error.printStackTrace(); } /** * 这个方法与上面几个方法不一样。没有用注解,是根据自己需要添加的方法。 * * @param message * @throws IOException */ public void sendMessage(String message) throws IOException { System.out.println("服务端推送" + userId + "的消息:" + message); this.session.getAsyncRemote().sendText(message); } /** * 这个方法与上面几个方法不一样。没有用注解,是根据自己需要添加的方法。 我是在有代办消息时 调用此接口 向指定用户发送消息 * * @param message * @throws IOException */ public void sendMessage(String userId,String message) throws IOException { System.out.println("服务端推送" + userId + "的消息:" + message); CopyOnWriteArraySet<WebSocketTest> webSocketSet = userwebSocketMap.get(userId); //群发消息 for (WebSocketTest item : webSocketSet) { try { item.session.getBasicRemote().sendText(message); } catch (IOException e) { e.printStackTrace(); continue; } } } public boolean exitUser(String userId) { return userwebSocketMap.containsKey(userId); } public CopyOnWriteArraySet<WebSocketTest> getUserSocketSet(String userId) { return userwebSocketMap.get(userId); } public void userCountIncrease(String userId) { if (count.containsKey(userId)) { count.put(userId, count.get(userId) + 1); } } public void userCountDecrement(String userId) { if (count.containsKey(userId)) { count.put(userId, count.get(userId) - 1); } } public void removeUserConunt(String userId) { count.remove(userId); } public Integer getCurrUserCount(String userId) { return count.get(userId); } private void initUserInfo(String userId) { CopyOnWriteArraySet<WebSocketTest> webSocketTestSet = new CopyOnWriteArraySet<WebSocketTest>(); webSocketTestSet.add(this); userwebSocketMap.put(userId, webSocketTestSet); count.put(userId, 1); } }

📍前端实现代码

  • 方法一:使用在线websocket客户端

    客户端页面使用 www.jsons.cn/websocket/ 与项目建立连接,充当web客户端

  • 方法二:自己编写客户端html

代码解读
复制代码
<!DOCTYPE html> <html lang="en"> <head> <meta charset="UTF-8"> <title>Title</title> </head> <body> </body> <script> let webSocket = null; // 创建一个变量 if ('WebSocket' in window){ // 判断当前的浏览器是否支持WebSocket // 如果支持则创建一个WebSocket赋值给刚才创建的变量 // 后面的路径实际上就是一次请求,但是这里用的是WebSocket协议 // 记住这个地方后面详细讲到怎么写 webSocket = new WebSocket('ws://localhost:8080/webSocket'); }else{ // 如果不兼容则弹框,该浏览器不支持 alert('该浏览器不支持') } /** * 当WebSocket创建连接(初始化)会触发该方法 */ webSocket.onopen = function (event){ console.log('建立连接') // 这个代表在浏览器打印日志,跟Java的System.out.println()意思一致 } /** * 当WebSocket关闭时候会触发该方法 */ webSocket.onclose = function (event){ console.log('关闭连接') // 同上 } /** * 当WebSocket接受消息会触发该方法 */ webSocket.onmessage = function (event){ console.log('收到消息:'+event.data) } /** * 当WebSocket连接出错触发该方法 */ webSocket.onerror = function (event){ console.log('websocket发生错误'); } /** * 页面关闭,WebSocket关闭 */ window.onbeforeunload = function (){ webSocket.close(); } </script> </html>

转载来源:https://juejin.cn/post/7300876243952631859

Apipost 私有化火热进行中

评论