如何实现线程复用?

  Java   19分钟   769浏览   0评论

引言

先说说使用线程池的好处,比如可以控制线程的数量,节省反复创建线程和销毁线程的开销等,在开发中的使用,一般来说任务量肯定是大于线程数量的,而为了防止出现OOM(Out Of Memory内存溢出),都是建议设置相对应业务的合适线程数量。那是在线程池中线程就只有那些,肯定是要做到线程的重复利用,才能执行超过线程的任务量的,那么线程池是怎么做到"线程复用"的呢?

从源码开始逐步分析,从execute()方法开始

//原子变量
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));

public void execute(Runnable command) {
        //判断提交的Runnable 任务,如果为null,则报NullPointerException
        if (command == null)
            throw new NullPointerException();    
        int c = ctl.get();
        //判断当前线程数是否小于核心线程数,如果小于,那就调用addWorker方法新增一个Worker,也可以理解成一个线程
        if (workerCountOf(c) < corePoolSize) {
        //addWorker这个方法主要要做的事就是执行command,同时第二个参数决定以哪个界限来进行是否新增线程的判断,传入true则代表以核心线程数为判断条件
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        //走到这步逻辑,则说明线程数大于等于核心线程数,或者addWorker方法调用失败了,这时就判断线程池是否是Running状态,如果是就调用offer方法提交线程任务到任务队列中
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            //如果线程池状态不是Running,说明线程池已经被关闭,这时就移除新提交到队列中的任务
            if (! isRunning(recheck) && remove(command))
                //执行拒绝策略
                reject(command);
             //检查下当前线程数是不是为0,为0的话就没有线程执行任务了   
            else if (workerCountOf(recheck) == 0)
                //所以就通过addWorker新建一个线程
                addWorker(null, false);
        }
        //走到这步逻辑,要么是线程池状态不是Running,说明已经关闭了,要么就是添加任务进任务队列时失败了,说明任务队列满了,这时候就该添加最大线程数了,传入false则代表以最大线程数为判断条件
        else if (!addWorker(command, false))
            //以上addWorker方法如果返回结果是false,就会执行拒绝策略了
            reject(command);
    }

以上粗略的过了下execute()方法的源码,可以发现有个addWorker()方法无处不在,同时它也有新建线程和运行任务的作用,所以重点看下

addWorker()方法源码

private boolean addWorker(Runnable firstTask, boolean core) {
        //可以当做一个标记位
        retry:
        //没有主动跳出就会无限的循环
        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);

            //做校验,比如rs >= SHUTDOWN说明线程池状态不正常,比如rs == SHUTDOWN说明线程池关闭了, firstTask == null说明无任务执行等,就直接返回false
            if (rs >= SHUTDOWN &&
                ! (rs == SHUTDOWN &&
                   firstTask == null &&
                   ! workQueue.isEmpty()))
                return false;
            //没有主动跳出就会无限的循环
            for (;;) {
                //获取当前线程数
                int wc = workerCountOf(c);
                //判断下当前线程数是不是超过规定了,是就直接返回false
                if (wc >= CAPACITY ||
                    wc >= (core ? corePoolSize : maximumPoolSize))
                    return false;
                  //CAS操作,比较并做+1操作
                if (compareAndIncrementWorkerCount(c))
                    //失败了的话就直接跳出循环到retry那里,且不再进循环了
                    break retry;    
                c = ctl.get();  // Re-read ctl
                // 如果线程池状态被更改的话
                if (runStateOf(c) != rs)
                    //直接跳出循环到retry那里,会再次进循环
                    continue retry;
                // else CAS failed due to workerCount change; retry inner loop
            }
        }

        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try {
            //创建一个Worker对象,以Runnable实例对象firstTask为参数
            w = new Worker(firstTask);
            //拿到线程对象
            final Thread t = w.thread;
            //线程对象不为空的话
            if (t != null) {
                //定义个锁
                final ReentrantLock mainLock = this.mainLock;
                //加锁
                mainLock.lock();
                try {
                    //获取线程池状态
                    int rs = runStateOf(ctl.get());
                    //做校验,不符合则抛IllegalThreadStateException异常
                    if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) {
                        if (t.isAlive()) // precheck that t is startable
                            throw new IllegalThreadStateException();
                        //否则把w对象加到workers中,workers是HashSet类型的对象 
                        workers.add(w);
                        int s = workers.size();
                        //比较下,再赋个值
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                        //修改标志,说明添加成功了
                        workerAdded = true;
                    }
                } finally {
                    //解锁
                    mainLock.unlock();
                }
                //如果添加成功了
                if (workerAdded) {
                    //就调用start方法执行任务,这个start做的就是运行任务的事了
                    t.start();
                    //修改标志位
                    workerStarted = true;
                }
            }
        } finally {
            //最后处理下那些,添加成功了,但是没有开始执行的任务
            if (! workerStarted)
                //这个方法主要作用就是把以上任务移除掉
                addWorkerFailed(w);
        }
        //返回运行结果
        return workerStarted;
    }

