【Hadoop】【Yarn】ResourceManager是如何控制主备的

举报
沙漠里的果果酱 发表于 2023/08/11 18:28:02 2023/08/11
【摘要】 【Hadoop】【Yarn】ResourceManager是如何控制主备的

ResourceManager上面所有的功能点基本都是以service形式实现的。Yarn HA管理也不例外。
在ResourceManager.serviceInit()方法中对其进行了初始化。代码片段如下:

// yarn.resourcemanager.ha.enabled 参数表示Yarn是否开启HA模式。默认值false。
this.rmContext.setHAEnabled(HAUtil.isHAEnabled(this.conf));
if (this.rmContext.isHAEnabled()) {
  HAUtil.verifyAndSetConfiguration(this.conf);
}

// elector必须要在adminService之后添加到service列表中,因为service列表的start是按照加入的顺序来的。而elector依赖于adminservice里面的方法。所以要在adminservice之后被加入service列表。
if (this.rmContext.isHAEnabled()) {
  // If the RM is configured to use an embedded leader elector,
  // initialize the leader elector.
  // yarn.resourcemanager.ha.automatic-failover.enabled为true
  if (HAUtil.isAutomaticFailoverEnabled(conf) && HAUtil.isAutomaticFailoverEmbedded(conf)) {
    EmbeddedElector elector = createEmbeddedElector();
    addIfService(elector);
    rmContext.setLeaderElectorService(elector);
  }
}




protected EmbeddedElector createEmbeddedElector() throws IOException {
  EmbeddedElector elector;
  //yarn.resourcemanager.ha.curator-leader-elector.enabled,默认为false
  curatorEnabled = conf.getBoolean(YarnConfiguration.CURATOR_LEADER_ELECTOR,
          YarnConfiguration.DEFAULT_CURATOR_LEADER_ELECTOR_ENABLED);
  if (curatorEnabled) {
    this.zkManager = createAndStartZKManager(conf);
    elector = new CuratorBasedElectorService(this);
  } else {
    elector = new ActiveStandbyElectorBasedElectorService(this);
  }
  return elector;
}

ActiveElectorBasedElectorService.serviceInit()

protected void serviceInit(Configuration conf)
    throws Exception {
  conf = conf instanceof YarnConfiguration
      ? conf
      : new YarnConfiguration(conf);

  String zkQuorum = conf.get(YarnConfiguration.RM_ZK_ADDRESS);
  if (zkQuorum == null) {
    throw new YarnRuntimeException("Embedded automatic failover " +
        "is enabled, but " + YarnConfiguration.RM_ZK_ADDRESS +
        " is not set");
  }

  String rmId = HAUtil.getRMHAId(conf);
  String clusterId = YarnConfiguration.getClusterId(conf);
  localActiveNodeInfo = createActiveNodeInfo(clusterId, rmId);

  String zkBasePath = conf.get(YarnConfiguration.AUTO_FAILOVER_ZK_BASE_PATH,
      YarnConfiguration.DEFAULT_AUTO_FAILOVER_ZK_BASE_PATH);
  String electionZNode = zkBasePath + "/" + clusterId;

  zkSessionTimeout = conf.getLong(YarnConfiguration.RM_ZK_TIMEOUT_MS,
      YarnConfiguration.DEFAULT_RM_ZK_TIMEOUT_MS);

  List<ACL> zkAcls = ZKCuratorManager.getZKAcls(conf);
  List<ZKUtil.ZKAuthInfo> zkAuths = ZKCuratorManager.getZKAuths(conf);

  int maxRetryNum =
      conf.getInt(YarnConfiguration.RM_HA_FC_ELECTOR_ZK_RETRIES_KEY, conf
        .getInt(CommonConfigurationKeys.HA_FC_ELECTOR_ZK_OP_RETRIES_KEY,
          CommonConfigurationKeys.HA_FC_ELECTOR_ZK_OP_RETRIES_DEFAULT));
  elector = new ActiveStandbyElector(zkQuorum, (int) zkSessionTimeout, electionZNode, zkAcls, zkAuths, this, maxRetryNum, false);
  //递归在zk上面创建znode,包括所有的父节点。
  elector.ensureParentZNode();
  if (!isParentZnodeSafe(clusterId)) {
    notifyFatalError(String.format("invalid data in znode, %s, " +
        "which may require the state store to be reformatted",
        electionZNode));
  }

  super.serviceInit(conf);
}


