• Home
  • Articles
    • 日志
    • 妍小言
    • 舒小书
    • 浩然说
    • 生活日记
  • All Tags

浅析ThreadPoolExecutor

04 May 2017

Reading time ~3 minutes

JDK默认提供的三种ThreadPoolExecutor

  • FixedThreadPool
    public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>(),
                                      threadFactory);

线程池中核心线程数和最大允许的线程数相同,即线程池中的线程空闲后不会被回收,除非allowCoreThreadTimeOut被设置成true.

  • SingleThreadPool
    public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>(),
                                    threadFactory));
    }

线程池中的线程数最多保持一个.

和FixedThreadPool(1)的区别为FixedThreadPool为ThreadPoolExecutor对象,可以调用ThreadPoolExecutor的一些方法对其进行配置。而SingleThreadPool返回的是ExecutorService的简单封装,仅暴露了ExecutorService的执行方法,无法进行配置。

  • CachedThreadPool

        public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {
            return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                          60L, TimeUnit.SECONDS,
                                          new SynchronousQueue<Runnable>(),
                                          threadFactory);
        }
    

    ​

一个无限容量的连接池,并且空闲的线程会缓存60S。

ThreadPoolExecutor简单解析

  • 构造函数

        public ThreadPoolExecutor(int corePoolSize,
                                  int maximumPoolSize,
                                  long keepAliveTime,
                                  TimeUnit unit,
                                  BlockingQueue<Runnable> workQueue,
                                  ThreadFactory threadFactory,
                                  RejectedExecutionHandler handler) {
            if (corePoolSize < 0 ||
                maximumPoolSize <= 0 ||
                maximumPoolSize < corePoolSize ||
                keepAliveTime < 0)
                throw new IllegalArgumentException();
            if (workQueue == null || threadFactory == null || handler == null)
                throw new NullPointerException();
            this.corePoolSize = corePoolSize;
            this.maximumPoolSize = maximumPoolSize;
            this.workQueue = workQueue;
            this.keepAliveTime = unit.toNanos(keepAliveTime);
            this.threadFactory = threadFactory;
            this.handler = handler;
        }
    
  • 参数解析

    • corePoolSize:核心线程池大小,线程数如果不超过该值,线程会默认保留,就算线程处于空闲状态。
    • maximumPoolSize:线程池的最大容量,线程数的重量不能操过该值。
    • keepAliveTime:空闲线程最长保留时间。
    • unit:时间单位。
    • workQueue:接收请求的队列。
    • threadFactory:线程工厂。
    • handler:由于超出线程范围和队列容量而使执行被阻塞时所使用的处理程序。
  • 部分参数意义

    • 当线程数不超过核心线程池大小时,创建的线程默认都将被保留下来,就算线程为空闲状态。
    • 当线程数超过核心线程池大小,而没超过最大线程池大小时,空闲的线程会等待keepAliveTime时长后被销毁。
  • Future简单解析

    • future.get()将会被阻塞知道获取数据,当任务被取消,任务抛出异常,任务被中断时均会抛出异常。
  • 线程池增加新处理线程(woker)的时机

    public void execute(Runnable command) {
      if (command == null)
          throw new NullPointerException();
      /*
       * Proceed in 3 steps:
       *
       * 1. If fewer than corePoolSize threads are running, try to
       * start a new thread with the given command as its first
       * task.  The call to addWorker atomically checks runState and
       * workerCount, and so prevents false alarms that would add
       * threads when it shouldn't, by returning false.
       *
       * 2. If a task can be successfully queued, then we still need
       * to double-check whether we should have added a thread
       * (because existing ones died since last checking) or that
       * the pool shut down since entry into this method. So we
       * recheck state and if necessary roll back the enqueuing if
       * stopped, or start a new thread if there are none.
       *
       * 3. If we cannot queue task, then we try to add a new
       * thread.  If it fails, we know we are shut down or saturated
       * and so reject the task.
       */
      int c = ctl.get();
      if (workerCountOf(c) < corePoolSize) {
          if (addWorker(command, true))
              return;
          c = ctl.get();
      }
      if (isRunning(c) && workQueue.offer(command)) {
          int recheck = ctl.get();
          if (! isRunning(recheck) && remove(command))
              reject(command);
          else if (workerCountOf(recheck) == 0)
              addWorker(null, false);
      }
      else if (!addWorker(command, false))
          reject(command);
    }
    
      private boolean addWorker(Runnable firstTask, boolean core) {
            retry:
            for (;;) {
                int c = ctl.get();
                int rs = runStateOf(c);
      
                // Check if queue empty only if necessary.
                if (rs >= SHUTDOWN &&
                    ! (rs == SHUTDOWN &&
                       firstTask == null &&
                       ! workQueue.isEmpty()))
                    return false;
      
                for (;;) {
                    int wc = workerCountOf(c);
                    if (wc >= CAPACITY ||
                        wc >= (core ? corePoolSize : maximumPoolSize))
                        return false;
                    if (compareAndIncrementWorkerCount(c))
                        break retry;
                    c = ctl.get();  // Re-read ctl
                    if (runStateOf(c) != rs)
                        continue retry;
                    // else CAS failed due to workerCount change; retry inner loop
                }
            }
      
            boolean workerStarted = false;
            boolean workerAdded = false;
            Worker w = null;
            try {
                w = new Worker(firstTask);
                final Thread t = w.thread;
                if (t != null) {
                    final ReentrantLock mainLock = this.mainLock;
                    mainLock.lock();
                    try {
                        // Recheck while holding lock.
                        // Back out on ThreadFactory failure or if
                        // shut down before lock acquired.
                        int rs = runStateOf(ctl.get());
      
                        if (rs < SHUTDOWN ||
                            (rs == SHUTDOWN && firstTask == null)) {
                            if (t.isAlive()) // precheck that t is startable
                                throw new IllegalThreadStateException();
                            workers.add(w);
                            int s = workers.size();
                            if (s > largestPoolSize)
                                largestPoolSize = s;
                            workerAdded = true;
                        }
                    } finally {
                        mainLock.unlock();
                    }
                    if (workerAdded) {
                        t.start();
                        workerStarted = true;
                    }
                }
            } finally {
                if (! workerStarted)
                    addWorkerFailed(w);
            }
            return workerStarted;
        }
    
  • 通过阅读java.util.concurrent.ThreadPoolExecutor#execute方法可以发现,在当前线程数小于核心线程数时,默认增加一个执行线程。在执行线程数为0时,直接增加一个执行线程。其余则仅在command插入任务队列失败后时,会尝试新增执行线程。
  • 补充:大部分情况下execute()方法仅会执行isRunning(c) && workQueue.offer(command)后就返回,即command插入任务队列成功后即返回。


ExecutorService多线程