Eventmesh适配OBS开源开发任务总结

举报
yd_255530874 发表于 2025/01/24 17:29:32 2025/01/24
227 0 0
【摘要】 EventMesh 是以事件驱动为核心的基础服务,作为动态的插件式云原生基础服务层,将应用程序和中间件层分离,并提供了灵活,可靠和快速的事件分发能力,同时可以对事件进行管理,可以作为应用进程的连接层,提供企业实现其数字化转型目标所需的全套应用进程间通信模式,本文提供了使用OBS存储消息内容的方法
简介

EventMesh 是以事件驱动为核心的基础服务,EventMesh 之于微服务与 Service Mesh 具有同等的定位。EventMesh 作为动态的插件式云原生基础服务层,将应用程序和中间件层分离,并提供了灵活,可靠和快速的事件分发能力,同时可以对事件进行管理,可以作为应用进程的连接层,提供企业实现其数字化转型目标所需的全套应用进程间通信模式。

环境准备
  1. 下载Gradle 8的最新版本
  2. 设置Gradle的有关环境变量
  3. 在IDEA中修改和gradle有关的配置
开发OBS存储插件过程
  1. 创建obs模块
  2. 再build.gradle中引入依赖,主要依赖如下
    implementation project(":eventmesh-common")
    implementation project(":eventmesh-storage-plugin:eventmesh-storage-api")
    implementation "com.lmax:disruptor"
    
    // obs
    implementation 'com.huaweicloud:esdk-obs-java-bundle:3.24.12'
    
    // auxiliary serialize
    api 'io.cloudevents:cloudevents-json-jackson'
    
  3. 开发消息生产者
  • 定义消息对象:
    @Data
    public class CloudMessageEvent {
    //消息id,作为obs objectkey的一部分
    private String msgId;
    //话题
    private String topic;
    
    public void clear() {
      msgId = null;
      topic = null;
    }
    }
    
  • 发送消息,部门代码如下:
    创建Disruptor
    private Disruptor<CloudMessageEvent> createDisruptor() {
      CloudMessageEventFactory cloudMessageEventFactory = new CloudMessageEventFactory();
      int bufferSize = 1024 * 1024;
      //实例化 Disruptor
      Disruptor<CloudMessageEvent> cloudMessageEventDisruptor = new Disruptor<>(
          cloudMessageEventFactory,
          bufferSize,
          new CloudMessageEventThreadFactory(),
          // 单生产者
          ProducerType.SINGLE,
          // 阻塞等待策略
          new BlockingWaitStrategy());
      cloudMessageEventDisruptor.handleEventsWith(new CloudMessageEventHandler());
      cloudMessageEventDisruptor.setDefaultExceptionHandler(new IgnoreExceptionHandler());
      return cloudMessageEventDisruptor;
    }
    
    发送消息
    public SendResult publish(CloudEvent cloudEvent) {
      Preconditions.checkNotNull(cloudEvent);
      SendResult sendResult = new SendResult();
      try {
        String uuid = UUID.randomUUID().toString().replaceAll("-", "");
        log.info("ObsProducer save cloudevent to OBS======>");
        obsBroker.putMessage(uuid, cloudEvent);
    
        RingBuffer<CloudMessageEvent> ringBuffer = disruptor.getRingBuffer();
        CloudMessageEventProducer producer = new CloudMessageEventProducer(ringBuffer);
        producer.onData(uuid, cloudEvent.getSubject());
    
        sendResult.setTopic(cloudEvent.getSubject());
        sendResult.setMessageId(uuid);
      } catch (Exception e) {
        log.error("failed to add event to ringBuffer for : e = {},{}", e, e.getMessage());
      }
      return sendResult;
    }
    public void putMessage(String messageId, CloudEvent message) {
      log.info("CloudEvent message class:{}", message.getClass().getName());
      String topicName = message.getSubject();
      createTopic(topicName);
      topicMultimap.put(topicName, 1);
      //保存消息到obs
      try {
        PutObjectRequest putObjectRequest = new PutObjectRequest();
        putObjectRequest.setBucketName(obsProperties.getBucket());
        putObjectRequest.setObjectKey(topicName + "/" + messageId + ".json");
        putObjectRequest.setInput(new ByteArrayInputStream(JSON.toJSONBytes(toCloudEventModel(message), StandardCharsets.UTF_8)));
        putObjectRequest.setAcl(AccessControlList.REST_CANNED_PUBLIC_READ);
        obsClient.putObject(putObjectRequest);
      } catch (ObsException e) {
        log.error(e.getMessage(), e);
      }
    }
    
  • 由于CloudEvent的实现类没有提供set方法,需要自定义类来保存信息
    自定义消息对象
    @Data
    public class CloudEventModel {
      private Map<String, Object> attributes;
      private byte[] data;
      private Map<String, Object> extensions;
      private String id;
      private URI source;
      private String specVersion;
      private String subject;
      private String type;
      private String dataContentType;
      private URI dataSchema;
      private OffsetDateTime time;
    }
    
    消息转换
    //将CloudEvent转化为自定义消息模型
    private CloudEventModel toCloudEventModel(CloudEvent cloudEvent) {
      CloudEventModel cloudEventModel = new CloudEventModel();
      cloudEventModel.setId(cloudEvent.getId());
      cloudEventModel.setSubject(cloudEvent.getSubject());
      cloudEventModel.setData(cloudEvent.getData().toBytes());
      cloudEventModel.setSource(cloudEvent.getSource());
      cloudEventModel.setType(cloudEvent.getType());
      Map<String, Object> extensions = new HashMap<>();
      if (!CollectionUtils.isEmpty(cloudEvent.getExtensionNames())) {
        for (String entry : cloudEvent.getExtensionNames()) {
          extensions.put(entry, cloudEvent.getExtension(entry) == null ? "" : cloudEvent.getExtension(entry));
        }
      }
      cloudEventModel.setExtensions(extensions);
      Map<String, Object> attributes = new HashMap<>();
      if (!CollectionUtils.isEmpty(cloudEvent.getAttributeNames())) {
        for (String entry : cloudEvent.getAttributeNames()) {
          attributes.put(entry, cloudEvent.getAttribute(entry) == null ? "" : cloudEvent.getAttribute(entry));
        }
      }
      cloudEventModel.setAttributes(attributes);
      cloudEventModel.setTime(cloudEvent.getTime());
      cloudEventModel.setDataSchema(cloudEvent.getDataSchema());
      cloudEventModel.setDataContentType(cloudEvent.getDataContentType());
      return cloudEventModel;
    }
    //将自定义消息模型转化为CloudEvent
    public CloudEvent toCloudEvent(CloudEventModel cloudEventModel) {
      io.cloudevents.core.v1.CloudEventBuilder cloudEventBuilder = CloudEventBuilder.v1()
        .withId(UUID.randomUUID().toString())
        .withSubject(cloudEventModel.getSubject())
        .withSource(cloudEventModel.getSource())
        .withDataContentType(cloudEventModel.getDataContentType())
        .withType(cloudEventModel.getType())
        .withTime(cloudEventModel.getTime())
        .withData(cloudEventModel.getData());
    	if (!ObjectUtils.isEmpty(cloudEventModel.getExtensions())) {
      Iterator<Map.Entry<String, Object>> iterator = cloudEventModel.getExtensions().entrySet().iterator();
      	  while (iterator.hasNext()) {
            Map.Entry<String, Object> entry = iterator.next();
            Object value = entry.getValue();
            if (value instanceof Integer) {
              cloudEventBuilder.withExtension(entry.getKey(), (Integer) value);
            } else if (value instanceof Number) {
              cloudEventBuilder.withExtension(entry.getKey(), (Number) value);
            } else if (value instanceof Boolean) {
              cloudEventBuilder.withExtension(entry.getKey(), (Boolean) value);
            } else if (value instanceof OffsetDateTime) {
              cloudEventBuilder.withExtension(entry.getKey(), (OffsetDateTime) value);
            } else if (value instanceof URI) {
              cloudEventBuilder.withExtension(entry.getKey(), (URI) value);
            } else {
              cloudEventBuilder.withExtension(entry.getKey(), (String) value);
            }
        }
     }
     return cloudEventBuilder.build();
    }
    
  1. 开发消息消费者,部门代码如下
  • 注册监听者
    public void registerEventListener(EventListener listener) {
      log.info("ObsConsumer registerEventListener======>");
      obsBroker.setListener(listener);
    }
    
  • 获取消息
    EventMeshAsyncConsumeContext consumeContext = new EventMeshAsyncConsumeContext() {
        @Override
        public void commit(EventMeshAction action) {
          switch (action) {
            case CommitMessage:
              // update offset
              log.info("CloudMessageEventHandler message commit, topic: {}, current messageId:{}", topicName, msgId);
              break;
            case ManualAck:
              // update offset
              log.info("CloudMessageEventHandler message ack, topic: {}, current offset:{}", topicName, msgId);
              break;
            case ReconsumeLater:
            default:
          }
        }
      };
      log.info("CloudMessageEventHandler EventListener will consume event======>");
      //
      EventListener listener = obsBroker.getListener();
      //obj从obs中获取的消息对象
      listener.consume(obsBroker.toCloudEvent((CloudEventModel) obj), consumeContext);
      //删除obs存储的消息
      log.info("CloudMessageEventHandler Delete obs object:{}======>", objectKey);
      obsClient.deleteObject(obsProperties.getBucket(), objectKey);
    
