源码剖析ThreadPoolExecutor线程池及阻塞队列

举报
努力的小雨 发表于 2023/12/28 09:57:17 2023/12/28
【摘要】 本文章对ThreadPoolExecutor线程池的底层源码进行分析,线程池如何起到了线程复用、又是如何进行维护我们的线程任务的呢?我们直接进入正题:首先我们看一下ThreadPoolExecutor类的源码 1 public ThreadPoolExecutor(int corePoolSize, 2 int maximumPoo...

本文章对ThreadPoolExecutor线程池的底层源码进行分析,线程池如何起到了线程复用、又是如何进行维护我们的线程任务的呢?我们直接进入正题:

首先我们看一下ThreadPoolExecutor类的源码

 1 public ThreadPoolExecutor(int corePoolSize,
 2                               int maximumPoolSize,
 3                               long keepAliveTime,
 4                               TimeUnit unit,
 5                               BlockingQueue<Runnable> workQueue, 
 6                               ThreadFactory threadFactory,
 7                               RejectedExecutionHandler handler) { //拒绝策略
 8         if (corePoolSize < 0 ||
 9             maximumPoolSize <= 0 ||
10             maximumPoolSize < corePoolSize ||
11             keepAliveTime < 0)
12             throw new IllegalArgumentException();
13         if (workQueue == null || threadFactory == null || handler == null)
14             throw new NullPointerException();
15         this.acc = System.getSecurityManager() == null ?
16                 null :
17                 AccessController.getContext();
18         //核心线程
19         this.corePoolSize = corePoolSize;
20         //最大线程数
21         this.maximumPoolSize = maximumPoolSize;
22         //阻塞队列,即今天主题
23         this.workQueue = workQueue;
24         //超时时间
25         this.keepAliveTime = unit.toNanos(keepAliveTime);
26         this.threadFactory = threadFactory;
27         //拒绝策略
28         this.handler = handler;
29     }

这是我们线程池实例化的时候的参数,其实最大的实用性来说,就是核心线程与最大线程数的设定,这个完全靠个人经验,并没有一个真正意义上的公式可以适用所有的业务场景,这里博主为大家找了一篇关于设定线程数的文章:

https://tech.meituan.com/2020/04/02/java-pooling-pratice-in-meituan.html

我们的线程池初始化好后,我们自己会调用excute方法来让线程池运行我们的线程任务,那我们就先来看看这个方法的实现:

 1 public void execute(Runnable command) {
 2         if (command == null)
 3             throw new NullPointerException();
 4         /*
 5          * 第一步:工作线程是否小于核心线程数量,如果是添加work中,worker其实也是一个线程,只不过它内部操作的是我们的上传的任务
 6          * 第二步:如果大于核心线程数量,添加到worker队列中,每一个不同的队列offer的实现方法也是不一样的,今天我们主要探讨这个
 7          * 第三步:阻塞队列被塞满了,需要创建新的非核心线程数量worker线程去处理我们的任务,创建worker线程失败了会触发拒绝策略,默认抛异常
 8          */
 9         int c = ctl.get();
10         if (workerCountOf(c) < corePoolSize) {
11             if (addWorker(command, true))
12                 return;
13             c = ctl.get();
14         }
15         if (isRunning(c) && workQueue.offer(command)) {
16             int recheck = ctl.get();
17             if (! isRunning(recheck) && remove(command))
18                 reject(command);
19             else if (workerCountOf(recheck) == 0)
20                 addWorker(null, false);
21         }
22         else if (!addWorker(command, false))
23             reject(command);
24     }
25     

我们看到当任务调用的时候,会执行addworker,那么worker是个什么东西呢?我们来看看它的构造实例:我们看一下worker类,就发现其实worker也是一个线程

 1 private final class Worker
 2         extends AbstractQueuedSynchronizer
 3         implements Runnable
 4     {
 5     
 6     ......
 7     
 8     Worker(Runnable firstTask) {
 9             setState(-1); // inhibit interrupts until runWorker
10             this.firstTask = firstTask;
11             this.thread = getThreadFactory().newThread(this);
12         }
13 
14         /** 覆盖执行run方法
15           */
16         public void run() {
17             runWorker(this);
18         }
19     ......
20     
21     }

这次我们来看一下addworker是怎么操作的:

 1 private boolean addWorker(Runnable firstTask, boolean core) {
 2         retry:
 3         for (;;) {
 4             int c = ctl.get();
 5             int rs = runStateOf(c);
 6 
 7             // Check if queue empty only if necessary.
 8             if (rs >= SHUTDOWN &&
 9                 ! (rs == SHUTDOWN &&
10                    firstTask == null &&
11                    ! workQueue.isEmpty()))
12                 return false;
13 
14             for (;;) {
15                 int wc = workerCountOf(c);
16                 if (wc >= CAPACITY ||
17                     //不允许创建大于最大核心线程数的任务
18                     wc >= (core ? corePoolSize : maximumPoolSize))
19                     return false;
20                 if (compareAndIncrementWorkerCount(c))
21                     break retry;
22                 c = ctl.get();  // Re-read ctl
23                 if (runStateOf(c) != rs)
24                     continue retry;
25                 // else CAS failed due to workerCount change; retry inner loop
26             }
27         }
28 
29         boolean workerStarted = false;
30         boolean workerAdded = false;
31         Worker w = null;
32         try {
33             //主要的创建worker过程是在这里
34             w = new Worker(firstTask);
35             final Thread t = w.thread;
36             if (t != null) {
37                 final ReentrantLock mainLock = this.mainLock;
38                 mainLock.lock();
39                 try {
40                     // Recheck while holding lock.
41                     // Back out on ThreadFactory failure or if
42                     // shut down before lock acquired.
43                     int rs = runStateOf(ctl.get());
44 
45                     if (rs < SHUTDOWN ||
46                         (rs == SHUTDOWN && firstTask == null)) {
47                         if (t.isAlive()) // precheck that t is startable
48                             throw new IllegalThreadStateException();
49                         workers.add(w);
50                         int s = workers.size();
51                         if (s > largestPoolSize)
52                             largestPoolSize = s;
53                         workerAdded = true;
54                     }
55                 } finally {
56                     mainLock.unlock();
57                 }
58                 if (workerAdded) {
59                     //此处调用的是worker线程的start方法,并没有直接调用我们的 任务
60                     //上面我们看worker的run方法了,里面调用的 是runWorker,那我们看看runWorker方法就可以了
61                     t.start();
62                     workerStarted = true;
63                 }
64             }
65         } finally {
66             if (! workerStarted)
67                 addWorkerFailed(w);
68         }
69         return workerStarted;
70     }
71     

到这里添加完毕后,我们在看看它是是如何执行我们的线程的,来看看runworker方法实现:

 1 final void runWorker(Worker w) {
 2         Thread wt = Thread.currentThread();
 3         Runnable task = w.firstTask;
 4         w.firstTask = null;
 5         w.unlock(); // allow interrupts
 6         boolean completedAbruptly = true;
 7         try {
 8             //这里体现的是线程的复用,复用的是worker线程,每处理一个线程都会getTask()从队列中取一个任务进行处理
 9             while (task != null || (task = getTask()) != null) {
10                 w.lock();
11                 // If pool is stopping, ensure thread is interrupted;
12                 // if not, ensure thread is not interrupted.  This
13                 // requires a recheck in second case to deal with
14                 // shutdownNow race while clearing interrupt
15                 if ((runStateAtLeast(ctl.get(), STOP) ||
16                      (Thread.interrupted() &&
17                       runStateAtLeast(ctl.get(), STOP))) &&
18                     !wt.isInterrupted())
19                     wt.interrupt();
20                 try {
21                     beforeExecute(wt, task);
22                     Throwable thrown = null;
23                     try {
24                         //直接调用我们任务的run方法,我们任务虽然是继承了runable,但是并没有调用start方法
25                         //其实我们的线程放入线程池中,并不是让我们的线程运行,仅仅是定义了一个方法体,
26                         //真正运行的是被线程池管理的worker线程
27                         task.run();
28                     } catch (RuntimeException x) {
29                         thrown = x; throw x;
30                     } catch (Error x) {
31                         thrown = x; throw x;
32                     } catch (Throwable x) {
33                         thrown = x; throw new Error(x);
34                     } finally {
35                         afterExecute(task, thrown);
36                     }
37                 } finally {
38                     task = null;
39                     w.completedTasks++;
40                     w.unlock();
41                 }
42             }
43             completedAbruptly = false;
44         } finally {
45             //回收线程,释放资源
46             processWorkerExit(w, completedAbruptly);
47         }
48     }

这个时候,大家应该就解决了一个问题就是,线程池如何体现的线程复用,就在gettask那里体现的,复用的就是worker线程,好了,这个时候不仅worker创建完成了,并且直接调用start方法,让自己开始运行起来,执行本次添加的任务,但是细心的小伙伴会看到参数传入的核心线程为true,那么此时也仅仅启动了核心线程,那么超过核心线程数的就应该加入到队列中,那么有什么队列供我们选择呢?

所以我们接下来看BlockingQueue的offer、poll(如果设置超时时间)、take方法,BlockingQueue有很多实现类,我们主要看以下几个:
  ArrayBlockingQueue、LinkedBlockingQueue、PriorityBlockingQueue、SynchronousQueue、DelayQueue五种BlockingQueue;

我们首先来说一说ArrayBlockingQueue,我们看看其构造函数

 1 public ArrayBlockingQueue(int capacity) {
 2 //必须指定队列大小,默认非公平锁
 3         this(capacity, false);
 4     }
 5 
 6 public ArrayBlockingQueue(int capacity, boolean fair) {
 7         if (capacity <= 0)
 8             throw new IllegalArgumentException();
 9         this.items = new Object[capacity];
10         lock = new ReentrantLock(fair);
11         notEmpty = lock.newCondition();
12         notFull =  lock.newCondition();
13     }
public boolean offer(E e) {
        checkNotNull(e);
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {//超过队列大小,将不继续存放,返回false创建worker线程
            if (count == items.length)
                return false;
            else {
                //数组后添加任务元素,使用到了循环数组的算法
                enqueue(e);
                return true;
            }
        } finally {
            lock.unlock();
        }
    }
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
        long nanos = unit.toNanos(timeout);
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            while (count == 0) {
                if (nanos <= 0)
                    return null;
                //等待时间,如果超过默认1000微妙,则将阻塞当前线程,等待添加任务时将其唤醒或者等待超时
                nanos = notEmpty.awaitNanos(nanos);
            }
            return dequeue();
        } finally {
            lock.unlock();
        }
    }
