关于RocketMQ

举报
Mr.Z事顺意 发表于 2023/02/23 18:07:20 2023/02/23
【摘要】 一、消息轨迹:消息轨迹简单来说就是日志,其把消息的生产、存储、消费等所有的访问和操作日志。1、消息轨迹的引入目的在项目中存在发送方与消费方相互“扯皮”的情况:    发送方说消息已经发送成功,而消费方说没有消费到。    这时我们就希望能记录一条消息的流转轨迹,即:消息是由哪个IP发送的?什么时候发送的?是被哪个消费者消费的?2、如何使用消息轨迹1> 修改Broker服务端配置,设置 tra...

一、消息轨迹:

消息轨迹简单来说就是日志,其把消息的生产、存储、消费等所有的访问和操作日志。
1、消息轨迹的引入目的

在项目中存在发送方与消费方相互“扯皮”的情况:

    发送方说消息已经发送成功,而消费方说没有消费到。
    这时我们就希望能记录一条消息的流转轨迹,即:消息是由哪个IP发送的?什么时候发送的?是被哪个消费者消费的?

2、如何使用消息轨迹

1> 修改Broker服务端配置,设置 traceTopicEnable=true;

    表示在Broker上创建名为RMQ_SYS_TRACE_TOPIC的topic,队

    列个数为1。所有的msgTrace信息默认都存储在这个topic中。

2> Producer中开启消息轨迹;

      public DefaultMQProducer(final String producerGroup, boolean enableMsgTrace)

        boolean类型的入参enableMsgTrace设置为true,表示启用消息轨迹追踪,默认为false。

      public DefaultMQProducer(final String producerGroup, boolean enableMsgTrace, final String customizedTraceTopic)

        String类型的入参customizedTraceTopic,表示用于记录消息轨迹的topic,不设置默认为RMQ_SYS_TRACE_TOPIC。

3> Consuemr中开启消息轨迹;

      public DefaultMQPushConsumer(final String consumerGroup, boolean enableMsgTrace)

        boolean类型的入参enableMsgTrace设置为true,表示启用消息轨迹追踪,默认为false。

如果启用了消息轨迹,在消息发送时尽量为消息指定Key属性,以便在RocketMQ-Console中对消息进行高性能的查询。

public class TraceProducer {
    public static void main(String[] args) throws Exception {
        // 第二个参数TRUE,表示开启MsgTrace
        DefaultMQProducer producer = new DefaultMQProducer("saint-test", true);
        producer.setNamesrvAddr("127.0.0.1:9876");
        producer.setMaxMessageSize(1024 * 1024 * 10);
        producer.start();

        Message msg = new Message("test-topic-trace",null, "key-trace", "trace-2".getBytes(StandardCharsets.UTF_8));
        SendResult send = producer.send(msg);

        System.out.println("sendResult: " + send);

        // 关闭生产者
        producer.shutdown();
        System.out.println("已经停机");
    }
}

3、消息轨迹实现原理
1)消息轨迹数据结构

1> 消息轨迹主体内容采用TraceContext类存储;

public class TraceContext implements Comparable<TraceContext> {

    private TraceType traceType;
    private long timeStamp = System.currentTimeMillis();
    private String regionId = "";
    private String regionName = "";
    private String groupName = "";
    private int costTime = 0;
    private boolean isSuccess = true;
    private String requestId = MessageClientIDSetter.createUniqID();
    private int contextCode = 0;
    private List<TraceBean> traceBeans;

        traceType:跟踪类型,可选值为Pub(消息发送)、SubBefore(消息拉取到客户端,在执行业务定义的消费逻辑之前)、SubAfter(消费后)。

        timeStamp:当前时间戳。

        regionId:Broker所在的区域ID,取自BrokerConfig#regionId()。

        groupName:组名称,traceType为Pub时表示生产者组的名称,traceType为subBefore或subAfter时表示消费组名称。

        requestId:在traceType为subBefore、subAfter时使用,消

        费端的请求ID。

        contextCode:消费状态码,可选值为SUCCESS、TIME_OUT、EXCEPTION、RETURNNULL、FAILED。

2> 针对消息信息采用TraceBean类维护;

public class TraceBean {
    private static final String LOCAL_ADDRESS = UtilAll.ipToIPv4Str(UtilAll.getIP());
    private String topic = "";
    private String msgId = "";
    private String offsetMsgId = "";
    private String tags = "";
    private String keys = "";
    private String storeHost = LOCAL_ADDRESS;
    private String clientHost = LOCAL_ADDRESS;
    private long storeTime;
    private int retryTimes;
    private int bodyLength;
    private MessageType msgType;


}


        topic:消息主题
        msgId:消息唯一ID
        offsetMsgId:消息偏移量ID,该ID中包含了Broker的IP以及偏移量
        tags:消息标志
        keys:消息索引key,根据该key可快速检索消息
        storeHost:跟踪类型为Pub时存储该消息的Broker服务器IP,跟踪类型为subBefore、subAfter时存储消费者IP
        bodyLength:消息体的长度
        msgType:消息的类型,可选值为Normal_Msg(普通消息)、Trans_Msg_Half(预提交消息)、Trans_msg_Commit(提交消息)、Delay_Msg(延迟消息)

2)轨迹消息存储

RocketMQ选择将消息轨迹数据当作一条消息,存储在Broker服务器中。

RocketMQ提供了两种方式来定义消息轨迹存储的topic:

    系统默认topic:如果Broker的traceTopicEnable配置项设为true,表示在该Broker上创建名为RMQ_SYS_TRACE_TOPIC的topic,队列个数为1,默认该值为false。
    自定义topic:在创建消息生产者或消息消费者时,可以通过参数自定义用于记录消息轨迹的topic名称;
        注意:RokcetMQ控制台(rocketmq-console)中只支持配置一个消息轨迹topic,建议使用系统默认的topic。

