「Spring Boot 集成应用」RocketMQ的集成用法(下)

1. RocketMQ集成之异步发送

异步发送能够提升发送效率, 适合高并发场景下使用, 基于RocketMQ集成之普通消息发送做改造:

1、增加异步发送接口

com.mirson.spring.boot.mq.rocket.basic.provider.RocketMqProviderContorller

    /**     * 异步发送消息     * @return     */    @GetMapping("/asyncSendString")    public String asyncSendString() {        for(int i=0; i<10; i++) {            String msg = "seq number: " + i;            final String seq = String.valueOf(i);            // 异步方式发送            rocketMQTemplate.asyncSend(RabbitMqConfig.TOPIC, msg, new SendCallback() {                public void onSuccess(SendResult sendResult) {                    // 发送成功回调处理                    log.info("seq number:  " + seq + ", send result: " + sendResult.getSendStatus());                }                public void onException(Throwable e) {                    // 发送异常回调处理                    log.error(e.getMessage(), e);                }            });        }        return "async send success";    }
  • 增加asyncSendString异步发送接口, 连续发送十条有序消息, 调用rocketMQTemplate的asyncSend方法, 实现异步发送, SendCallback内部提供了onSuccess与onException两个回调方法,可以针对性的做相应业务处理。
  • 同步方式发送, 能保证消息有序传递, 这里采用异步发送,不能保证消息能够有序接收, 在实际使用中, 要结合具体的业务场景使用。


2、测试验证

  • 调用异步发送接口


异步方式发送十条消息。

  • 监听器日志

十条消息全部接收成功, 注意消息的发送顺序, 与订阅的接收顺序, 没有保持一致;异步方式能够提升发送效率, 但缺点是不能保障消息的有序消费,在实际使用中, 可以结合同步锁来使用, 比如可以根据账户ID加锁, 因每个账户数据具有独立性, 这样可以提升消息的传递发送效率, 又能保障每个账户接收到的数据是有序的。

2. RocketMQ集成之ACL权限控制

ACL是Access Control List简称, 意为访问控制列表, 是RocketMQ4.4新加入的功能。加入ACL能够通过权限管理控制消息队列, 针对不同角色用户分配不同的队列操作权限, 便于权限管控, 提升消息队列数据的安全性。

1、ACL基本处理流程

2、创建rocketmq-acl工程

3、工程配置

application.yml

server:  port: 12615spring:  application:    name: rocketmq-acl# RocketMQ配置rocketmq:  name-server: 10.10.20.15:9876  # 生产者配置  producer:    group: basic-group    # 权限信息    access-key: rocketmq2    secret-key: 12345678  # 消费者配置  consumer:    # 权限信息    accessKey: rocketmq2    secret-key: 12345678

注意producer与consumer, 都需配置权限信息, accessKey相当于用户名, secret-key相当于密码。

这些信息不是随便填写, 要与RocketMQ服务配置文件保持一致。

4、定义消息监听器

com.mirson.spring.boot.mq.rocket.acl.consume.StrSpringMessageConsumer

@Service@RocketMQMessageListener(        topic = RabbitMqConfig.TOPIC_SPRING_MESSAGE,        consumerGroup = RabbitMqConfig.CONSUME_GROUP_SPRING_MESSAGE//        accessKey = "RocketMQ", // 不需再填写, 会自动从配置文件中读取//        secretKey = "12345678" // 不需再填写, 会自动从配置文件中读取)@Log4j2public class StrSpringMessageConsumer implements RocketMQListener {    @Override    public void onMessage(String str) {        log.info("StrSpringMessageConsumer => receive str: " + str);    }}

通过RocketMQMessageListener注解, 也可以配置accessKey与secretKey信息, 但在工程配置文件中我们已经填好, 系统启动会自动读取, 可以不用再填写。

5、定义发送接口

com.mirson.spring.boot.mq.rocket.acl.provider.RocketMqProviderContorller

/**     * 发送RocketMQ Spring Message封装消息     * @return     */    @GetMapping("/sendSpringMessage")    public String sendSpringMessage() {        String msg = "random number: " + RandomUtils.nextInt(0, 100);        // Send Spring Message With String        SendResult result = rocketMQTemplate.syncSend(RabbitMqConfig.TOPIC_SPRING_MESSAGE , MessageBuilder.withPayload(msg).build());        log.info("send result: " + result.getSendStatus());        return msg;    }

发送一个Spring Message封装的消息, 调用rocketMQTemplate的syncSend方法发送数据, 无需加额外参数。

6、RabiitMQ服务端用户权限设置

测试之前, 先要确保RocketMQ服务器开启了ACL验证功能。

  • 开启ACL验证

修改%RABBITMQ_HOME%/conf/broker.conf文件, 末尾增加:

#开启ACL权限控制功能aclEnable=true
  • 分配用户权限

修改%RABBITMQ_HOME%/conf/plain_acl.yml文件:

globalWhiteRemoteAddresses:accounts:- accessKey: RocketMQ  secretKey: 12345678  whiteRemoteAddress:  admin: false  defaultTopicPerm: DENY  defaultGroupPerm: SUB  topicPerms:  - topicA=DENY  - topicB=PUB|SUB  - topicC=SUB  groupPerms:  # the group should convert to retry topic  - groupA=DENY  - groupB=PUB|SUB  - groupC=SUB - accessKey: rocketmq2  secretKey: 12345678  whiteRemoteAddress: 192.168.1.*  # if it is admin, it could access all resources  admin: true  defaultTopicPerm: PUB|SUB  defaultGroupPerm: SUB  topicPerms:  - topicA=DENY  - topicB=PUB|SUB  - topic_acl_spring_message=PUB|SUB  - topic_acl_transaction_spring_message=PUB|SUB  - topicC=SUB  groupPerms:  # the group should convert to retry topic  - groupA=DENY  - groupB=PUB|SUB  - groupC=SUB  - group_acl_spring_message=PUB|SUB  - group_acl_transaction_spring_message=PUB|SUB
  • 默认该配置文件下会有两个用户, RocketMQ与rocketmq2, 这里要修改rocketmq2的权限。
  • rocketmq2用户虽然具有admin权限, 但是Rocketmq的ACL处理源码仍要读取topicPerms属性配置,否则会报错, 这里追加我们用于ACL测试的相关TOPIC与GROUP, 确保rocketmq2用户拥有测试时的所有权限, RocketMQ用户则无权限:
defaultTopicPerm: PUB|SUB  defaultGroupPerm: SUB  topicPerms:  - topicA=DENY  - topicB=PUB|SUB  - topic_acl_spring_message=PUB|SUB  - topic_acl_transaction_spring_message=PUB|SUB  - topicC=SUB  groupPerms:  # the group should convert to retry topic  - groupA=DENY  - groupB=PUB|SUB  - groupC=SUB  - group_acl_spring_message=PUB|SUB  - group_acl_transaction_spring_message=PUB|SUB
  • 权限控制参数说明

字段

取值

含义

globalWhiteRemoteAddresses

;192.168..*;192.168.0.1

全局IP白名单

accessKey

字符串

Access Key 用户名

secretKey

字符串

Secret Key 密码

whiteRemoteAddress

;192.168..*;192.168.0.1

用户IP白名单

admin

true;false

是否管理员账户

defaultTopicPerm

DENY;PUB;SUB;PUB|SUB

默认的Topic权限

defaultGroupPerm

DENY;PUB;SUB;PUB|SUB

默认的ConsumerGroup权限

topicPerms

topic=权限

各个Topic的权限

groupPerms

group=权限

各个ConsumerGroup的权限

7、测试验证

  • 权限分配好后, 重启RabbitMQ服务, 要确保读取的是我们修改的配置文件。

1)启动NameServer

nohup bin/mqnamesrv >/dev/null 2>&1 &

2) 启动Broker

sh bin/mqbroker -n 127.0.0.1:9876 -c /usr/local/rocketmq4.4/conf/broker.conf &

3) 关闭Broker

bin/mqshutdown broker

4) 关闭name server