public E take() throws InterruptedException {
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            while (count == 0)
                //等待被唤醒,无超时时间
                notEmpty.await();
            return dequeue();
        } finally {
            lock.unlock();
        }
    }

ArrayBlockingQueue讲解完了,再来看看LinkedBlockingQueue:三个方法对于任务的存放与取出与ArrayBlockingQueue并无太大差别,我们就不做太多的讲解,简单说一下,主要的就是节点阻塞队列与数组阻塞队列所用到的锁机制不一样,主要是因为数组是一个对象,而节点操作的则是对头与队尾节点,所以用到了两个taskLock与putLock锁,两者在对于队列的取与添加并不会产生冲突

public LinkedBlockingQueue() {
       //链表类型虽然不用指定大小队列,但是默认时int的最大值,实际场景下会引发内存溢出问题
        this(Integer.MAX_VALUE);
    }

 public LinkedBlockingQueue(int capacity) {
        if (capacity <= 0) throw new IllegalArgumentException();
        this.capacity = capacity;
        last = head = new Node<E>(null);
    }

PriorityBlockingQueue,总体来说,是具有优先级考虑的任务队列,因为任务需要实现comparator接口,先看看其构造器吧;

public PriorityBlockingQueue() {
//默认11长度大小,无比较器
        this(DEFAULT_INITIAL_CAPACITY, null);
    }

