【RocketMQ】十二、发送消息零丢失方案:RocketMQ事务消息的实现流程分析

本文主要介绍 RocketMQ发送消息零丢失方案之RocketMQ事务消息的实现流程分析

1. 解决消息丢失的第一个问题:生产者推送消息丢失

RocketMQ有一个非常强悍有力得到功能,就是事务消息,凭借这个事务级的消息机制,就可以让我们保证生产者推送出去的消息一定会成功写入MQ里,绝不会半路就搞丢了。

首先以一个订单系统为例,假设他收到了一个订单支付成功的通知之后,他必然是需要在自己的订单数据库里做一些增删改查操作的,比如更新订单状态之类的。

可能你会觉得,订单系统不就是先在自己的数据库做一些操作,然后直接发个消息到MQ去,让其他订阅这个Topic的系统去从MQ获取消息做对应的处理就可以了吗?

其实还真不是这么简单,在基于RocketMQ的事务消息机制中,我们首先要让订单系统去发送一条half消息到MQ,这个half消息本质就是一个订单支付成功的消息,只不过你可以理解为他这个消息的状态是half状态,这个时候消费者系统是看不见这个half消息的。然后我们去等待接收这个half消息写入成功的响应通知,可能现在你会觉得你没事儿先发个half消息给MQ做什么?

其实可以想一下,加入你二话不说让订单系统直接做了本地数据库的操作,然后再发送消息给MQ,结果爆出一堆异常,发现MQ挂了。这个时候,必然导致你没法通过消息通知下游消费者系统去做一些列的逻辑。

所以这里我们首先第一件事,不是让订单系统做一些数据库操作,而是先发一个half消息给MQ以及收到他的成功的响应,初步先跟MQ做了联系和沟通。意思就是确认下MQ还活着。

如果half消息写入失败了,可能你发现报错了,可能MQ就挂了,或者网络出现故障,导致half消息没有发送成功。

这个时候你的订单系统应该执行一系列的回滚操作,比如对订单状态做一个更新,把状态改成"交易关闭"等等。

如果half消息写入成功,这个时候订单系统应该在自己的数据库里执行一些增删改查的操作。

如果订单系统更新自己的数据库失败了,比如数据库的网络异常,或者数据库系统挂了,这个时候其实很简单,就是直接让订单系统发送一个rollback请求给MQ就可以了,意思就是说可以把之前发送给你的half消息给删除了,因为我自己这里都出问题了,已经没有办法继续后续的流程了。

如果订单系统成功的完成了本地的事务操作,此时就可以发送一个commit请求给MQ,要求让MQ对之前提交的half消息进行commit操作,RocketMQ对half消息进行commit之后,下游消费者系统就可以看到这个消息了。

让流程严谨一些:如果发送half消息成功了,但是没有收到响应呢?

如果我们把half消息发送给MQ了,MQ给保存下来了,但是MQ返回给我们的响应我们没有收到,此时会发生什么事情?

这个时候我们没有收到响应,可能是网络超时报错或者其他异常错误,这个时候订单系统会误以为发送half消息到MQ失败了,订单系统就直接执行回滚逻辑,把状态标记成"已关闭"。

但是这个时候MQ已经存储下来一条half消息了,那对这个消息要怎么处理?

其实RocketMQ这里有一个补偿流程, 他会去扫描自己处于half状态的消息,如果我们一直没有对这个消息执行commit/rollback操作,超过一定时间之后,他会回调你得到订单系统的一个接口,来确认到底是要commit这个消息还是rollback这个消息。

我们系统的这个接口就需要去判断这个消息是要commit还是rollback。

如果rollback或者commit发送失败了呢?

我们再假设一种场景,如果订单系统是收到了half消息写入成功的响应,同时尝试对自己数据库进行了更新,然后根据失败或者成功去执行了rollback或者commit请求,发送给MQ了,结果因为网络故障,导致rollback或者commit请求发送失败了呢?

其实这个也很简单,因为MQ里的消息一直是half状态,所以他过了一定时间会发现这个half消息有问题,会触发回调接口补偿的。

2. 事务消息机制的底层实现原理

half消息时如何对消费者不可见的?

上面提到过,对于MQ中的half消息,在没有commit操作之前,下游消费者是看不到他的,没办法去消费这条消息,那这个half消息是如何做到不给下游消费者系统看到的呢?

