之前记住这个图,就自认为理解了事务消息,然而在生产中真的用的事务消息时,发现还是有很多地方理解不到位的,这里再好好学习,深入理解一下。
比如我们本地执行一个事务,然后还有一个其他系统的事务,我们想保证2个模块或者系统的一致性,单机事务是不可能保证的,那么就可以使用RocketMQ事务消息。
消费端也需要最好重试机制,来确保最终一致性。
首先我们先了解下事务消息的生命周期,也就是我们常说的半消息
- 初始化:半事务消息被生产者构建并完成初始化,待发送到服务端的状态。
- 事务待提交:半事务消息被发送到服务端,和普通消息不同,并不会直接被服务端持久化,而是会被单独存储到事务存储系统中,等待第二阶段本地事务返回执行结果后再提交。此时消息对下游消费者不可见。
- 消息回滚:第二阶段如果事务执行结果明确为回滚,服务端会将半事务消息回滚,该事务消息流程终止。
- 提交待消费:第二阶段如果事务执行结果明确为提交,服务端会将半事务消息重新存储到普通存储系统中,此时消息对下游消费者可见,等待被消费者获取并消费。
- 消费中:消息被消费者获取,并按照消费者本地的业务逻辑进行处理的过程。 此时服务端会等待消费者完成消费并提交消费结果,如果一定时间后没有收到消费者的响应,Apache RocketMQ会对消息进行重试处理。
- 消费提交:消费者完成消费处理,并向服务端提交消费结果,服务端标记当前消息已经被处理(包括消费成功和失败)。 Apache RocketMQ默认支持保留所有消息,此时消息数据并不会立即被删除,只是逻辑标记已消费。消息在保存时间到期或存储空间不足被删除前,消费者仍然可以回溯消息重新消费。
- 消息删除:Apache RocketMQ按照消息保存机制滚动清理最早的消息数据,将消息从物理文件中删除。
我们再回到最上面那张图,来拆开看下
但还有两种情况,第4步的消息发送失败,或者本地事务执行还没有明确返回成功或者发生了异常,或者还在执行中,都会导致第4步无法正常进行,导致消息无法正常提交。
RocktMQ引入了本地事务回查的机制
既然生产端由于各种原因没有通知到MQ结果,那么由MQ自己来回查状态
根据回查结果来决定投递消息还是删除消息。
这里还有2个注意点
事务回查本身肯定是有性能消耗的,因此生产端尽可能保证本地事务结果不要返回未知;
另一点就是,回查事务结果要比本地事务执行晚一点,如果真的查询到还在执行的本地事务,要返回unknown。尽量保证本地事务不要太重耗时太久,也可以稍微把第一次回查的时间调晚一点。
这些参数也都是可以支持配置的。
最后,线上的代码就没法贴了,贴一个RocketMQ官方的事务消息代码
本地事务结果回查:
private static boolean checkOrderById(String orderId) {
return true;
}
本地事务执行:
private static boolean doLocalTransaction() {
return true;
}
生产端发送事务消息:
public static void main(String[] args) throws ClientException {
ClientServiceProvider provider = new ClientServiceProvider();
MessageBuilder messageBuilder = new MessageBuilder();
//构造事务生产者:事务消息需要生产者构建一个事务检查器,用于检查确认异常半事务的中间状态。
Producer producer = provider.newProducerBuilder()
.setTransactionChecker(messageView -> {
/**
* 事务检查器一般是根据业务的ID去检查本地事务是否正确提交还是回滚,此处以订单ID属性为例。
* 在订单表找到了这个订单,说明本地事务插入订单的操作已经正确提交;如果订单表没有订单,说明本地事务已经回滚。
*/
final String orderId = messageView.getProperties().get("OrderId");
if (Strings.isNullOrEmpty(orderId)) {
// 错误的消息,直接返回Rollback。
return TransactionResolution.ROLLBACK;
}
return checkOrderById(orderId) ? TransactionResolution.COMMIT : TransactionResolution.ROLLBACK;
})
.build();
//开启事务分支。
final Transaction transaction;
try {
transaction = producer.beginTransaction();
} catch (ClientException e) {
e.printStackTrace();
//事务分支开启失败,直接退出。
return;
}
Message message = messageBuilder.setTopic("topic")
//设置消息索引键,可根据关键字精确查找某条消息。
.setKeys("messageKey")
//设置消息Tag,用于消费端根据指定Tag过滤消息。
.setTag("messageTag")
//一般事务消息都会设置一个本地事务关联的唯一ID,用来做本地事务回查的校验。
.addProperty("OrderId", "xxx")
//消息体。
.setBody("messageBody".getBytes())
.build();
//发送半事务消息
final SendReceipt sendReceipt;
try {
sendReceipt = producer.send(message, transaction);
} catch (ClientException e) {
//半事务消息发送失败,事务可以直接退出并回滚。
return;
}
/**
* 执行本地事务,并确定本地事务结果。
* 1. 如果本地事务提交成功,则提交消息事务。
* 2. 如果本地事务提交失败,则回滚消息事务。
* 3. 如果本地事务未知异常,则不处理,等待事务消息回查。
*
*/
boolean localTransactionOk = doLocalTransaction();
if (localTransactionOk) {
try {
transaction.commit();
} catch (ClientException e) {
// 业务可以自身对实时性的要求选择是否重试,如果放弃重试,可以依赖事务消息回查机制进行事务状态的提交。
e.printStackTrace();
}
} else {
try {
transaction.rollback();
} catch (ClientException e) {
// 建议记录异常信息,回滚异常时可以无需重试,依赖事务消息回查机制进行事务状态的提交。
e.printStackTrace();
}
}
}
上面的代码也可以看到,对于本地事务中间态或者未知异常,直接不处理即可,等待MQ回查状态。
说点什么
您将是第一位评论人!