【Hadoop】【Yarn】ResourceManager是如何控制主备的
ResourceManager上面所有的功能点基本都是以service形式实现的。Yarn HA管理也不例外。
在ResourceManager.serviceInit()方法中对其进行了初始化。代码片段如下:
// yarn.resourcemanager.ha.enabled 参数表示Yarn是否开启HA模式。默认值false。
this.rmContext.setHAEnabled(HAUtil.isHAEnabled(this.conf));
if (this.rmContext.isHAEnabled()) {
HAUtil.verifyAndSetConfiguration(this.conf);
}
// elector必须要在adminService之后添加到service列表中,因为service列表的start是按照加入的顺序来的。而elector依赖于adminservice里面的方法。所以要在adminservice之后被加入service列表。
if (this.rmContext.isHAEnabled()) {
// If the RM is configured to use an embedded leader elector,
// initialize the leader elector.
// yarn.resourcemanager.ha.automatic-failover.enabled为true
if (HAUtil.isAutomaticFailoverEnabled(conf) && HAUtil.isAutomaticFailoverEmbedded(conf)) {
EmbeddedElector elector = createEmbeddedElector();
addIfService(elector);
rmContext.setLeaderElectorService(elector);
}
}
protected EmbeddedElector createEmbeddedElector() throws IOException {
EmbeddedElector elector;
//yarn.resourcemanager.ha.curator-leader-elector.enabled,默认为false
curatorEnabled = conf.getBoolean(YarnConfiguration.CURATOR_LEADER_ELECTOR,
YarnConfiguration.DEFAULT_CURATOR_LEADER_ELECTOR_ENABLED);
if (curatorEnabled) {
this.zkManager = createAndStartZKManager(conf);
elector = new CuratorBasedElectorService(this);
} else {
elector = new ActiveStandbyElectorBasedElectorService(this);
}
return elector;
}
ActiveElectorBasedElectorService.serviceInit()
protected void serviceInit(Configuration conf)
throws Exception {
conf = conf instanceof YarnConfiguration
? conf
: new YarnConfiguration(conf);
String zkQuorum = conf.get(YarnConfiguration.RM_ZK_ADDRESS);
if (zkQuorum == null) {
throw new YarnRuntimeException("Embedded automatic failover " +
"is enabled, but " + YarnConfiguration.RM_ZK_ADDRESS +
" is not set");
}
String rmId = HAUtil.getRMHAId(conf);
String clusterId = YarnConfiguration.getClusterId(conf);
localActiveNodeInfo = createActiveNodeInfo(clusterId, rmId);
String zkBasePath = conf.get(YarnConfiguration.AUTO_FAILOVER_ZK_BASE_PATH,
YarnConfiguration.DEFAULT_AUTO_FAILOVER_ZK_BASE_PATH);
String electionZNode = zkBasePath + "/" + clusterId;
zkSessionTimeout = conf.getLong(YarnConfiguration.RM_ZK_TIMEOUT_MS,
YarnConfiguration.DEFAULT_RM_ZK_TIMEOUT_MS);
List<ACL> zkAcls = ZKCuratorManager.getZKAcls(conf);
List<ZKUtil.ZKAuthInfo> zkAuths = ZKCuratorManager.getZKAuths(conf);
int maxRetryNum =
conf.getInt(YarnConfiguration.RM_HA_FC_ELECTOR_ZK_RETRIES_KEY, conf
.getInt(CommonConfigurationKeys.HA_FC_ELECTOR_ZK_OP_RETRIES_KEY,
CommonConfigurationKeys.HA_FC_ELECTOR_ZK_OP_RETRIES_DEFAULT));
elector = new ActiveStandbyElector(zkQuorum, (int) zkSessionTimeout, electionZNode, zkAcls, zkAuths, this, maxRetryNum, false);
//递归在zk上面创建znode,包括所有的父节点。
elector.ensureParentZNode();
if (!isParentZnodeSafe(clusterId)) {
notifyFatalError(String.format("invalid data in znode, %s, " +
"which may require the state store to be reformatted",
electionZNode));
}
super.serviceInit(conf);
}
上述service的初始化过程的主要动作就是创建了一个ActiveStandbyElector对象,并且调用该对象的ensureParentZnode方法在zk上面创建出相应的znode(用来控制主备)。
初始化完成后,后面的步骤就是serviceStart了。
@Override
protected void serviceStart() throws Exception {
elector.joinElection(localActiveNodeInfo);
super.serviceStart();
}
@Override
protected void serviceStop() throws Exception {
/**
* When error occurs in serviceInit(), serviceStop() can be called.
* We need null check for the case.
*/
if (elector != null) {
elector.quitElection(false);
elector.terminateConnection();
}
super.serviceStop();
}
@Override
public void becomeActive() throws ServiceFailedException {
cancelDisconnectTimer();
try {
rm.getRMContext().getRMAdminService().transitionToActive(req);
} catch (Exception e) {
throw new ServiceFailedException("RM could not transition to Active", e);
}
}
@Override
public void becomeStandby() {
cancelDisconnectTimer();
try {
rm.getRMContext().getRMAdminService().transitionToStandby(req);
} catch (Exception e) {
LOG.error("RM could not transition to Standby", e);
}
}
ActiveStandbyElector.joinElection()
public synchronized void joinElection(byte[] data)
throws HadoopIllegalArgumentException {
if (data == null) {
throw new HadoopIllegalArgumentException("data cannot be null");
}
if (wantToBeInElection) {
LOG.info("Already in election. Not re-connecting.");
return;
}
appData = new byte[data.length];
System.arraycopy(data, 0, appData, 0, data.length);
if (LOG.isDebugEnabled()) {
LOG.debug("Attempting active election for " + this);
}
joinElectionInternal();
}
private void joinElectionInternal() {
Preconditions.checkState(appData != null,
"trying to join election without any app data");
if (zkClient == null) {
if (!reEstablishSession()) {
fatalError("Failed to reEstablish connection with ZooKeeper");
return;
}
}
createRetryCount = 0;
wantToBeInElection = true;
createLockNodeAsync();
}
private void createLockNodeAsync() {
zkClient.create(zkLockFilePath, appData, zkAcl, CreateMode.EPHEMERAL,
this, zkClient);
}
上面就是在zk上面创建yarn用来控制ha文件的整个过程。
仅仅创建了当然不算完,要做到服务不中断,在主节点挂掉之后,备节点能够第一时间升到主。那么显然需要一个线程持续不断的监控zk上面这个ha文件的状态。如果该文件不存在,则立即创建,并且将将当前的ResourceManager变成Active。
ActiveStandbyElector.WatcherWithClientRef.process()里面会对zk监控节点的事件进行处理。
那么问题来了watcher是在哪里初始化的,以及process是哪里调用的。
1-初始化
ActiveElectorBasedElectorService.serviceInit()的方法中elector对象即ActiveStandbyElector对象,该构造方法里面会创建一个watcher对象。
WatcherWithClientRef实现了Zookeeper中的Watcher接口,用来监听znode。
ActiveElectorBasedElectorService.serviceInit()
---ActiveStandbyElector.ActiveStandbyElector()
------ActiveStandbyElector.createConnection()
---------ActiveStandbyElector.createToZooKeeper()// Get a new zookeeper client instance
------------new WatcherWithClientRef()
即每一次连接ZK的时候都要创建一个watcher,并且调用watcher.waitForZKConnectionEvent()方法,用来等待zk返回的事件。
接收到zk的事件之后会回调watcher的process方法。
ZooKeeper.ZooKeeper(String connectString, int sessionTimeout, Watcher watcher)
---ZooKeeper.ZooKeeper(String connectString,int sessionTimeout,Watcher watcher,boolean canBeReadOnly)
------ZooKeeper.ZooKeeper(String connectString,int sessionTimeout,Watcher watcher,boolean canBeReadOnly, HostProvider aHostProvider)
---------ZooKeeper.ZooKeeper(String connectString,int sessionTimeout,Watcher watcher,boolean canBeReadOnly, HostProvider aHostProvider,ZKClientConfig clientConfig)
------------createConnection()
public ZooKeeper(
String connectString,
int sessionTimeout,
Watcher watcher,
boolean canBeReadOnly,
HostProvider aHostProvider,
ZKClientConfig clientConfig) throws IOException {
LOG.info(
"Initiating client connection, connectString={} sessionTimeout={} watcher={}",
connectString,
sessionTimeout,
watcher);
if (clientConfig == null) {
clientConfig = new ZKClientConfig();
}
this.clientConfig = clientConfig;
watchManager = defaultWatchManager();
//这里的watcher就是创建zk客户端的时候传入的,例如rm里面创建的类型为:ActiveStandbyElector.WatcherWithClientRef
watchManager.defaultWatcher = watcher;
ConnectStringParser connectStringParser = new ConnectStringParser(connectString);
hostProvider = aHostProvider;
cnxn = createConnection(
connectStringParser.getChrootPath(),
hostProvider,
sessionTimeout,
this,
watchManager,
getClientCnxnSocket(),
canBeReadOnly);
// Add custom Auth Info to the connection if configured
addCustomAuthInfos();
// 启动ClientCnxn,ClientCnxn并不是一个线程,但是里面有两个线程对象。这里会将这两个线程对象启动
cnxn.start();
}
// @VisibleForTesting
protected ClientCnxn createConnection(
String chrootPath,
HostProvider hostProvider,
int sessionTimeout,
ZooKeeper zooKeeper,
ClientWatchManager watcher,
ClientCnxnSocket clientCnxnSocket,
boolean canBeReadOnly) throws IOException {
return new ClientCnxn(
chrootPath,
hostProvider,
sessionTimeout,
this,
watchManager,
clientCnxnSocket,
canBeReadOnly);
}
public ClientCnxn(
String chrootPath,
HostProvider hostProvider,
int sessionTimeout,
ZooKeeper zooKeeper,
ClientWatchManager watcher,
ClientCnxnSocket clientCnxnSocket,
long sessionId,
byte[] sessionPasswd,
boolean canBeReadOnly) throws IOException {
this.zooKeeper = zooKeeper;
this.watcher = watcher;
this.sessionId = sessionId;
this.sessionPasswd = sessionPasswd;
this.sessionTimeout = sessionTimeout;
this.hostProvider = hostProvider;
this.chrootPath = chrootPath;
connectTimeout = sessionTimeout / hostProvider.size();
readTimeout = sessionTimeout * 2 / 3;
readOnly = canBeReadOnly;
final SendThread sendThread = new SendThread(clientCnxnSocket);
final EventThread eventThread = new EventThread();
this.clientConfig = zooKeeper.getClientConfig();
initRequestTimeout();
this.bindingHelper = new ClientBindingHelper(clientConfig);
clientCnxnSocket.setBindingHelper(this.bindingHelper);
this.principalProvider = new ServerPrincipalProvider(bindingHelper, clientConfig);
}
ClientCnxn.SendThread.run()
---ClientCnxn.queueEvent()//将事件添加到waitingEvents队列中;
ClientCnxn.EventThread.run()
---ClientCnxn.processEvent()//处理waitingEvents队列中的事件;
private void queueEvent(WatchedEvent event, Set<Watcher> materializedWatchers) {
if (event.getType() == EventType.None && sessionState == event.getState()) {
return;
}
sessionState = event.getState();
final Set<Watcher> watchers;
if (materializedWatchers == null) {
// materialize the watchers based on the event
watchers = watcher.materialize(event.getState(), event.getType(), event.getPath());
} else {
watchers = new HashSet<Watcher>();
watchers.addAll(materializedWatchers);
}
WatcherSetEventPair pair = new WatcherSetEventPair(watchers, event);
// queue the pair (watch set & event) for later processing
waitingEvents.add(pair);
}
private void processEvent(Object event) {
try {
if (event instanceof WatcherSetEventPair) {
// each watcher will process the event
WatcherSetEventPair pair = (WatcherSetEventPair) event;
for (Watcher watcher : pair.watchers) {
try {
if (watcher != null && pair.event != null) {
watcher.process(pair.event);
}
} catch (Throwable t) {
LOG.error("Error while calling watcher ", t);
}
}
}
}
ActiveStandbyElector.WatcherWithClientRef.process()
public void process(WatchedEvent event) {
hasReceivedEvent.countDown();
try {
if (!hasSetZooKeeper.await(zkSessionTimeout, TimeUnit.MILLISECONDS)) {
LOG.debug("Event received with stale zk");
}
ActiveStandbyElector.this.processWatchEvent(zk, event);
} catch (Throwable t) {
fatalError(
"Failed to process watcher event " + event + ": " +
StringUtils.stringifyException(t));
}
}
ActiveStandbyElector.processWatchEvent()
/**
* interface implementation of Zookeeper watch events (connection and node),
* proxied by {@link WatcherWithClientRef}.
*/
synchronized void processWatchEvent(ZooKeeper zk, WatchedEvent event) {
Event.EventType eventType = event.getType();
if (isStaleClient(zk)) return;
if (LOG.isDebugEnabled()) {
LOG.debug("Watcher event type: " + eventType + " with state:"
+ event.getState() + " for path:" + event.getPath()
+ " connectionState: " + zkConnectionState
+ " for " + this);
}
if (eventType == Event.EventType.None) {
// the connection state has changed
switch (event.getState()) {
case SyncConnected:
LOG.info("Session connected.");
// if the listener was asked to move to safe state then it needs to
// be undone
ConnectionState prevConnectionState = zkConnectionState;
zkConnectionState = ConnectionState.CONNECTED;
if (prevConnectionState == ConnectionState.DISCONNECTED &&
wantToBeInElection) {
monitorActiveStatus();
}
break;
case Disconnected:
LOG.info("Session disconnected. Entering neutral mode...");
// ask the app to move to safe state because zookeeper connection
// is not active and we dont know our state
zkConnectionState = ConnectionState.DISCONNECTED;
enterNeutralMode();
break;
case Expired:
// the connection got terminated because of session timeout
// call listener to reconnect
LOG.info("Session expired. Entering neutral mode and rejoining...");
enterNeutralMode();
reJoinElection(0);
break;
case SaslAuthenticated:
LOG.info("Successfully authenticated to ZooKeeper using SASL.");
break;
default:
fatalError("Unexpected Zookeeper watch event state: "
+ event.getState());
break;
}
return;
}
// a watch on lock path in zookeeper has fired. so something has changed on
// the lock. ideally we should check that the path is the same as the lock
// path but trusting zookeeper for now
String path = event.getPath();
if (path != null) {
switch (eventType) {
case NodeDeleted:
if (state == State.ACTIVE) {
enterNeutralMode();
}
joinElectionInternal();
break;
case NodeDataChanged:
monitorActiveStatus();
break;
default:
if (LOG.isDebugEnabled()) {
LOG.debug("Unexpected node event: " + eventType + " for path: " + path);
}
monitorActiveStatus();
}
return;
}
// some unexpected error has occurred
fatalError("Unexpected watch error from Zookeeper");
}
ActiveStandbyElector.monitorActiveStatus()
private void monitorActiveStatus() {
assert wantToBeInElection;
if (LOG.isDebugEnabled()) {
LOG.debug("Monitoring active leader for " + this);
}
statRetryCount = 0;
monitorLockNodeAsync();
}
private void monitorLockNodeAsync() {
if (monitorLockNodePending && monitorLockNodeClient == zkClient) {
LOG.info("Ignore duplicate monitor lock-node request.");
return;
}
monitorLockNodePending = true;
monitorLockNodeClient = zkClient;
zkClient.exists(zkLockFilePath,
watcher, this,
zkClient);
}
只要检测到ha的znode数据发生变化事件就会检查该znode是否依然存在,如果不存在那么就会触发watchEvent的NodeDeleted事件。
每次事件过来之后,都会调用Watcher接口实现类的的process方法进行处理。
WatcherWithClientRef implements Watcher
因此,NodeDeleted事件到了之后又会将上面的调用过程执行一遍,本次处理的是NodeDeleted事件。
case NodeDeleted:
if (state == State.ACTIVE) {
enterNeutralMode();
}
joinElectionInternal();
break;
如果当前节点处于active状态,那么进入中立模式,如果当前节点是备节点,那么进入加入选举模式抢主。
- 点赞
- 收藏
- 关注作者
评论(0)