用Apache Rocketmq适配华为云和GaussDB
一、适配目的
本任务的主要目的是让Rocketmq支持华为云,主要是在验证Rocketmq在华为云CCE上是否正常使用。帮助拓展华为云的影响力,也方便Rocketmq的用户能够轻松上云。
二、适配任务分析
1、使用最新版Rocketmq在华为CCE集群上完成部署并配置相关topic等。
2、开发并部署Rocketmq发送消息client和接受消息client。
3、验证消息收发功能是否正常。
注:本具体任务内容内容可以参考博客附件。
二、适配过程
1、部署Rocketmq集群
查看Rocketmq官网部署方式:https://rocketmq.apache.org/zh/docs/deploymentOperations/01deploy,可以看到Rocketmq支持多种部署方式,本次采用较为复杂的多节点(集群)多副本模式-同步双写部署方式。使用华为云CCE部署相关实例(容器的yaml文件见附件)。
其中包含2个主broker,每个主broker分别有一个从broker,3个nameserver,另外还部署了2个Rocketmq proxy用于Rocketmq集群的外部访问,后续发送消息client和接受消息client就是连接该proxy进行消息收发验证,再额外部署1个dashboard用于查看消息队列的情况。(dashboard安装和使用:https://rocketmq.apache.org/zh/docs/deploymentOperations/04Dashboard)
CCE配置截图如下:
部署过程中需要的容器镜像包括rocketmq,版本为5.3.1,用于部署broker、namespace、proxy实例。另外一个是dashboard的镜像,用于部署dashboard实例,镜像如图
部署成功之后,我们在集群使用如下命令新建topic,名称是TestTopic1,消费组为TestGroup
sh mqadmin updatetopic -t TestTopic1 -c TestGroup
2、开发并部署发送消息和接受消息client
验证使用https://github.com/apache/servicecomb-fence这个工程作为基础,在工程中引入Rocketmq接入sdk:
同时,接收到消息之后需要落库,我们需要连接GaussDB,链接配置如下
开发消息发送逻辑,其中endpoint是rocketmq的proxy地址,发送的topic是TestTopic1,已经配置好了:
public String sendMsg(String msg) {
// Endpoint address, set to the Proxy address and port list, usually xxx:8080;xxx:8081
String endpoint = "192.168.0.137:8081";
// The target Topic name for message sending, which needs to be created in advance.
String topic = "TestTopic1";
RocketMqMsg mqMsg = new RocketMqMsg();
mqMsg.setMsgInfo(msg);
mqMsg.setTopic(topic);
ClientServiceProvider provider = ClientServiceProvider.loadService();
ClientConfigurationBuilder builder = ClientConfiguration.newBuilder().setEndpoints(endpoint);
ClientConfiguration configuration = builder.build();
// When initializing Producer, communication configuration and pre-bound Topic need to be set.
Producer producer = null;
try {
producer = provider.newProducerBuilder()
.setTopics(topic)
.setClientConfiguration(configuration)
.build();
}catch (Exception e){
}
// Sending a normal message.
Message message = provider.newMessageBuilder()
.setTopic(topic)
// Set the message index key, which can be used to accurately find a specific message.
.setKeys("messageKey")
// Set the message Tag, used by the consumer to filter messages by specified Tag.
.setTag("messageTag")
// Message body.
.setBody(name.getBytes())
.build();
try {
// Send the message, paying attention to the sending result and catching exceptions.
SendReceipt sendReceipt = producer.send(message);
logger.info("Send message successfully, messageId={}", sendReceipt.getMessageId());
} catch (ClientException e) {
logger.error("Failed to send message", e);
}
// producer.close();
return mqMsg.getMsgId();
}
开发接受消息并落库逻辑,接受的topic是TestTopic1,消费组是TestGroup。
public String consumeMsg() {
final ClientServiceProvider provider = ClientServiceProvider.loadService();
// Endpoint address, set to the Proxy address and port list, usually xxx:8080;xxx:8081
String endpoints = "192.168.0.137:8081";
ClientConfiguration clientConfiguration = ClientConfiguration.newBuilder()
.setEndpoints(endpoints)
.build();
// Subscription message filtering rule, indicating subscription to all Tag messages.
String tag = "*";
FilterExpression filterExpression = new FilterExpression(tag, FilterExpressionType.TAG);
// Specify the consumer group the consumer belongs to, Group needs to be created in advance.
String consumerGroup = "TestGroup";
// Specify which target Topic to subscribe to, Topic needs to be created in advance.
String topic ="TestTopic1";
try {
// Initialize PushConsumer
PushConsumer pushConsumer = provider.newPushConsumerBuilder()
.setClientConfiguration(clientConfiguration)
// Set the consumer group.
.setConsumerGroup(consumerGroup)
// Set pre-bound subscription relationship.
.setSubscriptionExpressions(Collections.singletonMap(topic, filterExpression))
// Set the message listener.
.setMessageListener(messageView -> {
// Handle messages and return the consumption result.
System.out.println(messageView.getMessageId());
logger.info("Consume message successfully, messageId={}", messageView.getMessageId());
String str = StandardCharsets.UTF_8.decode(messageView.getBody()).toString();
rocketMqMsgLogicService.insert(messageView.getMessageId().toString(),
topic,str);
//info
return ConsumeResult.SUCCESS;
}).build();
Thread.sleep(Long.MAX_VALUE);
} catch (Exception e) {
logger.error(e.getMessage());
}
return "正在接受...";
}
3、部署并验证
打包工程后,上传到华为云的一台ECS上,然后启动工程中的5个服务。
访问地址后点击发送消息,会返回消息ID:
然后点击接受消息,这样就可以从rocketmq获取消息。
通过测试接口/api/resource/v1/rocketMsg/getConsumedMsgByMsgId?msgId=01FA163E156D661D44074C839500000002,可以获取对应消息ID的具体落库信息:
三、交付件
(1) DEMO仓库地址: https://gitcode.com/uiu34/opensource-demo-rocketmq-20241116/overview
(2) DEMO开发提交记录: https://gitcode.com/uiu34/opensource-demo-rocketmq-20241116/commits/detail/22c93f4366d52153c262a7dc4a23f541e85b3425?ref=master
(3) 博客的地址: https://bbs.huaweicloud.com/blogs/439805
四、华为云资源清单
1.1 CCE
产品名称 |
集群类型 |
集群版本 |
集群规模 |
云容器引擎CCE |
Turbo集群 |
V1.29 |
50 |
1.2 ECS
产品名称 |
CPU架构 |
实例类型 |
公共镜像 |
镜像版本 |
弹性云服务器 |
鲲鹏计算 |
鲲鹏通用计算增强型 |
Huawei Cloud EulerOS |
Huawei Cloud EulerOS 2.0标准版 64位 ARM版(10GiB) |
1.3 GuassDB
产品名称 |
产品类型 |
数据库引擎版本 |
内核引擎版本 |
实例类型 |
部署形态 |
云数据库GaussDB |
基础版 |
GaussDB V2.0-8.201.0 |
505.2.0
|
集中式 |
1主2备 |
- 点赞
- 收藏
- 关注作者
评论(0)