为什么java线程池的submit的不抛出异常

举报
经典鸡翅 发表于 2022/02/17 22:09:03 2022/02/17
【摘要】 前言 大家好,这里是经典鸡翅,最近有人问我线程池的execute和submit,有的抛出异常,有的不抛出异常,这里鸡翅老哥给大家整理下。通过源码跟踪跟大家讲解。 两个方法 我们先用最普通的方式定义一个线程池。 ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecut...

前言

大家好,这里是经典鸡翅,最近有人问我线程池的execute和submit,有的抛出异常,有的不抛出异常,这里鸡翅老哥给大家整理下。通过源码跟踪跟大家讲解。

两个方法

我们先用最普通的方式定义一个线程池。

ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(5, 10, 5, TimeUnit.SECONDS, new LinkedBlockingDeque<>(1024), new ThreadPoolExecutor.AbortPolicy());
 

线程池有两种执行任务的方式,一种是execute方法,一种是submit方法。

我们引入一个最简单的例子。执行一个有异常的方法。


  
  1. @Test
  2. public void test() throws ExecutionException, InterruptedException {
  3. ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(5, 10, 5, TimeUnit.SECONDS, new LinkedBlockingDeque<>(1024), new ThreadPoolExecutor.AbortPolicy());
  4. threadPoolExecutor.execute(() -> {
  5. testThread();
  6. });
  7. }
  8. private int testThread() {
  9. int i = 1 / 0;
  10. return i;
  11. }

如上的一段代码,由于1/0的存在,必定是会出现异常的,我们执行后,看到结果,是有异常的。符合我们的预期。


  
  1. Exception in thread "pool-1-thread-1" java.lang.ArithmeticException: / by zero
  2. at com.hq.hqframe.textUtil.TextUtilTest.testThread(TextUtilTest.java:104)
  3. at com.hq.hqframe.textUtil.TextUtilTest.lambda$test$5(TextUtilTest.java:99)
  4. at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
  5. at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
  6. at java.lang.Thread.run(Thread.java:748)

如果我们使用submit提交任务呢?


  
  1. @Test
  2. public void test() throws ExecutionException, InterruptedException {
  3. ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(5, 10, 5, TimeUnit.SECONDS, new LinkedBlockingDeque<>(1024), new ThreadPoolExecutor.AbortPolicy());
  4. threadPoolExecutor.submit(() -> {
  5. testThread();
  6. });
  7. }
  8. private int testThread() {
  9. int i = 1 / 0;
  10. return i;
  11. }

执行代码我们发现,控制台并没有打印出任何异常。异常被吞了。这就是execute和submit的第一个区别。

为什么submit不会抛出异常呢?

我们开始跟踪一下,为什么submit不会抛出异常。先看submit的第一个方法。


  
  1. public Future<?> submit(Runnable task) {
  2. if (task == null) throw new NullPointerException();
  3. RunnableFuture<Void> ftask = newTaskFor(task, null);
  4. execute(ftask);
  5. return ftask;
  6. }

将task进行包装为FutureTask。这里面的task是runnable的,我们将其包装成callable形式的。通过runnableadapter。


  
  1. static final class RunnableAdapter<T> implements Callable<T> {
  2. final Runnable task;
  3. final T result;
  4. RunnableAdapter(Runnable task, T result) {
  5. this.task = task;
  6. this.result = result;
  7. }
  8. public T call() {
  9. task.run();
  10. return result;
  11. }
  12. }

包装后,执行这个task。


  
  1. public void execute(Runnable command) {
  2. if (command == null)
  3. throw new NullPointerException();
  4. /*
  5. * Proceed in 3 steps:
  6. *
  7. * 1. If fewer than corePoolSize threads are running, try to
  8. * start a new thread with the given command as its first
  9. * task. The call to addWorker atomically checks runState and
  10. * workerCount, and so prevents false alarms that would add
  11. * threads when it shouldn't, by returning false.
  12. *
  13. * 2. If a task can be successfully queued, then we still need
  14. * to double-check whether we should have added a thread
  15. * (because existing ones died since last checking) or that
  16. * the pool shut down since entry into this method. So we
  17. * recheck state and if necessary roll back the enqueuing if
  18. * stopped, or start a new thread if there are none.
  19. *
  20. * 3. If we cannot queue task, then we try to add a new
  21. * thread. If it fails, we know we are shut down or saturated
  22. * and so reject the task.
  23. */
  24. int c = ctl.get();
  25. if (workerCountOf(c) < corePoolSize) {
  26. if (addWorker(command, true))
  27. return;
  28. c = ctl.get();
  29. }
  30. if (isRunning(c) && workQueue.offer(command)) {
  31. int recheck = ctl.get();
  32. if (! isRunning(recheck) && remove(command))
  33. reject(command);
  34. else if (workerCountOf(recheck) == 0)
  35. addWorker(null, false);
  36. }
  37. else if (!addWorker(command, false))
  38. reject(command);
  39. }

