手拉手RocketMQ基础

举报
QGS 发表于 2024/03/10 22:37:44 2024/03/10
【摘要】 手拉手RocketMQ基础:安装配置+快速入门

an

消息中间件的对比

消息中间件

ActiveMQ

RabbitMQ

RocketMQ

kafka

开发语言

java

erlang

java

scala

单击吞吐量

万级

万级

10万级

10万级

时效性

ms

us

ms

ms

可用性

高(主从架构)

高(主从架构)

非常高(主从架构)

非常高(主从架构)


消息中间件: activeMQ:java(jms协议),性能一般,吞吐量低。rabbitMQ:erlang(amqp协议),性能好,功能丰富,吞吐量一般。rocketMQ:java,性能好,吞吐量丰富,功能丰富。Kafka: scala,吞吐量最大,功能单一,大数据领域


RocketMQ安装配置

RocketMQ 是阿里开源的分布式消息中间件,跟其它中间件相比,RocketMQ 的特点是纯JAVA实现,是一套提供了消息生产,存储,消费全过程API的软件系统。

rocketmq.apache.org

Broker:经纪人(经理人)

Topic主题:消息区分,分类,虚拟结构

Queue:消息队列


Apache RocketMQ 是一款低延迟、高并发、高可用、高可靠的分布式消息中间件。消息队列 RocketMQ 可为分布式应用系统提供异步解耦和削峰填谷的能力,同时也具备互联网应用所需的海量消息堆积、高吞吐、可靠重试等特性。


RocketMQ下载安装

解压rocketmq-all-4.9.4-bin-release.zip

unzip rocketmq-all-4.9.4-bin-release.zip

配置 环境变量

vim + /etc/profile


export NAMESRV_ADDR=localhost:9876


根据本地环境需求更改

vim runserver.sh


vim runbroker.sh


修改配置文件


namesrvAddr=localhost:9876

autoCreateTopicEnable=true #自动创建主题

brokerIP1=公网地址


启动

nohup sh mqnamesrv > /opt/rocketmq/namesrv.log &

nohup sh mqbroker -c ../conf/broker.conf > /opt/rocketmq/mqbroker.log &





RocketMQ Dashboard的下载与打包启动


pom.xml这个文件所在的目录下执行 Maven 打包命令

mvn clean package -Dmaven.test.skip=true


\target目录下中,运行jar

java -jar rocketmq-dashboard-1.0.0.jar


nohup java -jar rocketmq-dashboard-1.0.0.jar --server.port=8888 --rocketmq.config.nameservAddr=127.0.0.1:9876 > dashboard.log &


浏览器访问


使用

生产者(Producer)

生产者:也称为消息发布者,负责生产并发送消息至RocketMQ。


消费者(Consumer)

消费者:也称为消息订阅者,负责从RocketMQ接收并消费消息。


消息(Message)

消息:生产或消费的数据,对于RocketMQ来说,消息就是字节数组。


主机(Broker)

RocketMQ的核心,用于暂存和传输消息。


Pom.xml加入依赖

<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.9.2</version>
</dependency>


消息生产者

创建生产者

@Test
void rocketmqProducerTest() throws Exception {
//创建生产者
DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");
//连接namesrv
producer.setNamesrvAddr("192.168.68.133:9876");
//启动
producer.start();
//创建消息
Message message = new Message("Topic","消息".getBytes());
SendResult send = producer.send(message);
System.out.println("发送状态"+send.getSendStatus());
//关闭生产者
producer.shutdown();
}


运行生产者




消息消费者


@Test
void rocketmqConsumerTest() throws Exception {
//创建消费者
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ConsumerTest");
//连接namesrv
consumer.setNamesrvAddr("192.168.68.133:9876");
//订阅主题 *表示该主题的所有消息
consumer.subscribe("Topic","*");

//设置监听器(一直,异步回调方式)
consumer.registerMessageListener(new MessageListenerConcurrently() {
//消费方法
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
//业务处理

for (MessageExt messageExt : msgs) {
System.out.println("消费了:"+new String(messageExt.getBody()));
}
System.out.println("消费者上下文"+context);
//CONSUME_SUCCESS成功 RECONSUME_LATER失败
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});

//启动
consumer.start();
//挂起当前jvm
System.in.read();
//关闭
//consumer.shutdown();
}



RocketMQ的概念

分组(Group)

生产者:标识发送同一类消息的Producer,通常发送逻辑一致。发送普通消息的时候,仅标识使用,并无特别用处。主要作用用于事务消息,事务消息中如果某条发送某条消息的producer-A宕机,使得事务消息一直处于PREPARED状态并超时,则broker会回查同一个group的其它producer,确认这条消息应该commit还是rollback。


消费者:标识一类Consumer的集合名称,这类Consumer通常消费一类消息,且消费逻辑一致。同一个Consumer Group下的各个实例将共同消费topic的消息,起到负载均衡的作用。


消费进度以Consumer Group为粒度管理,不同Consumer Group之间消费进度彼此不受影响,即消息A被Consumer Group1消费过,也会再给Consumer Group2消费。


主题(Topic)

标识一类消息的逻辑名字,消息的逻辑管理单位。无论消息生产还是消费,都需要指定Topic。


区分消息的种类;一个发送者可以发送消息给一个或者多个Topic;一个消息的接收者可以订阅一个或者多个Topic消息。


标签(Tag)

RocketMQ支持给在发送的时候给topic打tag,同一个topic的消息虽然逻辑管理是一样的。但是消费topic的时候,如果你消费订阅的时候指定的是tagA,那么tagB的消息将不会投递。


消息队列(Message Queue)

简称Queue或Q。消息物理管理单位。一个Topic将有若干个Q。若一个Topic创建在不同的Broker,则不同的broker上都有若干Q,消息将物理地存储落在不同Broker结点上,具有水平扩展的能力。


无论生产者还是消费者,实际的生产和消费都是针对Q级别。例如Producer发送消息的时候,会预先选择(默认轮询)好该Topic下面的某一条Q发送;Consumer消费的时候也会负载均衡地分配若干个Q,只拉取对应Q的消息。


每一条message queue均对应一个文件,这个文件存储了实际消息的索引信息。并且即使文件被删除,也能通过实际纯粹的消息文件(commit log)恢复回来。


偏移量(Offset)

RocketMQ中,有很多offset的概念。一般我们只关心暴露到客户端的offset。不指定的话,就是指Message Queue下面的offset。

Message queue是无限长的数组。一条消息进来下标就会涨1,而这个数组的下标就是offset,Message queue中的max offset表示消息的最大offset。

所遇问题

错误: 无法验证 dlcdn.apache.org 的由 “/C=US/O=Let‘s Encrypt/CN=R3” 颁发的证书: 颁发的证书已经过期。要以不安全的方式连接至 dlcdn.apa

yum install -y ca-certificates


wget --no-check-certificate https://archive.apache.org/dist/rocketmq/4.9.4/rocketmq-all-4.9.4-bin-release.zip






【版权声明】本文为华为云社区用户原创内容,转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息, 否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。