websocket+springboot,springboot整合websocket两种方式
00-1010一、项目中服务器的创建二。java充当客户端链接ws1。ws客户端II的配置。项目启动时需要启用配置信息并链接到ws服务3。接收服务器推送的权限过滤demo4消息。ws客户端推送消息,类似于上面的服务器。本文使用websocket来推送一些数据。对比项目,制作演示。ws的相关问题不细数,只做记录。
这个针对ws的演示的主要逻辑背景是,A服务器B:通信层生成消息并推送出去,另一个项目A充当客户端和服务器。A的客户端3360不加区别地从通信层接收这些消息,A的服务器3360根据地址ip进行订阅。用户通过订阅A的ws,记录自己的信息和项目B推送的消息,项目A收到后,通过原始订阅的逻辑和一些权限过滤条件,过滤项目B生成的消息,然后推送到用户的客户端。
00-1010先介绍一下maven仓库
建立依赖关系groupIdorg.springframework.boot/groupId工件Spring-Boot-Starter-WebSocket/工件ID/依赖关系web socket服务器
同时注意springboot开通ws服务。
类启动加@EnableScheduling
演示的简要解释
/webSocket/{id}:链接的id是商家的id。如果之前这里有过类似的拍卖,就相当于一个服务器或者商家的标识符,是客户端的标识符,表示链接到哪个拍卖室。
@ServerEndpoint:作为服务器端的评论。
包com . ghh . my project . web socket;导入cn . Hu tool . core . lang . uuid;导入com . Alibaba . fast JSON . JSON;导入org . slf4j . logger;导入org . SLF 4j . logger factory;导入org . spring framework . stereotype.component;导入javax . web socket . *;导入javax . web socket . server . path param;导入javax . web socket . server . server endpoint;导入Java . io . io exception;导入Java . util . ArrayList;导入Java . util . list;导入Java . util . map;导入Java . util . concurrent . concurrent hashmap;@ server endpoint(/web socket/{ id } )@ component public class web socket { private Logger log=Logger factory . get Logger(web socket . class);private static int online count=0;/* *创建一个映射来存储生成的ws link push */private static MapString,web socket Clients=newConcurrentHashMap();/* *创建一个映射来存储当前访问的客户端*/PrivateStatic MapString,string id map=newConcurrentHashMap();非公开会议;/* *链接到*/PrivateString ID的场景ID;/* *每个链接的唯一ID */私有字符串userNo/* * * * @ description :记录第三方访问当前项目的websocket后的信息* @ datetime : 2021/7/510336002 * @ author 3360 GH * @ params 3360[ID,Session]* @ return void */@ on open public void on open(@ path param( ID )string ID,Session Session)throwsioexception { log . info(已连接id3360 {}拍卖行,当前拍卖行编号为3360尺寸thi
s.id = id; this.session = session; // 生成一个随机序列号来存储一个id下的所有用户 this.userNo = UUID.fastUUID().toString(); addOnlineCount(); //根据随机序列号存储一个socket连接 clients.put(userNo, this); idMap.put(userNo, id); } /** * @Description: 关闭连接 * @DateTime: 2021/7/5 10:02 * @Author: GHH * @Params: [] * @Return void */ @OnClose public void onClose() throws IOException { clients.remove(userNo); idMap.remove(userNo); subOnlineCount(); } /** * @Description: 客户端发送消息调用此方法 * @DateTime: 2021/6/16 15:35 * @Author: GHH * @Params: [message] * @Return void */ @OnMessage public void onMessage(String message) throws IOException {// JSONObject jsonTo = JSONObject.parseObject(message);// String mes = (String) jsonTo.get("message");// if (!("All").equals(jsonTo.get("To"))) {// sendMessageTo(mes, jsonTo.get("To").toString());// } else {// sendMessageAll(message);// } log.info("onMessage方法成功"); } @OnError public void onError(Session session, Throwable error) { log.error("{}", error); } public static void sendMessageTo(String message, String userNo) throws IOException { // session.getBasicRemote().sendText(message); //session.getAsyncRemote().sendText(message); WebSocket webSocket = clients.get(userNo); if (webSocket != null && webSocket.session.isOpen()) { webSocket.session.getAsyncRemote().sendText(JSON.toJSONString(message)); } } /** * @Description: 推送到指定的id值的记录 * @DateTime: 2021/6/15 17:11 * @Author: GHH * @Params: [message, id] * @Return void */ public static void sendMessageToById(String message, String id) { // session.getBasicRemote().sendText(message); //session.getAsyncRemote().sendText(message); //根据id获取所有的userNo链接的用户 List<String> userNos = getUserNosById(id); for (WebSocket item : clients.values()) { //遍历链接的value值,如果当前传入的id中链接的用户包含value值,则推送。 if (userNos.contains(item.userNo)) { item.session.getAsyncRemote().sendText(message); } } } /** * @Description: 推送所有开启的信息 * @DateTime: 2021/6/15 17:13 * @Author: GHH * @Params: [message] * @Return void */ public static void sendMessageAll(String message){ for (WebSocket item : clients.values()) { item.session.getAsyncRemote().sendText(message); } } public static synchronized int getOnlineCount() { return onlineCount; } public static synchronized void addOnlineCount() { WebSocket.onlineCount++; } public static synchronized void subOnlineCount() { WebSocket.onlineCount--; } public static synchronized Map<String, WebSocket> getClients() { return clients; } /** * @Description: 根据相应场景的一些逻辑处理 * @DateTime: 2021/7/5 10:03 * @Author: GHH * @Params: [id] * @Return java.util.List<java.lang.String> */ public static List<String> getUserNosById(String id) { ArrayList<String> userNos = new ArrayList<>(); for (Map.Entry<String, String> entry : idMap.entrySet()) { if (entry.getValue().equals(id)) { userNos.add(entry.getKey()); } } return userNos; }}demo中模拟的是定时器推送,第一个参数是消息内容,第二个是推送到哪一个拍卖间或者其他业务上的内容。方法的具体内容上一段代码有详细解释,有通过id,或者发送给全部ws链接的客户端
WebSocket.sendMessageToById(""+count,2+"");
@Scheduled(cron = "*/5 * * * * ?") public void job1(){ log.info("测试生成次数:{}",count); redisTemplate.opsForValue().set("测试"+count, ""+count++); if (count%2==0){ WebSocket.sendMessageToById(""+count,2+""); }else { WebSocket.sendMessageToById(""+count,1+""); } log.info("websocket发送"+count); }
二、java充当客户端链接ws
上述是java作为ws服务端推送当前业务信息的一个demo。我们项目目前做的是一个通讯层的概念,只能够推送数据内容,却无法根据用户权限去推送不同的数据。
ws客户端的搭建,首先链接ws服务端。首先是我们另外一个服务的ws配置信息,我这边demo是模拟链接上面的ws服务
1、ws客户端的配置
package com.ghh.websocketRecive.wsMessage;import lombok.extern.slf4j.Slf4j;import org.springframework.stereotype.Component;import javax.websocket.ContainerProvider;import javax.websocket.Session;import javax.websocket.WebSocketContainer;import java.net.URI;/** * @author ghh * @date 2019-08-16 16:02 */@Component@Slf4jpublic class WSClient { public static Session session; public static void startWS() { try { if (WSClient.session != null) { WSClient.session.close(); } WebSocketContainer container = ContainerProvider.getWebSocketContainer(); //设置消息大小最大为10M container.setDefaultMaxBinaryMessageBufferSize(10*1024*1024); container.setDefaultMaxTextMessageBufferSize(10*1024*1024); // 客户端,开启服务端websocket。 String uri = "ws://192.168.0.108:8082/webSocket/1"; Session session = container.connectToServer(WSHandler.class, URI.create(uri)); WSClient.session = session; } catch (Exception ex) { log.info(ex.getMessage()); } }}
2、配置信息需要在项目启动的时候去启用和链接ws服务
package com.ghh.websocketRecive;import com.ghh.websocketRecive.wsMessage.WSClient;import lombok.extern.slf4j.Slf4j;import org.mybatis.spring.annotation.MapperScan;import org.springframework.boot.SpringApplication;import org.springframework.boot.autoconfigure.SpringBootApplication;import org.springframework.scheduling.annotation.EnableScheduling;import javax.annotation.PostConstruct;@Slf4j@EnableScheduling@SpringBootApplication@MapperScan("com.ghh.websocketRecive.dao")public class WebsocketReciveApplication { public static void main(String[] args) { SpringApplication.run(WebsocketReciveApplication.class, args); } @PostConstruct public void init(){ log.info("初始化应用程序");// 初始化ws,链接服务端 WSClient.startWS(); }}
3、接收服务端推送的消息进行权限过滤demo
@ClientEndpoint:作为ws的客户端注解,@OnMessage接收服务端推送的消息。
package com.ghh.websocketRecive.wsMessage;import com.alibaba.fastjson.JSON;import com.alibaba.fastjson.JSONObject;import com.ghh.websocketRecive.entity.Student;import com.ghh.websocketRecive.service.UserService;import lombok.extern.slf4j.Slf4j;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.data.redis.core.RedisTemplate;import org.springframework.stereotype.Component;import javax.annotation.PostConstruct;import javax.websocket.*;import java.util.Objects;import java.util.Set;import static com.ghh.websocketRecive.wsMessage.WSClient.startWS;@ClientEndpoint@Slf4j@Componentpublic class WSHandler { @Autowired RedisTemplate<String,String> redisTemplate; private static RedisTemplate<String,String> redisTemplateService; @PostConstruct public void init() { redisTemplateService=redisTemplate; } @OnOpen public void onOpen(Session session) { WSClient.session = session; } @OnMessage public void processMessage(String message) { log.info("websocketRecive接收推送消息"+message); int permission = Integer.parseInt(message)%5; //查询所有订阅的客户端的ip。 Set<String> keys = redisTemplateService.keys("ip:*"); for (String key : keys) { // 根据登录后存储的客户端ip,获取权限地址 String s = redisTemplateService.opsForValue().get(key); String[] split = s.split(","); for (String s1 : split) { //向含有推送过来的数据权限地址的客户端推送告警数据。 if (s1.equals(permission+"")){ WebSocket.sendMessageToByIp(message,key.split(":")[1]); } } } } @OnError public void processError(Throwable t) { WSClient.session = null; try { Thread.sleep(5000); startWS(); } catch (InterruptedException e) { log.error("---websocket processError InterruptedException---", e); } log.error("---websocket processError error---", t); } @OnClose public void processClose(Session session, CloseReason closeReason) { log.error(session.getId() + closeReason.toString()); } public void send(String sessionId, String message) { try { log.info("send Msg:" + message); if (Objects.nonNull(WSClient.session)) { WSClient.session.getBasicRemote().sendText(message); } else { log.info("---websocket error----"); } } catch (Exception e) { log.error("---websocket send error---", e); } }}
4、ws客户端推送消息,推送消息和上面服务端类似。
这边是根据ip
package com.ghh.websocketRecive.wsMessage;import cn.hutool.core.lang.UUID;import com.alibaba.fastjson.JSON;import com.ghh.websocketRecive.service.UserService;import lombok.Builder;import lombok.Data;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.data.redis.core.RedisTemplate;import org.springframework.stereotype.Component;import javax.annotation.PostConstruct;import javax.websocket.*;import javax.websocket.server.PathParam;import javax.websocket.server.ServerEndpoint;import java.io.IOException;import java.util.ArrayList;import java.util.List;import java.util.Map;import java.util.concurrent.ConcurrentHashMap;@ServerEndpoint("/webSocket/{ip}")@Componentpublic class WebSocket { private Logger log = LoggerFactory.getLogger(WebSocket.class); private static int onlineCount = 0; private static Map<String, WebSocket> clients = new ConcurrentHashMap<>(); private Session session; /** 当前连接服务端的客户端ip */ private String ip; @Autowired RedisTemplate<String,String> redisTemplate; private static RedisTemplate<String,String> redisTemplateService; @PostConstruct public void init() { redisTemplateService = redisTemplate; } @OnOpen public void onOpen(@PathParam("ip") String ip, Session session) throws IOException { log.info("ip:{}客户端已连接:,当前客户端数量:{}", ip, onlineCount+1); this.ip = ip; this.session = session; // 接入一个websocket则生成一个随机序列号 addOnlineCount(); //根据随机序列号存储一个socket连接 clients.put(ip, this); } @OnClose public void onClose() throws IOException { clients.remove(ip); onlineCount--; subOnlineCount(); } /** * @Description: 客户端发送消息调用此方法 * @DateTime: 2021/6/16 15:35 * @Author: GHH * @Params: [message] * @Return void */ @OnMessage public void onMessage(String message) throws IOException { log.info("客户端发送消onMessage方法成功"); } @OnError public void onError(Session session, Throwable error) { log.error("{}", error); } public static void sendMessageTo(String message, String userNo) throws IOException { WebSocket webSocket = clients.get(userNo); if (webSocket != null && webSocket.session.isOpen()) { webSocket.session.getAsyncRemote().sendText(JSON.toJSONString(message)); } } /** * @Description: 推送到指定的ip值的记录 * @DateTime: 2021/6/15 17:11 * @Author: GHH * @Params: [message, id] * @Return void */ public static void sendMessageToByIp(String message, String ip) { for (WebSocket item : clients.values()) { //遍历链接的value值,如果当前传入的ip中链接的用户包含value值,则推送。 if (item.ip.equals(ip)) { item.session.getAsyncRemote().sendText(message); } } } /** * @Description: 推送所有开启的信息 * @DateTime: 2021/6/15 17:13 * @Author: GHH * @Params: [message] * @Return void */ public static void sendMessageAll(String message){ for (WebSocket item : clients.values()) { item.session.getAsyncRemote().sendText(message); } } public static synchronized int getOnlineCount() { return onlineCount; } public static synchronized void addOnlineCount() { WebSocket.onlineCount++; } public static synchronized void subOnlineCount() { WebSocket.onlineCount--; } public static synchronized Map<String, WebSocket> getClients() { return clients; }}
概述:
至此,简易的demo搭建完成,项目gitee网址:https://gitee.com/ghhNB/study.git
到此这篇关于SpringBoot整合WebSocket的客户端和服务端的实现的文章就介绍到这了,更多相关SpringBoot整合WebSocket内容请搜索盛行IT以前的文章或继续浏览下面的相关文章希望大家以后多多支持盛行IT!
郑重声明:本文由网友发布,不代表盛行IT的观点,版权归原作者所有,仅为传播更多信息之目的,如有侵权请联系,我们将第一时间修改或删除,多谢。