手搭手RocketMQ发送消息

举报
QGS 发表于 2024/03/13 22:07:27 2024/03/13
【摘要】 手搭手RocketMQ发送消息

消息中间件的对比

消息中间件

ActiveMQ

RabbitMQ

RocketMQ

kafka

开发语言

java

erlang

java

scala

单击吞吐量

万级

万级

10万级

10万级

时效性

ms

us

ms

ms

可用性

高(主从架构)

高(主从架构)

非常高(主从架构)

非常高(主从架构)


消息中间件: activeMQ:java(jms协议),性能一般,吞吐量低。rabbitMQ:erlang(amqp协议),性能好,功能丰富,吞吐量一般。rocketMQ:java,性能好,吞吐量丰富,功能丰富。Kafka: scala,吞吐量最大,功能单一,大数据领域



RocketMQ 是阿里开源的分布式消息中间件,跟其它中间件相比,RocketMQ 的特点是纯JAVA实现,是一套提供了消息生产,存储,消费全过程API的软件系统。

rocketmq.apache.org

Broker:经纪人(经理人)

Topic主题:消息区分,分类,虚拟结构

Queue:消息队列


Apache RocketMQ 是一款低延迟、高并发、高可用、高可靠的分布式消息中间件。消息队列 RocketMQ 可为分布式应用系统提供异步解耦和削峰填谷的能力,同时也具备互联网应用所需的海量消息堆积、高吞吐、可靠重试等特性。


发送消息


发送同步消息

同步消息发送后会用一个返回值,也就是MQ服务器接收到消息返回的一个确认,这种方式非常安全,但是性能就没那么高,而在MQ集群中,也是要等到所有的从机都复制了消息以后才会返回,这种方式适合重要消息的场景


@Test
void rocketmqProducerTest() throws Exception {
//创建生产者
DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");
//连接namesrv
producer.setNamesrvAddr("192.168.68.133:9876");
//启动
producer.start();
//创建消息
Message message = new Message("Topic","消息".getBytes());
SendResult send = producer.send(message);
System.out.println("发送状态"+send.getSendStatus());
//关闭生产者
producer.shutdown();
}

@Test
void rocketmqConsumerTest() throws Exception {
//创建消费者
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ConsumerTest");
//连接namesrv
consumer.setNamesrvAddr("192.168.68.133:9876");
//订阅主题 *表示该主题的所有消息
consumer.subscribe("Topic","*");

//设置监听器(一直,异步回调方式)
consumer.registerMessageListener(new MessageListenerConcurrently() {
//消费方法
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
//业务处理

for (MessageExt messageExt : msgs) {
System.out.println("消费了:"+new String(messageExt.getBody()));
}
System.out.println("消费者上下文"+context);
//CONSUME_SUCCESS成功 RECONSUME_LATER失败
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});

//启动
consumer.start();
//挂起当前jvm
System.in.read();
//关闭
//consumer.shutdown();
}



发送异步消息

异步消息常用在对响应时间敏感的业务场景,发送端不能容忍长时间等待Broker的响应。发送完后会有一个异步消息通知



@Test
void aysncProducerTest() throws Exception {
//创建生产者
DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");
//连接namesrv
producer.setNamesrvAddr("192.168.68.133:9876");
//启动
producer.start();
//创建消息
Message message = new Message("aysncTopic","异步消息".getBytes());
producer.send(message, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
System.out.println("发送成功"+sendResult);
}

@Override
public void onException(Throwable throwable) {
System.err.println("发送失败"+throwable);
}
});
System.out.println("执行了");
//关闭生产者
//producer.shutdown();
//挂起当前jvm
System.in.read();
}

@Test
void aysncConsumerTest() throws Exception {
//创建消费者
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ConsumerTest");
//连接namesrv
consumer.setNamesrvAddr("192.168.68.133:9876");
//订阅主题 *表示该主题的所有消息
consumer.subscribe("aysncTopic","*");

//设置监听器(一直,异步回调方式)
consumer.registerMessageListener(new MessageListenerConcurrently() {
//消费方法
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
//业务处理

for (MessageExt messageExt : msgs) {
System.out.println("消费了:"+new String(messageExt.getBody()));
}
System.out.println("消费者上下文"+context);
//CONSUME_SUCCESS成功 RECONSUME_LATER失败
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});

//启动
consumer.start();
//挂起当前jvm
System.in.read();
//关闭
//consumer.shutdown();
}


发送单向消息

