【Hadoop】【Yarn】Hadoop中ShutdownHook的使用
Hadoop中所有的进程的主线程都会被添加到ShutdownHook中。其作用是当进程的jvm退出可以让进程以一种比较优雅的方式退出。
根据注册的shudownhook(线程)的顺序来决定以一种特定的顺序退出。下面是java doc中对shutdown hook官方解释。
A shutdown hook is simply an initialized but unstarted thread. When the virtual machine begins its shutdown sequence it will start all registered shutdown hooks in some unspecified order and let them run concurrently. When all the hooks have finished it will then run all uninvoked finalizers if finalization-on-exit has been enabled. Finally, the virtual machine will halt. Note that daemon threads will continue to run during the shutdown sequence, as will non-daemon threads if shutdown was initiated by invoking the exit method.
Once the shutdown sequence has begun it can be stopped only by invoking the halt method, which forcibly terminates the virtual machine.
使用方式也很简单:
nodeManagerShutdownHook = new CompositeServiceShutdownHook(this);
ShutdownHookManager.get().addShutdownHook(nodeManagerShutdownHook, SHUTDOWN_HOOK_PRIORITY);
这里以NodeManager为例,启动的时候会以NodeManager(即上面的this)创建一个shutdownhook线程,然后添加到ShutdownHookManager.hooks集合中。
那么问题来了,上面说了shutdownhook是jvm的机制,到现在为止都会业务层面的东西,也没有看到哪里和jvm的shutdownhook扯上关系啊?
其实一切都在Hadoop的ShutdownHookManager类的定义中。该类有下面一段静态声明:
static {
try {
Runtime.getRuntime().addShutdownHook(
new Thread() {
@Override
public void run() {
//shutdown注册的hook线程
int timeoutCount = MGR.executeShutdown();
}
}
);
}
}
ShutdownHoookManager.executeShutdown()
int executeShutdown() {
int timeouts = 0;
for (HookEntry entry: getShutdownHooksInOrder()) {
Future<?> future = EXECUTOR.submit(entry.getHook());
try {
future.get(entry.getTimeout(), entry.getTimeUnit());
} catch (TimeoutException ex) {
timeouts++;
future.cancel(true);
LOG.warn("ShutdownHook '" + entry.getHook().getClass().
getSimpleName() + "' timeout, " + ex.toString(), ex);
} catch (Throwable ex) {
LOG.warn("ShutdownHook '" + entry.getHook().getClass().
getSimpleName() + "' failed, " + ex.toString(), ex);
}
}
return timeouts;
}
这里的逻辑其实很简单就是把上面注册的hooks 线程全部拿出来然后遍历运行。
按照注册钩子时的优先级进行遍历,优先级高的先执行。
List<HookEntry> getShutdownHooksInOrder() {
List<HookEntry> list;
synchronized (hooks) {
list = new ArrayList<>(hooks);
}
Collections.sort(list, new Comparator<HookEntry>() {
//reversing comparison so highest priority hooks are first
@Override
public int compare(HookEntry o1, HookEntry o2) {
return o2.priority - o1.priority;
}
});
return list;
}
每一个hook都是一个线程,其实shutdownhook就是依次执行hook线程。
即CompositeService.CompositeServiceShutdownHook.run()
public void run() {
ServiceOperations.stopQuietly(compositeService);
}
ServiceOperation.stopQuietly()
public static Exception stopQuietly(Logger log, Service service) {
try {
stop(service);
} catch (Exception e) {
log.warn("When stopping the service {}", service.getName(), e);
return e;
}
return null;
}
public static void stop(Service service) {
if (service != null) {
service.stop();
}
}
以NodeManager为例,其实就是NodeManager的stop方法。NodeManager本身没有stop方法,就会调用父类
AbstractService.stop()
---NodeManager.serviceStop()
protected void serviceStop() throws Exception {
if (isStopping.getAndSet(true)) {
return;
}
try {
super.serviceStop();
DefaultMetricsSystem.shutdown();
NMAuditLogger.logSuccess(getUserName(), "nmShutdown", "NodeManager");
if (null != context) {
context.getContainerExecutor().stop();
// Cleanup ResourcePluginManager
ResourcePluginManager rpm = context.getResourcePluginManager();
if (rpm != null) {
rpm.cleanup();
}
}
} finally {
// YARN-3641: NM's services stop get failed shouldn't block the
// release of NMLevelDBStore.
stopRecoveryStore();
}
}
- 点赞
- 收藏
- 关注作者
评论(0)