public PriorityBlockingQueue(int initialCapacity) {
//可指定长度大小
        this(initialCapacity, null);
    }

public PriorityBlockingQueue(int initialCapacity,
                                 Comparator<? super E> comparator) {
        if (initialCapacity < 1)
            throw new IllegalArgumentException();
        this.lock = new ReentrantLock();
        this.notEmpty = lock.newCondition();
//可指定比较器
        this.comparator = comparator;
        this.queue = new Object[initialCapacity];
    }
public boolean offer(E e) {
        if (e == null)
            throw new NullPointerException();
        final ReentrantLock lock = this.lock;
        lock.lock();
        int n, cap;
        Object[] array;
        while ((n = size) >= (cap = (array = queue).length))
//在这里队列的长度不再固定,而是实现了自动扩展
            tryGrow(array, cap);
        try {
            Comparator<? super E> cmp = comparator;
//默认与不默认都会使用comparator接口让数组进行比较,使优先级高的在数组最前面
            if (cmp == null)
                siftUpComparable(n, e, array);
            else
                siftUpUsingComparator(n, e, array, cmp);
            size = n + 1;
            notEmpty.signal();
        } finally {
            lock.unlock();
        }
        return true;
    }
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
        long nanos = unit.toNanos(timeout);
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        E result;
        try {
//每次取出任务的时候都会进行优先级比较,放到数组的第一个
            while ( (result = dequeue()) == null && nanos > 0)
                nanos = notEmpty.awaitNanos(nanos);
        } finally {
            lock.unlock();
        }
        return result;
    }
