还不懂Java线程池实现原理,看这一篇文章就够了(java线程池作用及使用方式)

  本篇文章为你整理了还不懂Java线程池实现原理,看这一篇文章就够了(java线程池作用及使用方式)的详细内容,包含有java线程池实战 java线程池作用及使用方式 java线程池工作原理 java中线程池是如何实现的 还不懂Java线程池实现原理,看这一篇文章就够了,希望能帮助你了解 还不懂Java线程池实现原理,看这一篇文章就够了。

   线程池无论是工作还是面试都是必备的技能,但是很多人对于线程池的实现原理却一知半解,并不了解线程池内部的工作原理,今天一灯就带大家一块剖析线程池底层实现原理。

  
线程池无论是工作还是面试都是必备的技能,但是很多人对于线程池的实现原理却一知半解,并不了解线程池内部的工作原理,今天一灯就带大家一块剖析线程池底层实现原理。

  1. 为什么要使用线程池

  使用线程池通常由以下两个原因:

  
使用线程池可以更容易管理线程,线程池可以动态管理线程个数、具有阻塞队列、定时周期执行任务、环境隔离等。

  
threadPoolExecutor.execute(() - {

   System.out.println(Thread.currentThread().getName() + " 关注公众号:一灯架构");

   // 3. 关闭线程池

   threadPoolExecutor.shutdown();

  

 

 

  输出结果:

  

pool-1-thread-2 关注公众号:一灯架构

 

  pool-1-thread-1 关注公众号:一灯架构

  pool-1-thread-3 关注公众号:一灯架构

  

 

  线程池的使用非常简单:

  调用new ThreadPoolExecutor()构造方法,指定核心参数,创建线程池。

  调用execute()方法提交Runnable任务

  使用结束后,调用shutdown()方法,关闭线程池。

  再看一下线程池构造方法中核心参数的作用。

  3. 线程池核心参数

  线程池共有七大核心参数:

  
corePoolSize 核心线程数

  当往线程池中提交任务,会创建线程去处理任务,直到线程数达到corePoolSize,才会往阻塞队列中添加任务。默认情况下,空闲的核心线程并不会被回收,除非配置了allowCoreThreadTimeOut=true。

  
maximumPoolSize 最大线程数

  当线程池中的线程数达到corePoolSize,阻塞队列又满了之后,才会继续创建线程,直到达到maximumPoolSize,另外空闲的非核心线程会被回收。

  
keepAliveTime 线程存活时间

  非核心线程的空闲时间达到了keepAliveTime,将会被回收。

  
TimeUnit 时间单位

  线程存活时间的单位,默认是TimeUnit.MILLISECONDS(毫秒),可选择的有:

  TimeUnit.NANOSECONDS(纳秒)

  TimeUnit.MICROSECONDS(微秒)

  TimeUnit.MILLISECONDS(毫秒)

  TimeUnit.SECONDS(秒)

  TimeUnit.MINUTES(分钟)

  TimeUnit.HOURS(小时)

  TimeUnit.DAYS(天)

  
workQueue 阻塞队列

  当线程池中的线程数达到corePoolSize,再提交的任务就会放到阻塞队列的等待,默认使用的是LinkedBlockingQueue,可选择的有:

  LinkedBlockingQueue(基于链表实现的阻塞队列)

  ArrayBlockingQueue(基于数组实现的阻塞队列)

  SynchronousQueue(只有一个元素的阻塞队列)

  PriorityBlockingQueue(实现了优先级的阻塞队列)

  DelayQueue(实现了延迟功能的阻塞队列)

  
threadFactory 线程创建工厂

  用来创建线程的工厂,默认的是Executors.defaultThreadFactory(),可选择的还有Executors.privilegedThreadFactory()实现了线程优先级。当然也可以自定义线程创建工厂,创建线程的时候最好指定线程名称,便于排查问题。

  
RejectedExecutionHandler 拒绝策略

  当线程池中的线程数达到maximumPoolSize,阻塞队列也满了之后,再往线程池中提交任务,就会触发执行拒绝策略,默认的是AbortPolicy(直接终止,抛出异常),可选择的有:

  AbortPolicy(直接终止,抛出异常)

  DiscardPolicy(默默丢弃,不抛出异常)

  DiscardOldestPolicy(丢弃队列中最旧的任务,执行当前任务)

  CallerRunsPolicy(返回给调用者执行)

  
线程池的工作原理,简单理解如下:

  当往线程池中提交任务的时候,会先判断线程池中线程数是否是核心线程数,如果小于,会创建核心线程并执行任务。

  如果线程数大于核心线程数,会判断阻塞队列是否已满,如果没有满,会把任务添加到阻塞队列中等待调度执行。

  如果阻塞队列已满,会判断线程数是否小于最大线程数,如果小于,会继续创建最大线程数并执行任务。

  如果线程数大于最大线程数,会执行拒绝策略,然后结束。

  5. 线程池源码剖析

  5.1 线程池的属性

  

public class ThreadPoolExecutor extends AbstractExecutorService {

 

   // 线程池的控制状态,Integer长度是32位,前3位用来存储线程池状态,后29位用来存储线程数量

   private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));

   // 线程个数所占的位数

   private static final int COUNT_BITS = Integer.SIZE - 3;

   // 线程池的最大容量,2^29-1,约5亿个线程

   private static final int CAPACITY = (1 COUNT_BITS) - 1;

   // 独占锁,用来控制多线程下的并发操作

   private final ReentrantLock mainLock = new ReentrantLock();

   // 工作线程的集合

   private final HashSet Worker workers = new HashSet ();

   // 等待条件,用来响应中断

   private final Condition termination = mainLock.newCondition();

   // 是否允许回收核心线程

   private volatile boolean allowCoreThreadTimeOut;

   // 线程数的历史峰值

   private int largestPoolSize;

   * 以下是线程池的七大核心参数

   private volatile int corePoolSize;

   private volatile int maximumPoolSize;

   private volatile long keepAliveTime;

   private final BlockingQueue Runnable workQueue;

   private volatile ThreadFactory threadFactory;

   private volatile RejectedExecutionHandler handler;

  

 

  线程池的控制状态ctl用来存储线程池状态和线程个数,前3位用来存储线程池状态,后29位用来存储线程数量。

  设计者多聪明,用一个变量存储了两块内容。

  5.2 线程池状态

  线程池共有5种状态:

  
调用shutdownNow方法后处于该状态,不再新任务,并中断所有线程,丢弃阻塞队列中所有任务。

  
 

  5.3 execute源码

  看一下往线程池中提交任务的源码,这是线程池的核心逻辑:

  

// 往线程池中提交任务

 

  public void execute(Runnable command) {

   // 1. 判断提交的任务是否为null

   if (command == null)

   throw new NullPointerException();

   int c = ctl.get();

   // 2. 判断线程数是否小于核心线程数

   if (workerCountOf(c) corePoolSize) {

   // 3. 把任务包装成worker,添加到worker集合中

   if (addWorker(command, true))

   return;

   c = ctl.get();

   // 4. 判断如果线程数不小于corePoolSize,并且可以添加到阻塞队列

   if (isRunning(c) workQueue.offer(command)) {

   // 5. 重新检查线程池状态,如果线程池不是运行状态,就移除刚才添加的任务,并执行拒绝策略

   int recheck = ctl.get();

   if (!isRunning(recheck) remove(command))

   reject(command);

   // 6. 判断如果线程数是0,就创建非核心线程(任务是null,会从阻塞队列中拉取任务)

   else if (workerCountOf(recheck) == 0)

   addWorker(null, false);

   // 7. 如果添加阻塞队列失败,就创建一个Worker

   else if (!addWorker(command, false))

   // 8. 如果创建Worker失败说明已经达到最大线程数了,则执行拒绝策略

   reject(command);

  

 

  execute方法的逻辑也很简单,最终就是调用addWorker方法,把任务添加到worker集合中,再看一下addWorker方法的源码:

  

// 添加worker

 

  private boolean addWorker(Runnable firstTask, boolean core) {

   retry:

   for (; ; ) {

   int c = ctl.get();

   int rs = runStateOf(c);

   // 1. 检查是否允许提交任务

   if (rs = SHUTDOWN

   !(rs == SHUTDOWN

   firstTask == null

   !workQueue.isEmpty()))

   return false;

   // 2. 使用死循环保证添加线程成功

   for (; ; ) {

   int wc = workerCountOf(c);

   // 3. 校验线程数是否超过容量限制

   if (wc = CAPACITY

   wc = (core ? corePoolSize : maximumPoolSize))

   return false;

   // 4. 使用CAS修改线程数

   if (compareAndIncrementWorkerCount(c))

   break retry;

   c = ctl.get();

   // 5. 如果线程池状态变了,则从头再来

   if (runStateOf(c) != rs)

   continue retry;

   boolean workerStarted = false;

   boolean workerAdded = false;

   Worker w = null;

   try {

   // 6. 把任务和新线程包装成一个worker

   w = new Worker(firstTask);

   final Thread t = w.thread;

   if (t != null) {

   // 7. 加锁,控制并发

   final ReentrantLock mainLock = this.mainLock;

   mainLock.lock();

   try {

   // 8. 再次校验线程池状态是否异常

   int rs = runStateOf(ctl.get());

   if (rs SHUTDOWN

   (rs == SHUTDOWN firstTask == null)) {

   // 9. 如果线程已经启动,就抛出异常

   if (t.isAlive())

   throw new IllegalThreadStateException();

   // 10. 添加到worker集合中

   workers.add(w);

   int s = workers.size();

   // 11. 记录线程数历史峰值

   if (s largestPoolSize)

   largestPoolSize = s;

   workerAdded = true;

   } finally {

   mainLock.unlock();

   if (workerAdded) {

   // 12. 启动线程

   t.start();

   workerStarted = true;

   } finally {

   if (!workerStarted)

   addWorkerFailed(w);

   return workerStarted;

  

 

  方法虽然很长,但是逻辑很清晰。就是把任务和线程包装成worker,添加到worker集合,并启动线程。

  5.4 worker源码

  再看一下worker类的结构:

  

private final class Worker

 

   extends AbstractQueuedSynchronizer

   implements Runnable {

   // 工作线程

   final Thread thread;

   // 任务

   Runnable firstTask;

   // 创建worker,并创建一个新线程(用来执行任务)

   Worker(Runnable firstTask) {

   setState(-1);

   this.firstTask = firstTask;

   this.thread = getThreadFactory().newThread(this);

  

 

  5.5 runWorker源码

  再看一下run方法的源码:

  

// 线程执行入口

 

  public void run() {

   runWorker(this);

  // 线程运行核心方法

  final void runWorker(Worker w) {

   Thread wt = Thread.currentThread();

   Runnable task = w.firstTask;

   w.firstTask = null;

   w.unlock();

   boolean completedAbruptly = true;

   try {

   // 1. 如果当前worker中任务是null,就从阻塞队列中获取任务

   while (task != null (task = getTask()) != null) {

   // 加锁,保证thread不被其他线程中断(除非线程池被中断)

   w.lock();

   // 2. 校验线程池状态,是否需要中断当前线程

   if ((runStateAtLeast(ctl.get(), STOP)

   (Thread.interrupted()

   runStateAtLeast(ctl.get(), STOP)))

   !wt.isInterrupted())

   wt.interrupt();

   try {

   beforeExecute(wt, task);

   Throwable thrown = null;

   try {

   // 3. 执行run方法

   task.run();

   } catch (RuntimeException x) {

   thrown = x;

   throw x;

   } catch (Error x) {

   thrown = x;

   throw x;

   } catch (Throwable x) {

   thrown = x;

   throw new Error(x);

   } finally {

   afterExecute(task, thrown);

   } finally {

   task = null;

   w.completedTasks++;

   // 解锁

   w.unlock();

   completedAbruptly = false;

   } finally {

   // 4. 从worker集合删除当前worker

   processWorkerExit(w, completedAbruptly);

  

 

  runWorker方法逻辑也很简单,就是不断从阻塞队列中拉取任务并执行。

  再看一下从阻塞队列中拉取任务的逻辑:

  

// 从阻塞队列中拉取任务

 

  private Runnable getTask() {

   boolean timedOut = false;

   for (; ; ) {

   int c = ctl.get();

   int rs = runStateOf(c);

   // 1. 如果线程池已经停了,或者阻塞队列是空,就回收当前线程

   if (rs = SHUTDOWN (rs = STOP workQueue.isEmpty())) {

   decrementWorkerCount();

   return null;

   int wc = workerCountOf(c);

   // 2. 再次判断是否需要回收线程

   boolean timed = allowCoreThreadTimeOut wc corePoolSize;

   if ((wc maximumPoolSize (timed timedOut))

   (wc 1 workQueue.isEmpty())) {

   if (compareAndDecrementWorkerCount(c))

   return null;

   continue;

   try {

   // 3. 从阻塞队列中拉取任务

   Runnable r = timed ?

   workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :

   workQueue.take();

   if (r != null)

   return r;

   timedOut = true;

   } catch (InterruptedException retry) {

   timedOut = false;

  

 

  6. 总结

  今天带大家一块详细剖析了Java线程池的实现原理,是不是非常简单?

  几百行的方法虽然看着复杂,令人头疼,只要由浅入深的梳理清理业务逻辑,源码读起来也是小菜一碟。

  我是「一灯架构」,如果本文对你有帮助,欢迎各位小伙伴点赞、评论和关注,感谢各位老铁,我们下期见

  以上就是还不懂Java线程池实现原理,看这一篇文章就够了(java线程池作用及使用方式)的详细内容,想要了解更多 还不懂Java线程池实现原理,看这一篇文章就够了的内容,请持续关注盛行IT软件开发工作室。

郑重声明:本文由网友发布,不代表盛行IT的观点,版权归原作者所有,仅为传播更多信息之目的,如有侵权请联系,我们将第一时间修改或删除,多谢。

留言与评论(共有 条评论)
   
验证码: