【RocketMQ】十四、Consumer消息零丢失方案:手动提交offset + 自动故障转移
本文主要介绍 RocketMQ Consumer消息零丢失方案:手动提交offset + 自动故障转移
通过前面的方案,我们可以保证消息一定会达到MQ中,也确保了MQ中的消息不会丢失,只要做到这一点,我们就可以保证下游消费者系统一定可以获取到消息,但是即使下游消费者获取了消息,这条消息数据就一定不会丢失吗?
答案是未必的,假设下游消费者系统已经获取了消息,但是消息目前还在他的内存里,还没有执行业务逻辑,此时他就直接提交了这条消息的offset到broker去说自己已经处理过了,然后这个时候下游消费者系统突然就宕机了,内存里的消息没有了,业务逻辑也没有执行,结果broker已经收到他提交的消息offset了,还以为他已经处理完这个消息了。等消费者系统重启后,就不会再次消费这个消息了,还是会出现数据丢失。
consumer.registerMessageListener(
new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(
List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
}
);
在RocketMQ的消费者中会注册一个监听器,就是上面代码中的MessageListenerConcurrently这个东西,当你的消费者获取到一批消息之后,就会回调你得到这个监听器函数,让你来处理这一批消息。
然后当你处理完毕之后,你才会返回consumeConcurrentlyStatus.CONSUME_SUCCESS作为消费成功的示意,告诉RocketMQ,这批消息我已经处理完毕了。
对于RocketMQ而言,其实只要你的消费者系统是在这个监听器的函数中先处理一批消息,基于这批消息都处理完业务逻辑,然后返回了哪个消费成功的状态,接着才会提交这批消息的offset到broker去。
所以在这个情况下,如果你对一批消息处理完毕了,然后再提交消息的offset给broker,接着消费者系统宕机了,此时是不会丢失消息的。
那么如果消费者系统获取了一批消息之后,还没处理完,也就没返回consumeConcurrentlyStatus.CONSUME_SUCCESS这个状态呢,自然没提交这批消息的offset给broker,此时消费者宕机了,会怎么样呢?
在这种情况下,你没有对这批消息提交offset给broker,broker不会认为你已经处理完了,此时消费者的一台机器宕机了,他其实会感知到你的一个consumer已经挂了,接着会把没处理完的消息交给其他机器去处理,所以在这种情况下,消息也绝对不会丢失的。
需要警惕的地方:不能异步消费消息
我们不能在代码中对消息进行异步处理,比如开启了子线程去处理这批消息,然后启动线程之后,就直接返回ConsumeConcurrentlyStatus.CONSUME_SUCCESS状态了。
如果用这种方式处理的话,那就可能出现开启的子线程还没处理完消息呢,已经返回CONSUME_SUCCESS状态了,就可能提交这批消息的offset给broker了,认为已经处理结束了。