public E take() throws InterruptedException {
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        E result;
        try {
//同理,也会进行优先级比较,虽然每次都比较但是在添加任务元素的时候已经是排好序的了
            while ( (result = dequeue()) == null)
                notEmpty.await();
        } finally {
            lock.unlock();
        }
        return result;
    }

SynchronousQueue是一个比较特殊的队列,固定长度为1,并且可以说是实时进行任务运行,并且必须已经有worker任务结束在获取其他任务的时候才会在队列中添加任务元素,否则一直为返回null,非常适合在一个请求需要同时拉去多个服务的场景;

    public SynchronousQueue() {
        this(false);
    }

 public SynchronousQueue(boolean fair) {
        transferer = fair ? new TransferQueue<E>() : new TransferStack<E>();
    }

并且是上面提到的offer、take、poll三个方法的操作都是一个transfer方法进行操作的,我们主要看一下这两个类在这个方法上有哪些区别;首先看一下结构简单的TransferQueue;

//存数据的时候,参数为transfer(e, true, 0)
//取数据的时候,参数为transfer(null, false, 0)
E transfer(E e, boolean timed, long nanos) {
//作者认为这个方法主要做了一下几件事情:
//1、根据传进来的任务参数,判断当前是取数据还是放数据
//2、如果是存数据,判断当前队列是否是空队列,如果是则返回false,线程池则会创建新的worker线程,不是空队列则会唤醒进行获取任务的worker线程并返回数据
//3、如果是拿数据,判断是否是空队列,则新创建节点并阻塞当前线程等待放数据时唤醒,不是空队列(换一种说法就是已经有一个线程正在等待获取任务),抛出头结点,继续往下走,这里有个疑问,抛出去后该节点的线程一直在等待,无法被唤醒了
QNode s = null; // constructed/reused as needed
            boolean isData = (e != null);

            for (;;) {
                QNode t = tail;
                QNode h = head;
                if (t == null || h == null)         // saw uninitialized value
                    continue;                       // spin

                if (h == t || t.isData == isData) { // empty or same-mode
                    QNode tn = t.next;
                    if (t != tail)                  // inconsistent read
                        continue;
                    if (tn != null) {               // lagging tail
                        advanceTail(t, tn);
                        continue;
                    }
                    if (timed && nanos <= 0)        // can't wait
                        return null;
                    if (s == null)
                        s = new QNode(e, isData);
                    if (!t.casNext(null, s))        // failed to link in
                        continue;

                    advanceTail(t, s);              // swing tail and wait
                    Object x = awaitFulfill(s, e, timed, nanos);
                    if (x == s) {                   // wait was cancelled
                        clean(t, s);
                        return null;
                    }

                    if (!s.isOffList()) {           // not already unlinked
                        advanceHead(t, s);          // unlink if head
                        if (x != null)              // and forget fields
                            s.item = s;
                        s.waiter = null;
                    }
                    return (x != null) ? (E)x : e;

                } else {                            // complementary-mode
                    QNode m = h.next;               // node to fulfill
                    if (t != tail || m == null || h != head)
                        continue;                   // inconsistent read

                    Object x = m.item;
                    if (isData == (x != null) ||    // m already fulfilled
                        x == m ||                   // m cancelled
                        !m.casItem(x, e)) {         // lost CAS
                        advanceHead(h, m);          // dequeue and retry
                        continue;
                    }

                    advanceHead(h, m);              // successfully fulfilled
                    LockSupport.unpark(m.waiter);
                    return (x != null) ? (E)x : e;
                }
            }
        }

