redis的消息发布、订阅服务与websocket结合推送消息给多个客户端

前言

多拓展一下了解的东西,主动学习比较感兴趣的websocket和消息队列相关概念。然后,就发现了redis竟然也可以进行消息的传递与分发、还有订阅某类消息的机制,这很像站点广播、全局或者局部通知的功能。对此产生了不少兴趣,决定建个demo,尝试一下。

一、环境介绍

本次编写demo程序,参考的博客地址如下:

[Websocket技术的Java实现(上篇)]
(https://blog.csdn.net/KeepStruggling/article/details/105543449)Websocket技术的Java实现(下篇)

建议先理解websocket的概念:

MDN的websocket的概念廖雪峰的官方网站的websocket说明

pom依赖:


    org.springframework.boot
    spring-boot-starter-web
    2.5.4


    org.springframework.boot
    spring-boot-starter-data-redis
    2.5.4


    
    org.springframework.boot
    spring-boot-starter-websocket
    2.5.4


    
    org.springframework.boot
    spring-boot-starter-thymeleaf
    2.5.4

二、redis的序列化配置

可以存中文的key和value

    @Bean
    public RedisTemplate redisTemplate(RedisConnectionFactory factory) {
        RedisTemplate redisTemplate = new RedisTemplate();
        redisTemplate.setConnectionFactory(factory);

        //key序列化方式;(不然会出现乱码;),但是如果方法上有Long等非String类型的话,会报类型转换错误;
        //所以在没有自己定义key生成策略的时候,以下这个代码建议不要这么写,可以不配置或者自己实现ObjectRedisSerializer
        //JdkSerializationRedisSerializer序列化方式 是没办法直接可视化存储数据的;
        //Jackson2JsonRedisSerializer序列化方式 有一个缺点,存储字符串时,可能会给字符串额外地加一对双引号,如""abc"";
        //实测推荐StringRedisSerializer序列化方式;
        redisTemplate.setKeySerializer(new StringRedisSerializer());
        redisTemplate.setHashKeySerializer(new StringRedisSerializer());
        redisTemplate.setValueSerializer(new StringRedisSerializer());
        redisTemplate.setHashValueSerializer(new StringRedisSerializer());
        return redisTemplate;
    }

    @Override
    @Bean
    public KeyGenerator keyGenerator() {
        return new SimpleKeyGenerator() {

            @Override
            public Object generate(Object target, Method method, Object... params) {
                StringBuilder sb = new StringBuilder();
                sb.append(target.getClass().getSimpleName());//获得类的简单类名,不包括前缀包路径
                sb.append(".").append(method.getName());

                StringBuilder paramsSb = new StringBuilder();
                for (Object param : params) {
                    // 如果不指定,默认生成包含到键值中
                    if (param != null) {
                        paramsSb.append(param);
                    }
                }

                if (paramsSb.length() > 0) {
                    sb.append("_").append(paramsSb);
                }
                return sb.toString();
            }

        };

    }

三、redis的消息监听设置

主要是监听到某个渠道有了消息之后,怎么处理。

import java.io.IOException;
import java.nio.charset.Charset;

import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.Message;
import org.springframework.data.redis.connection.MessageListener;

import com.feng.test.config.websocket.WebSocketServer;

/**
 * @className: RedisMessageListener.java
 * @author: 
 * @version: 1.0.0
 * @description:redis消息监听:得到消息后的处理方法 onMessage()
 * @date: 2022/8/23
 */
@Configuration
public class RedisMessageListener implements MessageListener {
    public void onMessage(Message message, byte[] bytes) {
        // 消息体
        System.out.println(message.getBody());
        String body = new String(message.getBody(), Charset.forName("UTF-8"));
        // 传播渠道名称
        String topic = new String(bytes);
        System.out.println("消息:" + body + ",渠道:" + topic);
        try {
            //此处是将redis监听到的消息,进行了websocket连接群发消息到浏览器端
            WebSocketServer.sendInfo(body, null);
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

四、redis的消息的监听容器

主要是设置如何监听消息,以及设置监听多个途径的消息,针对不同途径的消息可以有不同的处理逻辑,同一个途径的消息也可以有多个不同的处理逻辑。

    @Autowired
    private RedisTemplate redisTemplate;

    @Autowired
    private RedisConnectionFactory redisConnectionFactory;

    //引入一个redis的消息监听器
    @Autowired
    private MessageListener redisMessageListener;


    //线程池
    private ThreadPoolTaskScheduler taskScheduler;

    //创建线程池
    @Bean
    public ThreadPoolTaskScheduler initPoolTaskScheduler() {
        if (taskScheduler != null) {
            return taskScheduler;
        }

        taskScheduler = new ThreadPoolTaskScheduler();
        taskScheduler.setPoolSize(20);
        return taskScheduler;
    }

    /**
     * @description:创建发布redis消息监听器的容器,
     * 一个监听容器可以设置监听多个渠道的消息,并进行个性化的监听处理逻辑
     * @author: feng
     * @return RedisMessageListenerContainer
     */    
    @Bean
    public RedisMessageListenerContainer initContainer() {
        RedisMessageListenerContainer container = new RedisMessageListenerContainer();
        //设置连接
        container.setConnectionFactory(redisConnectionFactory);
        //设置线程池
        container.setTaskExecutor(initPoolTaskScheduler());
        //定义监听的渠道
        Topic a = new ChannelTopic("topic1");
        Topic b = new ChannelTopic("topic2");
        //使用监听器监听redis的消息
        container.addMessageListener(redisMessageListener,a);
        container.addMessageListener(redisMessageListener,b);
        return container;
    }

这一步,框架封装的太厉害,仍然有很多东西不是很明白,比如,容器结合了线程池之后,究竟是如何多线程监听消息变化的?

五、承接上面websocket群发消息

在引入websocket的依赖后,需要配置一番,才可以使用websocket。
首先,配置类:

@Configuration
public class WebSocketConfig {
    /**
     * 如果使用Springboot默认内置的tomcat容器,则必须注入ServerEndpoint的bean;
     * 如果使用外置的web容器,则不需要提供ServerEndpointExporter,下面的注入可以注解掉
     * 本次使用springboot内置的tomcat容器,不注释掉下面的bean
     * @return
     */
    @Bean
    public ServerEndpointExporter serverEndpointExporter() {
        return new ServerEndpointExporter();
    }
}

再者,要定义一个websocket的连接路径(应该也可以多个路径):

@Component
@ServerEndpoint("/ws/{sid}")
public class WebSocketServer {

    private static final Logger log = LoggerFactory.getLogger(WebSocketServer.class);

    public WebSocketServer() {

    }

    // 在线连接数统计
    private static int onlineCount = 0;

    // 声明某个连接的session
    private Session session;

    // 存放无数个socket连接
    private static ConcurrentHashMap websocketMap = new ConcurrentHashMap();

    // 声明sid
    private String sid;

    /**
     * 建立socket的方法调用
     * 
     * @param session
     * @param sid
     */
    @OnOpen
    public void onOpen(Session session, @PathParam("sid") String sid) {
        // 赋值当前的连接session
        this.session = session;
        this.sid = sid;
        if (websocketMap.get(this.sid) == null) {
            // 存放本次socket连接
            websocketMap.put(sid, this);
            // 增加一个在线数
            addOnlineCount();

            log.info("新的socket连接出现并监听:" + sid + ",当前在线人数:" + getOnlineCount());
        }

    }

    /**
     * 连接关闭时调用的方法
     */
    @OnClose
    public void onClose() {
        if (websocketMap.get(this.sid) != null) {
            websocketMap.remove(this.sid);
            subOnlineCount();
            log.info("连接" + this.sid + "断开了");
        }
    }

    /**
     * socket连接时,收到消息时的处理方法
     * 
     * @param message
     * @param session
     */
    @OnMessage
    public void onMessage(String message, Session session) {
        log.info("连接收到消息:" + message);
        // 如果message有字符
        if (StringUtils.hasLength(message)) {
            // 实际业务连接成千上万,应该用多线程处理
            for (WebSocketServer server : websocketMap.values()) {
                try {
                    server.sendMessage(message);
                    log.info("websocket群发消息-sid=" + server.sid + ":" + message);
                } catch (Exception e) {

                }
            }
        }
    }

    @OnError
    public void onError(Session session, Throwable error) {
        log.error("发生错误了");
        error.printStackTrace();
    }

    /**
     * 获取在线连接数
     * 
     * @return
     */
    public static synchronized int getOnlineCount() {
        return onlineCount;
    }

    /**
     * 增加在线连接数
     * 
     * @return
     */
    public static synchronized void addOnlineCount() {
        WebSocketServer.onlineCount++;
    }

    /**
     * 减少在线连接数
     * 
     * @return
     */
    public static synchronized void subOnlineCount() {
        WebSocketServer.onlineCount--;
    }

    /**
     * 实现服务器主动推送消息
     */
    public void sendMessage(String message) throws IOException {
        this.session.getBasicRemote().sendText(message);
    }

    /**
     * 群发自定义消息(用set会方便些)
     */
    public static void sendInfo(String message, @PathParam(value = "sid") String sid) throws IOException {
        log.info("推送消息到sid:" + sid + ",推送内容:" + message);
        if (StringUtils.hasLength(message)) {
            for (WebSocketServer server : websocketMap.values()) {
                try {
                    // sid为null时群发,不为null则只发一个
                    if (sid == null) {
                        server.sendMessage(message);
                    } else if (server.sid.equals(sid)) {
                        server.sendMessage(message);
                    }
                } catch (IOException e) {
                    e.printStackTrace();
                    continue;
                }
            }
        }
    }
}

六、建立一个前端界面测试一下





    
    websocket通讯





    

【userId】:

【toUserId】:

【toUserId内容】:

【操作】:

【操作】:

七、编写一个接口测试redis推送订阅消息

    @Autowired
    private RedisTemplate redisTemplate;

    @RequestMapping(value = "/getMessageData")
    public ResultData getMessageData(String message) {
        ObjectMapper om = new ObjectMapper();
        try {
            Map map = om.readValue(message, HashMap.class);
            redisTemplate.opsForValue().set("a", message);
            redisTemplate.opsForValue().set("b", map.toString());
            redisTemplate.convertAndSend("topic1", map);
            System.out.println(map);
        } catch (Exception e) {
            e.printStackTrace();
        }
        return ResultData.success("操作成功");
    }

测试接口:


接口发送要通知的消息测试


观察日志输出:


后端控制台输出


在看一下页面上建立websocket后,已经将redis发布的消息,从服务器端推送到了浏览器端:


浏览器接收到通知


至此,基本上这个demo已经实现了最初的目的。但是,这只能说技术思路上,可行,如果应用到生产系统上,必然需要考虑websocket连接数过多的问题、群发消息的遍历问题等等。

八、测试两个websocket之间互发消息

websocket之间互发消息

最后总结一下,从这个demo中,可以看出来框架已经做了很多的封装,服务端和浏览器的ip与端口不对,但是其中的互相建立连接的过程在后端已经被封装了,这样的优点是易于使用,缺点嘛,就是只会使用,但是稍一有新的业务需求,便不知道如何变通了。希望同行者能明白demo只是简单的参考,其中的协议原理才是知识的本质。

发表评论
留言与评论(共有 0 条评论) “”
   
验证码:

相关文章

推荐文章