RocketMQ 第六章:手把手教老婆实现-批量消息生产者和消费者
【摘要】
技术活,该赏关注+一键三连(点赞,评论,收藏)再看,养成好习惯
RocketMQ使用教程相关系列 目录
目录
第一节:介绍
限制:
第二节:批量消息-生产者和消息者步骤说明
批量消息生产者代码实现步骤
批量消息消费者代码实现步骤
第三节:批量消息生产者-小于4MB
效果:
第四节:批量消息消费者
效果...
-
技术活,该赏
-
关注+一键三连(点赞,评论,收藏)再看,养成好习惯
目录
第一节:介绍
批量发送消息能显著提高传递小消息的性能。
限制:
- 应该有相同的topic,相同的waitStoreMsgOK
- 不能是延时消息
- 这一批消息的总大小不应超过4MB(默认配置:DefaultMQProducer的maxMessageSize参数,可在broker*.properties配置文件中修改)。
第二节:批量消息-生产者和消息者步骤说明
批量消息生产者代码实现步骤
1.创建消息生产者producer,并制定生产者组名
2.指定Nameserver地址
3.启动producer
4.创建消息对象集合,指定主题Topic、Tag和消息体
5.发送集合消息
6.关闭生产者producer
批量消息消费者代码实现步骤
1.创建消费者Consumer,制定消费者组名
2.指定Nameserver地址
3.订阅主题Topic和Tag
4.设置回调函数,处理消息
5.启动消费者consumer
注意:消费者的 Topic 和 Tag 需要和生产者保持一致
第三节:批量消息生产者-小于4MB
-
public class Producer {
-
-
public static void main(String[] args) throws Exception {
-
// 1.创建消息生产者producer,并制定生产者组名
-
DefaultMQProducer producer = new DefaultMQProducer("demo_producer_batch_group");
-
// 2.指定Nameserver地址
-
producer.setNamesrvAddr("192.168.88.131:9876");
-
// 3.启动producer
-
producer.start();
-
System.out.println("生产者启动");
-
List<Message> msgs = new ArrayList<Message>();
-
-
// 4.创建消息对象,指定主题Topic、Tag和消息体
-
/**
-
* 参数一:消息主题Topic
-
* 参数二:消息Tag
-
* 参数三:消息内容
-
*/
-
for (int i = 0; i < 20; i++) {
-
Message msg = new Message("Topic_batch_demo", "Tag_batch_demo", ("Hello 虚竹,这是批量消息" + i).getBytes());
-
msgs.add(msg);
-
}
-
-
// 5.发送消息
-
SendResult result = producer.send(msgs);
-
// 发送状态
-
SendStatus status = result.getSendStatus();
-
-
System.out.println("发送结果:" + result);
-
-
// 线程睡1秒
-
TimeUnit.SECONDS.sleep(1);
-
-
// 6.关闭生产者producer
-
producer.shutdown();
-
System.out.println("生产者关闭");
-
}
-
-
}
效果:
第四节:批量消息消费者
-
public class Consumer {
-
-
public static void main(String[] args) throws Exception {
-
//1.创建消费者Consumer,制定消费者组名
-
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("demo_consumer_order_group");
-
//2.指定Nameserver地址
-
consumer.setNamesrvAddr("192.168.88.131:9876");
-
-
//消息拉取最大条数
-
consumer.setConsumeMessageBatchMaxSize(2);
-
//3.订阅主题Topic和Tag
-
consumer.subscribe("Topic_batch_demo", "*");
-
-
//4.设置回调函数,处理消息
-
consumer.registerMessageListener(new MessageListenerOrderly() {
-
//接受消息内容
-
@Override
-
public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext consumeOrderlyContext) {
-
for (MessageExt msg : msgs) {
-
-
try {
-
//获取主题
-
String topic = msg.getTopic();
-
//获取标签
-
String tags = msg.getTags();
-
//获取信息
-
byte[] body = msg.getBody();
-
String result = new String(body, RemotingHelper.DEFAULT_CHARSET);
-
System.out.println("Consumer消费信息:topic:" + topic + ",tags:"+tags+
-
",result"+ result);
-
-
} catch (UnsupportedEncodingException e) {
-
e.printStackTrace();
-
//重试
-
return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
-
}
-
}
-
return ConsumeOrderlyStatus.SUCCESS;
-
}
-
});
-
//5.启动消费者consumer
-
consumer.start();
-
}
-
}
效果:
第五节:批量消息生产者-大于4MB
消息拆分工具类
要发送大于4MB的消息,我们只需要把大的消息分裂成若干个小的消息。首先创建一个拆分消息的工具类
-
public class ListSplitter implements Iterator<List<Message>> {
-
private int SIZE_LIMIT = 1024 * 1024 * 4;
-
private final List<Message> messages;
-
private int currIndex;
-
-
public ListSplitter(List<Message> messages) {
-
this.messages = messages;
-
}
-
-
public ListSplitter(List<Message> messages, DefaultMQProducer mqProducer) {
-
this.messages = messages;
-
this.SIZE_LIMIT = mqProducer.getMaxMessageSize();
-
}
-
-
@Override
-
public boolean hasNext() {
-
return currIndex < messages.size();
-
}
-
-
@Override
-
public List<Message> next() {
-
int nextIndex = currIndex;
-
int totalSize = 0;
-
for (; nextIndex < messages.size(); nextIndex++) {
-
Message message = messages.get(nextIndex);
-
int tmpSize = message.getTopic().length() + message.getBody().length;
-
Map<String, String> properties = message.getProperties();
-
for (Map.Entry<String, String> entry : properties.entrySet()) {
-
tmpSize += entry.getKey().length() + entry.getValue().length();
-
}
-
// 增加日志的开销20字节
-
tmpSize = tmpSize + 20;
-
-
if (tmpSize > SIZE_LIMIT) {
-
// 单个消息超过了最大的限制
-
// 忽略,否则会阻塞分裂的进程
-
if (nextIndex - currIndex == 0) {
-
// 假如下一个子列表没有元素,则添加这个子列表然后退出循环,否则只是退出循环
-
nextIndex++;
-
}
-
break;
-
}
-
if (tmpSize + totalSize > SIZE_LIMIT) {
-
break;
-
} else {
-
totalSize += tmpSize;
-
}
-
-
}
-
List<Message> subList = messages.subList(currIndex, nextIndex);
-
currIndex = nextIndex;
-
return subList;
-
}
-
-
@Override
-
public void remove() {
-
-
}
-
-
}
生产者
-
public class ProducerMoreThan4M {
-
-
public static void main(String[] args) throws Exception {
-
// 1.创建消息生产者producer,并制定生产者组名
-
DefaultMQProducer producer = new DefaultMQProducer("demo_producer_batch_group");
-
// 2.指定Nameserver地址
-
producer.setNamesrvAddr("192.168.88.131:9876");
-
// 3.启动producer
-
producer.start();
-
System.out.println("生产者启动");
-
List<Message> msgs = new ArrayList<Message>();
-
-
// 4.创建消息对象,指定主题Topic、Tag和消息体
-
/**
-
* 参数一:消息主题Topic
-
* 参数二:消息Tag
-
* 参数三:消息内容
-
*/
-
for (int i = 0; i < 20; i++) {
-
Message msg = new Message("Topic_batch_demo", "Tag_batch_demo", ("Hello 虚竹,这是批量消息" + i).getBytes());
-
msgs.add(msg);
-
}
-
// 5.发送消息
-
// 发送批量消息:把大的消息分裂成若干个小的消息
-
ListSplitter splitter = new ListSplitter(msgs, producer);
-
while (splitter.hasNext()) {
-
try {
-
List<Message> listItem = splitter.next();
-
SendResult result = producer.send(listItem);
-
System.out.println("发送结果:" + result);
-
} catch (Exception e) {
-
e.printStackTrace();
-
// 处理error
-
}
-
}
-
-
// 线程睡1秒
-
TimeUnit.SECONDS.sleep(1);
-
-
// 6.关闭生产者producer
-
producer.shutdown();
-
System.out.println("生产者关闭");
-
}
-
-
}
效果:
第六节:批量消息消费者-大于4MB
跟小于4MB的消费者代码一模一样,就不水字数了。
效果:
文章来源: xiaoxuzhu.blog.csdn.net,作者:小虚竹,版权归原作者所有,如需转载,请联系作者。
原文链接:xiaoxuzhu.blog.csdn.net/article/details/115600192
【版权声明】本文为华为云社区用户转载文章,如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱:
cloudbbs@huaweicloud.com
- 点赞
- 收藏
- 关注作者
评论(0)