不知道大家看到这里是否有疑问,为什么当多个worker线程开始获取任务时,已经等待的节点会被抛出去,只会给队列留下最新的一个等待节点,其他节点根本不会再被唤醒了,其实这也是我的疑问,不知道大家有没有注意到,希望高手们可以给一个解释;下面再看一个TransferStack,这个是默认线程池队列初始化中使用的节点类型,这个比较好理解,也通俗易懂一点;我们看看其源码:

E transfer(E e, boolean timed, long nanos) {
//该实现类跟上一个有一些区别,但是总体逻辑差不多,也是分以下几步:
//第一步:根据传进来的任务参数判断是请求数据,还是存入数据
//第二步:如果是存入数据,空节点时直接返回null,让线程池创建worker线程运行任务,如果有等待节点,那么存入当前任务数据,并且再移除存入的数据节点和等待的节点,等待的节点此时会被赋值存入的任务并被唤醒
//第三步:如果是取出数据,空节点时存入等待数据节点并阻塞当前线程,如果不是空节点,已经有了等待节点,那么将会除去等待节点并唤醒,还会除去当前钢加入的等待节点,使当前节点队列还是保持null
 SNode s = null; // constructed/reused as needed
            int mode = (e == null) ? REQUEST : DATA;

            for (;;) {
                SNode h = head;
                if (h == null || h.mode == mode) {  // empty or same-mode
                    if (timed && nanos <= 0) {      // can't wait
                        if (h != null && h.isCancelled())
                            casHead(h, h.next);     // pop cancelled node
                        else
                            return null;
                    } else if (casHead(h, s = snode(s, e, h, mode))) {
                        SNode m = awaitFulfill(s, timed, nanos);
                        if (m == s) {               // wait was cancelled
                            clean(s);
                            return null;
                        }
                        if ((h = head) != null && h.next == s)
                            casHead(h, s.next);     // help s's fulfiller
                        return (E) ((mode == REQUEST) ? m.item : s.item);
                    }
                } else if (!isFulfilling(h.mode)) { // try to fulfill
                    if (h.isCancelled())            // already cancelled
                        casHead(h, h.next);         // pop and retry
                    else if (casHead(h, s=snode(s, e, h, FULFILLING|mode))) {
                        for (;;) { // loop until matched or waiters disappear
                            SNode m = s.next;       // m is s's match
                            if (m == null) {        // all waiters are gone
                                casHead(s, null);   // pop fulfill node
                                s = null;           // use new node next time
                                break;              // restart main loop
                            }
                            SNode mn = m.next;
                            if (m.tryMatch(s)) {
                                casHead(s, mn);     // pop both s and m
                                return (E) ((mode == REQUEST) ? m.item : s.item);
                            } else                  // lost match
                                s.casNext(m, mn);   // help unlink
                        }
                    }
                } else {                            // help a fulfiller
                    SNode m = h.next;               // m is h's match
                    if (m == null)                  // waiter is gone
                        casHead(h, null);           // pop fulfilling node
                    else {
                        SNode mn = m.next;
                        if (m.tryMatch(h))          // help match
                            casHead(h, mn);         // pop both h and m
                        else                        // lost match
                            h.casNext(m, mn);       // help unlink
                    }
                }
            }
        }

