日期:
来源:阿里开发者收集编辑:杜沁园(悬衡)
阿里妹导读
引入消息队列可以帮助我们解耦业务逻辑,提升性能,让主链路更加清晰。但是消息链路的代码腐化和一致性问题也给业务带来了很多困扰,本文阐述了钉钉审批消息链路重构的设计和解决方案。注:Metaq 是阿里 RocketMQ 消息队列的内网版本。
概述
图中只有顶点处存在失败,其他都是 Sunfire 的自动连线,不存在失败。
消息链路的美好幻想与残酷现实
美好幻想
主业务链路作为 Producer 发出消息 数十个甚至更多 Consumer 订阅该消息,分别执行自己快速且幂等的原子逻辑
残酷现实
逻辑相互影响,修改风险高; 链路脆弱,容易中断,一个调用失败,后续所有逻辑将不会执行; 没有重试:大泥球无法做到原子和幂等,整体重试代价太大,所以直接异步执行放弃重试 ; 消息队列引以为傲的重试功能反而会成为故障的温床,导致雪崩。
为什么不直接把大泥球拆分成前面的多个 Consumer 呢?这确实也是一种方案,但是对于大泥球 Consumer,可能会拆出几十个 Consumer,这会导致非常严重的读扩散。举个例子,审批单发起的消息中只含有审批单的 id,内容需要从数据库反查,原本在“大泥球”中,只需要查询一次就复用,而拆分后可能要多查几十次。这还只是众多扩散问题的其中一个,如果为了治理大泥球,却加重了扩散问题,就得不偿失了。
朴素的拆分想法
public interface ProcessEventListener extends EventListener {/*** 审批单发起事件*/void onProcessInstanceStart(InstanceEventContext instanceEventContext);/*** 审批单结束事件*/void onProcessInstanceFinish(InstanceEventContext instanceEventContext);/*** 审批任务生成事件*/void onTaskActivated(TaskEventContext taskEventContext);// 省略其他事件// ...}
// 接到审批单发起消息InstanceEventContext instanceEventContext = buildContext();for (handler : handlers) {try {handler.onProcessInstanceStart(instanceEventContext);} catch (Exception e) {// 打印监控日志等等// ...}}
某个处理器因为网络超时失败了,如何重试?:我们仅仅是将逻辑拆开了,执行的时候还是一串 “大泥球”,如果仅仅依靠消息队列本身机制,要重试只能一起重试,这显然无法满足诉求; 如何高性能地构建庞大的统一上下文(即 Context 参数):为了满足众多处理器对数据查询的诉求,需要提供庞大的上下文,除了性能风险外,也是代码腐败的温床
精准重试
Ignore:非重要业务,失败就算了,不需要重试; Concurrent:在另外线程池中并发执行,也不会重试。适合一些容易影响后续执行的长耗时的处理器; Retry Now:立即重试。会将任务放到本地的一个延迟队列中,100~500ms 后重试。适合时效性比较强的处理器; Retry Later:重投消息,精确重试失败的处理器,遵从 Metaq 的重投延迟,前三次重试分别是 1s 5s 10s ,因此不适合时效性强的处理器; 为了方便使用,我们将这些策略做成了注解的形式,比如审批抄送时效性没这么强,可以使用 Retry Later 策略:
public class CcListener implements ProcessEventListener {// Retry Later 策略, 最多重试两次@Policy(value = PolicyType.RETRY_LATER, retry = 2)@Overridepublic void onProcessInstanceStart(InstanceEventContext instanceEventContext) {//... 逻辑省略}}
再比如审批待办同步,是时效性比较强的,间隔太久容易有时效性问题,可以使用 Retry Now 策略:
public class SyncTodoTaskListener implements ProcessEventListener {// Retry Now 策略, 最多重试三次@Policy(value = PolicyType.RETRY_NOW, retry = 3)@Overridepublic void onProcessInstanceStart(InstanceEventContext instanceEventContext) {//... 逻辑省略}}前三种策略都比较好理解,就不多说了。下文重点讨论最后一种策略是如何实现的。
{// 总体重试次数, 第一次重试(第 0 次代表正常执行)"globalCnt": 1,// 每个处理器的执行状态"cntMap": {// handler1 第 1 次重试, 读取该属性可以判断 handler1 是否还有重试机会"handler1": 1,// -1 表示 handler2 已经执行成功, 不需要再执行"handler2": -1,// -2 表示 handler3 已经彻底执行失败(一般是超过了设置的最大重试次数), 不需要再执行"handler3": -2}}
Message message = new Message();// 重投到专门的重试主题message.setTopic("my-retry-topic");// 消息体保持不变message.setBody(preBody);// nextCnt 是重试的次数// 设置 DelayTimeLevel 能够让重投有一定的延时message.setDelayTimeLevel(nextCnt);// 将本次执行状态存储到 user property 中message.putUserProperty("RETRY_STORE", "{\"globalCnt\":1,\"cntMap\":{\"handler1\":1,\"handler2\":-1,\"handler3\":-2}}");// 发送消息mqPublishService.send(message);
图中的重投都是指往 topic 再发一遍消息,而不是 Metaq 自身的重投机制。
其中有一些细节问题需要注意,比如某次发布中上线了一个新的处理器,而发布机器刚好接到了一个重试消息,如何才能避免新处理器被意外执行呢?我们之前记录的执行状态 cntMap 中的所有 key 就相当于首次执行时的处理器快照,我们重试时只执行 cntMap 中存在的 key 就好了。
统一上下文
为了满足所有处理器的需求,上下文往往会很庞大,因此构建性能差。 外部无法感知处理器内部需要使用上下文的哪些字段,只能一股脑地将所有字段都填充好,传递进去,而且内部很有可能一个字段都不使用,白白损耗了性能。 上下文中存在一些幽灵字段,在某个处理器中设置进去,又在某几个处理器中读取,也就是它有时候为 null,有时候又有值,维护难度巨大,从中取个值都要战战兢兢。 读扩散问题:每个处理器都去读相同的数据,导致链路数十倍的读扩散。
懒加载
public class User {// 用户 idprivate Long uid;// 用户的部门,为了保持示例简单,这里就用普通的字符串// 需要远程调用 通讯录系统 获得private final Lazy<String> department;public User(Long uid, Lazy<String> department) {this.uid = uid;this.department = department;}public Long getUid() {return uid;}public String getDepartment() {return department.get();}}// 构建 User 实体Long uid = 1L;User user = new User(uid, Lazy.of(() -> departmentService.getDepartment(uid)));// 使用 User 实体,部门属性用起来和普通属性一样user.getDepartment();Lazy 框架的具体实现可以参考另一篇文章 利用惰性写出高性能且抽象的代码。
朴素的懒加载思想在依赖服务正常的情况下能够大大减少读扩散和提升性能,但是在依赖服务大规模异常时,还是会造成读扩散,进一步加大依赖服务的压力:
// 通过用户获得部门Lazy<String> departmentLazy = Lazy.of(() -> departmentService.getDepartment(uid));// 通过部门获得主管// department -> supervisorLazy<Long> supervisorLazy = departmentLazy.map(department -> SupervisorService.getSupervisor(department));
在层次加载结构中,一旦某个节点被求值,路径上所有的属性都会被缓存,因为这种对象能够自动地优化性能。以 利用惰性写出高性能且抽象的代码 这篇文章中 User 实体为例:
基于业务的上下文设计
审批单实例上下文 审批单发起消息 审批单结束消息 审批单撤销消息 ... 活动上下文 活动开始消息 活动结束消息 ... 任务上下文 任务开始消息 任务结束消息 任务取消消息 ...
如何防止雪崩
rebalance 导致重试次数归 0; 消费者执行超时,重试次数从 3 开始继续重投;
private boolean isDuplicateMessage(Message msg) {try {Integer consumedCount = ltairManager.incr("dingflow_mq_consume_" + msg.getMsgId() + "_" + msg.getReconsumeTimes(), 1, 30);if (consumedCount != null && consumedCount > 1) {return true;}} catch (Throwable throwable) {// 打印错误日志// ...}return false;}
不处理重试消息:reconsumeTimes 大于 0 直接返回,不处理重试消息。因为我们不依赖 Metaq 层面的重试机制。
Show Me The Code: 框架编码设计
监控与感知
如图,OpenEventCallbackListener 处理器因为 NPE 问题,失败量突然大幅度增加。
总结
将消息链路拆分成多个处理器; 利用 Metaq 的 UserProperty 存储每次处理器的执行状态,精准重试失败的处理器; 用懒加载机制构建统一上下文,提升构建性能,降低读扩散,最终设计出最符合业务的上下文;