bin/mqshutdown namesrv

  • 使用RocketMQ用户,发送数据, 预期应该是无权限。

修改配置文件, 启动服务:

# RocketMQ配置rocketmq:  name-server: 10.10.20.15:9876  # 生产者配置  producer:    group: basic-group    # 权限信息    access-key: RocketMQ    secret-key: 12345678  # 消费者配置  consumer:    # 权限信息    accessKey: RocketMQ    secret-key: 12345678
  • 访问发送接口

http://127.0.0.1:12615/sendSpringMessage

出现异常, 查看控制台日志:

没有该主题topic_acl_spring_message的操作权限, ACL正常生效。

  • 使用rocketmq2用户,发送数据,预期是可以正常发送与接收

修改配置文件:

# RocketMQ配置rocketmq:  name-server: 10.10.20.15:9876  # 生产者配置  producer:    group: basic-group    # 权限信息    access-key: rocketmq2    secret-key: 12345678  # 消费者配置  consumer:    # 权限信息    accessKey: rocketmq2    secret-key: 12345678
  • 调用发送接口

查看控制台监听日志:

通过ACL权限控制, 能够正常发送与接收队列数据。

3. RocketMQ集成之Transaction事务消息

假设场景,用户进行转账, 先扣除自身的账户金额, 再发送消息通知, 增加对方的账户金额, 在发送消息通知的过程中如果失败该如何处理? 为了解决本地事务执行与消息发送的原子性问题, RocketMQ推出了Transaction事务消息(并非分布式事务解决方案, 但可以基于此功能,与补偿机制实现一套方案)。 具体处理机制:

