11Nacos配置中心之客户端长轮询
11Nacos配置中心之客户端长轮询
客户端长轮询定时任务是在NacosFactory的createConfigService构建ConfigService对象实例的时候启动的
createConfigService
public static ConfigService createConfigService(String serverAddr) throws NacosException {
return ConfigFactory.createConfigService(serverAddr);
}
public class ConfigFactory {
/**
* Create Config
*
* @param properties init param
* @return ConfigService
* @throws NacosException Exception
*/
public static ConfigService createConfigService(Properties properties) throws NacosException {
try {
Class<?> driverImplClass = Class.forName("com.alibaba.nacos.client.config.NacosConfigService");
Constructor constructor = driverImplClass.getConstructor(Properties.class);
ConfigService vendorImpl = (ConfigService) constructor.newInstance(properties);
return vendorImpl;
} catch (Throwable e) {
throw new NacosException(NacosException.CLIENT_INVALID_PARAM, e);
}
}
/**
* Create Config
*
* @param serverAddr serverList
* @return Config
* @throws ConfigService Exception
*/
public static ConfigService createConfigService(String serverAddr) throws NacosException {
Properties properties = new Properties();
properties.put(PropertyKeyConst.SERVER_ADDR, serverAddr);
return createConfigService(properties);
}
}
- 通过Class.forName加载NacosConfigService类
- 使用反射来完成NacosConfigService类的实例化
NacosConfigService构造
NacosConfigService构造方法:
public NacosConfigService(Properties properties) throws NacosException {
String encodeTmp = properties.getProperty("encode");
if (StringUtils.isBlank(encodeTmp)) {
this.encode = "UTF-8";
} else {
this.encode = encodeTmp.trim();
}
this.initNamespace(properties);
this.agent = new MetricsHttpAgent(new ServerHttpAgent(properties));
this.agent.start();
this.worker = new ClientWorker(this.agent, this.configFilterChainManager, properties);
}
它的构造方法中:
- 初始化HttpAgent,使用了装饰器模式,实际工作的类是ServerHttpAgent,MetricsHttpAgent内部也调用了ServerHttpAgent的方法,增加监控统计信息
- ClientWorker是客户端的工作类,agent作为参数传入ClientWorker,用agent做一些远程调用
ClientWorker构造
ClientWorker的构造函数:
@SuppressWarnings("PMD.ThreadPoolCreationRule")
public ClientWorker(final HttpAgent agent, final ConfigFilterChainManager configFilterChainManager, final Properties properties) {
this.agent = agent;
this.configFilterChainManager = configFilterChainManager;
// Initialize the timeout parameter
init(properties);
executor = Executors.newScheduledThreadPool(1, new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(r);
t.setName("com.alibaba.nacos.client.Worker." + agent.getName());
t.setDaemon(true);
return t;
}
});
executorService = Executors.newScheduledThreadPool(Runtime.getRuntime().availableProcessors(), new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(r);
t.setName("com.alibaba.nacos.client.Worker.longPolling." + agent.getName());
t.setDaemon(true);
return t;
}
});
executor.scheduleWithFixedDelay(new Runnable() {
@Override
public void run() {
try {
checkConfigInfo();
} catch (Throwable e) {
LOGGER.error("[" + agent.getName() + "] [sub-check] rotate check error", e);
}
}
}, 1L, 10L, TimeUnit.MILLISECONDS);
}
构造方法中:
- 构建定时调度的线程池,第一个线程池executor只拥有一个核心线程,每隔10s执行一次checkConfigInfo()方法,功能就是每10ms检查一次配置信息
- 第二个线程池executorService只完成了初始化,后续用于客户端的定时长轮询功能。
checkConfigInfo()方法:
public void checkConfigInfo() {
int listenerSize = cacheMap.get().size();
int longingTaskCount = (int) Math.ceil(listenerSize / ParamUtil.getPerTaskConfigSize());
if (longingTaskCount > currentLongingTaskCount) {
for (int i = (int) currentLongingTaskCount; i < longingTaskCount; i++) {
executorService.execute(new LongPollingRunnable(i));
}
currentLongingTaskCount = longingTaskCount;
}
}
这个方法的主要功能就是检查配置信息是否发送变化,
- 获取监听个数
- 分配长轮询任务数,向上取整
- 判断长轮询任务数是否比当前长轮询任务数大,如果大的话创建指定就创建线程达到所需的任务数的线程数量,如果不比当前任务数就把求得长轮询任务数赋值给当前长轮询任务数
cacheMap用来存储监听变更的缓存集合,key是根据dataID/group/tenant拼接的值。Value是对应的存储在Nacos服务器上的配置文件的内容。
默认情况下每个长轮询LongPollingRunnable任务处理3000个监听配置集,超过3000个启动多个LongPollingRunnable执行。
LongPollingRunnable
LongPollingRunnable是一个线程,我们可以直接找到LongPollingRunnable里面的run方法
class LongPollingRunnable implements Runnable {
private int taskId;
public LongPollingRunnable(int taskId) {
this.taskId = taskId;
}
@Override
public void run() {
List<CacheData> cacheDatas = new ArrayList<CacheData>();
List<String> inInitializingCacheList = new ArrayList<String>();
try {
// check failover config
for (CacheData cacheData : cacheMap.get().values()) {
if (cacheData.getTaskId() == taskId) {
cacheDatas.add(cacheData);
try {
checkLocalConfig(cacheData);
if (cacheData.isUseLocalConfigInfo()) {
cacheData.checkListenerMd5();
}
} catch (Exception e) {
LOGGER.error("get local config info error", e);
}
}
}
// check server config
List<String> changedGroupKeys = checkUpdateDataIds(cacheDatas, inInitializingCacheList);
for (String groupKey : changedGroupKeys) {
String[] key = GroupKey.parseKey(groupKey);
String dataId = key[0];
String group = key[1];
String tenant = null;
if (key.length == 3) {
tenant = key[2];
}
try {
String content = getServerConfig(dataId, group, tenant, 3000L);
CacheData cache = cacheMap.get().get(GroupKey.getKeyTenant(dataId, group, tenant));
cache.setContent(content);
LOGGER.info("[{}] [data-received] dataId={}, group={}, tenant={}, md5={}, content={}",
agent.getName(), dataId, group, tenant, cache.getMd5(),
ContentUtils.truncateContent(content));
} catch (NacosException ioe) {
String message = String.format(
"[%s] [get-update] get changed config exception. dataId=%s, group=%s, tenant=%s",
agent.getName(), dataId, group, tenant);
LOGGER.error(message, ioe);
}
}
for (CacheData cacheData : cacheDatas) {
if (!cacheData.isInitializing() || inInitializingCacheList
.contains(GroupKey.getKeyTenant(cacheData.dataId, cacheData.group, cacheData.tenant))) {
cacheData.checkListenerMd5();
cacheData.setInitializing(false);
}
}
inInitializingCacheList.clear();
executorService.execute(this);
} catch (Throwable e) {
// If the rotation training task is abnormal, the next execution time of the task will be punished
LOGGER.error("longPolling error : ", e);
executorService.schedule(this, taskPenaltyTime, TimeUnit.MILLISECONDS);
}
}
}
LongPollingRunnable类的run()方法中:
- 遍历CacheData,检查本地配置,根据taskId对cacheMap进行数据分割,通过checkLocalConfig方法检查本地配置,本地在${user}\naocs\config\目录下缓存一份服务端的配置信息,checkLocalConfig将内存中的数据和本地磁盘数据比较,不一致说明数据发生了变化,需要触发事件通知。
- 执行checkUpdateDataIds方法在服务端建立长轮询机制,通过长轮询检查数据变更。
- 遍历变更数据集合changedGroupKeys,调用getServerConfig方法,根据dataId,group,tenant去服务端读取对应的配置信息并保存到本地文件中。
- 继续定时执行当前线程
checkUpdateDataIds()方法
checkUpdateDataIds()方法基于长连接方式监听服务端配置的变化,最后根据变化数据的key去服务端获取最新数据。
checkUpdateDataIds中调用checkUpdateConfigStr
/**
*
*/
List<String> checkUpdateConfigStr(String probeUpdateString, boolean isInitializingCacheList) throws IOException {
List<String> params = Arrays.asList(Constants.PROBE_MODIFY_REQUEST, probeUpdateString);
List<String> headers = new ArrayList<String>(2);
headers.add("Long-Pulling-Timeout");
headers.add("" + timeout);
// told server do not hang me up if new initializing cacheData added in
if (isInitializingCacheList) {
headers.add("Long-Pulling-Timeout-No-Hangup");
headers.add("true");
}
if (StringUtils.isBlank(probeUpdateString)) {
return Collections.emptyList();
}
try {
HttpResult result = agent.httpPost(Constants.CONFIG_CONTROLLER_PATH + "/listener", headers, params,
agent.getEncode(), timeout);
if (HttpURLConnection.HTTP_OK == result.code) {
setHealthServer(true);
return parseUpdateDataIdResponse(result.content);
} else {
setHealthServer(false);
LOGGER.error("[{}] [check-update] get changed dataId error, code: {}", agent.getName(), result.code);
}
} catch (IOException e) {
setHealthServer(false);
LOGGER.error("[" + agent.getName() + "] [check-update] get changed dataId exception", e);
throw e;
}
return Collections.emptyList();
}
这个方法的作用就是从Server获取值变化了的DataID列表。返回的对象里只有dataId和group是有效的,保证不返回NULL
checkUpdateConfigStr()方法中通过agent.httpPost调用/v1/cs/configs/listener接口实现长轮询请求。长轮询请求是实现层面只是设置了一个比较长的超时时间,默认30s。如果服务端的数据发生变更,客户端会收到HttpResult。服务端返回的是存在数据变更的dataId, group, tenant。获得这些信息后,在LongPollingRunnable的run方法中调用getServerConfig方法从Nacos服务器中读取具体的配置内容。
getServerConfig
从Nacos服务器中读取具体的配置内容:
public String getServerConfig(String dataId, String group, String tenant, long readTimeout)
throws NacosException {
if (StringUtils.isBlank(group)) {
group = Constants.DEFAULT_GROUP;
}
HttpResult result = null;
try {
List<String> params = null;
if (StringUtils.isBlank(tenant)) {
params = Arrays.asList("dataId", dataId, "group", group);
} else {
params = Arrays.asList("dataId", dataId, "group", group, "tenant", tenant);
}
result = agent.httpGet(Constants.CONFIG_CONTROLLER_PATH, null, params, agent.getEncode(), readTimeout);
} catch (IOException e) {
String message = String.format(
"[%s] [sub-server] get server config exception, dataId=%s, group=%s, tenant=%s", agent.getName(),
dataId, group, tenant);
LOGGER.error(message, e);
throw new NacosException(NacosException.SERVER_ERROR, e);
}
switch (result.code) {
case HttpURLConnection.HTTP_OK:
LocalConfigInfoProcessor.saveSnapshot(agent.getName(), dataId, group, tenant, result.content);
return result.content;
case HttpURLConnection.HTTP_NOT_FOUND:
LocalConfigInfoProcessor.saveSnapshot(agent.getName(), dataId, group, tenant, null);
return null;
case HttpURLConnection.HTTP_CONFLICT: {
LOGGER.error(
"[{}] [sub-server-error] get server config being modified concurrently, dataId={}, group={}, "
+ "tenant={}", agent.getName(), dataId, group, tenant);
throw new NacosException(NacosException.CONFLICT,
"data being modified, dataId=" + dataId + ",group=" + group + ",tenant=" + tenant);
}
case HttpURLConnection.HTTP_FORBIDDEN: {
LOGGER.error("[{}] [sub-server-error] no right, dataId={}, group={}, tenant={}", agent.getName(), dataId,
group, tenant);
throw new NacosException(result.code, result.content);
}
default: {
LOGGER.error("[{}] [sub-server-error] dataId={}, group={}, tenant={}, code={}", agent.getName(), dataId,
group, tenant, result.code);
throw new NacosException(result.code,
"http error, code=" + result.code + ",dataId=" + dataId + ",group=" + group + ",tenant=" + tenant);
}
}
}
总结
现在我们知道Nacos配置中心的客户端做了哪些事了,客户端创建NacosConfigService实例,它的构造方法中创建了ClientWorker对象,ClientWorker中就设定了定时线程每隔10秒执行一次checkConfigInfo()方法来检查配置信息是否变更,使用的线程是LongPollingRunnable,它的run()方法中的逻辑就是调用checkUpdateDataIds()方法检查是否数据变更,本质是调用服务端的/v1/cs/configs/listener接口来实现的
- 点赞
- 收藏
- 关注作者
评论(0)