etcd适配华为云&Gaussdb
最近在开源社区做贡献,发现etcd不能连接Gaussdb进行存储配置信息。于是产生了给etcd适配Gaussdb的想法。现在记录一下步骤和当时遇到的问题。
首先展示一下我的成果,喜欢的朋友,点个star吧。
有问题可以联系我:sweetwuxiaomei@qq.com
servicecomb etcd修改仓库
git信息:https://github.com/SweetWuXiaoMei/servicecomb-java-chassis
分支:dev_czd_etcd
demo地址仓库:
git信息:https://gitcode.com/chenzhida/opensource-demo-etcd-241112/overview
分支:dev_etcd
步骤1:fork 仓库
demo主要使用的是servicecomb,目前servicecomb还没有支持etcd,所以先fork servicecomb添加etcd支持
servicecomb源仓库:https://github.com/apache/servicecomb-java-chassis
步骤2:分析并理解servicecomb仓库的源码结构并开发
对于servicecomb的支持,主要是服务发现和动态配置两大块,分别是service-registry(服务发现)和dynamic-config(动态配置)。
首先对于service-registry模块新增一个etcd模块,在servicecomb中已经对于服务注册发现已经留下了接口,我们只需要实现这些接口接口,注册接口是Registration,发现接口是Discovery。
服务发现大致的思路是,首先通过spring boot自动发现机制加载EtcdConfiguration,这里会加载EtcdDiscovery和EtcdRegistration,然后实现其中的接口行了。
EtcdDiscovery主要的方法实现功能如下:
findServiceInstances:查询所有的实例信息
findServices:发现所有的服务名称
init:初始化etcd需要的路径信息
run:获取etcd实例
EtcdRegistration主要的方法实现的功能如下:
init: 初始化etcd需要的路径信息
run:获取etcd实例并开启定时获取实例的任务
重点介绍一下EtcdDiscovery.findServiceInstances的逻辑,代码如下:
public List<EtcdDiscoveryInstance> findServiceInstances(String application, String serviceName) {
String prefixPath = basePath + "/" + application + "/" + serviceName;
watchMap.computeIfAbsent(prefixPath, serName -> {
Watch watchClient = client.getWatchClient();
try {
ByteSequence prefixByteSeq = ByteSequence.from(prefixPath, Charset.defaultCharset());
watchClient.watch(prefixByteSeq, WatchOption.builder().withPrefix(prefixByteSeq).build(),
resp -> watchNode(application, serviceName, prefixPath));
} catch (Exception e) {
LOGGER.error("Failed to add watch", e);
}
return watchClient;
});
List<KeyValue> endpointKv = getValuesByPrefix(prefixPath);
return convertServiceInstanceList(endpointKv);
}
private void watchNode(String application, String serviceName, String prefixPath) {
CompletableFuture<GetResponse> getFuture = client.getKVClient()
.get(ByteSequence.from(prefixPath, StandardCharsets.UTF_8),
GetOption.builder().withPrefix(ByteSequence.from(prefixPath, StandardCharsets.UTF_8)).build());
getFuture.thenAcceptAsync(response -> {
List<EtcdDiscoveryInstance> discoveryInstanceList = convertServiceInstanceList(response.getKvs());
instanceChangedListener.onInstanceChanged(name(), application, serviceName, discoveryInstanceList);
}).exceptionally(e -> {
LOGGER.error("watchNode error", e);
return null;
});
}
private List<KeyValue> getValuesByPrefix(String prefix) {
CompletableFuture<GetResponse> getFuture = client.getKVClient()
.get(ByteSequence.from(prefix, StandardCharsets.UTF_8),
GetOption.builder().withPrefix(ByteSequence.from(prefix, StandardCharsets.UTF_8)).build());
GetResponse response = MuteExceptionUtil.builder().withLog("get kv by prefix error")
.executeCompletableFuture(getFuture);
return response.getKvs();
}
代码功能解释
findServiceInstances 方法
构建前缀路径:根据 application 和 serviceName 构建 prefixPath。
添加监听器:使用 watchMap 确保每个 prefixPath 只有一个监听器,通过 client.getWatchClient() 添加监听器。
获取键值对:调用 getValuesByPrefix 获取前缀路径下的所有键值对。
转换实例列表:调用 convertServiceInstanceList 将键值对转换为 EtcdDiscoveryInstance 列表。
watchNode 方法
获取键值对:通过 client.getKVClient().get 获取前缀路径下的所有键值对。
处理响应:异步处理响应,将键值对转换为 EtcdDiscoveryInstance 列表,并调用 instanceChangedListener.onInstanceChanged 通知实例变化。
异常处理:记录异常日志。
getValuesByPrefix 方法
获取键值对:通过 client.getKVClient().get 获取前缀路径下的所有键值对。
处理响应:使用 MuteExceptionUtil.builder 处理异步响应,记录异常日志并返回键值对列表。
流程图如下:
重点介绍一下EtcdRegistration.run的逻辑,代码如下:
@Override
public void run() {
client = Client.builder().endpoints(etcdRegistryProperties.getConnectString())
.build();
keyPath = basePath + "/" +
BootStrapProperties.readApplication(environment) + "/" +
BootStrapProperties.readServiceName(environment) + "/" +
registrationId.getInstanceId();
String valueJson = MuteExceptionUtil.builder().withLog("to json, key:{}, value:{}", keyPath, etcdInstance)
.executeFunction(JsonUtils::writeValueAsString, etcdInstance);
register(ByteSequence.from(keyPath, Charset.defaultCharset()),
ByteSequence.from(valueJson, Charset.defaultCharset()));
}
public void register(ByteSequence key, ByteSequence value) {
Lease leaseClient = client.getLeaseClient();
leaseId = MuteExceptionUtil.builder().withLog("get lease id, key:{}, value:{}", keyPath, etcdInstance)
.executeCompletableFuture(leaseClient.grant(60)).getID();
KV kvClient = client.getKVClient();
PutOption putOption = PutOption.builder().withLeaseId(leaseId).build();
CompletableFuture<PutResponse> putResponse = kvClient.put(key, value, putOption);
putResponse.thenRun(() -> {
executorService = Executors.newSingleThreadScheduledExecutor();
executorService.scheduleAtFixedRate(
() -> MuteExceptionUtil.builder().withLog("reRegister, {}, {}", keyPath, etcdInstance)
.executeFunction(leaseClient::keepAliveOnce, leaseId), 0, 5, TimeUnit.SECONDS);
});
}
详细步骤
初始化客户端:使用Client.builder()构建Etcd客户端,并设置连接端点。
生成键路径:根据环境配置生成键路径keyPath,格式为basePath/application/serviceName/instanceId。
序列化实例信息:使用MuteExceptionUtil.builder()构建异常处理工具,调用JsonUtils.writeValueAsString将etcdInstance对象转换为JSON字符串。
注册服务:调用register方法,传入生成的键和值。
获取租约ID:使用MuteExceptionUtil.builder()构建异常处理工具,调用leaseClient.grant(60)获取租约ID。
设置键值对:使用kvClient.put将键值对写入Etcd,并设置租约ID。
定期续租:创建单线程定时任务,每5秒调用leaseClient.keepAliveOnce续租。
流程图如下:
下面介绍一下dynamic-config下的config-etcd模块
通过Java的spi机制读取DynamicPropertiesSource,实现自动读取。
实现DynamicPropertiesSource的核心方法
重点解释一下EtcdClient的方法,代码如下:
public void refreshEtcdConfig() throws Exception {
getClient();
String env = BootStrapProperties.readServiceEnvironment(environment);
if (StringUtils.isEmpty(env)) {
env = EtcdConfig.ZOOKEEPER_DEFAULT_ENVIRONMENT;
}
addEnvironmentConfig(env);
addApplicationConfig(env);
addServiceConfig(env);
addVersionConfig(env);
addTagConfig(env);
refreshConfigItems();
}
private void addTagConfig(String env) throws Exception {
if (StringUtils.isEmpty(etcdConfig.getInstanceTag())) {
return;
}
String path = String.format(PATH_TAG, env,
BootStrapProperties.readApplication(environment),
BootStrapProperties.readServiceName(environment),
BootStrapProperties.readServiceVersion(environment),
etcdConfig.getInstanceTag());
ByteSequence prefixByteSeq = ByteSequence.from(path, Charset.defaultCharset());
Watch watchClient = client.getWatchClient();
watchClient.watch(prefixByteSeq, WatchOption.builder().withPrefix(prefixByteSeq).build(),
resp -> new Thread(new GetDataRunable(tagData, this, path)).start());
this.tagData = parseData(path);
}
代码解释
refreshEtcdConfig 方法
获取客户端:调用 getClient() 方法初始化 Etcd 客户端。
读取环境变量:从 environment 中读取服务环境配置 env,如果为空则使用默认值 ZOOKEEPER_DEFAULT_ENVIRONMENT。
添加配置:依次调用 addEnvironmentConfig、addApplicationConfig、addServiceConfig、addVersionConfig 和 addTagConfig 方法,分别添加环境、应用、服务、版本和标签配置。
刷新配置项:调用 refreshConfigItems 方法,合并所有配置项并更新。
addTagConfig 方法
检查实例标签:如果 etcdConfig.getInstanceTag() 为空,则直接返回。
构建路径:使用 String.format 构建标签配置的路径 path。
创建前缀字节序列:将路径转换为 ByteSequence。
设置监听:使用 client.getWatchClient() 设置监听器,当路径下的数据发生变化时,启动新线程执行 GetDataRunable。
解析数据:调用 parseData(path) 方法解析路径下的配置数据,并存储在 tagData 中。
流程图如下:
步骤3:提交pr的准备工作
提交给开源的代码对质量要求很高,servicecomb里面集成了多项插件进行检查,他们是通过maven去配置的。主要使用下面的命令,并分别做了解释。
这些 Maven 命令分别用于构建项目、跳过特定检查、执行质量检测、生成 Docker 镜像等。以下是每条命令的详细说明:
- mvn -B -Pit install -DskipTests -Dspotbugs.skip=true checkstyle:check
- -B: 以“批处理模式”运行 Maven,适合在 CI/CD 环境中自动运行时使用,避免用户交互。
- -Pit: 激活名为 it 的 Maven Profile(配置不同的依赖、插件等)。
- install: 编译项目、运行测试(如果不跳过)、并将构建的 JAR 文件放入本地 Maven 仓库。
- -DskipTests: 跳过测试阶段(不运行测试用例)。
- -Dspotbugs.skip=true: 跳过 SpotBugs 检查(SpotBugs 是 Java 静态代码分析工具)。
- checkstyle:check: 运行 Checkstyle 插件进行代码格式检查,以确保代码遵循项目的样式约定。
- linelint check
- linelint 是一种文件格式检查工具,通常用于检查文件中的空行、行尾空格等常见的格式问题。
- check: 这个命令执行 linelint 工具中的“检查”任务。
- mvn apache-rat:check -B -Pit,release
- apache-rat:check: 使用 Apache RAT(Release Audit Tool)插件检查项目的 License 和版权声明,确保符合开源项目的合规要求。
- -B: 批处理模式。
- -Pit,release: 激活 it 和 release 两个 Profile,使得该命令在这些特定的环境配置下运行。
- mvn -B -Pit -DskipTests clean verify spotbugs:spotbugs
- -B: 批处理模式。
- -Pit: 激活 it Profile。servicecomb进行demo测试的自定义profile
- -DskipTests: 跳过测试。
- clean: 清理项目的构建目录,删除所有之前的编译和打包结果。
- verify: 验证项目是否符合测试、检查规则。包括单元测试、集成测试、代码质量检查等步骤。
- spotbugs:spotbugs: 使用 SpotBugs 插件进行代码质量分析,检查代码中的潜在 bug 和性能问题。
- mvn clean install -Dcheckstyle.skip -Dspotbugs.skip=true -B -Pdocker -Pjacoco -Pit -Pcoverage
- clean: 清理构建目录。
- install: 编译并打包项目,将构建的文件安装到本地仓库。
- -Dcheckstyle.skip: 跳过 Checkstyle 检查。
- -Dspotbugs.skip=true: 跳过 SpotBugs 检查。
- -B: 批处理模式。
- -Pdocker -Pjacoco -Pit -Pcoverage: 激活 docker、jacoco、it 和 coverage 四个 Profile。
- docker Profile 可能用于打包 Docker 镜像。
- jacoco Profile 配置了 JaCoCo 插件,用于生成代码覆盖率报告。
- coverage Profile 配置代码覆盖率相关插件或设置。
步骤4:在huawei demo中添加Etcd支持
只需要在需要注册中心和配置中心的地方添加以下依赖即可
<dependency>
<groupId>org.apache.servicecomb</groupId>
<artifactId>registry-etcd</artifactId>
<version>3.3.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.servicecomb</groupId>
<artifactId>config-etcd</artifactId>
<version>3.3.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
<version>3.25.5</version>
<scope>runtime</scope>
</dependency>
并且下面的配置:
servicecomb:
service:
application: scb-fence
name: resource-server
version: 0.0.1
registry:
etcd:
connectString: http://127.0.0.1:2379
config:
etcd:
connectString: http://127.0.0.1:2379
步骤5:购买华为云
步骤6:测试结果
修改动作:
修改前:
修改后:
- 点赞
- 收藏
- 关注作者
评论(0)