先举个例子,订单系统发送了一个half状态的订单支付消息到"OrderPaySuccessTopic"里去,这是一个Topic,然后下游红包系统也是订阅了这个"OrderPaySuccessTopic",从里面获取消息的。其实你写入一个Topic,最终是定位到这个Topic的某个MessageQueue,然后定位到一台Broker机器上去,然后写入的是Broker上的CommitLog文件,同时将消费索引写入到MessageQueue对应的ConsumeQueue文件。

如果你写入一条half消息到OrderPaySuccessTopic里去,会定位到这个Topic的一个MessageQueue,然后定位到一个Broker机器上去,接着按理说消息会写入CommitLog,同时消息的offset会写入MessageQueue对应的ConsumeQueue,这个ConsumeQueue是属于OrderPaySuccessTopic的,然后下游红包系统按理说会从这个ConsumeQueue里获取到你写入的这个half消息。

但实际上红包系统却没有办法看到这条消息,其本质原因就是RocketMQ一旦发现你发送的是一个half消息,他不会把整个half消息的offset写入OrderPaySuccessTopic的ConsumeQueue里去,他会把这条half消息写入到自己内部的"RMQ_SYS_TRANS_HALF_TOPIC"这个Topic对应的一个ConsumeQueue里去。所以下游的红包系统没有办法看到这个half消息。

在什么情况下订单系统会收到half消息成功的响应?

结合上面的内容,可以了解到,必须是要half消息进入到RocketMQ内部的RMQ_SYS_TRANS_HALF_TOPIC的ConsumeQueue文件了,此时就会认为half消息写入成功了,然后就会返回响应给订单系统。

这个时候,一旦你的订单系统收到这个half消息写入成功的响应,必然就知道这个half消息已经在RocketMQ的内部了。

假如因为各种问题,没有执行rollback或者commit会怎么样?

其实这个时候他会在后台有定时任务,定时任务会去扫描RMQ_SYS_TRANS_HALF_TOPIC中的half消息,如果你超过一定时间还是half消息,rocketMQ会回调订单系统的接口,让你判断这个half消息是要rollback还是commit.

如果执行rollback操作的话,如何标记消息回滚?

假设我们的订单系统执行了rollback请求,那么此时就需要对消息进行回滚,但是RocketMQ会把这条half消息从磁盘文件中删除吗?

显然是不是的,因为RocketMQ都是顺序的把消息写入磁盘文件的,所以在这里如果你执行rollback,他的本质就是用一个OP操作来标记half消息的状态。

RocketMQ内部有一个OP_TOPIC,此时可以写一条rollback op记录到这个Topic,标记某个half消息时rollback了。

另外,假如你一直没有执行commit/rollback,RocketMQ会回调订单系统的接口去判断half消息的状态,但是他最多就会回调15次,如果15次之后你都没有办法告诉他half消息的状态,他就自动把这条half消息标记成rollback

如果执行commit操作,如何让消息对下游消费者可见?

在执行commit操作之后,RocketMQ就会在OP_TOPIC里写入一条记录,标记half消息已经是commit状态了,接着需要把放在RMQ_SYS_TRANS_HALF_TOPIC中的half消息给写入到OrderPaySuccessTopic的ConsumeQueue里去,这样下游消费系统就可以消费这条消息了。

3. 为什么解决发送消息零丢失方案,一定要使用事务消息方案?

之前提到发送消息的时候,可能存在消息的丢失,也就是说可能消息根本就没有进入到MQ就丢了,然后没有解释过多的东西就直接切入了RocketMQ事务消息的方案,其实通过RocketMQ事务消息机制的研究,现在可以确信一点,如果使用事务消息机制去发送消息到MQ,一定是可以保证消息必然发送到MQ的,不会丢。

但是这个事务消息机制其实挺复杂的,先得发送half消息,然后还得发送rollback/commit的请求,要是中间有点什么问题,MQ还得回调你的接口。

我们真的有必要使用这么复杂的机制去确保消息到达MQ,而且绝对不会丢吗?毕竟这么复杂的机制完全有可能导致整体性能比较差,而且吞吐量比较低,是否有更加简单的方法来确保消息一定可以到达MQ呢?

能不能基于重试机制来确保消息到达MQ?

想到这里,可能内心已经有一个想法了,就是我们之前觉得发消息到MQ,无非就是觉得可能半路上消息给丢失了,然后消息根本没有进入到MQ中,我们也没做什么额外的措施,就导致消息找不回来了。

那么我们先搞清楚一个问题,我们发送消息到MQ,然后我们可以等待MQ返回响应给我们,在什么样的情况下,MQ会返回响应给我们呢?