接下来我们再来讨论一下DelayQueue,这个队列就和PriorityQueue有些关联,具体关联在哪里呢?我们看看它 的源码;

public class DelayQueue<E extends Delayed> extends AbstractQueue<E>
    implements BlockingQueue<E> {

    private final transient ReentrantLock lock = new ReentrantLock();
    //内部使用的是PriorityQueue作为存储对象,但是该DelayQueue为任务元素指定了比较器,就是时间比较器
    private final PriorityQueue<E> q = new PriorityQueue<E>();
}
//比较器是这个,定义了Comparable<Delayed>,查看时间是否到达或超过了指定时间
public interface Delayed extends Comparable<Delayed> {

    /**
     * Returns the remaining delay associated with this object, in the
     * given time unit.
     *
     * @param unit the time unit
     * @return the remaining delay; zero or negative values indicate
     * that the delay has already elapsed
     */
    long getDelay(TimeUnit unit);
}

剩下的offer、poll、take我就不讲解了,去元素的时候就是多判断了一步,是否超过或到达指定时间,否则将会使当前线程进行等待剩余的时间,而不是自旋

最后总结一下线程池以及使用到的队列原理:

线程池为何会比自己创建线程更加高效、方便,第一点就是线程池已经帮我们封装好了并且对线程进行了管理,比如生产者消费者模式,使我们的线程池高效的利用CPU进行处理任务,也可以对我们的业务场景来看使用哪个队列;第二点是线程池帮我们把线程进行了复用,而不是处理完一个任务就丢弃一个线程;

队列中ArrayBlockingQueue与LinkedBlockingQueue,处理节点存储类型不一样,一个是数组,一个是节点类型,还有LinkedBlockingQueue使用到了存储锁与取锁,两者操作并不冲突,而ArrayBlockingQueue则使用了一个排它锁,取数据与存数据用的是一把锁。

而PriorityBlockingQueue和DelayQueue,虽然内部都使用了PriorityQueue作为存储介质,但是PriorityBlockingQueue不会强制要求你使用哪一种比较器,而DelayQueue必须使用时间比较器更加局限也明确了任务的类型。

最后说一下SynchronousQueue,该队列比其他队列特殊一点,该队列是同步类型的队列,就是说队列不存储任务数据,而是必须有正在获取的等待节点才会让数据暂时放入队列中然后立马取出,或者不会放入队列,而是替换到等待队列中的任务并唤醒等待队列,从而到达任务不会被存储在队列中。也就是说不会缓冲任务放入队列中,更像是实时任务取出并处理。

好了,我们的学习也到此结束了,并且最后提示大家使用线程池的时候,最好自己定义线程池参数,而不是使用Executors使用默认参数来创建线程。就是因为写的队列长度过大,会导致程序崩溃

【版权声明】本文为华为云社区用户原创内容,转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息, 否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。