深入线程池原理

举报
海风极客 发表于 2022/10/16 22:08:22 2022/10/16
【摘要】 1 翻一翻ThreadPoolExecutor 1.1 类关系图 1.2 成员变量—线程池状态(生命周期)private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));private static final int COUNT_BITS = Integer.SIZE - 3;private static fi...

1 翻一翻ThreadPoolExecutor

1.1 类关系图

在这里插入图片描述

1.2 成员变量—线程池状态(生命周期)

private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int CAPACITY   = (1 << COUNT_BITS) - 1;

// runState is stored in the high-order bits
private static final int RUNNING    = -1 << COUNT_BITS;
private static final int SHUTDOWN   =  0 << COUNT_BITS;
private static final int STOP       =  1 << COUNT_BITS;
private static final int TIDYING    =  2 << COUNT_BITS;
private static final int TERMINATED =  3 << COUNT_BITS;

线程池的五种状态(生命周期):

  • RUNNING:正在运行,接受新任务和处理排队的任务

  • SHUTDOWN:准备关闭,不接受新任务,但处理排队的任务

  • STOP:停止,不接受新任务,不处理排队的任务,中断正在进行的任务

  • TIDYING:整理,所有的任务都已经终止,workerCount为0,转换到TIDYING状态的线程将运行terminated()钩子方法

  • TERMINATED:终止, terminated()已经完成这些值之间的数字顺序很重要,允许顺序比较。 runState随时间单调地增加,但不需要触及每个状态

1.3 四个构造方法

/**
 * 使用默认ThreadFactory和Handler
 */
public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue) {
    this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
         Executors.defaultThreadFactory(), defaultHandler);
}

/**
 * 使用默认Handler
 */
public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory) {
    this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
         threadFactory, defaultHandler);
}

/**
 * 使用默认ThreadFactory
 */
public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          RejectedExecutionHandler handler) {
    this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
         Executors.defaultThreadFactory(), handler);
}

/**
 * 真正执行的构造方法
 */
public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler) {
    if (corePoolSize < 0 ||
        maximumPoolSize <= 0 ||
        maximumPoolSize < corePoolSize ||
        keepAliveTime < 0)
        throw new IllegalArgumentException();
    if (workQueue == null || threadFactory == null || handler == null)
        throw new NullPointerException();
    this.acc = System.getSecurityManager() == null ?
        null :
    AccessController.getContext();
    this.corePoolSize = corePoolSize;
    this.maximumPoolSize = maximumPoolSize;
    this.workQueue = workQueue;
    this.keepAliveTime = unit.toNanos(keepAliveTime);
    this.threadFactory = threadFactory;
    this.handler = handler;
}

线程池的七大参数:

  • corePoolSize:核心线程数,核心池大小是保持活动的最小工作线程数(不允许超时等),除非设置了allowCoreThreadTimeOut,在这种情况下,最小值为零。

  • maximumPoolSize:最大线程数,实际的最大值在内部是由CAPACITY限制的。

  • keepAliveTime:等待工作的空闲线程的超时(默认以纳秒计)。 当存在超过corePoolSize或allowCoreThreadTimeOut时,线程使用此超时。 否则他们将永远等待新的工作。

  • unit:等待工作的空闲线程的超时(默认以纳秒计)的单位。

  • workQueue:用于保存任务并将其传递给工作线程的队列。 我们不要求workQueue.poll()返回null必然意味着workQueue.isEmpty(),所以只依赖isEmpty来查看队列是否为空(例如,当决定是否从SHUTDOWN转换到TIDYING时,我们必须这样做)。 这适用于特殊用途的队列,如DelayQueues,允许poll()返回null,即使稍后当延迟过期时它可能返回非null。

  • threadFactory:工厂的新线程。所有线程都是使用这个工厂创建的(通过方法addWorker)。所有调用者都必须做好addWorker失败的准备,这可能反映了系统或用户限制线程数量的策略。即使没有将其视为错误,创建线程失败也可能导致新任务被拒绝或现有任务仍卡在队列中。我们进一步保留池不变量,甚至在遇到OutOfMemoryError之类的错误时,这些错误可能在尝试创建线程时抛出。这类错误相当常见,因为需要在Thread中分配本机堆栈。启动时,用户将希望执行清理池关闭来清理。可能会有足够的内存可用来完成清理代码,而不会遇到另一个OutOfMemoryError。

  • handler:在执行中饱和或关闭时调用的处理程序。

1.4 execute方法

/**
 * 在将来某个时候执行给定的任务。 任务可以在新线程中执行,也可以在现有的池线程中执行。  
 *
 * 如果由于该执行器已关闭或已达到其容量,任务无法提交执行,则由当前处理   
 */
