Welcome everyone

正确理解事务消息

分布式 汪明鑫 620浏览 0评论

之前记住这个图,就自认为理解了事务消息,然而在生产中真的用的事务消息时,发现还是有很多地方理解不到位的,这里再好好学习,深入理解一下。

RocketMQ可以帮助我们实现最终一致性

比如我们本地执行一个事务,然后还有一个其他系统的事务,我们想保证2个模块或者系统的一致性,单机事务是不可能保证的,那么就可以使用RocketMQ事务消息。

消费端也需要最好重试机制,来确保最终一致性。

首先我们先了解下事务消息的生命周期,也就是我们常说的半消息

  • 初始化:半事务消息被生产者构建并完成初始化,待发送到服务端的状态。
  • 事务待提交:半事务消息被发送到服务端,和普通消息不同,并不会直接被服务端持久化,而是会被单独存储到事务存储系统中,等待第二阶段本地事务返回执行结果后再提交。此时消息对下游消费者不可见。
  • 消息回滚:第二阶段如果事务执行结果明确为回滚,服务端会将半事务消息回滚,该事务消息流程终止。
  • 提交待消费:第二阶段如果事务执行结果明确为提交,服务端会将半事务消息重新存储到普通存储系统中,此时消息对下游消费者可见,等待被消费者获取并消费。
  • 消费中:消息被消费者获取,并按照消费者本地的业务逻辑进行处理的过程。 此时服务端会等待消费者完成消费并提交消费结果,如果一定时间后没有收到消费者的响应,Apache RocketMQ会对消息进行重试处理。
  • 消费提交:消费者完成消费处理,并向服务端提交消费结果,服务端标记当前消息已经被处理(包括消费成功和失败)。 Apache RocketMQ默认支持保留所有消息,此时消息数据并不会立即被删除,只是逻辑标记已消费。消息在保存时间到期或存储空间不足被删除前,消费者仍然可以回溯消息重新消费。
  • 消息删除:Apache RocketMQ按照消息保存机制滚动清理最早的消息数据,将消息从物理文件中删除。

我们再回到最上面那张图,来拆开看下

正常流程下,生产端发送半消息,然后执行本地事务,本地事务成功则半消息commit, 本地事务失败则半消息rollback。

但还有两种情况,第4步的消息发送失败,或者本地事务执行还没有明确返回成功或者发生了异常,或者还在执行中,都会导致第4步无法正常进行,导致消息无法正常提交。

RocktMQ引入了本地事务回查的机制

既然生产端由于各种原因没有通知到MQ结果,那么由MQ自己来回查状态

根据回查结果来决定投递消息还是删除消息。

这里还有2个注意点

事务回查本身肯定是有性能消耗的,因此生产端尽可能保证本地事务结果不要返回未知;

另一点就是,回查事务结果要比本地事务执行晚一点,如果真的查询到还在执行的本地事务,要返回unknown。尽量保证本地事务不要太重耗时太久,也可以稍微把第一次回查的时间调晚一点。

这些参数也都是可以支持配置的。

最后,线上的代码就没法贴了,贴一个RocketMQ官方的事务消息代码

本地事务结果回查:

 private static boolean checkOrderById(String orderId) {
        return true;
    }

本地事务执行:

private static boolean doLocalTransaction() {
        return true;
    }

生产端发送事务消息:

public static void main(String[] args) throws ClientException {
        ClientServiceProvider provider = new ClientServiceProvider();
        MessageBuilder messageBuilder = new MessageBuilder();
        //构造事务生产者:事务消息需要生产者构建一个事务检查器,用于检查确认异常半事务的中间状态。
        Producer producer = provider.newProducerBuilder()
                .setTransactionChecker(messageView -> {
                    /**
                     * 事务检查器一般是根据业务的ID去检查本地事务是否正确提交还是回滚,此处以订单ID属性为例。
                     * 在订单表找到了这个订单,说明本地事务插入订单的操作已经正确提交;如果订单表没有订单,说明本地事务已经回滚。
                     */
                    final String orderId = messageView.getProperties().get("OrderId");
                    if (Strings.isNullOrEmpty(orderId)) {
                        // 错误的消息,直接返回Rollback。
                        return TransactionResolution.ROLLBACK;
                    }
                    return checkOrderById(orderId) ? TransactionResolution.COMMIT : TransactionResolution.ROLLBACK;
                })
                .build();
        //开启事务分支。
        final Transaction transaction;
        try {
            transaction = producer.beginTransaction();
        } catch (ClientException e) {
            e.printStackTrace();
            //事务分支开启失败,直接退出。
            return;
        }
        Message message = messageBuilder.setTopic("topic")
                //设置消息索引键,可根据关键字精确查找某条消息。
                .setKeys("messageKey")
                //设置消息Tag,用于消费端根据指定Tag过滤消息。
                .setTag("messageTag")
                //一般事务消息都会设置一个本地事务关联的唯一ID,用来做本地事务回查的校验。
                .addProperty("OrderId", "xxx")
                //消息体。
                .setBody("messageBody".getBytes())
                .build();
        //发送半事务消息
        final SendReceipt sendReceipt;
        try {
            sendReceipt = producer.send(message, transaction);
        } catch (ClientException e) {
            //半事务消息发送失败,事务可以直接退出并回滚。
            return;
        }
        /**
         * 执行本地事务,并确定本地事务结果。
         * 1. 如果本地事务提交成功,则提交消息事务。
         * 2. 如果本地事务提交失败,则回滚消息事务。
         * 3. 如果本地事务未知异常,则不处理,等待事务消息回查。
         *
         */
        boolean localTransactionOk = doLocalTransaction();
        if (localTransactionOk) {
            try {
                transaction.commit();
            } catch (ClientException e) {
                // 业务可以自身对实时性的要求选择是否重试,如果放弃重试,可以依赖事务消息回查机制进行事务状态的提交。
                e.printStackTrace();
            }
        } else {
            try {
                transaction.rollback();
            } catch (ClientException e) {
                // 建议记录异常信息,回滚异常时可以无需重试,依赖事务消息回查机制进行事务状态的提交。
                e.printStackTrace();
            }
        }
    }

上面的代码也可以看到,对于本地事务中间态或者未知异常,直接不处理即可,等待MQ回查状态。

转载请注明:汪明鑫的个人博客 » 正确理解事务消息

喜欢 (0)

说点什么

您将是第一位评论人!

提醒
avatar
wpDiscuz