手搭手RocketMQ发送消息
消息中间件的对比
消息中间件 |
ActiveMQ |
RabbitMQ |
RocketMQ |
kafka |
开发语言 |
java |
erlang |
java |
scala |
单击吞吐量 |
万级 |
万级 |
10万级 |
10万级 |
时效性 |
ms |
us |
ms |
ms |
可用性 |
高(主从架构) |
高(主从架构) |
非常高(主从架构) |
非常高(主从架构) |
消息中间件: activeMQ:java(jms协议),性能一般,吞吐量低。rabbitMQ:erlang(amqp协议),性能好,功能丰富,吞吐量一般。rocketMQ:java,性能好,吞吐量丰富,功能丰富。Kafka: scala,吞吐量最大,功能单一,大数据领域
RocketMQ 是阿里开源的分布式消息中间件,跟其它中间件相比,RocketMQ 的特点是纯JAVA实现,是一套提供了消息生产,存储,消费全过程API的软件系统。
rocketmq.apache.org
Broker:经纪人(经理人)
Topic主题:消息区分,分类,虚拟结构
Queue:消息队列
Apache RocketMQ 是一款低延迟、高并发、高可用、高可靠的分布式消息中间件。消息队列 RocketMQ 可为分布式应用系统提供异步解耦和削峰填谷的能力,同时也具备互联网应用所需的海量消息堆积、高吞吐、可靠重试等特性。
发送消息
发送同步消息
同步消息发送后会用一个返回值,也就是MQ服务器接收到消息返回的一个确认,这种方式非常安全,但是性能就没那么高,而在MQ集群中,也是要等到所有的从机都复制了消息以后才会返回,这种方式适合重要消息的场景
@Test
void rocketmqProducerTest() throws Exception {
//创建生产者
DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");
//连接namesrv
producer.setNamesrvAddr("192.168.68.133:9876");
//启动
producer.start();
//创建消息
Message message = new Message("Topic","消息".getBytes());
SendResult send = producer.send(message);
System.out.println("发送状态"+send.getSendStatus());
//关闭生产者
producer.shutdown();
}
@Test
void rocketmqConsumerTest() throws Exception {
//创建消费者
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ConsumerTest");
//连接namesrv
consumer.setNamesrvAddr("192.168.68.133:9876");
//订阅主题 *表示该主题的所有消息
consumer.subscribe("Topic","*");
//设置监听器(一直,异步回调方式)
consumer.registerMessageListener(new MessageListenerConcurrently() {
//消费方法
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
//业务处理
for (MessageExt messageExt : msgs) {
System.out.println("消费了:"+new String(messageExt.getBody()));
}
System.out.println("消费者上下文"+context);
//CONSUME_SUCCESS成功 RECONSUME_LATER失败
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
//启动
consumer.start();
//挂起当前jvm
System.in.read();
//关闭
//consumer.shutdown();
}
发送异步消息
异步消息常用在对响应时间敏感的业务场景,发送端不能容忍长时间等待Broker的响应。发送完后会有一个异步消息通知
@Test
void aysncProducerTest() throws Exception {
//创建生产者
DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");
//连接namesrv
producer.setNamesrvAddr("192.168.68.133:9876");
//启动
producer.start();
//创建消息
Message message = new Message("aysncTopic","异步消息".getBytes());
producer.send(message, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
System.out.println("发送成功"+sendResult);
}
@Override
public void onException(Throwable throwable) {
System.err.println("发送失败"+throwable);
}
});
System.out.println("执行了");
//关闭生产者
//producer.shutdown();
//挂起当前jvm
System.in.read();
}
@Test
void aysncConsumerTest() throws Exception {
//创建消费者
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ConsumerTest");
//连接namesrv
consumer.setNamesrvAddr("192.168.68.133:9876");
//订阅主题 *表示该主题的所有消息
consumer.subscribe("aysncTopic","*");
//设置监听器(一直,异步回调方式)
consumer.registerMessageListener(new MessageListenerConcurrently() {
//消费方法
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
//业务处理
for (MessageExt messageExt : msgs) {
System.out.println("消费了:"+new String(messageExt.getBody()));
}
System.out.println("消费者上下文"+context);
//CONSUME_SUCCESS成功 RECONSUME_LATER失败
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
//启动
consumer.start();
//挂起当前jvm
System.in.read();
//关闭
//consumer.shutdown();
}
发送单向消息
单向消息发送这种方式不关心发送结果的场景,这种方式吞吐量大,但存在消息丢失的风险。使用案例:日志信息发送
@Test
void OnewayProducerTest()throws Exception{
//创建生产者
DefaultMQProducer producer = new DefaultMQProducer("Oneway");
//连接namesrv
producer.setNamesrvAddr("192.168.68.133:9876");
//启动
producer.start();
//创建消息
Message message = new Message("Oneway","单向消息
".getBytes());
//发送消息
producer.sendOneway(message);
//关闭生产者
producer.shutdown();
}
@Test
void OnewayConsumerTest() throws Exception {
//创建消费者
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ConsumerTest");
//连接namesrv
consumer.setNamesrvAddr("192.168.68.133:9876");
//订阅主题 *表示该主题的所有消息
consumer.subscribe("Oneway","*");
//设置监听器(一直,异步回调方式)
consumer.registerMessageListener(new MessageListenerConcurrently() {
//消费方法
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
//业务处理
for (MessageExt messageExt : msgs) {
System.out.println("消费了:"+new String(messageExt.getBody()));
}
System.out.println("消费者上下文"+context);
//CONSUME_SUCCESS成功 RECONSUME_LATER失败
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
//启动
consumer.start();
//挂起当前jvm
System.in.read();
//关闭
//consumer.shutdown();
}
发送延时消息
发送延时消息,顾名思义。场景:比如淘宝商城下单后,并未支付,有30分钟未支付订单状态
@Test
void delayProducerTest()throws Exception{
//创建生产者
DefaultMQProducer producer = new DefaultMQProducer("delayGroup");
//连接namesrv
producer.setNamesrvAddr("192.168.68.133:9876");
//启动
producer.start();
//创建消息
Message message = new Message("delay","延迟消息".getBytes());
//设置延时 根据官方延时等级
message.setDelayTimeLevel(2);
//发送消息
producer.sendOneway(message);
//关闭生产者
producer.shutdown();
}
@Test
void delayConsumerTest() throws Exception {
//创建消费者
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ConsumerTest");
//连接namesrv
consumer.setNamesrvAddr("192.168.68.133:9876");
//订阅主题 *表示该主题的所有消息
consumer.subscribe("delay","*");
//设置监听器(一直,异步回调方式)
consumer.registerMessageListener(new MessageListenerConcurrently() {
//消费方法
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
//业务处理
for (MessageExt messageExt : msgs) {
System.out.println("消费了:"+new String(messageExt.getBody()));
}
System.out.println("消费者上下文"+context);
//CONSUME_SUCCESS成功 RECONSUME_LATER失败
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
//启动
consumer.start();
//挂起当前jvm
System.in.read();
//关闭
//consumer.shutdown();
}
发送批量消息
批量消息:可以一次性发送一组消息
@Test
void delayProducerTest()throws Exception{
//创建生产者
DefaultMQProducer producer = new DefaultMQProducer("delayGroup");
//连接namesrv
producer.setNamesrvAddr("192.168.68.133:9876");
//启动
producer.start();
//创建消息
//批量消息
List<Message> messages = Arrays.asList(
new Message("delay","批量消息1".getBytes()),
new Message("delay","批量消息2".getBytes()),
new Message("delay","批量消息3".getBytes())
);
//发送消息
producer.send(messages);
//关闭生产者
producer.shutdown();
}
发送顺序消息
发送的消息要保证消息是一定有序的,顺序消息,发送到同一个队列
实体类
@Data
@AllArgsConstructor
public class MessageM {
private int userID;
private String desc;
}
顺序消息,发送到同一个队列
private List<MessageM> messageMs = Arrays.asList(
new MessageM(1,"下单"),
new MessageM(1,"付款"),
new MessageM(1,"配送"),
new MessageM(2,"下单"),
new MessageM(2,"付款"),
new MessageM(2,"配送")
);
@Test
void orderProducerTest() throws Exception {
//创建生产者
DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");
//连接namesrv
producer.setNamesrvAddr("192.168.211.131:9876");
//启动
producer.start();
messageMs.forEach(messageM -> {
//创建消息
Message message = new Message("orderMsg",messageM.toString().getBytes());
//发送顺序消息,发送到同一个队列
try {
//相同的userID去相同的队列
producer.send(
message,
new MessageQueueSelector() {
@Override
public MessageQueue select(List<MessageQueue> list, Message message, Object o) {
//选择队列
int hashCode = o.toString().hashCode();
int i = hashCode % list.size();
return list.get(i);
}
},
messageM.getUserID());
} catch (MQClientException e) {
throw new RuntimeException(e);
} catch (RemotingException e) {
throw new RuntimeException(e);
} catch (MQBrokerException e) {
throw new RuntimeException(e);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
});
//关闭生产者
producer.shutdown();
}
@Test
void orderConsumerTest() throws Exception {
//创建消费者
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ConsumerTest");
//连接namesrv
consumer.setNamesrvAddr("192.168.211.131:9876");
//订阅主题 *表示该主题的所有消息
consumer.subscribe("orderMsg","*");
//设置监听器(一直,异步回调方式)
//MessageListenerConcurrently 并发模式,多线程
//MessageListenerOrderly 顺序模式,单线程
consumer.registerMessageListener(new MessageListenerOrderly() {
//消费方法
@Override
public ConsumeOrderlyStatus consumeMessage(List<MessageExt> list, ConsumeOrderlyContext consumeOrderlyContext) {
System.out.println("当前线程ID"+Thread.currentThread().getId());
return ConsumeOrderlyStatus.SUCCESS;
}
});
//启动
consumer.start();
//挂起当前jvm
System.in.read();
//关闭
//consumer.shutdown();
}
发送带标签消息
RocketMQ提供消息过滤功能,可根据业务逻辑区分,带有A标签的被A消费,带有B标签的被B消费
@Test
void TagProducerTest()throws Exception{
//创建生产者
DefaultMQProducer producer = new DefaultMQProducer("TagGroup");
//连接namesrv
producer.setNamesrvAddr("192.168.68.133:9876");
//启动
producer.start();
//创建消息
//批量消息
Message message1 = new Message("tagTopic", "tagA", "tag标签内容A".getBytes());
Message message2 = new Message("tagTopic", "tagB", "tag标签内容B".getBytes());
//发送消息
producer.send(message1);
producer.send(message2);
//关闭生产者
producer.shutdown();
}
@Test
void TagAConsumerTest() throws Exception {
//创建消费者
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ConsumerTest");
//连接namesrv
consumer.setNamesrvAddr("192.168.68.133:9876");
//订阅主题 *表示该主题的所有消息
consumer.subscribe("tagTopic","tagA");
//设置监听器(一直,异步回调方式)
consumer.registerMessageListener(new MessageListenerConcurrently() {
//消费方法
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
//业务处理
for (MessageExt messageExt : msgs) {
System.out.println("消费了:"+new String(messageExt.getBody()));
}
System.out.println("消费者上下文"+context);
//CONSUME_SUCCESS成功 RECONSUME_LATER失败
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
//启动
consumer.start();
//挂起当前jvm
System.in.read();
//关闭
//consumer.shutdown();
}
@Test
void TagBConsumerTest() throws Exception {
//创建消费者
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ConsumerTest");
//连接namesrv
consumer.setNamesrvAddr("192.168.68.133:9876");
//订阅主题 *表示该主题的所有消息
consumer.subscribe("tagTopic","tagB");
//设置监听器(一直,异步回调方式)
consumer.registerMessageListener(new MessageListenerConcurrently() {
//消费方法
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
//业务处理
for (MessageExt messageExt : msgs) {
System.out.println("消费了:"+new String(messageExt.getBody()));
}
System.out.println("消费者上下文"+context);
//CONSUME_SUCCESS成功 RECONSUME_LATER失败
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
//启动
consumer.start();
//挂起当前jvm
System.in.read();
//关闭
//consumer.shutdown();
}
所遇问题
错误: 无法验证 dlcdn.apache.org 的由 “/C=US/O=Let‘s Encrypt/CN=R3” 颁发的证书: 颁发的证书已经过期。要以不安全的方式连接至 dlcdn.apa
yum install -y ca-certificates
wget --no-check-certificate https://archive.apache.org/dist/rocketmq/4.9.4/rocketmq-all-4.9.4-bin-release.zip
RocketMq 报错 :No route info of this topic : test-topic
conf目录下的broker.conf添加
#自动创建主题
autoCreateTopicEnable=true
打包时可能出现的问题maven [INFO] No proxies configured [INFO] No proxy was configured, downloading directly
经过过排查后发现,是少了yarn-v1.22.10
https://github.com/yarnpkg/yarn/releases/download/v1.22.10/yarn-v1.22.10.tar.gz
- 点赞
- 收藏
- 关注作者
评论(0)