上述service的初始化过程的主要动作就是创建了一个ActiveStandbyElector对象,并且调用该对象的ensureParentZnode方法在zk上面创建出相应的znode(用来控制主备)。
初始化完成后,后面的步骤就是serviceStart了。

@Override
protected void serviceStart() throws Exception {
  elector.joinElection(localActiveNodeInfo);
  super.serviceStart();
}

@Override
protected void serviceStop() throws Exception {
  /**
   * When error occurs in serviceInit(), serviceStop() can be called.
   * We need null check for the case.
   */
  if (elector != null) {
    elector.quitElection(false);
    elector.terminateConnection();
  }
  super.serviceStop();
}

@Override
public void becomeActive() throws ServiceFailedException {
  cancelDisconnectTimer();

  try {
    rm.getRMContext().getRMAdminService().transitionToActive(req);
  } catch (Exception e) {
    throw new ServiceFailedException("RM could not transition to Active", e);
  }
}

@Override
public void becomeStandby() {
  cancelDisconnectTimer();

  try {
    rm.getRMContext().getRMAdminService().transitionToStandby(req);
  } catch (Exception e) {
    LOG.error("RM could not transition to Standby", e);
  }
}

ActiveStandbyElector.joinElection()

public synchronized void joinElection(byte[] data)
    throws HadoopIllegalArgumentException {
  
  if (data == null) {
    throw new HadoopIllegalArgumentException("data cannot be null");
  }
  
  if (wantToBeInElection) {
    LOG.info("Already in election. Not re-connecting.");
    return;
  }

  appData = new byte[data.length];
  System.arraycopy(data, 0, appData, 0, data.length);

  if (LOG.isDebugEnabled()) {
    LOG.debug("Attempting active election for " + this);
  }
  joinElectionInternal();
}





private void joinElectionInternal() {
  Preconditions.checkState(appData != null,
      "trying to join election without any app data");
  if (zkClient == null) {
    if (!reEstablishSession()) {
      fatalError("Failed to reEstablish connection with ZooKeeper");
      return;
    }
  }

  createRetryCount = 0;
  wantToBeInElection = true;
  createLockNodeAsync();
}




private void createLockNodeAsync() {
  zkClient.create(zkLockFilePath, appData, zkAcl, CreateMode.EPHEMERAL,
      this, zkClient);
}

上面就是在zk上面创建yarn用来控制ha文件的整个过程。

仅仅创建了当然不算完,要做到服务不中断,在主节点挂掉之后,备节点能够第一时间升到主。那么显然需要一个线程持续不断的监控zk上面这个ha文件的状态。如果该文件不存在,则立即创建,并且将将当前的ResourceManager变成Active。

ActiveStandbyElector.WatcherWithClientRef.process()里面会对zk监控节点的事件进行处理。
那么问题来了watcher是在哪里初始化的,以及process是哪里调用的。
1-初始化
       ActiveElectorBasedElectorService.serviceInit()的方法中elector对象即ActiveStandbyElector对象,该构造方法里面会创建一个watcher对象。
       WatcherWithClientRef实现了Zookeeper中的Watcher接口,用来监听znode。
       ActiveElectorBasedElectorService.serviceInit()
       ---ActiveStandbyElector.ActiveStandbyElector()
       ------ActiveStandbyElector.createConnection()
       ---------ActiveStandbyElector.createToZooKeeper()// Get a new zookeeper client instance
       ------------new WatcherWithClientRef()

即每一次连接ZK的时候都要创建一个watcher,并且调用watcher.waitForZKConnectionEvent()方法,用来等待zk返回的事件。
接收到zk的事件之后会回调watcher的process方法。


ZooKeeper.ZooKeeper(String connectString, int sessionTimeout, Watcher watcher)
---ZooKeeper.ZooKeeper(String connectString,int sessionTimeout,Watcher watcher,boolean canBeReadOnly)
------ZooKeeper.ZooKeeper(String connectString,int sessionTimeout,Watcher watcher,boolean canBeReadOnly, HostProvider aHostProvider)
---------ZooKeeper.ZooKeeper(String connectString,int sessionTimeout,Watcher watcher,boolean canBeReadOnly, HostProvider aHostProvider,ZKClientConfig clientConfig)
------------createConnection()