看完这部分,其实还是没发现线程池到底是怎么做到线程复用的,不过可以发现有个关键类,接下来就看看它的相关源码:

Worker类的相关源码

//可以发现Worker实现了Runnable
private final class Worker extends AbstractQueuedSynchronizer
        implements Runnable{

       //这是Worker类的构造方法       
        Worker(Runnable firstTask) {
            //设置一下状态位state,和AQS有关
            setState(-1); // inhibit interrupts until runWorker
            //赋值
            this.firstTask = firstTask;
            //拿到线程工厂,并创建一个线程,然后很骚气的把Worker对象做为参数去做线程的初始化
            this.thread = getThreadFactory().newThread(this);
        }
}

实现了Runnable接口,那必然是要重写run方法的,看看Worker类的run方法:

  public void run() {
            runWorker(this);
        }

看看runWorker方法源码:

 final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        Runnable task = w.firstTask;
        //设置为null,帮助gc回收
        w.firstTask = null;
        w.unlock(); // allow interrupts
        boolean completedAbruptly = true;
        try {
            //这步做的通过取Worker对象的firstTask或者通过getTask方法从工作队列中获取待执行的任务,只要不为null,就一直会循环
            while (task != null || (task = getTask()) != null) {
                w.lock();
                if ((runStateAtLeast(ctl.get(), STOP) ||
                     (Thread.interrupted() &&
                      runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                    wt.interrupt();
                try {
                    beforeExecute(wt, task);
                    Throwable thrown = null;
                    try {
                        //然后直接调用task的run方法来执行具体的任务
                        task.run();
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                        thrown = x; throw new Error(x);
                    } finally {
                        afterExecute(task, thrown);
                    }
                } finally {
                    task = null;
                    w.completedTasks++;
                    w.unlock();
                }
            }
            completedAbruptly = false;
        } finally {
            processWorkerExit(w, completedAbruptly);
        }
    }

分析到这里,关键出现在task.run()上,首先我们知道task是Runnable类型的,直接调用run方法的话,JVM是不会帮我们去生成新线程的,就像和调用普通方法一样,所以一个线程始终都会在whlie循环的逻辑中不断的被重复利用,然后去取Worker对象的firstTask或者通过getTask方法从工作队列中获取待执行的任务,再直接调用Runnable类型的run方法执行任务。

总结

简单的从源码来总结下,执行任务先调用的是t.start()方法,这个t是个Thread对象,而这个Thread对象则是从Worker对象里获得的,在Worker在做初始化时就会赋值Thread,同时Worker初始化Thread对象时又是以自己作为参数来完成,而Worker对象又是个实现了Runnable接口的类,那Worker对象就肯定有自己的run方法,所以t.start()方法真正意义上调用的是Worker对象中重写的run方法,而这个Worker对象中的run方法里没有所谓的本地start方法,JVM自然不会再创建新的线程,而是把它当普通方法一样执行。再加上whlie循环体,这样就做到了Worker对象新建的线程始终都会在一个大循环里,而这个线程会反复的获取任务,接着执行任务,直到任务都执行完毕,这就是线程池实现“线程复用”的原理

如果你觉得文章对你有帮助,那就请作者喝杯咖啡吧☕
微信
支付宝
😀
😃
😄
😁
😆
😅
🤣
😂
🙂
🙃
😉
😊
😇
🥰
😍
🤩
😘
😗
☺️
😚
😙
🥲
😋
😛
😜
🤪
😝
🤑
🤗
🤭
🫢
🫣
🤫
🤔
🤨
😐
😑
😶
😏
😒
🙄
😬
😮‍💨
🤤
😪
😴
😷
🤒
🤕
🤢
🤮
🤧
🥵
🥶
🥴
😵
😵‍💫
🤯
🥳
🥺
😠
😡
🤬
🤯
😈
👿
💀
☠️
💩
👻
👽
👾
🤖
😺
😸
😹
😻
😼
😽
🙀
😿
😾
👋
🤚
🖐️
✋️
🖖
🫱
🫲
🫳
🫴
🫷
🫸
👌
🤌
🤏
✌️
🤞
🫰
🤟
🤘
🤙
👈️
👉️
👆️
🖕
👇️
☝️
🫵
👍️
👎️
✊️
👊
🤛
🤜
👏
🙌
👐
🤲
🤝
🙏
✍️
💅
🤳
💪
🦾
🦿
🦵
🦶
👂
🦻
👃
👶
👧
🧒
👦
👩
🧑
👨
👩‍🦱
👨‍🦱
👩‍🦰
👨‍🦰
👱‍♀️
👱‍♂️
👩‍🦳
👨‍🦳
👩‍🦲
👨‍🦲
🧔‍♀️
🧔‍♂️
👵
🧓
👴
👲
👳‍♀️
👳‍♂️
🧕
👮‍♀️
👮‍♂️
👷‍♀️
👷‍♂️
💂‍♀️
💂‍♂️
🕵️‍♀️
🕵️‍♂️
👩‍⚕️
👨‍⚕️
👩‍🌾
👨‍🌾
👩‍🍳
👨‍🍳
🐶
🐱
🐭
🐹
🐰
🦊
🐻
🐼
🐨
🐯
🦁
🐮
🐷
🐸
🐵
🐔
🐧
🐦
🦅
🦉
🐴
🦄
🐝
🪲
🐞
🦋
🐢
🐍
🦖
🦕
🐬
🦭
🐳
🐋
🦈
🐙
🦑
🦀
🦞
🦐
🐚
🐌
🦋
🐛
🦟
🪰
🪱
🦗
🕷️
🕸️
🦂
🐢
🐍
🦎
🦖
🦕
🐊
🐢
🐉
🦕
🦖
🐘
🦏
🦛
🐪
🐫
🦒
🦘
🦬
🐃
🐂
🐄
🐎
🐖
🐏
🐑
🐐
🦌
🐕
🐩
🦮
🐕‍🦺
🐈
🐈‍⬛
🐓
🦃
🦚
🦜
🦢
🦩
🕊️
🐇
🦝
🦨
🦡
🦫
🦦
🦥
🐁
🐀
🐿️
🦔
🌵
🎄
🌲
🌳
🌴
🌱
🌿
☘️
🍀
🎍
🎋
🍃
🍂
🍁
🍄
🌾
💐
🌷
🌹
🥀
🌺
🌸
🌼
🌻
🌞
🌝
🌛
🌜
🌚
🌕
🌖
🌗
🌘
🌑
🌒
🌓
🌔
🌙
🌎
🌍
🌏
🪐
💫
🌟
🔥
💥
☄️
☀️
🌤️
🌥️
🌦️
🌧️
⛈️
🌩️
🌨️
❄️
☃️
🌬️
💨
💧
💦
🌊
🍇
🍈
🍉
🍊
🍋
🍌
🍍
🥭
🍎
🍏
🍐
🍑
🍒
🍓
🥝
🍅
🥥
🥑
🍆
🥔
🥕
🌽
🌶️
🥒
🥬
🥦
🧄
🧅
🍄
🥜
🍞
🥐
🥖
🥨
🥯
🥞
🧇
🧀
🍖
🍗
🥩
🥓
🍔
🍟
🍕
🌭
🥪
🌮
🌯
🥙
🧆
🥚
🍳
🥘
🍲
🥣
🥗
🍿
🧈
🧂
🥫
🍱
🍘
🍙
🍚
🍛
🍜
🍝
🍠
🍢
🍣
🍤
🍥
🥮
🍡
🥟
🥠
🥡
🦪
🍦
🍧
🍨
🍩
🍪
🎂
🍰
🧁
🥧
🍫
🍬
🍭
🍮
🍯
🍼
🥛
🍵
🍶
🍾
🍷
🍸
🍹
🍺
🍻
🥂
🥃
🥤
🧃
🧉
🧊
🗺️
🏔️
⛰️
🌋
🏕️
🏖️
🏜️
🏝️
🏞️
🏟️
🏛️
🏗️
🏘️
🏙️
🏚️
🏠
🏡
🏢
🏣
🏤
🏥
🏦
🏨
🏩
🏪
🏫
🏬
🏭
🏯
🏰
💒
🗼
🗽
🕌
🛕
🕍
⛩️
🕋
🌁
🌃
🏙️
🌄
🌅
🌆
🌇
🌉
🎠
🎡
🎢
💈
🎪
🚂
🚃
🚄
🚅
🚆
🚇
🚈
🚉
🚊
🚝
🚞
🚋
🚌
🚍
🚎
🚐
🚑
🚒
🚓
🚔
🚕
🚖
🚗
🚘
🚙
🚚
🚛
🚜
🏎️
🏍️
🛵
🦽
🦼
🛺
🚲
🛴
🛹
🚏
🛣️
🛤️
🛢️
🚨
🚥
🚦
🚧
🛶
🚤
🛳️
⛴️
🛥️
🚢
✈️
🛩️
🛫
🛬
🪂
💺
🚁
🚟
🚠
🚡
🛰️
🚀
🛸
🧳
📱
💻
⌨️
🖥️
🖨️
🖱️
🖲️
💽
💾
📀
📼
🔍
🔎
💡
🔦
🏮
📔
📕
📖
📗
📘
📙
📚
📓
📒
📃
📜
📄
📰
🗞️
📑
🔖
🏷️
💰
💴
💵
💶
💷
💸
💳
🧾
✉️
📧
📨
📩
📤
📥
📦
📫
📪
📬
📭
📮
🗳️
✏️
✒️
🖋️
🖊️
🖌️
🖍️
📝
📁
📂
🗂️
📅
📆
🗒️
🗓️
📇
📈
📉
📊
📋
📌
📍
📎
🖇️
📏
📐
✂️
🗃️
🗄️
🗑️
🔒
🔓
🔏
🔐
🔑
🗝️
🔨
🪓
⛏️
⚒️
🛠️
🗡️
⚔️
🔫
🏹
🛡️
🔧
🔩
⚙️
🗜️
⚗️
🧪
🧫
🧬
🔬
🔭
📡
💉
🩸
💊
🩹
🩺
🚪
🛏️
🛋️
🪑
🚽
🚿
🛁
🧴
🧷
🧹
🧺
🧻
🧼
🧽
🧯
🛒
🚬
⚰️
⚱️
🗿
🏧
🚮
🚰
🚹
🚺
🚻
🚼
🚾
🛂
🛃
🛄
🛅
⚠️
🚸
🚫
🚳
🚭
🚯
🚱
🚷
📵
🔞
☢️
☣️
❤️
🧡
💛
💚
💙
💜
🖤
💔
❣️
💕
💞
💓
💗
💖
💘
💝
💟
☮️
✝️
☪️
🕉️
☸️
✡️
🔯
🕎
☯️
☦️
🛐
🆔
⚛️
🉑
☢️
☣️
📴
📳
🈶
🈚
🈸
🈺
🈷️
✴️
🆚
💮
🉐
㊙️
㊗️
🈴
🈵
🈹
🈲
🅰️
🅱️
🆎
🆑
🅾️
🆘
🛑
💢
💯
💠
♨️
🚷
🚯
🚳
🚱
🔞
📵
🚭
‼️
⁉️
🔅
🔆
🔱
⚜️
〽️
⚠️
🚸
🔰
♻️
🈯
💹
❇️
✳️
🌐
💠
Ⓜ️
🌀
💤
🏧
🚾
🅿️
🈳
🈂️
🛂
🛃
🛄
🛅
  0 条评论