单向消息发送这种方式不关心发送结果的场景,这种方式吞吐量大,但存在消息丢失的风险。使用案例:日志信息发送

@Test
void OnewayProducerTest()throws Exception{
//创建生产者
DefaultMQProducer producer = new DefaultMQProducer("Oneway");
//连接namesrv
producer.setNamesrvAddr("192.168.68.133:9876");
//启动
producer.start();
//创建消息
Message message = new Message("Oneway","单向消息

".getBytes());
//发送消息
producer.sendOneway(message);
//关闭生产者
producer.shutdown();
}

@Test
void OnewayConsumerTest() throws Exception {
//创建消费者
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ConsumerTest");
//连接namesrv
consumer.setNamesrvAddr("192.168.68.133:9876");
//订阅主题 *表示该主题的所有消息
consumer.subscribe("Oneway","*");

//设置监听器(一直,异步回调方式)
consumer.registerMessageListener(new MessageListenerConcurrently() {
//消费方法
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
//业务处理

for (MessageExt messageExt : msgs) {
System.out.println("消费了:"+new String(messageExt.getBody()));
}
System.out.println("消费者上下文"+context);
//CONSUME_SUCCESS成功 RECONSUME_LATER失败
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
//启动
consumer.start();
//挂起当前jvm
System.in.read();
//关闭
//consumer.shutdown();
}


发送延时消息

发送延时消息,顾名思义。场景:比如淘宝商城下单后,并未支付,有30分钟未支付订单状态


@Test
void delayProducerTest()throws Exception{
//创建生产者
DefaultMQProducer producer = new DefaultMQProducer("delayGroup");
//连接namesrv
producer.setNamesrvAddr("192.168.68.133:9876");
//启动
producer.start();
//创建消息
Message message = new Message("delay","延迟消息".getBytes());
//设置延时 根据官方延时等级
message.setDelayTimeLevel(2);
//发送消息
producer.sendOneway(message);
//关闭生产者
producer.shutdown();
}

@Test
void delayConsumerTest() throws Exception {
//创建消费者
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ConsumerTest");
//连接namesrv
consumer.setNamesrvAddr("192.168.68.133:9876");
//订阅主题 *表示该主题的所有消息
consumer.subscribe("delay","*");

//设置监听器(一直,异步回调方式)
consumer.registerMessageListener(new MessageListenerConcurrently() {
//消费方法
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
//业务处理

for (MessageExt messageExt : msgs) {
System.out.println("消费了:"+new String(messageExt.getBody()));
}
System.out.println("消费者上下文"+context);
//CONSUME_SUCCESS成功 RECONSUME_LATER失败
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
//启动
consumer.start();
//挂起当前jvm
System.in.read();
//关闭
//consumer.shutdown();
}


发送批量消息

批量消息:可以一次性发送一组消息

@Test
void delayProducerTest()throws Exception{
//创建生产者
DefaultMQProducer producer = new DefaultMQProducer("delayGroup");
//连接namesrv
producer.setNamesrvAddr("192.168.68.133:9876");
//启动
producer.start();
//创建消息
//批量消息
List<Message> messages = Arrays.asList(
new Message("delay","批量消息1".getBytes()),
new Message("delay","批量消息2".getBytes()),
new Message("delay","批量消息3".getBytes())
);
//发送消息
producer.send(messages);
//关闭生产者
producer.shutdown();
}



发送顺序消息

发送的消息要保证消息是一定有序的,顺序消息,发送到同一个队列


实体类

@Data
@AllArgsConstructor
public class MessageM {
private int userID;
private String desc;
}

顺序消息,发送到同一个队列

private List<MessageM> messageMs = Arrays.asList(
new MessageM(1,"下单"),
new MessageM(1,"付款"),
new MessageM(1,"配送"),
new MessageM(2,"下单"),
new MessageM(2,"付款"),
new MessageM(2,"配送")
);

@Test
void orderProducerTest() throws Exception {
//创建生产者
DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");
//连接namesrv
producer.setNamesrvAddr("192.168.211.131:9876");
//启动
producer.start();

messageMs.forEach(messageM -> {
//创建消息
Message message = new Message("orderMsg",messageM.toString().getBytes());
//发送顺序消息,发送到同一个队列
try {
//相同的userID去相同的队列
producer.send(
message,
new MessageQueueSelector() {
@Override
public MessageQueue select(List<MessageQueue> list, Message message, Object o) {
//选择队列
int hashCode = o.toString().hashCode();
int i = hashCode % list.size();
return list.get(i);
}
},
messageM.getUserID());
} catch (MQClientException e) {
throw new RuntimeException(e);
} catch (RemotingException e) {
throw new RuntimeException(e);
} catch (MQBrokerException e) {
throw new RuntimeException(e);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
});
//关闭生产者
producer.shutdown();
}


@Test
void orderConsumerTest() throws Exception {
//创建消费者
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ConsumerTest");
//连接namesrv
consumer.setNamesrvAddr("192.168.211.131:9876");
//订阅主题 *表示该主题的所有消息
consumer.subscribe("orderMsg","*");

//设置监听器(一直,异步回调方式)
//MessageListenerConcurrently 并发模式,多线程
//MessageListenerOrderly 顺序模式,单线程
consumer.registerMessageListener(new MessageListenerOrderly() {
//消费方法
@Override
public ConsumeOrderlyStatus consumeMessage(List<MessageExt> list, ConsumeOrderlyContext consumeOrderlyContext) {
System.out.println("当前线程ID"+Thread.currentThread().getId());
return ConsumeOrderlyStatus.SUCCESS;
}
});
//启动
consumer.start();
//挂起当前jvm
System.in.read();
//关闭
//consumer.shutdown();
}


发送带标签消息

RocketMQ提供消息过滤功能,可根据业务逻辑区分,带有A标签的被A消费,带有B标签的被B消费

@Test
void TagProducerTest()throws Exception{
//创建生产者
DefaultMQProducer producer = new DefaultMQProducer("TagGroup");
//连接namesrv
producer.setNamesrvAddr("192.168.68.133:9876");
//启动
producer.start();
//创建消息
//批量消息
Message message1 = new Message("tagTopic", "tagA", "tag标签内容A".getBytes());
Message message2 = new Message("tagTopic", "tagB", "tag标签内容B".getBytes());
//发送消息
producer.send(message1);
producer.send(message2);
//关闭生产者
producer.shutdown();
}

@Test
void TagAConsumerTest() throws Exception {
//创建消费者
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ConsumerTest");
//连接namesrv
consumer.setNamesrvAddr("192.168.68.133:9876");
//订阅主题 *表示该主题的所有消息
consumer.subscribe("tagTopic","tagA");

//设置监听器(一直,异步回调方式)
consumer.registerMessageListener(new MessageListenerConcurrently() {
//消费方法
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
//业务处理

for (MessageExt messageExt : msgs) {
System.out.println("消费了:"+new String(messageExt.getBody()));
}
System.out.println("消费者上下文"+context);
//CONSUME_SUCCESS成功 RECONSUME_LATER失败
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
//启动
consumer.start();
//挂起当前jvm
System.in.read();
//关闭
//consumer.shutdown();
}

@Test
void TagBConsumerTest() throws Exception {
//创建消费者
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ConsumerTest");
//连接namesrv
consumer.setNamesrvAddr("192.168.68.133:9876");
//订阅主题 *表示该主题的所有消息
consumer.subscribe("tagTopic","tagB");

//设置监听器(一直,异步回调方式)
consumer.registerMessageListener(new MessageListenerConcurrently() {
//消费方法
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
//业务处理

for (MessageExt messageExt : msgs) {
System.out.println("消费了:"+new String(messageExt.getBody()));
}
System.out.println("消费者上下文"+context);
//CONSUME_SUCCESS成功 RECONSUME_LATER失败
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
//启动
consumer.start();
//挂起当前jvm
System.in.read();
//关闭
//consumer.shutdown();
}



所遇问题

错误: 无法验证 dlcdn.apache.org 的由 “/C=US/O=Let‘s Encrypt/CN=R3” 颁发的证书: 颁发的证书已经过期。要以不安全的方式连接至 dlcdn.apa

yum install -y ca-certificates


wget --no-check-certificate https://archive.apache.org/dist/rocketmq/4.9.4/rocketmq-all-4.9.4-bin-release.zip



RocketMq 报错 :No route info of this topic : test-topic

conf目录下的broker.conf添加

#自动创建主题

autoCreateTopicEnable=true



打包时可能出现的问题maven [INFO] No proxies configured [INFO] No proxy was configured, downloading directly

经过过排查后发现,是少了yarn-v1.22.10


https://github.com/yarnpkg/yarn/releases/download/v1.22.10/yarn-v1.22.10.tar.gz





【版权声明】本文为华为云社区用户原创内容,转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息, 否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。