public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();
    /*
     * 分三步进行:
     * 1. 如果运行的线程少于corePoolSize,则尝试使用给定命令作为其第一个任务启动一个新线程。 addWorker的调      *    用会自动检查runState和workerCount,从而通过返回false来防止在不应该添加线程时产生的错误警报。  
     * 2. 如果一个任务可以成功排队,那么我们仍然需要再次检查我们是否应该添加一个线程(因为自上次检查后,现有的      *    已经死亡)或进入此方法后池关闭。 所以我们重新检查状态,如有必要则回滚正在排队的if停止,或启动一个新      *    的线程,如果没有。
     * 3. 如果不能对任务进行排队,则尝试添加一个新的线程。如果它失败了,我们知道我们被关闭或饱和了所以拒绝这个      *    任务。
     */
    int c = ctl.get();
    if (workerCountOf(c) < corePoolSize) {
        if (addWorker(command, true))
            return;
        c = ctl.get();
    }
    if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get();
        if (! isRunning(recheck) && remove(command))
            reject(command);
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
    else if (!addWorker(command, false))
        reject(command);
}

2 看一看线程池的底层原理

2.1 线程的状态(生命周期)转换

  • ctl:主线程池的状态,用AtomicInteger原子类表示,

  • RUNNING:初始化时默认为RUNNING状态

    private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
    
  • SHUTDOWN:shutdown()方法,线程池不能够接受新的任务,它会等待所有任务执行完毕;

    public void shutdown() {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            checkShutdownAccess();
            advanceRunState(SHUTDOWN);
            interruptIdleWorkers();
            onShutdown(); // hook for ScheduledThreadPoolExecutor
        } finally {
            mainLock.unlock();
        }
        tryTerminate();
    }
    
  • STOP:shutdownNow()方法线程池不能接受新的任务,并且会去尝试终止正在执行的任务

    public List<Runnable> shutdownNow() {
        List<Runnable> tasks;
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            checkShutdownAccess();
            advanceRunState(STOP);
            interruptWorkers();
            tasks = drainQueue();
        } finally {
            mainLock.unlock();
        }
        tryTerminate();
        return tasks;
    }
    
  • TIDYING:

    final void tryTerminate() {
        for (;;) {
            int c = ctl.get();
            if (isRunning(c) ||
                runStateAtLeast(c, TIDYING) ||
                (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
                return;
            if (workerCountOf(c) != 0) { // Eligible to terminate
                interruptIdleWorkers(ONLY_ONE);
                return;
            }
    
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
                    try {
                        terminated();
                    } finally {
                        ctl.set(ctlOf(TERMINATED, 0));
                        termination.signalAll();
                    }
                    return;
                }
            } finally {
                mainLock.unlock();
            }
            // else retry on failed CAS
        }
    }
    
  • TERMINATED:

    public boolean awaitTermination(long timeout, TimeUnit unit)
        throws InterruptedException {
        long nanos = unit.toNanos(timeout);
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            for (;;) {
                if (runStateAtLeast(ctl.get(), TERMINATED))
                    return true;
                if (nanos <= 0)
                    return false;
                nanos = termination.awaitNanos(nanos);
            }
        } finally {
            mainLock.unlock();
        }
    }
    

2.2 线程池的阻塞队列

在这里插入图片描述
由此可见BlockingQueue是一个继承与Queue的接口,而他的具体实现最常用的无非一下几种
在这里插入图片描述

  • ArrayBlockingQueue:基于数组的先进先出队列,此队列创建时必须指定大小
  • DelayQueue:一个无界的BlockingQueue,用于放置实现了Delayed接口的对象,其中的对象只能在其到期时才能从队列中取走。这种队列是有序的,即队头对象的延迟到期时间最长。不能将null元素放置到这种队列中。
  • LinkedBlockingDuque:LinkedBlockingDeque是双向链表实现的双向并发阻塞队列。该阻塞队列同时支持FIFO和FILO两种操作方式,即可以从队列的头和尾同时操作
  • LinkedBlockingQueue:基于链表的先进先出队列,如果创建时没有指定此队列大小,则默认为Integer.MAX_VALUE;
  • LinkedTransferQueue:一个由链表结构组成的无界阻塞TransferQueue队列。相对于其他阻塞队列,多了tryTransfer和transfer方法。
  • PriorityBlockingQueue:一个支持优先级的无界阻塞队列,直到系统资源耗尽。默认情况下元素采用自然顺序升序排列。也可以自定义类实现compareTo()方法来指定元素排序规则,或者初始化PriorityBlockingQueue时,指定构造参数Comparator来对元素进行排序。但需要注意的是不能保证同优先级元素的顺序。PriorityBlockingQueue也是基于最小二叉堆实现,使用基于CAS实现的自旋锁来控制队列的动态扩容,保证了扩容操作不会阻塞take操作的执行。
  • SynchronousQueue:这个队列比较特殊,它不会保存提交的任务,而是将直接新建一个线程来执行新来的任务。

2.3 线程池的执行过程

2.3.1 调用execute方法,参数为Runnable接口

用到了可重入锁和双重检查

