用Apache Rocketmq适配华为云和GaussDB

举报
zhanwei 发表于 2024/11/18 18:07:38 2024/11/18
【摘要】 一、适配目的本任务的主要目的是让Rocketmq支持华为云,主要是在验证Rocketmq在华为云CCE上是否正常使用。帮助拓展华为云的影响力,也方便Rocketmq的用户能够轻松上云。二、适配任务1、使用最新版Rocketmq在华为CCE集群上完成部署并配置相关topic等。2、开发并部署Rocketmq发送消息client和接受消息client。3、验证消息收发功能是否正常。注:本具体任务...

一、适配目的

本任务的主要目的是让Rocketmq支持华为云,主要是在验证Rocketmq在华为云CCE上是否正常使用。帮助拓展华为云的影响力,也方便Rocketmq的用户能够轻松上云

二、适配任务分析

1、使用最新版Rocketmq在华为CCE集群上完成部署并配置相关topic等。

2、开发并部署Rocketmq发送消息client和接受消息client。

3、验证消息收发功能是否正常。

注:本具体任务内容内容可以参考博客附件。

二、适配过程

1、部署Rocketmq集群

查看Rocketmq官网部署方式:https://rocketmq.apache.org/zh/docs/deploymentOperations/01deploy,可以看到Rocketmq支持多种部署方式,本次采用较为复杂的多节点(集群)多副本模式-同步双写部署方式。使用华为云CCE部署相关实例(容器的yaml文件见附件)。

