如何实现线程复用?

  Java   19分钟   363浏览   0评论

引言

先说说使用线程池的好处,比如可以控制线程的数量,节省反复创建线程和销毁线程的开销等,在开发中的使用,一般来说任务量肯定是大于线程数量的,而为了防止出现OOM(Out Of Memory内存溢出),都是建议设置相对应业务的合适线程数量。那是在线程池中线程就只有那些,肯定是要做到线程的重复利用,才能执行超过线程的任务量的,那么线程池是怎么做到"线程复用"的呢?

从源码开始逐步分析,从execute()方法开始

//原子变量
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));

public void execute(Runnable command) {
        //判断提交的Runnable 任务,如果为null,则报NullPointerException
        if (command == null)
            throw new NullPointerException();    
        int c = ctl.get();
        //判断当前线程数是否小于核心线程数,如果小于,那就调用addWorker方法新增一个Worker,也可以理解成一个线程
        if (workerCountOf(c) < corePoolSize) {
        //addWorker这个方法主要要做的事就是执行command,同时第二个参数决定以哪个界限来进行是否新增线程的判断,传入true则代表以核心线程数为判断条件
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        //走到这步逻辑,则说明线程数大于等于核心线程数,或者addWorker方法调用失败了,这时就判断线程池是否是Running状态,如果是就调用offer方法提交线程任务到任务队列中
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            //如果线程池状态不是Running,说明线程池已经被关闭,这时就移除新提交到队列中的任务
            if (! isRunning(recheck) && remove(command))
                //执行拒绝策略
                reject(command);
             //检查下当前线程数是不是为0,为0的话就没有线程执行任务了   
            else if (workerCountOf(recheck) == 0)
                //所以就通过addWorker新建一个线程
                addWorker(null, false);
        }
        //走到这步逻辑,要么是线程池状态不是Running,说明已经关闭了,要么就是添加任务进任务队列时失败了,说明任务队列满了,这时候就该添加最大线程数了,传入false则代表以最大线程数为判断条件
        else if (!addWorker(command, false))
            //以上addWorker方法如果返回结果是false,就会执行拒绝策略了
            reject(command);
    }

以上粗略的过了下execute()方法的源码,可以发现有个addWorker()方法无处不在,同时它也有新建线程和运行任务的作用,所以重点看下

addWorker()方法源码

private boolean addWorker(Runnable firstTask, boolean core) {
        //可以当做一个标记位
        retry:
        //没有主动跳出就会无限的循环
        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);

            //做校验,比如rs >= SHUTDOWN说明线程池状态不正常,比如rs == SHUTDOWN说明线程池关闭了, firstTask == null说明无任务执行等,就直接返回false
            if (rs >= SHUTDOWN &&
                ! (rs == SHUTDOWN &&
                   firstTask == null &&
                   ! workQueue.isEmpty()))
                return false;
            //没有主动跳出就会无限的循环
            for (;;) {
                //获取当前线程数
                int wc = workerCountOf(c);
                //判断下当前线程数是不是超过规定了,是就直接返回false
                if (wc >= CAPACITY ||
                    wc >= (core ? corePoolSize : maximumPoolSize))
                    return false;
                  //CAS操作,比较并做+1操作
                if (compareAndIncrementWorkerCount(c))
                    //失败了的话就直接跳出循环到retry那里,且不再进循环了
                    break retry;    
                c = ctl.get();  // Re-read ctl
                // 如果线程池状态被更改的话
                if (runStateOf(c) != rs)
                    //直接跳出循环到retry那里,会再次进循环
                    continue retry;
                // else CAS failed due to workerCount change; retry inner loop
            }
        }

        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try {
            //创建一个Worker对象,以Runnable实例对象firstTask为参数
            w = new Worker(firstTask);
            //拿到线程对象
            final Thread t = w.thread;
            //线程对象不为空的话
            if (t != null) {
                //定义个锁
                final ReentrantLock mainLock = this.mainLock;
                //加锁
                mainLock.lock();
                try {
                    //获取线程池状态
                    int rs = runStateOf(ctl.get());
                    //做校验,不符合则抛IllegalThreadStateException异常
                    if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) {
                        if (t.isAlive()) // precheck that t is startable
                            throw new IllegalThreadStateException();
                        //否则把w对象加到workers中,workers是HashSet类型的对象 
                        workers.add(w);
                        int s = workers.size();
                        //比较下,再赋个值
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                        //修改标志,说明添加成功了
                        workerAdded = true;
                    }
                } finally {
                    //解锁
                    mainLock.unlock();
                }
                //如果添加成功了
                if (workerAdded) {
                    //就调用start方法执行任务,这个start做的就是运行任务的事了
                    t.start();
                    //修改标志位
                    workerStarted = true;
                }
            }
        } finally {
            //最后处理下那些,添加成功了,但是没有开始执行的任务
            if (! workerStarted)
                //这个方法主要作用就是把以上任务移除掉
                addWorkerFailed(w);
        }
        //返回运行结果
        return workerStarted;
    }