public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();
    int c = ctl.get();
    // 如果运行的线程少于corePoolSize,则尝试使用给定命令作为其第一个任务启动一个新线程
    if (workerCountOf(c) < corePoolSize) {
        // 调用addWorker
        if (addWorker(command, true))
            return;
        c = ctl.get();
    }
    //如果不能对任务进行排队,则尝试添加一个新的线程。如果失败,拒绝这个任务。
    if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get();
        if (! isRunning(recheck) && remove(command))
            reject(command);
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
    else if (!addWorker(command, false))
        reject(command);
}
2.3.2 addWorker加入任务到线程池
private boolean addWorker(Runnable firstTask, boolean core) {
    retry:
    for (;;) {
        //查看状态
        int c = ctl.get();
        //查看正在运行的任务
        int rs = runStateOf(c);
        // 只在必要时检查队列是否为空
        if (rs >= SHUTDOWN &&
            ! (rs == SHUTDOWN &&
               firstTask == null &&
               ! workQueue.isEmpty()))
            return false;
        for (;;) {
            //worker数量
            int wc = workerCountOf(c);
            //判断:核心线程判断是否超过核心线程数,非核心线程判断是否超过最大线程数,超过则返回FALSE
            if (wc >= CAPACITY ||
                wc >= (core ? corePoolSize : maximumPoolSize))
                return false;
            if (compareAndIncrementWorkerCount(c))
                break retry;
            c = ctl.get();  // Re-read ctl
            if (runStateOf(c) != rs)
                continue retry;
            // else CAS failed due to workerCount change; retry inner loop
        }
    }
    //新任务是否启动
    boolean workerStarted = false;
    //新任务是否已添加
    boolean workerAdded = false;
    Worker w = null;
    try {
        w = new Worker(firstTask);
        final Thread t = w.thread;
        if (t != null) {
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                // 保持锁定时重新检查,在ThreadFactory失败时退出  ,在获得锁之前关闭  
                int rs = runStateOf(ctl.get());
                if (rs < SHUTDOWN ||
                    (rs == SHUTDOWN && firstTask == null)) {
                    //预先检查t是可启动的
                    if (t.isAlive()) 
                        throw new IllegalThreadStateException();
                    workers.add(w);
                    int s = workers.size();
                    if (s > largestPoolSize)
                        largestPoolSize = s;
                    workerAdded = true;
                }
            } finally {
                mainLock.unlock();
            }
            if (workerAdded) {
                t.start();
                workerStarted = true;
            }
        }
    } finally {
        //添加任务异常
        if (! workerStarted)
            addWorkerFailed(w);
    }
    return workerStarted;
}

2.4 Worker的类结构

private final class Worker
    extends AbstractQueuedSynchronizer
    implements Runnable
{
    /**
     * 这个类永远不会被序列化,但是我们提供了一个serialVersionUID来抑制javac警告。 
     */
    private static final long serialVersionUID = 6138294804551838833L;

    final Thread thread;
    
    Runnable firstTask;

    volatile long completedTasks;

    /**
     * 使用给定的第一个任务和ThreadFactory中的线程创建。
     * @param firstTask the first task (null if none)
     */
    Worker(Runnable firstTask) {
        setState(-1); // 在runWorker之前禁止中断
        this.firstTask = firstTask;
        this.thread = getThreadFactory().newThread(this);
    }

    /** 将主运行循环委托给外部的runWorker  */
    public void run() {
        runWorker(this);
    }

    // 锁方法 
    // The value 0 represents the unlocked state.
    // The value 1 represents the locked state.

    protected boolean isHeldExclusively() {
        return getState() != 0;
    }

    protected boolean tryAcquire(int unused) {
        if (compareAndSetState(0, 1)) {
            setExclusiveOwnerThread(Thread.currentThread());
            return true;
        }
        return false;
    }

    protected boolean tryRelease(int unused) {
        setExclusiveOwnerThread(null);
        setState(0);
        return true;
    }

    public void lock()        { acquire(1); }
    public boolean tryLock()  { return tryAcquire(1); }
    public void unlock()      { release(1); }
    public boolean isLocked() { return isHeldExclusively(); }

    void interruptIfStarted() {
        Thread t;
        if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
            try {
                t.interrupt();
            } catch (SecurityException ignore) {
            }
        }
    }
}

2.5 线程池的拒绝策略

当线程池的任务缓存队列已满并且线程池中的线程数目达到maximumPoolSize,如果还有任务到来就会采取任务拒绝策略,通常有以下四种策略:

ThreadPoolExecutor.AbortPolicy:丢弃任务并抛出RejectedExecutionException异常。

ThreadPoolExecutor.DiscardPolicy:也是丢弃任务,但是不抛出异常。

ThreadPoolExecutor.DiscardOldestPolicy:丢弃队列最前面的任务,然后重新尝试执行任务(重复此过程)

ThreadPoolExecutor.CallerRunsPolicy:由调用线程处理该任务

3 线程池的使用

[推荐文章](线程池的使用

【版权声明】本文为华为云社区用户原创内容,转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息, 否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。