答案是显而易见的,就是MQ收到消息之后写入本地磁盘文件了,当然这个时候可能仅仅是写入os cache中,但是只要他写入自己本地存储了,就会返回响应给我们。

那么只要我们在代码中发送消息到MQ之后,同步等待MQ返回响应给我们,一直等待,如果半路有网络异常或者MQ内部异常,我们肯定会收到一个异常,比如网络错误,或者请求超时之类的。

如果我们再收到异常之后,就认为消息发送MQ失败了,然后再次重试尝试发送消息到MQ,接着再次同步等待MQ返回响应给我们,这样反复重试,是否可以确保消息一定会到达MQ?

理论上似乎存在一些短暂网络异常的场景下,我们是可以通过不停的重试去保证消息到达MQ的,因为如果短时间网络异常了,消息一直没法发送,我们只要不停的重试,网络一旦恢复了,消息就可以发送到MQ了。

如果要是反复重试多次发现一直没有办法把消息投递到MQ,此时我们就可以直接让订单系统回滚之前的流程,判定本次订单支付交易失败了。

看起来这个简单的同步发送消息+反复重试的方案,也可以做到保证消息一定可以投递到MQ中。

但是如果是在比较复杂的订单业务场景中,仅仅采用同步发消息+反复重试多次的方案去确保消息绝对投递到MQ中,似乎还是不够的。

先执行订单本地事务,还是先发消息到MQ?

如果我们先执行本地事务,接着再发送消息到MQ,看起来伪代码可能是这样的:

try {
    // 执行订单本地事务
    orderService.finishOrderPay();
    // 发送消息到MQ去
    producer.sendMessage();
} catch (Exception e) {
    // 如果发送消息失败了,进行重试
    for(int i=0;i < 3; i++) {
        // 重试发送消息
    }
    // 如果多次重试发送消息之后,还是不行
    // 回滚本地订单事务
    orderService.rollbackOrderPay();
}

上面这段代码看起来似乎天衣无缝,先执行订单本地事务,接着发送消息到MQ,如果本地事务执行失败了,则不会继续发送消息到MQ了。

如果订单事务执行成功了,发送MQ失败了,自动进行几次重试,重试如果一直失败,就回滚订单。

但是这里有一个问题,假设你刚执行完订单本地事务,结果还没等到你发送消息到MQ,结果你的订单系统突然崩溃了,这就导致你的订单状态可能已经修改为了"已完成",但是消息却没有发送到MQ中去,这就是这个方案最大的隐患。

如果出现这种场景,那你的多次重试发送MQ之类的代码根本没有机会执行,而且订单本地事务还已经执行成功了,你的消息还没发送出去,红包系统没机会派发红包,必然导致用户支付成功了,结果看不到自己的红包。

把订单本地事务和重试发送MQ消息放到一个事务代码中

接着考虑下一个问题,这时候可能有一个新的想法,如果把订单本地事务代码和发送MQ消息的代码放到一个事务代码中呢?

@Transactional
public void payOrderSuccess() {
    try {
        // 执行订单本地事务
        orderService.finishOrderPay();
        // 发送消息到MQ去
        producer.sendMessage();
    } catch (Exception e) {
        // 如果发送消息失败了,进行重试
        for(int i=0;i < 3; i++) {
            // 重试发送消息
        }
        // 如果多次重试发送消息之后,还是不行
        // 抛出异常,回滚本地事务
        throw new XXXException();
    }
}

上面这个代码看起来似乎解决了我们的问题,就是在这个方法上加入事务,在这个事务方法中,我们哪怕执行了orderService.finishOrderPay(),但是其实也仅仅执行了一些增删改查的SQL语句,还没提交订单本地事务。

如果发送消息失败了,而且多次重试还不奏效,则抛出异常会自动回滚本地事务。

如果你刚执行了 orderServicec.finishOrderPay(),结果订单系统直接崩溃了,此时订单本地事务会回滚,因为根本没提交过。

但是对于这个方案,还是非常的不理想,原因就出在那个MQ多次重试的地方,假设用户支付成功了,然后支付系统回调通知你的订单系统说有一笔订单支付成功了,这个时候你的订单系统卡在多次重试MQ的代码那里,可能耗时了好几秒中,此时回调通知及的系统早就等不及可能都超时异常了。

而且把重试MQ的代码放在这个逻辑里,可能导致订单系统的这个接口性能很差。

