RocketMQ入门

举报
赵KK日常技术记录 发表于 2023/06/24 20:25:06 2023/06/24
【摘要】 RocketMQ是一个分布式消息队列系统,它最初由阿里巴巴开发并开源。RocketMQ具有高可靠性、高吞吐量、高扩展性和良好的可维护性等特点,被广泛应用于金融、电商、物流、游戏等领域。本文将介绍如何入门使用RocketMQ。一、安装RocketMQ1.下载RocketMQ安装包从官网下载最新版本的RocketMQ安装包,地址:[http://rocketmq.apache.org/relea...
RocketMQ是一个分布式消息队列系统,它最初由阿里巴巴开发并开源。RocketMQ具有高可靠性、高吞吐量、高扩展性和良好的可维护性等特点,被广泛应用于金融、电商、物流、游戏等领域。

本文将介绍如何入门使用RocketMQ。

一、安装RocketMQ

1.下载RocketMQ安装包

从官网下载最新版本的RocketMQ安装包,地址:[http://rocketmq.apache.org/release\_notes/release-notes-4.9.0/](http://rocketmq.apache.org/release_notes/release-notes-4.9.0/)

2.解压安装包

将下载的安装包解压到指定目录下,如:/usr/local/rocketmq

3.配置环境变量

在/etc/profile文件中添加如下配置:

```
export ROCKETMQ_HOME=/usr/local/rocketmq
export PATH=PATH:ROCKETMQ_HOME/bin
```

执行source /etc/profile命令使配置生效。

4.启动RocketMQ

进入RocketMQ安装目录下的bin目录,执行如下命令启动RocketMQ:

```
sh mqnamesrv

sh mqbroker -n localhost:9876
```

二、使用RocketMQ

1.创建主题

在RocketMQ中,消息发送和接收需要指定主题(topic)。创建主题可以使用RocketMQ提供的命令行工具mqadmin,命令如下:

```
sh mqadmin updateTopic -c DefaultCluster -n localhost:9876 -t myTopic
```

2.发送消息

使用RocketMQ提供的Java API发送消息,代码如下:

```
public class Producer {
public static void main(String[] args) throws Exception {
// 实例化消息生产者Producer
DefaultMQProducer producer = new DefaultMQProducer("producer_group");
// 设置NameServer的地址
producer.setNamesrvAddr("localhost:9876");
// 启动Producer实例
producer.start();
// 创建消息,并指定Topic、Tag和消息体
Message msg = new Message("myTopic" /* Topic /, "TagA" / Tag /, "Hello RocketMQ" .getBytes(RemotingHelper.DEFAULT_CHARSET) / Message body */);
// 发送消息到一个Broker
SendResult sendResult = producer.send(msg);
// 打印发送结果
System.out.printf("%s%n", sendResult);
// 关闭Producer实例
producer.shutdown();
}
}
```

3.消费消息

使用RocketMQ提供的Java API消费消息,代码如下:

```
public class Consumer {
public static void main(String[] args) throws Exception {
// 实例化消费者
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer_group");
// 设置NameServer的地址
consumer.setNamesrvAddr("localhost:9876");
// 订阅Topic
consumer.subscribe("myTopic", "*");
// 注册消息监听器
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
// 消费成功
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
// 启动消费者实例
consumer.start();
System.out.printf("Consumer Started.%n");
}
}
```

以上代码中,消费者使用了DefaultMQPushConsumer类,它是RocketMQ提供的一个消息推送消费者,可以通过注册消息监听器来消费消息。

四、总结

本文介绍了如何安装和使用RocketMQ,包括创建主题、发送消息和消费消息。RocketMQ具有高可靠性、高吞吐量、高扩展性和良好的可维护性等特点,是一个非常优秀的消息队列系统。
【版权声明】本文为华为云社区用户原创内容,转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息, 否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。