我们直接将目光聚焦到addWorker,这里面干了一个什么事呢?重点建造了一个Worker类。把当前的task传入进去。Worker类里面把自己的task作为一个线程赋值。


  
  1. Worker(Runnable firstTask) {
  2. setState(-1); // inhibit interrupts until runWorker
  3. this.firstTask = firstTask;
  4. this.thread = getThreadFactory().newThread(this);
  5. }

回到addWorker处,我们可以看到start方法,这个方法,实际调用的就是线程的run方法。也就是我们worker类里面自己实现的run方法。


  
  1. public void run() {
  2. runWorker(this);
  3. }

再看下runworker方法,我们发现里面继续调用了task.run方法。


  
  1. final void runWorker(Worker w) {
  2. Thread wt = Thread.currentThread();
  3. Runnable task = w.firstTask;
  4. w.firstTask = null;
  5. w.unlock(); // allow interrupts
  6. boolean completedAbruptly = true;
  7. try {
  8. while (task != null || (task = getTask()) != null) {
  9. w.lock();
  10. // If pool is stopping, ensure thread is interrupted;
  11. // if not, ensure thread is not interrupted. This
  12. // requires a recheck in second case to deal with
  13. // shutdownNow race while clearing interrupt
  14. if ((runStateAtLeast(ctl.get(), STOP) ||
  15. (Thread.interrupted() &&
  16. runStateAtLeast(ctl.get(), STOP))) &&
  17. !wt.isInterrupted())
  18. wt.interrupt();
  19. try {
  20. beforeExecute(wt, task);
  21. Throwable thrown = null;
  22. try {
  23. task.run();
  24. } catch (RuntimeException x) {
  25. thrown = x; throw x;
  26. } catch (Error x) {
  27. thrown = x; throw x;
  28. } catch (Throwable x) {
  29. thrown = x; throw new Error(x);
  30. } finally {
  31. afterExecute(task, thrown);
  32. }
  33. } finally {
  34. task = null;
  35. w.completedTasks++;
  36. w.unlock();
  37. }
  38. }
  39. completedAbruptly = false;
  40. } finally {
  41. processWorkerExit(w, completedAbruptly);
  42. }
  43. }

我们就可以重新追踪到futureTask。


  
  1. public void run() {
  2. if (state != NEW ||
  3. !UNSAFE.compareAndSwapObject(this, runnerOffset,
  4. null, Thread.currentThread()))
  5. return;
  6. try {
  7. Callable<V> c = callable;
  8. if (c != null && state == NEW) {
  9. V result;
  10. boolean ran;
  11. try {
  12. result = c.call();
  13. ran = true;
  14. } catch (Throwable ex) {
  15. result = null;
  16. ran = false;
  17. setException(ex);
  18. }
  19. if (ran)
  20. set(result);
  21. }
  22. } finally {
  23. // runner must be non-null until state is settled to
  24. // prevent concurrent calls to run()
  25. runner = null;
  26. // state must be re-read after nulling runner to prevent
  27. // leaked interrupts
  28. int s = state;
  29. if (s >= INTERRUPTING)
  30. handlePossibleCancellationInterrupt(s);
  31. }

此时我们发现,异常已经被捕获了,并且调用了setexception方法。把我们的异常信息给了outcome。


  
  1. protected void setException(Throwable t) {
  2. if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
  3. outcome = t;
  4. UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state
  5. finishCompletion();
  6. }
  7. }

到此我们就明白了为什么submit没有抛出异常了!

文章来源: blog.csdn.net,作者:经典鸡翅,版权归原作者所有,如需转载,请联系作者。

原文链接:blog.csdn.net/hanqing456/article/details/122293430

【版权声明】本文为华为云社区用户转载文章,如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。