1、仍采用ACL机制, 基于rocketmq-acl工程改造实现

2、增加接收监听器

com.mirson.spring.boot.mq.rocket.acl.consume.StringTransactionConsumer

@Service@RocketMQMessageListener(        topic = RabbitMqConfig.TOPIC_SPRING_TRANSACTION_MESSAGE,        consumerGroup = RabbitMqConfig.CONSUME_GROUP_TRANSACTION_SPRING_MESSAGE)@Log4j2public class StringTransactionConsumer implements RocketMQListener {    @Override    public void onMessage(String message) {        log.info("StringTransactionConsumer => receive transaction str: " + message);    }}

这里订阅的GROUP为RabbitMqConfig.CONSUME_GROUP_TRANSACTION_SPRING_MESSAGE。

3、自定义事务监听器

com.mirson.spring.boot.mq.rocket.acl.config.TransactionListener

@RocketMQTransactionListener(        txProducerGroup = RabbitMqConfig.CONSUME_GROUP_TRANSACTION_SPRING_MESSAGE)@Log4j2public class TransactionListener implements RocketMQLocalTransactionListener {    private AtomicInteger transactionIndex = new AtomicInteger(0);    private ConcurrentHashMap localTrans = new ConcurrentHashMap();    @Override    public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {        String transId = (String)msg.getHeaders().get(RocketMQHeaders.TRANSACTION_ID);        int value = transactionIndex.getAndIncrement();        int status = value % 3;        localTrans.put(transId, status);        if (status == 0) {             return RocketMQLocalTransactionState.COMMIT;        }        if (status == 1) {            log.info("    # ROLLBACK # Simulating %s related local transaction exec failed! {}", new String((byte[])msg.getPayload()));            return RocketMQLocalTransactionState.ROLLBACK;        }        return RocketMQLocalTransactionState.UNKNOWN;    }    @Override    public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {        String transId = (String)msg.getHeaders().get(RocketMQHeaders.TRANSACTION_ID);        RocketMQLocalTransactionState retState = RocketMQLocalTransactionState.COMMIT;        Integer status = localTrans.get(transId);        if (null != status) {            switch (status) {                case 0:                    retState = RocketMQLocalTransactionState.UNKNOWN;                    break;                case 1:                    retState = RocketMQLocalTransactionState.COMMIT;                    break;                case 2:                    retState = RocketMQLocalTransactionState.COMMIT;                    break;            }        }        return retState;    }}
  • 通过注解RocketMQTransactionListener实现自定义事务监听器, txProducerGroup要与上面监听器配置的Group一致,如果接收监听器不指定Group, 将采用RocketMQ默认的事务控制处理器。
  • 这里监听器的主要作用, 控制消息的提交与回滚, 通过取模计算, 将结果为1的数据进行回滚并打印。

4、定义发送接口

    /**     * 发送RocketMQ Transaction 事务消息     * @return     */    @GetMapping("/sendTransactionMessage")    public String sendTransactionMessage() {        for (int i = 0; i < 10; i++)        {            Message msg = MessageBuilder.withPayload("seq number " + i).                setHeader(RocketMQHeaders.TRANSACTION_ID, "KEY_" + i).build();            SendResult sendResult = rocketMQTemplate.sendMessageInTransaction(RabbitMqConfig.CONSUME_GROUP_TRANSACTION_SPRING_MESSAGE,                RabbitMqConfig.TOPIC_SPRING_TRANSACTION_MESSAGE, msg, null);            log.info("seq " + i  + " send result: " + sendResult.getSendStatus());         }        return "send transaction message success.";    }

定义sendTransactionMessage接口发送事务消息, 这里连续发送十条事务消息,调用rocketMQTemplate 的sendMessageInTransaction方法, 指定配置的组别与主题信息。

5、测试验证

发送十条事务消息, 在事务监听器里面, 有部分数据会出现回滚, 下面验证, 监听器是否正常接收确认的消息, 能否接收到回滚的消息。

  • 调用接口

http://127.0.0.1:12615/sendTransactionMessage


  • 查看接收日志结果

可以看到, 成功发送了十条数据, 有4条数据出现回滚, 监听器打印接收了6条数据, 验证成功。

4. 总结

这里全面的讲解RocketMQ技术点, 相对较多, 也可以看出RocketMQ功能比较丰富, 有较好的扩展性,灵活性,适用各种业务场景, 不仅可以与Spring Boot 集成, 还可以支持Spring Cloud Stream 在微服务中应用。RocketMQ还支持消息轨迹跟踪, 异步顺序发送, 并发消费等, 更多功能大家可以再深入研究, 能够更好的适应生产项目对不同场景的使用要求。

发表评论
留言与评论(共有 0 条评论) “”
   
验证码:

相关文章

推荐文章