public ZooKeeper(
    String connectString,
    int sessionTimeout,
    Watcher watcher,
    boolean canBeReadOnly,
    HostProvider aHostProvider,
    ZKClientConfig clientConfig) throws IOException {
    LOG.info(
        "Initiating client connection, connectString={} sessionTimeout={} watcher={}",
        connectString,
        sessionTimeout,
        watcher);

    if (clientConfig == null) {
        clientConfig = new ZKClientConfig();
    }
    this.clientConfig = clientConfig;
    watchManager = defaultWatchManager();
    //这里的watcher就是创建zk客户端的时候传入的,例如rm里面创建的类型为:ActiveStandbyElector.WatcherWithClientRef
    watchManager.defaultWatcher = watcher;
    ConnectStringParser connectStringParser = new ConnectStringParser(connectString);
    hostProvider = aHostProvider;

    cnxn = createConnection(
        connectStringParser.getChrootPath(),
        hostProvider,
        sessionTimeout,
        this,
        watchManager,
        getClientCnxnSocket(),
        canBeReadOnly);
    // Add custom Auth Info to the connection if configured
    addCustomAuthInfos();
    // 启动ClientCnxn,ClientCnxn并不是一个线程,但是里面有两个线程对象。这里会将这两个线程对象启动
    cnxn.start();
}

// @VisibleForTesting
protected ClientCnxn createConnection(
    String chrootPath,
    HostProvider hostProvider,
    int sessionTimeout,
    ZooKeeper zooKeeper,
    ClientWatchManager watcher,
    ClientCnxnSocket clientCnxnSocket,
    boolean canBeReadOnly) throws IOException {
    return new ClientCnxn(
        chrootPath,
        hostProvider,
        sessionTimeout,
        this,
        watchManager,
        clientCnxnSocket,
        canBeReadOnly);
}




public ClientCnxn(
    String chrootPath,
    HostProvider hostProvider,
    int sessionTimeout,
    ZooKeeper zooKeeper,
    ClientWatchManager watcher,
    ClientCnxnSocket clientCnxnSocket,
    long sessionId,
    byte[] sessionPasswd,
    boolean canBeReadOnly) throws IOException {
    this.zooKeeper = zooKeeper;
    this.watcher = watcher;
    this.sessionId = sessionId;
    this.sessionPasswd = sessionPasswd;
    this.sessionTimeout = sessionTimeout;
    this.hostProvider = hostProvider;
    this.chrootPath = chrootPath;

    connectTimeout = sessionTimeout / hostProvider.size();
    readTimeout = sessionTimeout * 2 / 3;
    readOnly = canBeReadOnly;

    final SendThread sendThread = new SendThread(clientCnxnSocket);
    final EventThread eventThread = new EventThread();
    this.clientConfig = zooKeeper.getClientConfig();
    initRequestTimeout();
    this.bindingHelper = new ClientBindingHelper(clientConfig);
    clientCnxnSocket.setBindingHelper(this.bindingHelper);
    this.principalProvider = new ServerPrincipalProvider(bindingHelper, clientConfig);
}

ClientCnxn.SendThread.run()
---ClientCnxn.queueEvent()//将事件添加到waitingEvents队列中;

ClientCnxn.EventThread.run()
---ClientCnxn.processEvent()//处理waitingEvents队列中的事件;

private void queueEvent(WatchedEvent event, Set<Watcher> materializedWatchers) {
    if (event.getType() == EventType.None && sessionState == event.getState()) {
        return;
    }
    sessionState = event.getState();
    final Set<Watcher> watchers;
    if (materializedWatchers == null) {
        // materialize the watchers based on the event
        watchers = watcher.materialize(event.getState(), event.getType(), event.getPath());
    } else {
        watchers = new HashSet<Watcher>();
        watchers.addAll(materializedWatchers);
    }
    WatcherSetEventPair pair = new WatcherSetEventPair(watchers, event);
    // queue the pair (watch set & event) for later processing
    waitingEvents.add(pair);
}