其中包含2个主broker,每个主broker分别有一个从broker,3个nameserver,另外还部署了2个Rocketmq proxy用于Rocketmq集群的外部访问,后续发送消息client和接受消息client就是连接该proxy进行消息收发验证,再额外部署1个dashboard用于查看消息队列的情况。(dashboard安装和使用:https://rocketmq.apache.org/zh/docs/deploymentOperations/04Dashboard)

CCE配置截图如下:

部署过程中需要的容器镜像包括rocketmq,版本为5.3.1,用于部署broker、namespace、proxy实例。另外一个是dashboard的镜像,用于部署dashboard实例,镜像如图

部署成功之后,我们在集群使用如下命令新建topic,名称是TestTopic1,消费组为TestGroup

sh mqadmin updatetopic -t TestTopic1 -c TestGroup

2、开发并部署发送消息和接受消息client

验证使用https://github.com/apache/servicecomb-fence这个工程作为基础,在工程中引入Rocketmq接入sdk:

同时,接收到消息之后需要落库,我们需要连接GaussDB,链接配置如下

开发消息发送逻辑,其中endpoint是rocketmq的proxy地址,发送的topic是TestTopic1,已经配置好了:

  public String sendMsg(String msg) {

      // Endpoint address, set to the Proxy address and port list, usually xxx:8080;xxx:8081
      String endpoint = "192.168.0.137:8081";
      // The target Topic name for message sending, which needs to be created in advance.
      String topic = "TestTopic1";
      RocketMqMsg mqMsg = new RocketMqMsg();
      mqMsg.setMsgInfo(msg);
      mqMsg.setTopic(topic);
      ClientServiceProvider provider = ClientServiceProvider.loadService();
      ClientConfigurationBuilder builder = ClientConfiguration.newBuilder().setEndpoints(endpoint);
      ClientConfiguration configuration = builder.build();
      // When initializing Producer, communication configuration and pre-bound Topic need to be set.
      Producer producer = null;
      try {
          producer = provider.newProducerBuilder()
                  .setTopics(topic)
                  .setClientConfiguration(configuration)
                  .build();
      }catch (Exception e){

      }

      // Sending a normal message.
      Message message = provider.newMessageBuilder()
              .setTopic(topic)
              // Set the message index key, which can be used to accurately find a specific message.
              .setKeys("messageKey")
              // Set the message Tag, used by the consumer to filter messages by specified Tag.
              .setTag("messageTag")
              // Message body.
              .setBody(name.getBytes())
              .build();
      try {
          // Send the message, paying attention to the sending result and catching exceptions.
          SendReceipt sendReceipt = producer.send(message);

          logger.info("Send message successfully, messageId={}", sendReceipt.getMessageId());

      } catch (ClientException e) {
          logger.error("Failed to send message", e);
      }
      // producer.close();
      return mqMsg.getMsgId();
  }

开发接受消息并落库逻辑,接受的topic是TestTopic1,消费组是TestGroup。

public String consumeMsg() {
        final ClientServiceProvider provider = ClientServiceProvider.loadService();
        // Endpoint address, set to the Proxy address and port list, usually xxx:8080;xxx:8081
        String endpoints = "192.168.0.137:8081";
        ClientConfiguration clientConfiguration = ClientConfiguration.newBuilder()
              .setEndpoints(endpoints)
              .build();
        // Subscription message filtering rule, indicating subscription to all Tag messages.
        String tag = "*";
        FilterExpression filterExpression = new FilterExpression(tag, FilterExpressionType.TAG);
        // Specify the consumer group the consumer belongs to, Group needs to be created in advance.
        String consumerGroup = "TestGroup";
        // Specify which target Topic to subscribe to, Topic needs to be created in advance.
        String topic ="TestTopic1";

    try {
        // Initialize PushConsumer
        PushConsumer pushConsumer = provider.newPushConsumerBuilder()
                .setClientConfiguration(clientConfiguration)
                // Set the consumer group.
                .setConsumerGroup(consumerGroup)
                // Set pre-bound subscription relationship.
                .setSubscriptionExpressions(Collections.singletonMap(topic, filterExpression))
                // Set the message listener.
                .setMessageListener(messageView -> {
                    // Handle messages and return the consumption result.
                    System.out.println(messageView.getMessageId());
                    logger.info("Consume message successfully, messageId={}", messageView.getMessageId());

                    String str = StandardCharsets.UTF_8.decode(messageView.getBody()).toString();
                    rocketMqMsgLogicService.insert(messageView.getMessageId().toString(),
                            topic,str);
                    //info
                    return ConsumeResult.SUCCESS;
                }).build();
        Thread.sleep(Long.MAX_VALUE);
    } catch (Exception e) {
        logger.error(e.getMessage());
    }
    return "正在接受...";
  }

3、部署并验证

打包工程后,上传到华为云的一台ECS上,然后启动工程中的5个服务。

访问地址后点击发送消息,会返回消息ID:

然后点击接受消息,这样就可以从rocketmq获取消息。

通过测试接口/api/resource/v1/rocketMsg/getConsumedMsgByMsgId?msgId=01FA163E156D661D44074C839500000002,可以获取对应消息ID的具体落库信息:

三、交付件

(1) DEMO仓库地址: https://gitcode.com/uiu34/opensource-demo-rocketmq-20241116/overview

(2) DEMO开发提交记录: https://gitcode.com/uiu34/opensource-demo-rocketmq-20241116/commits/detail/22c93f4366d52153c262a7dc4a23f541e85b3425?ref=master

(3) 博客的地址: https://bbs.huaweicloud.com/blogs/439805

四、华为云资源清单

1.1 CCE

产品名称

集群类型

集群版本

集群规模

云容器引擎CCE

Turbo集群

V1.29

50

1.2 ECS

产品名称

CPU架构

实例类型

公共镜像

镜像版本

弹性云服务器

鲲鹏计算

鲲鹏通用计算增强型

Huawei Cloud EulerOS

Huawei Cloud EulerOS 2.0标准版 64ARM(10GiB)

1.3 GuassDB

产品名称

产品类型

数据库引擎版本

内核引擎版本

实例类型

部署形态

云数据库GaussDB

基础版

GaussDB V2.0-8.201.0


505.2.0

集中式

12

【版权声明】本文为华为云社区用户原创内容,转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息, 否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。