RocketMQ msgId与offsetMsgId释疑(实战篇)
本篇详细介绍消息发送、消息消费、RocketMQ queryMsgById 命令以及 rocketmq-console 等使用场景中究竟是用的哪一个ID。
1、抛出问题
1.1 从消息发送看消息ID
package org.apache.rocketmq.example.quickstart;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;
public class Producer { public static void main(String[] args) { try { DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name"); producer.setNamesrvAddr("127.0.0.1:9876"); producer.start(); Message msg = new Message("TestTopic" /* Topic */,null /* Tag */, ("Hello RocketMQ test1" ).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */); SendResult sendResult = producer.send(msg); System.out.printf("%s%n", sendResult); producer.shutdown(); } catch (Throwable e) { e.printStackTrace(); } }
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
执行效果如图所示:
即消息发送会返回 msgId 与 offsetMsgId。
1.2 从消息消费看消息ID
package org.apache.rocketmq.example.quickstart;
import java.util.List;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;
public class Consumer { public static void main(String[] args) throws InterruptedException, MQClientException { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_1"); consumer.setNamesrvAddr("127.0.0.1:9876"); consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); consumer.subscribe("TestTopic", "*"); consumer.registerMessageListener(new MessageListenerConcurrently() { public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { System.out.println("MessageExt msg.getMsgId():" + msgs.get(0).getMsgId()); System.out.println("-------------------分割线-----------------"); System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs); return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); consumer.start(); System.out.printf("Consumer Started.%n"); }
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
执行效果如图所示:
不知道大家是否有注意到,调用 msgs.get(0).getMsgId()返回的msgId 与直接输出msgs中的 msgId 不一样,那这又是为什么呢?答案在本文的第二部分有详细分析。
2、消息ID释疑
从消息发送的结果可以得知,RocketMQ 发送的返回结果会返回msgId 与 offsetMsgId,那这两个 msgId 分别是代表什么呢?
- msgId:该ID 是消息发送者在消息发送时会首先在客户端生成,全局唯一,在 RocketMQ 中该 ID 还有另外的一个叫法:uniqId,无不体现其全局唯一性。
- offsetMsgId:消息偏移ID,该 ID 记录了消息所在集群的物理地址,主要包含所存储 Broker 服务器的地址( IP 与端口号)以及所在commitlog 文件的物理偏移量。
2.1 msgId 即全局唯一 ID 构建规则
从这张图可以看出,msgId确实是客户端生成的,接下来我们详细分析一下其生成算法。
MessageClientIDSetter#createUniqID
public static String createUniqID() { StringBuilder sb = new StringBuilder(LEN * 2); sb.append(FIX_STRING); // @1 sb.append(UtilAll.bytes2string(createUniqIDBuffer())); // @2 return sb.toString();
}
- 1
- 2
- 3
- 4
- 5
- 6
一个 uniqID 的构建主要分成两个部分:FIX_STRING 与唯一 ID 生成算法,顾名思义,FIX_STRING 就是一个客户端固定一个前缀,那接下来先看一下固定字符串的生成规则。
2.1.1 FIX_STRING
MessageClientIDSetter静态代码块
static { byte[] ip; try { ip = UtilAll.getIP(); } catch (Exception e) { ip = createFakeIP(); } LEN = ip.length + 2 + 4 + 4 + 2; ByteBuffer tempBuffer = ByteBuffer.allocate(ip.length + 2 + 4); tempBuffer.position(0); tempBuffer.put(ip); tempBuffer.position(ip.length); tempBuffer.putInt(UtilAll.getPid()); tempBuffer.position(ip.length + 2); tempBuffer.putInt(MessageClientIDSetter.class.getClassLoader().hashCode()); FIX_STRING = UtilAll.bytes2string(tempBuffer.array()); setStartTime(System.currentTimeMillis()); COUNTER = new AtomicInteger(0);
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
从这里可以看出 FIX_STRING 的主要由:客户端的IP、进程ID、加载 MessageClientIDSetter 的类加载器的 hashcode。
2.1.2 唯一性算法
msgId 的唯一性算法由 MessageClientIDSetter 的createUniqIDBuffer 方法实现。
private static byte[] createUniqIDBuffer() { ByteBuffer buffer = ByteBuffer.allocate(4 + 2); long current = System.currentTimeMillis(); if (current >= nextStartTime) { setStartTime(current); } buffer.position(0); buffer.putInt((int) (System.currentTimeMillis() - startTime)); buffer.putShort((short) COUNTER.getAndIncrement()); return buffer.array();
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
可以得出 msgId 的后半段主要由:当前时间与系统启动时间的差值,以及自增序号。
2.2 offsetMsgId构建规则
在消息 Broker 服务端将消息追加到内存后会返回其物理偏移量,即在 commitlog 文件中的文件,然后会再次生成一个id,代码中虽然也叫 msgId,其实这里就是我们常说的 offsetMsgId,即记录了消息的物理偏移量,故我们重点来看一下其具体生成规则:
MessageDecoder#createMessageId
public static String createMessageId(final ByteBuffer input , final ByteBuffer addr, final long offset) {
input.flip(); int msgIDLength = addr.limit() == 8 ? 16 : 28; input.limit(msgIDLength); input.put(addr); input.putLong(offset); return UtilAll.bytes2string(input.array());
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
首先结合该方法的调用上下文,先解释一下该方法三个入参的含义:
- ByteBuffer input
用来存放 offsetMsgId 的字节缓存区( NIO 相关的基础知识) - ByteBuffer addr
当前 Broker 服务器的 IP 地址与端口号,即通过解析 offsetMsgId 从而得到消息服务器的地址信息。 - long offset
消息的物理偏移量。
即构成 offsetMsgId 的组成部分:Broker 服务器的 IP 与端口号、消息的物理偏移量。
温馨提示:即在 RocketMQ中,只需要提供 offsetMsgId,可用不必知道该消息所属的topic信息即可查询该条消息的内容。
2.3 消息发送与消息消费返回的消息ID信息
消息发送时会在 SendSesult中返回 msgId、offsetMsgId,在了解了这个两个 ID 的含义时则问题不大,接下来重点介绍一下消息消费时返回的 msgId 到底是哪一个。
在消息消费时,我们更加希望因为 msgId (即客户端生成的全局唯一性ID),因为该全局性 ID 非常方便实现消费端的幂等。
在本文的1.2节我们也提到一个现象,为什么如下图代码中输出的 msgId 会不一样呢?
在客户端返回的 msg 信息,其最终返回的对象是 MessageClientExt ,继承自 MessageExt。
那我们接下来分别看一下其 getMsgId() 方法与 toString 方法即可。
@Override
public String getMsgId() { String uniqID = MessageClientIDSetter.getUniqID(this); if (uniqID == null) { return this.getOffsetMsgId(); } else { return uniqID; }
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
原来在调用 MessageClientExt 中的 getMsgId 方法时,如果消息的属性中存在其唯一ID,则返回消息的全局唯一ID,否则返回消息的 offsetMsgId。
而 MessageClientExt 方法并没有重写 MessageExt 的 toString 方法,其实现如图所示:
故返回的是 MessageExt中 的 msgId,该 msgId 存放的是offsetMsgId,所以才造成了困扰。
温馨提示:如果消息消费失败需要重试,RocketMQ 的做法是将消息重新发送到 Broker 服务器,此时全局 msgId 是不会发送变化的,但该消息的 offsetMsgId 会发送变化,因为其存储在服务器中的位置发生了变化。
3、实践经验
在回答了消息发送与消息消费关于msgId与offsetMsgId的困扰后,再来介绍一下如果根据msgId去查询消息。
想必大家对 rocketmq-console ,那在消息查找界面,展示的消息列表中返回的 msgId 又是哪一个呢?
这里的 Message ID 返回的是消息的全局唯一ID。
其实 RokcetMQ 也提供了 queryMsgById 命令来查看消息的内容,不过这里的 msgId 是 offsetMsgId,我们首先将全局唯一ID传入命令,其执行效果如下:
发现报错,那我们将 offsetMsgId 传入其执行效果如图所示:
但在 rocketmq-console 的根据消息ID去查找消息,无论传入哪个msgId,下图该功能都能返回正确的结果:
这是因为 rocketmq-console 做了兼容,首先将传入的 msgId 用 queryMsgById 该命令去查,如果报错,则当成 uniqID(全局ID)去查,首先全局ID会存储在消息的属性中,并会创建 Hash 索引,即可用通过 indexfile 快速定位到该条消息。
关于 RocketMQ 消息ID的相关问题就介绍到这里了,希望能得到您的认可,帮忙点个赞,谢谢。
见文如面,我是威哥,热衷于成体系剖析JAVA主流中间件,关注公众号『中间件兴趣圈』,回复专栏可获取成体系专栏导航,回复资料可以获取笔者的学习思维导图。
文章来源: blog.csdn.net,作者:中间件兴趣圈,版权归原作者所有,如需转载,请联系作者。
原文链接:blog.csdn.net/prestigeding/article/details/104739950
- 点赞
- 收藏
- 关注作者
评论(0)