看完这部分,其实还是没发现线程池到底是怎么做到线程复用的,不过可以发现有个关键类,接下来就看看它的相关源码:

Worker类的相关源码

//可以发现Worker实现了Runnable
private final class Worker extends AbstractQueuedSynchronizer
        implements Runnable{

       //这是Worker类的构造方法       
        Worker(Runnable firstTask) {
            //设置一下状态位state,和AQS有关
            setState(-1); // inhibit interrupts until runWorker
            //赋值
            this.firstTask = firstTask;
            //拿到线程工厂,并创建一个线程,然后很骚气的把Worker对象做为参数去做线程的初始化
            this.thread = getThreadFactory().newThread(this);
        }
}

实现了Runnable接口,那必然是要重写run方法的,看看Worker类的run方法:

  public void run() {
            runWorker(this);
        }

看看runWorker方法源码:

 final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        Runnable task = w.firstTask;
        //设置为null,帮助gc回收
        w.firstTask = null;
        w.unlock(); // allow interrupts
        boolean completedAbruptly = true;
        try {
            //这步做的通过取Worker对象的firstTask或者通过getTask方法从工作队列中获取待执行的任务,只要不为null,就一直会循环
            while (task != null || (task = getTask()) != null) {
                w.lock();
                if ((runStateAtLeast(ctl.get(), STOP) ||
                     (Thread.interrupted() &&
                      runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                    wt.interrupt();
                try {
                    beforeExecute(wt, task);
                    Throwable thrown = null;
                    try {
                        //然后直接调用task的run方法来执行具体的任务
                        task.run();
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                        thrown = x; throw new Error(x);
                    } finally {
                        afterExecute(task, thrown);
                    }
                } finally {
                    task = null;
                    w.completedTasks++;
                    w.unlock();
                }
            }
            completedAbruptly = false;
        } finally {
            processWorkerExit(w, completedAbruptly);
        }
    }

分析到这里,关键出现在task.run()上,首先我们知道task是Runnable类型的,直接调用run方法的话,JVM是不会帮我们去生成新线程的,就像和调用普通方法一样,所以一个线程始终都会在whlie循环的逻辑中不断的被重复利用,然后去取Worker对象的firstTask或者通过getTask方法从工作队列中获取待执行的任务,再直接调用Runnable类型的run方法执行任务。

总结

简单的从源码来总结下,执行任务先调用的是t.start()方法,这个t是个Thread对象,而这个Thread对象则是从Worker对象里获得的,在Worker在做初始化时就会赋值Thread,同时Worker初始化Thread对象时又是以自己作为参数来完成,而Worker对象又是个实现了Runnable接口的类,那Worker对象就肯定有自己的run方法,所以t.start()方法真正意义上调用的是Worker对象中重写的run方法,而这个Worker对象中的run方法里没有所谓的本地start方法,JVM自然不会再创建新的线程,而是把它当普通方法一样执行。再加上whlie循环体,这样就做到了Worker对象新建的线程始终都会在一个大循环里,而这个线程会反复的获取任务,接着执行任务,直到任务都执行完毕,这就是线程池实现“线程复用”的原理

如果你觉得文章对你有帮助,那就请作者喝杯咖啡吧☕
微信
支付宝
  0 条评论