本文选自北京大学出版社的《分布式中间件核心原理与RocketMQ最佳实践》一书
本书:分布式中间件核心原理与RocketMQ实战技术一本通:实战案例+操作步骤+执行效果图,手把手教你吃透分布式中间件技术,轻松实现从小白到大牛的职业跃迁。
1,事务消息代码实现
package com.huc.rocketmq.transaction;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.TransactionListener;
import org.apache.rocketmq.client.producer.TransactionMQProducer;
import org.apache.rocketmq.client.producer.TransactionSendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import java.io.UnsupportedEncodingException;
import java.util.concurrent.*;
/**
* @author liumeng
*/
public class TransactionProducer {
public static void main(String[] args)
throws MQClientException, UnsupportedEncodingException {
// 这里是一个自定义的接收RocketMQ回调的监听接口
TransactionListener transactionListener = new TransactionListenerImpl();
// 创建支持事务消息的Producer,并指定生产者组
TransactionMQProducer producer =
new TransactionMQProducer("testTransactionGroup");
// 指定一个线程池,用于处理RocketMQ回调请求的
ExecutorService executorService = new ThreadPoolExecutor(
2,
5,
100,
TimeUnit.SECONDS,
new ArrayBlockingQueue<Runnable>(2000),
new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r);
thread.setName("testThread");
return thread;
}
}
);
// 给事务消息生产者设置线程池
producer.setExecutorService(executorService);
// 给事务消息生产者设置回调接口
producer.setTransactionListener(transactionListener);
// 启动生产者
producer.start();
// 构造一条订单支付成功的消息
Message message = new Message(
"PayOrderSuccessTopic",
"testTag",
"testKey",
"订单支付消息".getBytes(RemotingHelper.DEFAULT_CHARSET)
);
// 将消息作为half消息发送出去
try {
TransactionSendResult result = producer.sendMessageInTransaction(message, null);
} catch (Exception e) {
// half消息发送失败
// 订单系统执行回滚逻辑,比如退款、关闭订单
}
}
}
package com.huc.rocketmq.transaction;
import org.apache.rocketmq.client.producer.LocalTransactionState;
import org.apache.rocketmq.client.producer.TransactionListener;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;
public class TransactionListenerImpl implements TransactionListener {
/**
* 如果half消息发送成功了,就会回调这个方法,执行本地事务
* @param message
* @param o
* @return
*/
@Override
public LocalTransactionState executeLocalTransaction(Message message, Object o) {
// 执行订单本地业务,并根据结构返回commit/rollback
try {
// 本地事务执行成功,返回commit
return LocalTransactionState.COMMIT_MESSAGE;
}catch (Exception e){
// 本地事务执行失败,返回rollback,作废half消息
return LocalTransactionState.ROLLBACK_MESSAGE;
}
}
/**
* 如果没有正确返回commit或rollback,会执行此方法
* @param messageExt
* @return
*/
@Override
public LocalTransactionState checkLocalTransaction(MessageExt messageExt) {
// 查询本地事务是否已经成功执行了,再次根据结果返回commit/rollback
try {
// 本地事务执行成功,返回commit
return LocalTransactionState.COMMIT_MESSAGE;
}catch (Exception e){
// 本地事务执行失败,返回rollback,作废half消息
return LocalTransactionState.ROLLBACK_MESSAGE;
}
}
}
到这里事务消息的代码我们就完成了,但是我相信小伙伴们不会满足于仅仅使用原生代码实现,那接下来我们就用Spring Boot重写编写一次相同的逻辑。
使用Spring Boot项目后,我们还是先准备一个消息的实体类TranMessage,代码如下:
package com.huc.rocketmq.transaction.spring;
/**
* 事务消息实体
*/
public class TranMessage {
public static final String TOPIC = "Tran";
/**
* 编号
*/
private Integer id;
public TranMessage setId(Integer id) {
this.id = id;
return this;
}
public Integer getId() {
return id;
}
@Override
public String toString() {
return "TranMessage{" +
"id=" + id +
'}';
}
}
然后我们编写事务消息的生产者TranProducer:
package com.huc.rocketmq.transaction.spring;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;
@Component
public class TranProducer {
@Autowired
private RocketMQTemplate rocketMQTemplate;
public SendResult sendMessageInTransaction(Integer id) {
// 创建TranMessage消息
Message<TranMessage> message = MessageBuilder
.withPayload(new TranMessage().setId(id)).build();
// 发送事务消息
return rocketMQTemplate.sendMessageInTransaction(TranMessage.TOPIC,
message,id);
}
}
同样的,我们需要编写一个回调监听的实现类,用于自定义处理本地事务,返回commit或者rollback消息。代码如下:
package com.huc.rocketmq.transaction.spring;
import org.apache.rocketmq.spring.annotation.RocketMQTransactionListener;
import org.apache.rocketmq.spring.core.RocketMQLocalTransactionListener;
import org.apache.rocketmq.spring.core.RocketMQLocalTransactionState;
import org.springframework.messaging.Message;
// 注解中可以指定线程池参数
@RocketMQTransactionListener(corePoolSize=2,maximumPoolSize=5)
public class TransactionListenerImpl implements RocketMQLocalTransactionListener {
@Override
public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {
// 执行订单本地业务,并根据结构返回commit/rollback
try {
// 本地事务执行成功,返回commit
return RocketMQLocalTransactionState.COMMIT;
}catch (Exception e){
// 本地事务执行失败,返回rollback,作废half消息
return RocketMQLocalTransactionState.ROLLBACK;
}
}
@Override
public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
// 查询本地事务是否已经成功执行了,再次根据结果返回commit/rollback
try {
// 本地事务执行成功,返回commit
return RocketMQLocalTransactionState.COMMIT;
}catch (Exception e){
// 本地事务执行失败,返回rollback,作废half消息
return RocketMQLocalTransactionState.ROLLBACK;
}
}
}
2,顺序消息代码实现
有关消息乱序的出现原因以及解决方案我们已经在8.4.3小节中讲解过了,小伙伴们可以去复习一下,本节我们将直接讨论代码的实现,首先还是使用原生代码实现。
producer.send(
msg,
new MessageQueueSelector() {
@Override
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
Long orderId = (Long) arg; // 根据订单id选择发送的queue
long index = orderId % mqs.size();// 用订单id于MessageQueue的数量取模
return mqs.get((int) index); // 返回一个运算后固定的MessageQueue
}
},
orderId, // 传入订单id
new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
System.out.println(sendResult);
}
@Override
public void onException(Throwable throwable) {
System.out.println(throwable);
}
}
);
在发送消息时增加一个MessageQueueSelector,就可以实现统一订单id的消息一直会发送到同一个MessageQueue之中,可以解决消息乱序问题。
接着我们来看消费者部分的代码实现,主要代码如下:
consumer.registerMessageListener(new MessageListenerOrderly() {
@Override
public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeOrderlyContext context) {
try {
// 对有序的消息进行顺序处理
for (MessageExt t : msgs) {
}
return ConsumeOrderlyStatus.SUCCESS;
} catch (Exception e) {
// 如果消息处理出错,返回一个状态,暂停一会儿再来处理这批消息。
return
ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
}
}
});
这里面要注意的是我们注册的监听器是MessageListenerOrderly,这个监听器为了保证顺序消费,Consumer会对每一个ConsumerQueue只使用一个线程来处理消息,如果使用了多线程,是无法避免消息乱序的。
至此原生代码的实现已经完成了,Spring Boot的代码原理也是一样的。
消息实体的代码我们就省略了,直接看生产者的代码,如下:
package com.huc.rocketmq.order.spring;
import com.huc.rocketmq.spring.DemoMessage;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@Component
public class OrderProducer {
@Autowired
private RocketMQTemplate rocketMQTemplate;
public SendResult syncSend(Integer id) {
// 创建 DemoMessage 消息
DemoMessage message = new DemoMessage();
message.setId(id);
// 同步发送消息
return rocketMQTemplate.syncSendOrderly(DemoMessage.TOPIC,
message,String.valueOf(id));
}
public void asyncSend(Integer id, SendCallback callback) {
// 创建 DemoMessage 消息
DemoMessage message = new DemoMessage();
message.setId(id);
// 异步发送消息
rocketMQTemplate.asyncSendOrderly(DemoMessage.TOPIC,
message,String.valueOf(id),callback);
}
public void onewaySend(Integer id) {
// 创建 DemoMessage 消息
DemoMessage message = new DemoMessage();
message.setId(id);
// oneway 发送消息
rocketMQTemplate.sendOneWayOrderly(DemoMessage.TOPIC,
message,String.valueOf(id));
}
}
以上代码中可以看出,每个发送方法中都调用了对应的Orderly方法,并传入了一个id值,默认根据id值采用SelectMessageQueueByHash策略来选择MessageQueue。
接下来我们继续看消费者代码的实现。
package com.huc.rocketmq.order.spring;
import com.huc.rocketmq.spring.DemoMessage;
import org.apache.rocketmq.spring.annotation.ConsumeMode;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
@Component
@RocketMQMessageListener(
topic = DemoMessage.TOPIC,
consumerGroup = "demo-consumer-group-" + DemoMessage.TOPIC,
consumeMode = ConsumeMode.ORDERLY // 设置为顺序消费
)
public class OrderConsumer implements RocketMQListener<DemoMessage> {
private Logger logger = LoggerFactory.getLogger(getClass());
@Override
public void onMessage(DemoMessage message) {
logger.info("[onMessage][线程编号:{} 消息内容:{}]", Thread.currentThread().getId(), message);
}
}
可以看到消费者代码改动很小,只需要在@RocketMQMessageListener注解中新增consumeMode = ConsumeMode.ORDERLY,就可以指定顺序消费了,小伙伴们可以大胆的猜测它的实现原理,和我们的原生代码实现的方式是相同的。
3,消息过滤代码实现
// 构建消息对象
Message msg = new Message(
topic, //这里指定的是topic
"A",//这里存放的Tag 消费者会根据tag进行消息过滤
message.getBytes(RemotingHelper.DEFAULT_CHARSET));
// 我们还可以设置一些用户自定义的属性
msg.putUserProperty("name","value");
消费者在消费数据时就可以根据tag和属性进行过滤了,比如下边的写法:
// 订阅test Topic , 第二个参数是通过tag过滤,意思是过滤出tag为A或B的消息
consumer.subscribe("test", "A||B");
对应到spring boot中的实现也很简单,生产者部分关键代码如下:
// 创建 DemoMessage 消息
Message<DemoMessage> message = MessageBuilder
.withPayload(new DemoMessage().setId(id))
.setHeader(MessageConst.PROPERTY_TAGS,"A")// 设置消息的tag
.build();
消费者过滤的主要代码如下:
@RocketMQMessageListener(
topic = DemoMessage.TOPIC,
consumerGroup = "demo-consumer-group-" + DemoMessage.TOPIC,
selectorExpression = "A||B" // 通过tag过滤
)
消费者部分只要在@RocketMQMessageListener注解中增加selectorExpression属性就可以了。
4,延时消息代码实现
图1 放弃支付流程
图2 延时消息放弃支付流程
// 构建消息对象
Message msg = new Message(
topic, //这里指定延时消息的topic
message.getBytes(RemotingHelper.DEFAULT_CHARSET));
// 指定延时级别为3
msg.setDelayTimeLevel(3);
producer.send(msg);
可以看到最核心的内容就是msg.setDelayTimeLevel(3),设置了延迟级别。
RocketMQ支持的延迟级别有18个,这个我们之前已经介绍过了,如下:
messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
所以设置为3代表10s后消息可以被消费者消费。
消费者的代码这里就不演示了,没有什么特殊的写法。
下面我们来看一下Spring Boot的生产者代码实现:
// 创建 DemoMessage 消息
Message<DemoMessage> message = MessageBuilder
.withPayload(new DemoMessage().setId(id))
.build();
// 同步发送消息
return rocketMQTemplate.syncSend(DemoMessage.TOPIC,
message,
30*1000,
3);// 此处设置的就是延时级别
声明:本文选自北京大学出版社的《分布式中间件核心原理与RocketMQ最佳实践》一书,略有修改,经出版社授权刊登于此。
编辑推荐
本次福利将送出《分布式中间件核心原理与RocketMQ最佳实践》* 5本,超高中奖率(参与人少,多期中奖率超过90%)
为避免撸羊毛的用户参与抽奖,让更多真实用户获得社区福利,现在把抽奖方式切换到 spring4all.com 网上通过积分参与。
本次抽奖地址:http://spring4all.com/forum-post/2112.html