08、Java多线程:ThreadPoolExecutor、RejectedExecutionHandler

ThreadPoolExecutor

public class ThreadPoolExecutor extends AbstractExecutorService

1、 线程池可以解决两个不同问题:由于减少了每个任务调用的开销,它们通常可以在执行大量异步任务时提供增强的性能,并且还可以提供绑定和管理资源(包括执行任务集时使用的线程)的方法;
2、 每个ThreadPoolExecutor还维护着一些基本的统计数据,如完成的任务数;
3、 一个ExecutorService,它使用可能的几个池线程之一执行每个提交的任务,通常使用Executors工厂方法配置;

构造方法

用给定的初始参数创建新的 ThreadPoolExecutor。

ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) 

构造方法参数解释:

corePoolSize

池中所保存的线程数,包括空闲线程。

maximumPoolSize

池中允许的最大线程数。

keepAliveTime

当线程数大于核心时,此为终止前多余的空闲线程等待新任务的最长时间。

unit

keepAliveTime 参数的时间单位。

workQueue

执行前用于保持任务的队列。此队列仅保持由 execute 方法提交的 Runnable 任务。

threadFactory

执行程序创建新线程时使用的工厂。
默认DefaultThreadFactory,创建普通的优先级为5且非守护的线程。

handler

由于超出线程范围和队列容量而使执行被阻塞时所使用的处理程序。
默认AbortPolicy,处理程序遭到拒绝将抛出运行时 RejectedExecutionException。

常见方法

void execute(Runnable command)

在将来某个时间执行给定任务。可以在新线程中或者在现有池线程中执行该任务。 如果无法将任务提交执行,或者因为此执行程序已关闭,或者因为已达到其容量,则该任务由当前 RejectedExecutionHandler 处理。

public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();
    if (poolSize >= corePoolSize || !addIfUnderCorePoolSize(command)) {
        if (runState == RUNNING && workQueue.offer(command)) {
            if (runState != RUNNING || poolSize == 0)
                ensureQueuedTaskHandled(command);
        }
        else if (!addIfUnderMaximumPoolSize(command))
            reject(command); // is shutdown or saturated
    }
}

其中通过Thread t = threadFactory.newThread(w)将command实例化成线程。 workers.add( new Worker(command))将command放入到HashSet workers存储的工作线程集合中,command执行完毕后 workers.remove(w);

void shutdown()

按过去执行已提交任务的顺序发起一个有序的关闭,但是不接受新任务。如果已经关闭,则调用没有其他作用。

public void shutdown() {
	SecurityManager security = System.getSecurityManager();
	if (security != null)
            security.checkPermission(shutdownPerm);

        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            if (security != null) { // Check if caller can modify our threads
                for (Worker w : workers)
                    security.checkAccess(w.thread);
            }

            int state = runState;
            if (state < SHUTDOWN)
                runState = SHUTDOWN;

            try {
                for (Worker w : workers) {
                    w.interruptIfIdle();
                }
            } catch (SecurityException se) { // Try to back out
                runState = state;
                // tryTerminate() here would be a no-op
                throw se;
            }

            tryTerminate(); // Terminate now if pool and queue empty
        } finally {
            mainLock.unlock();
        }
}

shutdownNow

尝试停止所有的活动执行任务、暂停等待任务的处理,并返回等待执行的任务列表。在从此方法返回的任务队列中排空(移除)这些任务。
并不保证能够停止正在处理的活动执行任务,但是会尽力尝试。

public List<Runnable> shutdownNow() {
	SecurityManager security = System.getSecurityManager();
	if (security != null)
            security.checkPermission(shutdownPerm);

        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            if (security != null) { // Check if caller can modify our threads
                for (Worker w : workers)
                    security.checkAccess(w.thread);
            }

            int state = runState;
            if (state < STOP)
                runState = STOP;

            try {
                for (Worker w : workers) {
                    w.interruptNow();
                }
            } catch (SecurityException se) { // Try to back out
                runState = state;
                // tryTerminate() here would be a no-op
                throw se;
            }

            List<Runnable> tasks = drainQueue();
            tryTerminate(); // Terminate now if pool and queue empty
            return tasks;
        } finally {
            mainLock.unlock();
        }
    }

shutdown及shutdownNow关闭任务的实现均是通过 Thread.interrupt() 取消任务,所以无法响应中断的任何任务可能永远无法终止。

poolSize与构造函数中几个参数的关系

poolSize:当前运行的线程。

1、 新任务提交时,若poolSize<corePoolSize,创建新线程来处理请求,即使其他辅助线程是空闲的;
2、 若poolSize>corePoolSize,且poolSize<maximumPoolSize,workQueue未满,放入workQueue中,等待线程池中任务调度执行;
3、 如果运行的线程多于corePoolSize而少于maximumPoolSize,则仅当队列满时才创建新线程;
4、 若workQueue已满且运行线程等于maximumPoolSize时,新提交任务由RejectedExecutionHandler处理;
5、 当线程池中超过corePoolSize线程,空闲时间达到keepAliveTime时,关闭空闲线程;
6、 当设置allowCoreThreadTimeOut(true)时,线程池中corePoolSize线程空闲时间达到keepAliveTime也将关闭;

