最近在开发的过程中遇到几个需求都是要在多久分钟后发送提醒消息,时间可配置。同事说可以用定时任务去做。但产品要求及时发出去不要有误差时间,所以这个方案不行。所以想到用Redison提供延迟队列来解决,同时项目中已经存在了redis。所以封装成一个starter方便每个服务引用
Springboot官方提供的starter以spring-boot-starter-xxx的方式命名,同时官方建议自定义的starter使用xxx-spring-boot-starter命名规则,以区分官方默认的的starter。所以我们这将starter名字定义delay-queue-spring-boot-starter
在pom文件中引入下面依赖
org.springframework.boot
spring-boot-configuration-processor
true
org.springframework.boot
spring-boot-autoconfigure
2.3.4.RELEASE
org.redisson
redisson-spring-boot-starter
3.15.6
这个RedissonDelayQueueAutoConfiguration类是我们自动的配置类
org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
com.youth.delay.queue.service.RedissonDelayQueueAutoConfiguration定接口用来给消息处队列后调用
/**
* 队列事件监听接口,需要实现这个方法
*
* @param
*/
public interface RedisDelayedQueueListener {
/**
* 执行方法
*
* @param t
*/
void invoke(T t);
}
@SuppressWarnings("all")
@ConditionalOnBean(value = RedissonDelayQueueMark.class)
@Configuration
public class RedissonDelayQueueAutoConfiguration {
private static final Logger logger = LoggerFactory.getLogger(RedissonDelayQueueAutoConfiguration.class);
static {
logger.info("Redisson延迟队列生效");
}
@Bean
public RedisDelayedQueue redisDelayedQueue(RedissonClient redissonClient){
return new RedisDelayedQueue(redissonClient);
}
@Bean
public RedisDelayedQueueInit redisDelayedQueueInit(RedissonClient redissonClient){
return new RedisDelayedQueueInit(redissonClient);
}
}如果上面类被springboot加载进入容器,那么会成功注入我们两个bean。一个是RedisDelayedQueue,其作用是用来发送消息进队列,源码如下
import org.redisson.api.RBlockingQueue;
import org.redisson.api.RDelayedQueue;
import org.redisson.api.RedissonClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.concurrent.TimeUnit;
public class RedisDelayedQueue {
private static final Logger logger = LoggerFactory.getLogger(RedisDelayedQueue.class);
private final RedissonClient redissonClient;
public RedisDelayedQueue(RedissonClient redissonClient) {
this.redissonClient = redissonClient;
}
/**
* 添加队列
*
* @param t DTO传输类,消息本体
* @param delay 时间数量,时间段,比如10分钟后执行
* @param timeUnit 时间单位,时段段的单位
* @param queueName 队列名字,消费处队列的的消费类的名字,比如A.class.getName()
* @param 泛型
*/
public void addQueue(T t, long delay, TimeUnit timeUnit, String queueName) {
RBlockingQueue blockingFairQueue = redissonClient.getBlockingQueue(queueName);
RDelayedQueue delayedQueue = redissonClient.getDelayedQueue(blockingFairQueue);
delayedQueue.offer(t, delay, timeUnit);
delayedQueue.destroy();
logger.info("添加队列成功:{},delay:{},timeUnit:{}", queueName, delay, timeUnit);
}
} 另一个类是RedisDelayedQueueInit,其作用主要是去扫描实现了接口的RedisDelayedQueueListener的类。源码如下
/**
* 初始化队列监听
*/
@SuppressWarnings("all")
public class RedisDelayedQueueInit implements ApplicationContextAware{
private static final Logger logger = LoggerFactory.getLogger(RedisDelayedQueueInit.class);
private final RedissonClient redissonClient;
//此处可以定义成自己的线程池,不然有内存溢出的风险
private final ExecutorService executor = Executors.newFixedThreadPool(2);
public RedisDelayedQueueInit(RedissonClient redissonClient) {
this.redissonClient = redissonClient;
}
/**
* 获取应用上下文并获取相应的接口实现类
*
* @param applicationContext
* @throws BeansException
*/
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
Map map = applicationContext.getBeansOfType(RedisDelayedQueueListener.class);
logger.info("延迟队列接口RedisDelayedQueueListener的实现类数量={}",map.size());
for (Map.Entry item : map.entrySet()) {
String listenerName = item.getValue().getClass().getName();
startThread(listenerName, item.getValue());
}
}
/**
* 启动线程获取队列*
*
* @param queueName queueName
* @param redisDelayedQueueListener 任务回调监听
* @param 泛型
* @return
*/
private void startThread(String queueName, RedisDelayedQueueListener redisDelayedQueueListener) {
RBlockingQueue blockingFairQueue = redissonClient.getBlockingQueue(queueName);
this.executor.submit(() -> {
//noinspection InfiniteLoopStatement
while (true) {
//解决项目重新启动并不会消费之前队列里的消息
RDelayedQueue delayedQueue = redissonClient.getDelayedQueue(blockingFairQueue);
try {
T t = blockingFairQueue.take();
logger.info("监听队列线程{},获取到值:{}", queueName, JSON.toJSONString(t));
//出队列调用具体业务逻辑实现类
redisDelayedQueueListener.invoke(t);
} catch (Exception e) {
Thread.currentThread().interrupt();
// logger.info("监听队列线程错误,", e);
}
delayedQueue.destroy();
}
});
}
}
1、pom.xml文件引入依赖
com.youthcool
delay-queue-spring-boot-starter
0.0.1-SNAPSHOT
2、发送延迟消息
下面的消息是在发送后10秒钟后执行
@Resource
private RedisDelayedQueue redisDelayedQueue;
@GetMapping("/test")
public Boolean test(){
String seqId = UUID.randomUUID().toString().replace("-", "");
OaFingerReqDTO oaFingerReqDTO=new OaFingerReqDTO();
oaFingerReqDTO.setSeqId(seqId);
oaFingerReqDTO.setEmployeeNo("111489");
oaFingerReqDTO.setKaoQinDate(DateTime.now().toDateStr());
redisDelayedQueue.addQueue(oaFingerReqDTO,10, TimeUnit.SECONDS,TestDelayQueue.class.getName());
return true;
}3、消费者,监听出队列的消息
@Component
@Slf4j
public class TestDelayQueue implements RedisDelayedQueueListener {
@Override
public void invoke(OaFingerReqDTO oaFingerReqDTO) {
log.info("我出队列了={}", JSON.toJSONString(oaFingerReqDTO));
}
} | 留言与评论(共有 0 条评论) “” |