保证业务系统一致性的最佳方案:基于RocketMQ的事务消息机制

综合来看,真正要保证消息一定投递到MQ,同时保证业务系统之间数据完全一致,业内最佳的方案还是基于RocketMQ的事务消息机制。

4. 用支付后发红包的案例场景,分析RocketMQ事物消息的代码实现细节

4.1. 发送half事务消息出去

public class TransactionProducer {
    public static void main(String[] args) {
        // 这个东西就是用来接收RocketMQ回调的一个监听接口
        // 这里会实现执行订单本地事务,commit、rollback、回调查询等逻辑
        TransactionListener = transactionListener = new TransactionListener();
        // 下面这个就是创建一个支持事务消息的Producer
        // 对于这个producer,还得指定一个生产者分组
        TransactionMQProducer producer = new TransactionMQProducer("testProducerGroup");
        // 下面这个是指定了一个线程池,里面会包含一些线程
        // 这个线程池里面的线程就是用来处理RocketMQ回调你的请求的
        ExecutorService executorService = new ThreadPoolExecutor(
            2,5,100, TimeUnit.SECONDS,
            new ArrayBlockingQueue<Runnable>(2000),
            new ThreadFactory() {
                @Override
                public Thread newThread(Runnable r) {
                    Thread thread = new Thread(r);
                    thread.setName("testThread");
                    return thred;
                }
            }
        );
        producer.setExecutorService(executorService);
        producer.setTransactionListener(transactionListener);
        producer.start();
        
        // 构造一条订单支付成功的消息
        Message msg = new Message(
            "PayOrderSuccessTopic",
            "TestTag",
            "TestKey",
            ("订单支付消息").getBytes(RemotingHelper.DEFAULT_CHARSET)
        );
        // 将消息作为half消息发送出去
        SendResult sendResult = producer.sendMessageInTransaction(msg, null);
    }
}

4.2 假如half消息发送失败,或者没有收到half消息响应怎么办?

此时我们应该会在执行 producer.sendMessageInTransaction(msg, null); 的时候收到一个异常,发现消息发送失败了。

所以可以用下面的代码去关注half消息发送失败的问题:

try {
    SendResult sendResult = producer.sendMessageInTransaction(msg, null);
} catch (Exception e) {
    // half消息发送失败,执行回滚逻辑
}

那如果一直没有收到half消息发送成功的通知呢? 我们可以把发送出去的half消息放在内存里,或者写入本地磁盘文件,后台开启一个线程去检查,如果一个half消息超过一段时间没有收到响应,就自动触发回滚逻辑。

4.3 如果half消息成功了,如何执行本地事务

刚才代码中有一个TransactionListener,这个类也是我们自己定义的:

public class TransactionListenerImpl implements TransactionListener {
    // 如果half消息发送成功了
    // 就会在这里回调你的这个函数,就可以执行本地事务了
    @Override
    public LocalTransactionState executeLocalTransaction (
        Message msg, Object art) {
        // 执行订单本地事务
        // 接着根据本地一系列操作的执行结果,去选择执行commit或者rollback
        try {
            return LocalTransactionState.COMMIT_MESSAGE;
        } catch(Exception e) {
            return LocalTransactionState.ROLLBACK_MESSAGE;
        }
    }
}

4.4 如果没有返回commit或者rollback,如何进行回调?

public class TransactionListenerImpl implements TransactionListener {
    // 如果half消息发送成功了
    // 就会在这里回调你的这个函数,就可以执行本地事务了
    @Override
    public LocalTransactionState executeLocalTransaction (
        Message msg, Object art) {
        // 执行订单本地事务
        // 接着根据本地一系列操作的执行结果,去选择执行commit或者rollback
        try {
            return LocalTransactionState.COMMIT_MESSAGE;
        } catch(Exception e) {
            return LocalTransactionState.ROLLBACK_MESSAGE;
        }
    }
    
    // 如果因为各种原因,没有返回commit或者rollback
    @Override
    public LocalTransactionState checkLocalTransaction(MessageExt msg) {
        // 查询本地事务是否执行成功了
        Integer state = localTrans.get(msg.getTransactionId());
        if (state != null) {
            switch (state) {
                case 0: return LocalTransactionState.UNKNOW;
                case 1: return LocalTransactionState.COMMIT_MESSAGE;
                case 2: return LocalTransactionState.ROLLBACK_MESSAGE;
            }
        }
        return LocalTransactionState.ROLLBACK_MESSAGE;
    }
}