排队

所有BlockingQueue 都可用于传输和保持提交的任务。可以使用此队列与池大小进行交互:

1、 如果运行的线程少于corePoolSize,则Executor始终首选添加新的线程,而不进行排队;
2、 如果运行的线程等于或多于corePoolSize,但又小于maximumPoolSize,则Executor始终首选将请求加入队列,而不添加新的线程;
3、 如果无法将请求加入队列,则创建新的线程,除非创建此线程超出maximumPoolSize,在这种情况下,由RejectedExecutionHandler处理;

排队有三种通用策略:

1、 直接提交工作队列的默认选项是SynchronousQueue,它将任务直接提交给线程而不保持它们;
2、 无界队列使用无界队列将导致在所有corePoolSize线程都忙时新任务在队列中等待;
3、 有界队列当使用有限的maximumPoolSizes时,有界队列(如ArrayBlockingQueue)有助于防止资源耗尽,但是可能较难调整和控制;

当Executor 已经关闭,并且 Executor 将有限边界用于最大线程和工作队列容量,且已经饱和时,在方法 execute(java.lang.Runnable) 中提交的新任务将被 拒绝,由RejectedExecutionHandler处理。

从源码看出,AbstractExecutorService提供submit、newTaskFor、invokeAny的默认实现,而ThreadPoolExecutor主要关注提交任务的处理,execute及shutdown。

通俗地理解:提交任务由AbstractExecutorService负责,处理提交请求及提交后线程任务的执行由ThreadPoolExecutor负责。

利用ReentrantLock保证线程同步安全, mainLock.newCondition()实现线程阻塞与唤醒。

其实这其中经常有大家忽视的两点:

(1)** 线程池初始化时,alive线程数多少?**

这里就要看你如何使用线程池,默认线程池中没有任何线程,在execute or submit时才创建线程执行任务。

ThreadPoolExecutor还有一种显式创建:prestartCoreThread()创建一个核心线程 or 创建所有核心线程数prestartAllCoreThreads()。

//显式创建单个核心线程
public boolean prestartCoreThread() {
    return workerCountOf(ctl.get()) < corePoolSize &&
        addWorker(null, true);
}
//显式创建所有核心线程
public int prestartAllCoreThreads() {
    int n = 0;
    while (addWorker(null, true))
        ++n;
    return n;
}

其实在内嵌的tomcat-embed-core核心包中,已经显式创建所有核心线程:

 

(2)** 线程池执行完任务后,又是如何保证核心线程处于alive状态的?**

线程池每次添加成功任务时,都会初始化一个Worker。

//ThreadPoolExecutorb内部工作任务,对excute or submit 的任务进行了包装
Worker(Runnable firstTask) {
    setState(-1); // inhibit interrupts until runWorker
    this.firstTask = firstTask;
    this.thread = getThreadFactory().newThread(this);//本行作用相当于new Thread(firstTask)
}     

 

t.start()实际运行的是Worker类中run()->runWorker()->getTask()

    private Runnable getTask() {
        boolean timedOut = false; // Did the last poll() time out?
        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);
            // Check if queue empty only if necessary.
            if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
                decrementWorkerCount();
                return null;
            }
            int wc = workerCountOf(c);
            // 判断workers是否需要剔除,即是否需要保证core thread alive
            boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
            if ((wc > maximumPoolSize || (timed && timedOut))
                && (wc > 1 || workQueue.isEmpty())) {
                if (compareAndDecrementWorkerCount(c))
                    return null;
                continue;
            }
            try {
                Runnable r = timed ?
                    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                    workQueue.take();
                    //上述代码就保证了core thread是带有超时时间,还是一直阻塞等待任务
                if (r != null)
                    return r;
                timedOut = true;
            } catch (InterruptedException retry) {
                timedOut = false;
            }
        }
    }

RejectedExecutionHandler

public interface RejectedExecutionHandler

无法由ThreadPoolExecutor 执行的任务的处理程序。

public interface RejectedExecutionHandler {

    /**
     * 当 execute 不能接受某个任务时,可以由 ThreadPoolExecutor 调用的方法。
     */
    void rejectedExecution(Runnable r, ThreadPoolExecutor executor);
}

ThreadPoolExecutor定义了四种:

1、 ThreadPoolExecutor.AbortPolicy:拒绝并抛出RejectedExecutionException;
2、 ThreadPoolExecutor.CallerRunsPolicy:拒绝但在调用者的线程中直接执行该任务;
3、 ThreadPoolExecutor.DiscardPolicy:拒绝但不做任何动作;
4、 ThreadPoolExecutor.DiscardOldestPolicy:如果执行程序尚未关闭,则位于工作队列头部的任务将被删除,然后重试执行程序(如果再次失败,则重复此过程);