另外:为了避免消息轨迹的数据与正常的业务数据混在一起,官方建议单独使用一个Broker用于开启消息轨迹跟踪。消息轨迹数据只会发送到这一台Broker服务器上,不影响原业务Broker的负载压力。

三、总结

消息轨迹其实就是记录消息从发送 到 存储 再到 消费,整个消息生命周期中的一些关键信息,比如:谁发送的、发送耗时多久、消息保存在哪了、谁消费了、消费耗时多久。

在RocketMQ中的实现方式也很简单,在消息发送/消费前后基于钩子函数,做before()、after()逻辑,进而记录消息轨迹信息。

特别注意:storeTime并不是真实的消息存储时间,而是一个估算值,取自:客户端发送消息耗时的一半。

消息轨迹功能涉及到的关键类:

    AsyncTraceDispatcher:负责异步发送消息轨迹到Broker。
    ConsumeMessageHook:消费消息钩子函数
    SendMessageHook:生产消息钩子函数
    TraceContext、TraceBean:两者一起用于表述消息轨迹

最后,一些实战注意事项:

1、为什么要发送个half消息?有什么用?
这个half消息是在订单系统进行下单操作前发送,并且对下游服务的消费者是不可见的。那这个消息的 作用更多的体现在确认RocketMQ的服务是否正常。相当于嗅探下RocketMQ服务是否正常。

2.half消息如果写入失败了怎么办?
如果没有half消息这个流程,那我们通常是会在订单系统中先完成下单,再发送消息给MQ。这时候写 入消息到MQ如果失败就会非常尴尬了。而half消息如果写入失败,我们就可以认为MQ的服务是有问题的,这时,就不能通知下游服务了。我们可以在下单时给订单一个状态标记,然后等待MQ服务正常后 再进行补偿操作,等MQ服务正常后重新下单通知下游服务。

3.订单系统写数据库失败了怎么办?
这个问题我们同样比较下没有使用事务消息机制时会怎么办?如果没有使用事务消息,我们只能判断下 单失败,抛出了异常,那就不往MQ发消息了,这样至少保证不会对下游服务进行错误的通知。但是这 样的话,如果过一段时间数据库恢复过来了,这个消息就无法再次发送了。当然,也可以设计另外的补 偿机制,例如将订单数据缓存起来,再启动一个线程定时尝试往数据库写。

而如果使用事务消息机制, 就可以有一种更优雅的方案。
如果下单时,写数据库失败(可能是数据库崩了,需要等一段时间才能恢复)。那我们可以另外找个地方 把订单消息先缓存起来(Redis、文本或者其他方式),然后给RocketMQ返回一个UNKNOWN状态。这样 RocketMQ就会过一段时间来回查事务状态。我们就可以在回查事务状态时再尝试把订单数据写入数据库,如果数据库这时候已经恢复了,那就能完整正常的下单,再继续后面的业务。这样这个订单的消息 就不会因为数据库临时崩了而丢失。

4.half消息写入成功后RocketMQ挂了怎么办?
我们需要注意下,在事务消息的处理机制中,未知状态的事务状态回查是由RocketMQ的Broker主动发起的。也就是说如果出现了这种情况,那RocketMQ就不会回调到事务消息中回查事务状态的服务。这时,我们就可以将订单一直标记为"新下单"的状态。而等RocketMQ恢复后,只要存储的消息没有丢失,RocketMQ就会再次继续状态回查的流程。

5.下单成功后如何优雅的等待支付成功?
在订单场景下,通常会要求下单完成后,客户在一定时间内,例如10分钟,内完成订单支付,支付完成 后才会通知下游服务进行进一步的营销补偿。
如果不用事务消息,那通常会怎么办?
简单的方式是启动一个定时任务,每隔一段时间扫描订单表,比对未支付的订单的下单时间,将超过 时间的订单回收。这种方式显然是有很大问题的,需要定时扫描很庞大的一个订单信息,这对系统是个 不小的压力。
那更进一步的方案是什么呢?是不是就可以使用RocketMQ提供的延迟消息机制。往MQ发一个延迟1分 钟的消息,消费到这个消息后去检查订单的支付状态,如果订单已经支付,就往下游发送下单的通知。 而如果没有支付,就再发一个延迟1分钟的消息。终在第十个消息时把订单回收。这个方案就不用对 全部的订单表进行扫描,而只需要每次处理一个单独的订单消息。
那如果使用上了事务消息呢?我们就可以用事务消息的状态回查机制来替代定时的任务。在下单时,给 Broker返回一个UNKNOWN的未知状态。而在状态回查的方法中去查询订单的支付状态。这样整个业 务逻辑就会简单很多。我们只需要配置RocketMQ中的事务消息回查次数(默认15次)和事务回查间隔时 间(messageDelayLevel),就可以更优雅的完成这个支付状态检查的需求。

6、事务消息机制的作用
整体来说,在订单这个场景下,消息不丢失的问题实际上就还是转化成了下单这个业务与下游服务的业 务的分布式事务一致性问题。而事务一致性问题一直以来都是一个非常复杂的问题。而RocketMQ的事 务消息机制,实际上只保证了整个事务消息的一半,他保证的是订单系统下单和发消息这两个事件的事 务一致性,而对下游服务的事务并没有保证。但是即便如此,也是分布式事务的一个很好的降级方案。

【版权声明】本文为华为云社区用户原创内容,转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息, 否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。