Eventmesh适配OBS开源开发任务总结
【摘要】 EventMesh 是以事件驱动为核心的基础服务,作为动态的插件式云原生基础服务层,将应用程序和中间件层分离,并提供了灵活,可靠和快速的事件分发能力,同时可以对事件进行管理,可以作为应用进程的连接层,提供企业实现其数字化转型目标所需的全套应用进程间通信模式,本文提供了使用OBS存储消息内容的方法
简介
EventMesh 是以事件驱动为核心的基础服务,EventMesh 之于微服务与 Service Mesh 具有同等的定位。EventMesh 作为动态的插件式云原生基础服务层,将应用程序和中间件层分离,并提供了灵活,可靠和快速的事件分发能力,同时可以对事件进行管理,可以作为应用进程的连接层,提供企业实现其数字化转型目标所需的全套应用进程间通信模式。
环境准备
- 下载Gradle 8的最新版本
- 设置Gradle的有关环境变量
- 在IDEA中修改和gradle有关的配置
开发OBS存储插件过程
- 创建obs模块
- 再build.gradle中引入依赖,主要依赖如下
implementation project(":eventmesh-common") implementation project(":eventmesh-storage-plugin:eventmesh-storage-api") implementation "com.lmax:disruptor" // obs implementation 'com.huaweicloud:esdk-obs-java-bundle:3.24.12' // auxiliary serialize api 'io.cloudevents:cloudevents-json-jackson'
- 开发消息生产者
- 定义消息对象:
@Data public class CloudMessageEvent { //消息id,作为obs objectkey的一部分 private String msgId; //话题 private String topic; public void clear() { msgId = null; topic = null; } }
- 发送消息,部门代码如下:
创建Disruptor
发送消息private Disruptor<CloudMessageEvent> createDisruptor() { CloudMessageEventFactory cloudMessageEventFactory = new CloudMessageEventFactory(); int bufferSize = 1024 * 1024; //实例化 Disruptor Disruptor<CloudMessageEvent> cloudMessageEventDisruptor = new Disruptor<>( cloudMessageEventFactory, bufferSize, new CloudMessageEventThreadFactory(), // 单生产者 ProducerType.SINGLE, // 阻塞等待策略 new BlockingWaitStrategy()); cloudMessageEventDisruptor.handleEventsWith(new CloudMessageEventHandler()); cloudMessageEventDisruptor.setDefaultExceptionHandler(new IgnoreExceptionHandler()); return cloudMessageEventDisruptor; }
public SendResult publish(CloudEvent cloudEvent) { Preconditions.checkNotNull(cloudEvent); SendResult sendResult = new SendResult(); try { String uuid = UUID.randomUUID().toString().replaceAll("-", ""); log.info("ObsProducer save cloudevent to OBS======>"); obsBroker.putMessage(uuid, cloudEvent); RingBuffer<CloudMessageEvent> ringBuffer = disruptor.getRingBuffer(); CloudMessageEventProducer producer = new CloudMessageEventProducer(ringBuffer); producer.onData(uuid, cloudEvent.getSubject()); sendResult.setTopic(cloudEvent.getSubject()); sendResult.setMessageId(uuid); } catch (Exception e) { log.error("failed to add event to ringBuffer for : e = {},{}", e, e.getMessage()); } return sendResult; } public void putMessage(String messageId, CloudEvent message) { log.info("CloudEvent message class:{}", message.getClass().getName()); String topicName = message.getSubject(); createTopic(topicName); topicMultimap.put(topicName, 1); //保存消息到obs try { PutObjectRequest putObjectRequest = new PutObjectRequest(); putObjectRequest.setBucketName(obsProperties.getBucket()); putObjectRequest.setObjectKey(topicName + "/" + messageId + ".json"); putObjectRequest.setInput(new ByteArrayInputStream(JSON.toJSONBytes(toCloudEventModel(message), StandardCharsets.UTF_8))); putObjectRequest.setAcl(AccessControlList.REST_CANNED_PUBLIC_READ); obsClient.putObject(putObjectRequest); } catch (ObsException e) { log.error(e.getMessage(), e); } }
- 由于CloudEvent的实现类没有提供set方法,需要自定义类来保存信息
自定义消息对象
消息转换@Data public class CloudEventModel { private Map<String, Object> attributes; private byte[] data; private Map<String, Object> extensions; private String id; private URI source; private String specVersion; private String subject; private String type; private String dataContentType; private URI dataSchema; private OffsetDateTime time; }
//将CloudEvent转化为自定义消息模型 private CloudEventModel toCloudEventModel(CloudEvent cloudEvent) { CloudEventModel cloudEventModel = new CloudEventModel(); cloudEventModel.setId(cloudEvent.getId()); cloudEventModel.setSubject(cloudEvent.getSubject()); cloudEventModel.setData(cloudEvent.getData().toBytes()); cloudEventModel.setSource(cloudEvent.getSource()); cloudEventModel.setType(cloudEvent.getType()); Map<String, Object> extensions = new HashMap<>(); if (!CollectionUtils.isEmpty(cloudEvent.getExtensionNames())) { for (String entry : cloudEvent.getExtensionNames()) { extensions.put(entry, cloudEvent.getExtension(entry) == null ? "" : cloudEvent.getExtension(entry)); } } cloudEventModel.setExtensions(extensions); Map<String, Object> attributes = new HashMap<>(); if (!CollectionUtils.isEmpty(cloudEvent.getAttributeNames())) { for (String entry : cloudEvent.getAttributeNames()) { attributes.put(entry, cloudEvent.getAttribute(entry) == null ? "" : cloudEvent.getAttribute(entry)); } } cloudEventModel.setAttributes(attributes); cloudEventModel.setTime(cloudEvent.getTime()); cloudEventModel.setDataSchema(cloudEvent.getDataSchema()); cloudEventModel.setDataContentType(cloudEvent.getDataContentType()); return cloudEventModel; } //将自定义消息模型转化为CloudEvent public CloudEvent toCloudEvent(CloudEventModel cloudEventModel) { io.cloudevents.core.v1.CloudEventBuilder cloudEventBuilder = CloudEventBuilder.v1() .withId(UUID.randomUUID().toString()) .withSubject(cloudEventModel.getSubject()) .withSource(cloudEventModel.getSource()) .withDataContentType(cloudEventModel.getDataContentType()) .withType(cloudEventModel.getType()) .withTime(cloudEventModel.getTime()) .withData(cloudEventModel.getData()); if (!ObjectUtils.isEmpty(cloudEventModel.getExtensions())) { Iterator<Map.Entry<String, Object>> iterator = cloudEventModel.getExtensions().entrySet().iterator(); while (iterator.hasNext()) { Map.Entry<String, Object> entry = iterator.next(); Object value = entry.getValue(); if (value instanceof Integer) { cloudEventBuilder.withExtension(entry.getKey(), (Integer) value); } else if (value instanceof Number) { cloudEventBuilder.withExtension(entry.getKey(), (Number) value); } else if (value instanceof Boolean) { cloudEventBuilder.withExtension(entry.getKey(), (Boolean) value); } else if (value instanceof OffsetDateTime) { cloudEventBuilder.withExtension(entry.getKey(), (OffsetDateTime) value); } else if (value instanceof URI) { cloudEventBuilder.withExtension(entry.getKey(), (URI) value); } else { cloudEventBuilder.withExtension(entry.getKey(), (String) value); } } } return cloudEventBuilder.build(); }
- 开发消息消费者,部门代码如下
- 注册监听者
public void registerEventListener(EventListener listener) { log.info("ObsConsumer registerEventListener======>"); obsBroker.setListener(listener); }
- 获取消息
EventMeshAsyncConsumeContext consumeContext = new EventMeshAsyncConsumeContext() { @Override public void commit(EventMeshAction action) { switch (action) { case CommitMessage: // update offset log.info("CloudMessageEventHandler message commit, topic: {}, current messageId:{}", topicName, msgId); break; case ManualAck: // update offset log.info("CloudMessageEventHandler message ack, topic: {}, current offset:{}", topicName, msgId); break; case ReconsumeLater: default: } } }; log.info("CloudMessageEventHandler EventListener will consume event======>"); // EventListener listener = obsBroker.getListener(); //obj从obs中获取的消息对象 listener.consume(obsBroker.toCloudEvent((CloudEventModel) obj), consumeContext); //删除obs存储的消息 log.info("CloudMessageEventHandler Delete obs object:{}======>", objectKey); obsClient.deleteObject(obsProperties.getBucket(), objectKey);
添加配置
- 插件配置
采用eventmesh提供的spi机制,加载对应的类 - OBS配置
打包
- gradle clean jar dist -x test -x checkstyleMain -x javaDoc && gradle installPlugin && gradle tar
在ECS上部署 eventmesh
- 购买ECS并绑定EIP,同时放开对应的服务端口
- 上传压缩包并解压
- 修改配置:
修改eventMesh.storage.plugin.type的值为obs - 给脚本添加执行权限:
chmod u+x ./bin/start.sh
chmod u+x ./bin/stop.sh - 启动 eventmesh
./bin/start.sh &
运行测试程序
1.修改配置
修改 eventmesh.ip 的值为部署好的eventmesh服务的ip
- 在IDEA中运行:org.apache.eventmesh.http.demo.sub.SpringBootDemoApplication
- 在IDEA中运行:org.apache.eventmesh.http.demo.pub.eventmeshmessage.AsyncPublishInstance
运行结束后查看日志,日志位置: logs/eventmesh.out - 查看OBS桶的对象的变化,运行结束后OBS中因该没有对象存在
【声明】本内容来自华为云开发者社区博主,不代表华为云及华为云开发者社区的观点和立场。转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息,否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱:
cloudbbs@huaweicloud.com
- 点赞
- 收藏
- 关注作者
作者其他文章
评论(0)