private void processEvent(Object event) {
    try {
        if (event instanceof WatcherSetEventPair) {
            // each watcher will process the event
            WatcherSetEventPair pair = (WatcherSetEventPair) event;
            for (Watcher watcher : pair.watchers) {
                try {
                    if (watcher != null && pair.event != null) {
                        watcher.process(pair.event);
                    }
                } catch (Throwable t) {
                    LOG.error("Error while calling watcher ", t);
                }
            }
     }
}

ActiveStandbyElector.WatcherWithClientRef.process()

public void process(WatchedEvent event) {
  hasReceivedEvent.countDown();
  try {
    if (!hasSetZooKeeper.await(zkSessionTimeout, TimeUnit.MILLISECONDS)) {
      LOG.debug("Event received with stale zk");
    }
    ActiveStandbyElector.this.processWatchEvent(zk, event);
  } catch (Throwable t) {
    fatalError(
        "Failed to process watcher event " + event + ": " +
        StringUtils.stringifyException(t));
  }
}

ActiveStandbyElector.processWatchEvent()

/**
 * interface implementation of Zookeeper watch events (connection and node),
 * proxied by {@link WatcherWithClientRef}.
 */
synchronized void processWatchEvent(ZooKeeper zk, WatchedEvent event) {
  Event.EventType eventType = event.getType();
  if (isStaleClient(zk)) return;
  if (LOG.isDebugEnabled()) {
    LOG.debug("Watcher event type: " + eventType + " with state:"
        + event.getState() + " for path:" + event.getPath()
        + " connectionState: " + zkConnectionState
        + " for " + this);
  }

  if (eventType == Event.EventType.None) {
    // the connection state has changed
    switch (event.getState()) {
    case SyncConnected:
      LOG.info("Session connected.");
      // if the listener was asked to move to safe state then it needs to
      // be undone
      ConnectionState prevConnectionState = zkConnectionState;
      zkConnectionState = ConnectionState.CONNECTED;
      if (prevConnectionState == ConnectionState.DISCONNECTED &&
          wantToBeInElection) {
        monitorActiveStatus();
      }
      break;
    case Disconnected:
      LOG.info("Session disconnected. Entering neutral mode...");

      // ask the app to move to safe state because zookeeper connection
      // is not active and we dont know our state
      zkConnectionState = ConnectionState.DISCONNECTED;
      enterNeutralMode();
      break;
    case Expired:
      // the connection got terminated because of session timeout
      // call listener to reconnect
      LOG.info("Session expired. Entering neutral mode and rejoining...");
      enterNeutralMode();
      reJoinElection(0);
      break;
    case SaslAuthenticated:
      LOG.info("Successfully authenticated to ZooKeeper using SASL.");
      break;
    default:
      fatalError("Unexpected Zookeeper watch event state: "
          + event.getState());
      break;
    }

    return;
  }

  // a watch on lock path in zookeeper has fired. so something has changed on
  // the lock. ideally we should check that the path is the same as the lock
  // path but trusting zookeeper for now
  String path = event.getPath();
  if (path != null) {
    switch (eventType) {
    case NodeDeleted:
      if (state == State.ACTIVE) {
        enterNeutralMode();
      }
      joinElectionInternal();
      break;
    case NodeDataChanged:
      monitorActiveStatus();
      break;
    default:
      if (LOG.isDebugEnabled()) {
        LOG.debug("Unexpected node event: " + eventType + " for path: " + path);
      }
      monitorActiveStatus();
    }

    return;
  }

  // some unexpected error has occurred
  fatalError("Unexpected watch error from Zookeeper");
}

ActiveStandbyElector.monitorActiveStatus()

private void monitorActiveStatus() {
  assert wantToBeInElection;
  if (LOG.isDebugEnabled()) {
    LOG.debug("Monitoring active leader for " + this);
  }
  statRetryCount = 0;
  monitorLockNodeAsync();
}

private void monitorLockNodeAsync() {
  if (monitorLockNodePending && monitorLockNodeClient == zkClient) {
    LOG.info("Ignore duplicate monitor lock-node request.");
    return;
  }
  monitorLockNodePending = true;
  monitorLockNodeClient = zkClient;
  zkClient.exists(zkLockFilePath,
      watcher, this,
      zkClient);
}


只要检测到ha的znode数据发生变化事件就会检查该znode是否依然存在,如果不存在那么就会触发watchEvent的NodeDeleted事件。
每次事件过来之后,都会调用Watcher接口实现类的的process方法进行处理。
WatcherWithClientRef implements Watcher

因此,NodeDeleted事件到了之后又会将上面的调用过程执行一遍,本次处理的是NodeDeleted事件。

case NodeDeleted:
  if (state == State.ACTIVE) {
    enterNeutralMode();
  }
  joinElectionInternal();
  break;

如果当前节点处于active状态,那么进入中立模式,如果当前节点是备节点,那么进入加入选举模式抢主。

【版权声明】本文为华为云社区用户原创内容,未经允许不得转载,如需转载请自行联系原作者进行授权。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。