用了Redison提供的延迟队列解决方案后,同学们都早点下班了

前言

最近在开发的过程中遇到几个需求都是要在多久分钟后发送提醒消息,时间可配置。同事说可以用定时任务去做。但产品要求及时发出去不要有误差时间,所以这个方案不行。所以想到用Redison提供延迟队列来解决,同时项目中已经存在了redis。所以封装成一个starter方便每个服务引用

原理流程图

一、自定义starter的命名规则

Springboot官方提供的starter以spring-boot-starter-xxx的方式命名,同时官方建议自定义的starter使用xxx-spring-boot-starter命名规则,以区分官方默认的的starter。所以我们这将starter名字定义delay-queue-spring-boot-starter

二、新建一个maven项目

在pom文件中引入下面依赖

    
        
            org.springframework.boot
            spring-boot-configuration-processor
            true
        
        
            org.springframework.boot
            spring-boot-autoconfigure
            2.3.4.RELEASE
        
        
            org.redisson
            redisson-spring-boot-starter
            3.15.6
        
    

三、在项目resources下新建一个META-INF文件夹,并在其目录下新建一个spring.factories文件,写入下面内容

这个RedissonDelayQueueAutoConfiguration类是我们自动的配置类

org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
 com.youth.delay.queue.service.RedissonDelayQueueAutoConfiguration


定接口用来给消息处队列后调用

/**
 * 队列事件监听接口,需要实现这个方法
 *
 * @param 
 */
public interface RedisDelayedQueueListener {
    /**
     * 执行方法
     *
     * @param t
     */
    void invoke(T t);
}

四、自动装配类RedissonDelayQueueAutoConfiguration

@SuppressWarnings("all")
    @ConditionalOnBean(value = RedissonDelayQueueMark.class)
    @Configuration
    public class RedissonDelayQueueAutoConfiguration {
        private static final Logger logger = LoggerFactory.getLogger(RedissonDelayQueueAutoConfiguration.class);
        static {
            logger.info("Redisson延迟队列生效");
        }
        @Bean
        public RedisDelayedQueue redisDelayedQueue(RedissonClient redissonClient){
            return new RedisDelayedQueue(redissonClient);
        }
        @Bean
        public RedisDelayedQueueInit redisDelayedQueueInit(RedissonClient redissonClient){
            return new RedisDelayedQueueInit(redissonClient);
        }
    }

如果上面类被springboot加载进入容器,那么会成功注入我们两个bean。一个是RedisDelayedQueue,其作用是用来发送消息进队列,源码如下

import org.redisson.api.RBlockingQueue;
import org.redisson.api.RDelayedQueue;
import org.redisson.api.RedissonClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.concurrent.TimeUnit;


public class RedisDelayedQueue {

    private static final Logger logger = LoggerFactory.getLogger(RedisDelayedQueue.class);
    private final RedissonClient redissonClient;

    public RedisDelayedQueue(RedissonClient redissonClient) {
        this.redissonClient = redissonClient;
    }


    /**
     * 添加队列
     *
     * @param t        DTO传输类,消息本体
     * @param delay    时间数量,时间段,比如10分钟后执行
     * @param timeUnit 时间单位,时段段的单位
     * @param queueName 队列名字,消费处队列的的消费类的名字,比如A.class.getName()
     * @param       泛型
     */
    public  void addQueue(T t, long delay, TimeUnit timeUnit, String queueName) {
        RBlockingQueue blockingFairQueue = redissonClient.getBlockingQueue(queueName);
        RDelayedQueue delayedQueue = redissonClient.getDelayedQueue(blockingFairQueue);
        delayedQueue.offer(t, delay, timeUnit);
        delayedQueue.destroy();
        logger.info("添加队列成功:{},delay:{},timeUnit:{}", queueName, delay, timeUnit);
    }

}

另一个类是RedisDelayedQueueInit,其作用主要是去扫描实现了接口的RedisDelayedQueueListener的类。源码如下

/**
 * 初始化队列监听
 */
@SuppressWarnings("all")
public class RedisDelayedQueueInit  implements ApplicationContextAware{

    private static final Logger logger = LoggerFactory.getLogger(RedisDelayedQueueInit.class);

    private final RedissonClient redissonClient;
    //此处可以定义成自己的线程池,不然有内存溢出的风险
    private final ExecutorService executor = Executors.newFixedThreadPool(2);

    public RedisDelayedQueueInit(RedissonClient redissonClient) {
        this.redissonClient = redissonClient;
    }

    /**
     * 获取应用上下文并获取相应的接口实现类
     *
     * @param applicationContext
     * @throws BeansException
     */
    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        Map map = applicationContext.getBeansOfType(RedisDelayedQueueListener.class);
        logger.info("延迟队列接口RedisDelayedQueueListener的实现类数量={}",map.size());
        for (Map.Entry item : map.entrySet()) {
            String listenerName = item.getValue().getClass().getName();
            startThread(listenerName, item.getValue());
        }
    }

    /**
     * 启动线程获取队列*
     *
     * @param queueName                 queueName
     * @param redisDelayedQueueListener 任务回调监听
     * @param                        泛型
     * @return
     */
    private  void startThread(String queueName, RedisDelayedQueueListener redisDelayedQueueListener) {
        RBlockingQueue blockingFairQueue = redissonClient.getBlockingQueue(queueName);
        this.executor.submit(() -> {
            //noinspection InfiniteLoopStatement
            while (true) {
                //解决项目重新启动并不会消费之前队列里的消息
                RDelayedQueue delayedQueue = redissonClient.getDelayedQueue(blockingFairQueue);
                try {
                    T t = blockingFairQueue.take();
                    logger.info("监听队列线程{},获取到值:{}", queueName, JSON.toJSONString(t));
                    //出队列调用具体业务逻辑实现类
                    redisDelayedQueueListener.invoke(t);
                } catch (Exception e) {
                    Thread.currentThread().interrupt();
//                    logger.info("监听队列线程错误,", e);
                }
                delayedQueue.destroy();
            }
        });

    }

}

五、SpringBoot项目如何使用

1、pom.xml文件引入依赖

        
            com.youthcool
            delay-queue-spring-boot-starter
            0.0.1-SNAPSHOT
        

2、发送延迟消息

下面的消息是在发送后10秒钟后执行

    @Resource
    private RedisDelayedQueue redisDelayedQueue;
    @GetMapping("/test")
    public Boolean test(){
        String seqId = UUID.randomUUID().toString().replace("-", "");
        OaFingerReqDTO oaFingerReqDTO=new OaFingerReqDTO();
        oaFingerReqDTO.setSeqId(seqId);
        oaFingerReqDTO.setEmployeeNo("111489");
        oaFingerReqDTO.setKaoQinDate(DateTime.now().toDateStr());
        redisDelayedQueue.addQueue(oaFingerReqDTO,10, TimeUnit.SECONDS,TestDelayQueue.class.getName());
        return true;
    }

3、消费者,监听出队列的消息

@Component
@Slf4j
public class TestDelayQueue implements RedisDelayedQueueListener {

    @Override
    public void invoke(OaFingerReqDTO oaFingerReqDTO) {
        log.info("我出队列了={}", JSON.toJSONString(oaFingerReqDTO));
    }
}

测试结果

#头条创作挑战赛#

发表评论
留言与评论(共有 0 条评论) “”
   
验证码:

相关文章

推荐文章