在日常开发中,延时任务是一个无法规避的话题。也存在各种不同的方案,比如:
当然,在所有的方案中,分布式延时队列方案是最佳方法,当然也是最复杂的方案。
在延时任务这个场景,分布式延时队列方案 是最优策略,所以,不少公司制定了相关规范,只能使用 RocketMQ 实现延时调度。
系统的稳定性有了一定的保障,但操作的复杂性抛给了下面的研发人员。
从一个 Leader 角度,我一直认为“只定规范,不提供工具,是极度不负责任的表现”。
期望框架能够提供:
框架基于 RocketMQ 进行构建,请自行完成 RocketMQ 的搭建。
我们使用 rocketmq starter 完成基本配置。
首先,在 pom 中增加 rocketmq starter 依赖,具体如下:
org.apache.rocketmq
rocketmq-spring-boot-starter
2.2.1
其次,在 application.yml 中添加 rocketmq 配置,具体如下:
rocketmq:
name-server: http://127.0.0.1:9876
producer:
group: async-demo
其中,name-server 根据具体情况进行配置。
配置完成,可以在项目中:
为了方便与 spring-boot 项目集成,lego 提供 lego-starter,以完成快速接入。
在 pom 中增加 starter,具体如下:
com.geekhalo.lego
lego-starter
0.1.5-delay_task-SNAPSHOT
其中,自动配置机制将完成:
我们只需在方法上添加 @DelayBasedRocketMQ 注解,完成基础配置,该方法便具备延时处理的能力。具体如下:
@DelayBasedRocketMQ(
topic = "${cancelOrder.delay.topic}",
tag = "delayCancelOrder",
consumerGroup = "${cancelOrder.delay.consumerGroup1}",
delayLevel = 2
)
public void delayCancelOrder(Long orderId, String reason){
DelayTask delayTask = new DelayTask(orderId, reason, null);
log.info("Run Cancel Order {}", delayTask);
this.tasks.add(delayTask);
}
@DelayBasedRocketMQ 定义如下:
@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
public @interface DelayBasedRocketMQ {
/**
* RocketMQ topic
* @return
*/
String topic();
/**
* Tag
* @return
*/
String tag() default "*";
/**
* 延迟级别
* @return
*/
int delayLevel() default -1;
/**
* 延迟时间 SpEL 表达式
* @return
*/
String delayLevelSpEl() default "0";
/**
* nameServer 配置
* @return
*/
String nameServer() default "${rocketmq.name-server:}";
/**
* 消费者组信息
* @return
*/
String consumerGroup();
/**
* 消费者运行的 profile,主要用于发送和消费分离的场景
* @return
*/
String consumerProfile() default "";
}
在 application 文件中增加相关配置,具体如下:
cancelOrder:
delay:
topic: delay-task-test-topic
consumerGroup1: delay-cancel-order-group1
consumerGroup2: delay-cancel-order-group2
写一个简单的单测,代码如下:
@Test
void delayCancelOrder() throws Exception{
Long orderId = RandomUtils.nextLong();
String reason = "超时自动取消";
this.delayService.delayCancelOrder(orderId, reason);
Assertions.assertFalse(CollectionUtils.isNotEmpty(this.delayService.getTasks()));
TimeUnit.SECONDS.sleep(4);
Assertions.assertFalse(CollectionUtils.isNotEmpty(this.delayService.getTasks()));
TimeUnit.SECONDS.sleep(6);
Assertions.assertTrue(CollectionUtils.isNotEmpty(this.delayService.getTasks()));
}
运行单测,日志如下:
10:16:09.951 [ main] c.g.l.core.delay.DelayMethodInterceptor : success to sent Delay Task to RocketMQ for [7973190633030104064, 超时自动取消]
10:16:14.964 [MessageThread_1] com.geekhalo.lego.delay.DelayService : Run Cancel Order DelayService.DelayTask(orderId=7973190633030104064, reason=超时自动取消, timeOutLevel=null)
10:16:14.964 [MessageThread_1] c.g.l.c.s.AbstractConsumerContainer : consume message 2408820718E04140899ECD9401476CBB137358644D46308D59D10000, cost: 8 ms
为了方便,对部分日志进行简化,但不影响分析结果。
从运行日志可以得出:
从日志上看,完全符合设计预期。
不同的场景需要不同的延时级别,在 @DelayBasedRocketMQ 直接指定死,不方便业务扩展。
如果需要动态指定延时级别,可以使用 @DelayBasedRocketMQ 的 delayLevelSpEl ,通过 SpEL 从上下文中读取配置,具体如下:
@DelayBasedRocketMQ(
topic = "${cancelOrder.delay.topic}",
tag = "delayCancelOrderForTimeout",
consumerGroup = "${cancelOrder.delay.consumerGroup2}",
delayLevelSpEl = "#timeOutLevel"
)
public void delayCancelOrderForTimeout(Long orderId, String reason, int timeOutLevel){
DelayTask delayTask = new DelayTask(orderId, reason, timeOutLevel);
log.info("Run Cancel Order {}", delayTask);
this.tasks.add(delayTask);
}
其中,delayLevelSpEl = "#timeOutLevel" 含义为,将参数 timeOutLevel 的值作为 延时级别。
编写单元测试用例,具体如下:
@Test
void delayCancelOrder_DelayTime() throws Exception{
Long orderId = RandomUtils.nextLong();
String reason = "超时自动取消";
this.delayService.delayCancelOrderForTimeout(orderId, reason, 3);
Assertions.assertFalse(CollectionUtils.isNotEmpty(this.delayService.getTasks()));
TimeUnit.SECONDS.sleep(9);
Assertions.assertFalse(CollectionUtils.isNotEmpty(this.delayService.getTasks()));
TimeUnit.SECONDS.sleep(11);
Assertions.assertTrue(CollectionUtils.isNotEmpty(this.delayService.getTasks()));
}
运行测试用例,观察日志如下:
10:27:56.257 [ main] c.g.l.core.delay.DelayMethodInterceptor : success to sent Delay Task to RocketMQ for [844282856080074752, 超时自动取消, 3]
10:28:06.281 [MessageThread_1] com.geekhalo.lego.delay.DelayService : Run Cancel Order DelayService.DelayTask(orderId=844282856080074752, reason=超时自动取消, timeOutLevel=3)
10:28:06.282 [MessageThread_1] c.g.l.c.s.AbstractConsumerContainer : consume message 2408820718E04140899ECD9401476CBB13D358644D46309820DB0000, cost: 13 ms
从日志上可见,延时时间已经成功调整为 10秒(10:27:56 发送任务,10:28:06 接收到任务),符合设计预期。
为了更好的对资源进行隔离,有时需要单独部署一组集群,用于处理后台任务。
为支持该模式,@DelayBasedRocketMQ提供了 consumerProfile 配置,用于指定 Consumer 在哪个 profile 下执行,如果不设置,则对环境不进行任何要求。
整体架构
在方法上添加注解后,框架自动完成:
核心流程如下:
项目仓库地址:https://gitee.com/litao851025/lego
项目文档地址:https://gitee.com/litao851025/lego/wikis/support/delayBasedRocketMQ
留言与评论(共有 0 条评论) “” |