多拓展一下了解的东西,主动学习比较感兴趣的websocket和消息队列相关概念。然后,就发现了redis竟然也可以进行消息的传递与分发、还有订阅某类消息的机制,这很像站点广播、全局或者局部通知的功能。对此产生了不少兴趣,决定建个demo,尝试一下。
本次编写demo程序,参考的博客地址如下:
[Websocket技术的Java实现(上篇)]
(https://blog.csdn.net/KeepStruggling/article/details/105543449)Websocket技术的Java实现(下篇)建议先理解websocket的概念:
MDN的websocket的概念廖雪峰的官方网站的websocket说明
pom依赖:
org.springframework.boot
spring-boot-starter-web
2.5.4
org.springframework.boot
spring-boot-starter-data-redis
2.5.4
org.springframework.boot
spring-boot-starter-websocket
2.5.4
org.springframework.boot
spring-boot-starter-thymeleaf
2.5.4
可以存中文的key和value
@Bean
public RedisTemplate redisTemplate(RedisConnectionFactory factory) {
RedisTemplate redisTemplate = new RedisTemplate();
redisTemplate.setConnectionFactory(factory);
//key序列化方式;(不然会出现乱码;),但是如果方法上有Long等非String类型的话,会报类型转换错误;
//所以在没有自己定义key生成策略的时候,以下这个代码建议不要这么写,可以不配置或者自己实现ObjectRedisSerializer
//JdkSerializationRedisSerializer序列化方式 是没办法直接可视化存储数据的;
//Jackson2JsonRedisSerializer序列化方式 有一个缺点,存储字符串时,可能会给字符串额外地加一对双引号,如""abc"";
//实测推荐StringRedisSerializer序列化方式;
redisTemplate.setKeySerializer(new StringRedisSerializer());
redisTemplate.setHashKeySerializer(new StringRedisSerializer());
redisTemplate.setValueSerializer(new StringRedisSerializer());
redisTemplate.setHashValueSerializer(new StringRedisSerializer());
return redisTemplate;
}
@Override
@Bean
public KeyGenerator keyGenerator() {
return new SimpleKeyGenerator() {
@Override
public Object generate(Object target, Method method, Object... params) {
StringBuilder sb = new StringBuilder();
sb.append(target.getClass().getSimpleName());//获得类的简单类名,不包括前缀包路径
sb.append(".").append(method.getName());
StringBuilder paramsSb = new StringBuilder();
for (Object param : params) {
// 如果不指定,默认生成包含到键值中
if (param != null) {
paramsSb.append(param);
}
}
if (paramsSb.length() > 0) {
sb.append("_").append(paramsSb);
}
return sb.toString();
}
};
} 主要是监听到某个渠道有了消息之后,怎么处理。
import java.io.IOException;
import java.nio.charset.Charset;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.Message;
import org.springframework.data.redis.connection.MessageListener;
import com.feng.test.config.websocket.WebSocketServer;
/**
* @className: RedisMessageListener.java
* @author:
* @version: 1.0.0
* @description:redis消息监听:得到消息后的处理方法 onMessage()
* @date: 2022/8/23
*/
@Configuration
public class RedisMessageListener implements MessageListener {
public void onMessage(Message message, byte[] bytes) {
// 消息体
System.out.println(message.getBody());
String body = new String(message.getBody(), Charset.forName("UTF-8"));
// 传播渠道名称
String topic = new String(bytes);
System.out.println("消息:" + body + ",渠道:" + topic);
try {
//此处是将redis监听到的消息,进行了websocket连接群发消息到浏览器端
WebSocketServer.sendInfo(body, null);
} catch (IOException e) {
e.printStackTrace();
}
}
}主要是设置如何监听消息,以及设置监听多个途径的消息,针对不同途径的消息可以有不同的处理逻辑,同一个途径的消息也可以有多个不同的处理逻辑。
@Autowired
private RedisTemplate redisTemplate;
@Autowired
private RedisConnectionFactory redisConnectionFactory;
//引入一个redis的消息监听器
@Autowired
private MessageListener redisMessageListener;
//线程池
private ThreadPoolTaskScheduler taskScheduler;
//创建线程池
@Bean
public ThreadPoolTaskScheduler initPoolTaskScheduler() {
if (taskScheduler != null) {
return taskScheduler;
}
taskScheduler = new ThreadPoolTaskScheduler();
taskScheduler.setPoolSize(20);
return taskScheduler;
}
/**
* @description:创建发布redis消息监听器的容器,
* 一个监听容器可以设置监听多个渠道的消息,并进行个性化的监听处理逻辑
* @author: feng
* @return RedisMessageListenerContainer
*/
@Bean
public RedisMessageListenerContainer initContainer() {
RedisMessageListenerContainer container = new RedisMessageListenerContainer();
//设置连接
container.setConnectionFactory(redisConnectionFactory);
//设置线程池
container.setTaskExecutor(initPoolTaskScheduler());
//定义监听的渠道
Topic a = new ChannelTopic("topic1");
Topic b = new ChannelTopic("topic2");
//使用监听器监听redis的消息
container.addMessageListener(redisMessageListener,a);
container.addMessageListener(redisMessageListener,b);
return container;
}这一步,框架封装的太厉害,仍然有很多东西不是很明白,比如,容器结合了线程池之后,究竟是如何多线程监听消息变化的?
在引入websocket的依赖后,需要配置一番,才可以使用websocket。
首先,配置类:
@Configuration
public class WebSocketConfig {
/**
* 如果使用Springboot默认内置的tomcat容器,则必须注入ServerEndpoint的bean;
* 如果使用外置的web容器,则不需要提供ServerEndpointExporter,下面的注入可以注解掉
* 本次使用springboot内置的tomcat容器,不注释掉下面的bean
* @return
*/
@Bean
public ServerEndpointExporter serverEndpointExporter() {
return new ServerEndpointExporter();
}
}再者,要定义一个websocket的连接路径(应该也可以多个路径):
@Component
@ServerEndpoint("/ws/{sid}")
public class WebSocketServer {
private static final Logger log = LoggerFactory.getLogger(WebSocketServer.class);
public WebSocketServer() {
}
// 在线连接数统计
private static int onlineCount = 0;
// 声明某个连接的session
private Session session;
// 存放无数个socket连接
private static ConcurrentHashMap websocketMap = new ConcurrentHashMap();
// 声明sid
private String sid;
/**
* 建立socket的方法调用
*
* @param session
* @param sid
*/
@OnOpen
public void onOpen(Session session, @PathParam("sid") String sid) {
// 赋值当前的连接session
this.session = session;
this.sid = sid;
if (websocketMap.get(this.sid) == null) {
// 存放本次socket连接
websocketMap.put(sid, this);
// 增加一个在线数
addOnlineCount();
log.info("新的socket连接出现并监听:" + sid + ",当前在线人数:" + getOnlineCount());
}
}
/**
* 连接关闭时调用的方法
*/
@OnClose
public void onClose() {
if (websocketMap.get(this.sid) != null) {
websocketMap.remove(this.sid);
subOnlineCount();
log.info("连接" + this.sid + "断开了");
}
}
/**
* socket连接时,收到消息时的处理方法
*
* @param message
* @param session
*/
@OnMessage
public void onMessage(String message, Session session) {
log.info("连接收到消息:" + message);
// 如果message有字符
if (StringUtils.hasLength(message)) {
// 实际业务连接成千上万,应该用多线程处理
for (WebSocketServer server : websocketMap.values()) {
try {
server.sendMessage(message);
log.info("websocket群发消息-sid=" + server.sid + ":" + message);
} catch (Exception e) {
}
}
}
}
@OnError
public void onError(Session session, Throwable error) {
log.error("发生错误了");
error.printStackTrace();
}
/**
* 获取在线连接数
*
* @return
*/
public static synchronized int getOnlineCount() {
return onlineCount;
}
/**
* 增加在线连接数
*
* @return
*/
public static synchronized void addOnlineCount() {
WebSocketServer.onlineCount++;
}
/**
* 减少在线连接数
*
* @return
*/
public static synchronized void subOnlineCount() {
WebSocketServer.onlineCount--;
}
/**
* 实现服务器主动推送消息
*/
public void sendMessage(String message) throws IOException {
this.session.getBasicRemote().sendText(message);
}
/**
* 群发自定义消息(用set会方便些)
*/
public static void sendInfo(String message, @PathParam(value = "sid") String sid) throws IOException {
log.info("推送消息到sid:" + sid + ",推送内容:" + message);
if (StringUtils.hasLength(message)) {
for (WebSocketServer server : websocketMap.values()) {
try {
// sid为null时群发,不为null则只发一个
if (sid == null) {
server.sendMessage(message);
} else if (server.sid.equals(sid)) {
server.sendMessage(message);
}
} catch (IOException e) {
e.printStackTrace();
continue;
}
}
}
}
}
websocket通讯
【userId】:
【toUserId】:
【toUserId内容】:
【操作】:
【操作】:
@Autowired
private RedisTemplate redisTemplate;
@RequestMapping(value = "/getMessageData")
public ResultData getMessageData(String message) {
ObjectMapper om = new ObjectMapper();
try {
Map map = om.readValue(message, HashMap.class);
redisTemplate.opsForValue().set("a", message);
redisTemplate.opsForValue().set("b", map.toString());
redisTemplate.convertAndSend("topic1", map);
System.out.println(map);
} catch (Exception e) {
e.printStackTrace();
}
return ResultData.success("操作成功");
} 测试接口:
接口发送要通知的消息测试
观察日志输出:
后端控制台输出
在看一下页面上建立websocket后,已经将redis发布的消息,从服务器端推送到了浏览器端:
浏览器接收到通知
至此,基本上这个demo已经实现了最初的目的。但是,这只能说技术思路上,可行,如果应用到生产系统上,必然需要考虑websocket连接数过多的问题、群发消息的遍历问题等等。
websocket之间互发消息
最后总结一下,从这个demo中,可以看出来框架已经做了很多的封装,服务端和浏览器的ip与端口不对,但是其中的互相建立连接的过程在后端已经被封装了,这样的优点是易于使用,缺点嘛,就是只会使用,但是稍一有新的业务需求,便不知道如何变通了。希望同行者能明白demo只是简单的参考,其中的协议原理才是知识的本质。
| 留言与评论(共有 0 条评论) “” |