添加配置
  1. 插件配置

    采用eventmesh提供的spi机制,加载对应的类
  2. OBS配置
打包
  • gradle clean jar dist -x test -x checkstyleMain -x javaDoc && gradle installPlugin && gradle tar
在ECS上部署 eventmesh
  1. 购买ECS并绑定EIP,同时放开对应的服务端口
  2. 上传压缩包并解压
  3. 修改配置:

    修改eventMesh.storage.plugin.type的值为obs
  4. 给脚本添加执行权限:
    chmod u+x ./bin/start.sh
    chmod u+x ./bin/stop.sh
  5. 启动 eventmesh
    ./bin/start.sh &
运行测试程序

1.修改配置

修改 eventmesh.ip 的值为部署好的eventmesh服务的ip

  1. 在IDEA中运行:org.apache.eventmesh.http.demo.sub.SpringBootDemoApplication
  2. 在IDEA中运行:org.apache.eventmesh.http.demo.pub.eventmeshmessage.AsyncPublishInstance
    运行结束后查看日志,日志位置: logs/eventmesh.out
  3. 查看OBS桶的对象的变化,运行结束后OBS中因该没有对象存在
【声明】本内容来自华为云开发者社区博主,不代表华为云及华为云开发者社区的观点和立场。转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息,否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

作者其他文章

评论(0

抱歉,系统识别当前为高风险访问,暂不支持该操作

    全部回复

    上滑加载中

    设置昵称

    在此一键设置昵称,即可参与社区互动!

    *长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

    *长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。