欧美成人午夜免费全部完,亚洲午夜福利精品久久,а√最新版在线天堂,另类亚洲综合区图片小说区,亚洲欧美日韩精品色xxx

扣丁學(xué)堂淺談Java培訓(xùn)之線程池原理分析與實(shí)際運(yùn)用詳解

2018-01-15 11:24:58 1262瀏覽

今天扣丁學(xué)堂給大家介紹一下關(guān)于Java線程池原理分析與實(shí)際運(yùn)用的詳細(xì)介紹,首先在我們的開發(fā)中“池”的概念并不罕見,有數(shù)據(jù)庫(kù)連接池、線程池、對(duì)象池、常量池等等。下面我們主要針對(duì)線程池詳細(xì)介紹一下吧。



一、使用線程池的好處

1、降低資源消耗

可以重復(fù)利用已創(chuàng)建的線程降低線程創(chuàng)建和銷毀造成的消耗。

2、提高響應(yīng)速度

當(dāng)任務(wù)到達(dá)時(shí),任務(wù)可以不需要等到線程創(chuàng)建就能立即執(zhí)行。

3、提高線程的可管理性

線程是稀缺資源,如果無(wú)限制地創(chuàng)建,不僅會(huì)消耗系統(tǒng)資源,還會(huì)降低系統(tǒng)的穩(wěn)定性,使用線程池可以進(jìn)行統(tǒng)一分配、調(diào)優(yōu)和監(jiān)控。

二、線程池的工作原理

首先我們看下當(dāng)一個(gè)新的任務(wù)提交到線程池之后,線程池是如何處理的

1、線程池判斷核心線程池里的線程是否都在執(zhí)行任務(wù)。如果不是,則創(chuàng)建一個(gè)新的工作線程來(lái)執(zhí)行任務(wù)。如果核心線程池里的線程都在執(zhí)行任務(wù),則執(zhí)行第二步。

2、線程池判斷工作隊(duì)列是否已經(jīng)滿。如果工作隊(duì)列沒(méi)有滿,則將新提交的任務(wù)存儲(chǔ)在這個(gè)工作隊(duì)列里進(jìn)行等待。如果工作隊(duì)列滿了,則執(zhí)行第三步

3、線程池判斷線程池的線程是否都處于工作狀態(tài)。如果沒(méi)有,則創(chuàng)建一個(gè)新的工作線程來(lái)執(zhí)行任務(wù)。如果已經(jīng)滿了,則交給飽和策略來(lái)處理這個(gè)任務(wù)。

三、線程池飽和策略

這里提到了線程池的飽和策略,那我們就簡(jiǎn)單介紹下有哪些飽和策略:

AbortPolicy

為Java線程池默認(rèn)的阻塞策略,不執(zhí)行此任務(wù),而且直接拋出一個(gè)運(yùn)行時(shí)異常,切記ThreadPoolExecutor.execute需要trycatch,否則程序會(huì)直接退出。

DiscardPolicy

直接拋棄,任務(wù)不執(zhí)行,空方法

DiscardOldestPolicy

從隊(duì)列里面拋棄head的一個(gè)任務(wù),并再次execute此task。

CallerRunsPolicy

在調(diào)用execute的線程里面執(zhí)行此command,會(huì)阻塞入口

用戶自定義拒絕策略(最常用)

實(shí)現(xiàn)RejectedExecutionHandler,并自己定義策略模式

1、如果當(dāng)前運(yùn)行的線程少于corePoolSize,則創(chuàng)建新線程來(lái)執(zhí)行任務(wù)(注意,執(zhí)行這一步驟需要獲取全局鎖)。

2、如果運(yùn)行的線程等于或多于corePoolSize,則將任務(wù)加入BlockingQueue。

3、如果無(wú)法將任務(wù)加入BlockingQueue(隊(duì)列已滿),則在非corePool中創(chuàng)建新的線程來(lái)處理任務(wù)(注意,執(zhí)行這一步驟需要獲取全局鎖)。

4、如果創(chuàng)建新線程將使當(dāng)前運(yùn)行的線程超出maximumPoolSize,任務(wù)將被拒絕,并調(diào)用RejectedExecutionHandler.rejectedExecution()方法。

ThreadPoolExecutor采取上述步驟的總體設(shè)計(jì)思路,是為了在執(zhí)行execute()方法時(shí),盡可能地避免獲取全局鎖(那將會(huì)是一個(gè)嚴(yán)重的可伸縮瓶頸)。在ThreadPoolExecutor完成預(yù)熱之后(當(dāng)前運(yùn)行的線程數(shù)大于等于corePoolSize),幾乎所有的execute()方法調(diào)用都是執(zhí)行步驟2,而步驟2不需要獲取全局鎖。

四、關(guān)鍵方法源碼分析

我們看看核心方法添加到線程池方法execute的源碼如下:

//

//Executesthegiventasksometimeinthefuture.Thetask

//mayexecuteinanewthreadorinanexistingpooledthread.

//

//Ifthetaskcannotbesubmittedforexecution,eitherbecausethis

//executorhasbeenshutdownorbecauseitscapacityhasbeenreached,

//thetaskishandledbythecurrent{@codeRejectedExecutionHandler}.

//

//@paramcommandthetasktoexecute

//@throwsRejectedExecutionExceptionatdiscretionof

//{@codeRejectedExecutionHandler},ifthetask

//cannotbeacceptedforexecution

//@throwsNullPointerExceptionif{@codecommand}isnull

//

publicvoidexecute(Runnablecommand){

if(command==null)

thrownewNullPointerException();

//

//Proceedin3steps:

//

//1.IffewerthancorePoolSizethreadsarerunning,tryto

//startanewthreadwiththegivencommandasitsfirst

//task.ThecalltoaddWorkeratomicallychecksrunStateand

//workerCount,andsopreventsfalsealarmsthatwouldadd

//threadswhenitshouldn't,byreturningfalse.

//翻譯如下:

//判斷當(dāng)前的線程數(shù)是否小于corePoolSize如果是,使用入?yún)⑷蝿?wù)通過(guò)addWord方法創(chuàng)建一個(gè)新的線程,

//如果能完成新線程創(chuàng)建exexute方法結(jié)束,成功提交任務(wù)

//2.Ifataskcanbesuccessfullyqueued,thenwestillneed

//todouble-checkwhetherweshouldhaveaddedathread

//(becauseexistingonesdiedsincelastchecking)orthat

//thepoolshutdownsinceentryintothismethod.Sowe

//recheckstateandifnecessaryrollbacktheenqueuingif

//stopped,orstartanewthreadiftherearenone.

//翻譯如下:

//在第一步?jīng)]有完成任務(wù)提交;狀態(tài)為運(yùn)行并且能否成功加入任務(wù)到工作隊(duì)列后,再進(jìn)行一次check,如果狀態(tài)

//在任務(wù)加入隊(duì)列后變?yōu)榱朔沁\(yùn)行(有可能是在執(zhí)行到這里線程池shutdown了),非運(yùn)行狀態(tài)下當(dāng)然是需要

//reject;然后再判斷當(dāng)前線程數(shù)是否為0(有可能這個(gè)時(shí)候線程數(shù)變?yōu)榱?),如是,新增一個(gè)線程;

//3.Ifwecannotqueuetask,thenwetrytoaddanew

//thread.Ifitfails,weknowweareshutdownorsaturated

//andsorejectthetask.

//翻譯如下:

//如果不能加入任務(wù)到工作隊(duì)列,將嘗試使用任務(wù)新增一個(gè)線程,如果失敗,則是線程池已經(jīng)shutdown或者線程池

//已經(jīng)達(dá)到飽和狀態(tài),所以reject這個(gè)他任務(wù)

//

intc=ctl.get();

//工作線程數(shù)小于核心線程數(shù)

if(workerCountOf(c)<corePoolSize){

//直接啟動(dòng)新線程,true表示會(huì)再次檢查workerCount是否小于corePoolSize

if(addWorker(command,true))

return;

c=ctl.get();

}

//如果工作線程數(shù)大于等于核心線程數(shù)

//線程的的狀態(tài)未RUNNING并且隊(duì)列notfull

if(isRunning(c)&&workQueue.offer(command)){

//再次檢查線程的運(yùn)行狀態(tài),如果不是RUNNING直接從隊(duì)列中移除

intrecheck=ctl.get();

if(!isRunning(recheck)&&remove(command))

//移除成功,拒絕該非運(yùn)行的任務(wù)

reject(command);

elseif(workerCountOf(recheck)==0)

//防止了SHUTDOWN狀態(tài)下沒(méi)有活動(dòng)線程了,但是隊(duì)列里還有任務(wù)沒(méi)執(zhí)行這種特殊情況。

//添加一個(gè)null任務(wù)是因?yàn)镾HUTDOWN狀態(tài)下,線程池不再接受新任務(wù)

addWorker(null,false);

}

//如果隊(duì)列滿了或者是非運(yùn)行的任務(wù)都拒絕執(zhí)行

elseif(!addWorker(command,false))

reject(command);

}

下面我們繼續(xù)看看addWorker是如何實(shí)現(xiàn)的:

privatebooleanaddWorker(RunnablefirstTask,booleancore){

//java標(biāo)簽

retry:

//死循環(huán)

for(;;){

intc=ctl.get();

//獲取當(dāng)前線程狀態(tài)

intrs=runStateOf(c);

//Checkifqueueemptyonlyifnecessary.

//這個(gè)邏輯判斷有點(diǎn)繞可以改成

//rs>=shutdown&&(rs!=shutdown||firstTask!=null||workQueue.isEmpty())

//邏輯判斷成立可以分為以下幾種情況均不接受新任務(wù)

//1、rs>shutdown:--不接受新任務(wù)

//2、rs>=shutdown&&firstTask!=null:--不接受新任務(wù)

//3、rs>=shutdown&&workQueue.isEmppty:--不接受新任務(wù)

//邏輯判斷不成立

//1、rs==shutdown&&firstTask!=null:此時(shí)不接受新任務(wù),但是仍會(huì)執(zhí)行隊(duì)列中的任務(wù)

//2、rs==shotdown&&firstTask==null:會(huì)執(zhí)行addWork(null,false)

//防止了SHUTDOWN狀態(tài)下沒(méi)有活動(dòng)線程了,但是隊(duì)列里還有任務(wù)沒(méi)執(zhí)行這種特殊情況。

//添加一個(gè)null任務(wù)是因?yàn)镾HUTDOWN狀態(tài)下,線程池不再接受新任務(wù)

if(rs>=SHUTDOWN&&!(rs==SHUTDOWN&&firstTask==null&&!workQueue.isEmpty()))

returnfalse;

//死循環(huán)

//如果線程池狀態(tài)為RUNNING并且隊(duì)列中還有需要執(zhí)行的任務(wù)

for(;;){

//獲取線程池中線程數(shù)量

intwc=workerCountOf(c);

//如果超出容量或者較大線程池容量不在接受新任務(wù)

if(wc>=CAPACITY||wc>=(core?corePoolSize:maximumPoolSize))

returnfalse;

//線程安全增加工作線程數(shù)

if(compareAndIncrementWorkerCount(c))

//跳出retry

breakretry;

c=ctl.get();//Re-readctl

//如果線程池狀態(tài)發(fā)生變化,重新循環(huán)

if(runStateOf(c)!=rs)

continueretry;

//elseCASfailedduetoworkerCountchange;retryinnerloop

}

}

//走到這里說(shuō)明工作線程數(shù)增加成功

booleanworkerStarted=false;

booleanworkerAdded=false;

Workerw=null;

try{

finalReentrantLockmainLock=this.mainLock;

w=newWorker(firstTask);

finalThreadt=w.thread;

if(t!=null){

//加鎖

mainLock.lock();

try{

//Recheckwhileholdinglock.

//BackoutonThreadFactoryfailureorif

//shutdownbeforelockacquired.

intc=ctl.get();

intrs=runStateOf(c);

//RUNNING狀態(tài)||SHUTDONW狀態(tài)下清理隊(duì)列中剩余的任務(wù)

if(rs<SHUTDOWN||

(rs==SHUTDOWN&&firstTask==null)){

//檢查線程狀態(tài)

if(t.isAlive())//precheckthattisstartable

thrownewIllegalThreadStateException();

//將新啟動(dòng)的線程添加到線程池中

workers.add(w);

//更新線程池線程數(shù)且不超過(guò)較大值

ints=workers.size();

if(s>largestPoolSize)

largestPoolSize=s;

workerAdded=true;

}

}finally{

mainLock.unlock();

}

//啟動(dòng)新添加的線程,這個(gè)線程首先執(zhí)行firstTask,然后不停的從隊(duì)列中取任務(wù)執(zhí)行

if(workerAdded){

//執(zhí)行ThreadPoolExecutor的runWoker方法

t.start();

workerStarted=true;

}

}

}finally{

//線程啟動(dòng)失敗,則從wokers中移除w并遞減wokerCount

if(!workerStarted)

//遞減wokerCount會(huì)觸發(fā)tryTerminate方法

addWorkerFailed(w);

}

returnworkerStarted;

}

addWorker之后是runWorker,第一次啟動(dòng)會(huì)執(zhí)行初始化傳進(jìn)來(lái)的任務(wù)firstTask;然后會(huì)從workQueue中取任務(wù)執(zhí)行,如果隊(duì)列為空則等待keepAliveTime這么長(zhǎng)時(shí)間。

finalvoidrunWorker(Workerw){

Threadwt=Thread.currentThread();

Runnabletask=w.firstTask;

w.firstTask=null;

//允許中斷

w.unlock();//allowinterrupts

booleancompletedAbruptly=true;

try{

//如果getTask返回null那么getTask中會(huì)將workerCount遞減,如果異常了這個(gè)遞減操作會(huì)在processWorkerExit中處理

while(task!=null||(task=getTask())!=null){

w.lock();

//Ifpoolisstopping,ensurethreadisinterrupted;

//ifnot,ensurethreadisnotinterrupted.This

//requiresarecheckinsecondcasetodealwith

//shutdownNowracewhileclearinginterrupt

if((runStateAtLeast(ctl.get(),STOP)||

(Thread.interrupted()&&

runStateAtLeast(ctl.get(),STOP)))&&

!wt.isInterrupted())

wt.interrupt();

try{

beforeExecute(wt,task);

Throwablethrown=null;

try{

task.run();

}catch(RuntimeExceptionx){

thrown=x;throwx;

}catch(Errorx){

thrown=x;throwx;

}catch(Throwablex){

thrown=x;thrownewError(x);

}finally{

afterExecute(task,thrown);

}

}finally{

task=null;

w.completedTasks++;

w.unlock();

}

}

completedAbruptly=false;

}finally{

processWorkerExit(w,completedAbruptly);

}

}

我們看下getTask是如何執(zhí)行的:

privateRunnablegetTask(){

booleantimedOut=false;//Didthelastpoll()timeout?

//死循環(huán)

retry:for(;;){

//獲取線程池狀態(tài)

intc=ctl.get();

intrs=runStateOf(c);

//Checkifqueueemptyonlyifnecessary.

//1.rs>SHUTDOWN所以rs至少等于STOP,這時(shí)不再處理隊(duì)列中的任務(wù)

//2.rs=SHUTDOWN所以rs>=STOP肯定不成立,這時(shí)還需要處理隊(duì)列中的任務(wù)除非隊(duì)列為空

//這兩種情況都會(huì)返回null讓runWoker退出while循環(huán)也就是當(dāng)前線程結(jié)束了,所以必須要decrement

if(rs>=SHUTDOWN&&(rs>=STOP||workQueue.isEmpty())){

//遞減workerCount值

decrementWorkerCount();

returnnull;

}

//標(biāo)記從隊(duì)列中取任務(wù)時(shí)是否設(shè)置超時(shí)時(shí)間

booleantimed;//Areworkerssubjecttoculling?

//1.RUNING狀態(tài)

//2.SHUTDOWN狀態(tài),但隊(duì)列中還有任務(wù)需要執(zhí)行

for(;;){

intwc=workerCountOf(c);

//1.corethread允許被超時(shí),那么超過(guò)corePoolSize的的線程必定有超時(shí)

//2.allowCoreThreadTimeOut==false&&wc>

//corePoolSize時(shí),一般都是這種情況,corethread即使空閑也不會(huì)被回收,只要超過(guò)的線程才會(huì)

timed=allowCoreThreadTimeOut||wc>corePoolSize;

//從addWorker可以看到一般wc不會(huì)大于maximumPoolSize,所以更關(guān)心后面半句的情形:

//1.timedOut==false第一次執(zhí)行循環(huán),從隊(duì)列中取出任務(wù)不為null方法返回或者

//poll出異常了重試

//2.timeOut==true&&timed==

//false:看后面的代碼workerQueue.poll超時(shí)時(shí)timeOut才為true,

//并且timed要為false,這兩個(gè)條件相悖不可能同時(shí)成立(既然有超時(shí)那么timed肯定為true)

//所以超時(shí)不會(huì)繼續(xù)執(zhí)行而是returnnull結(jié)束線程。

if(wc<=maximumPoolSize&&!(timedOut&&timed))

break;

//workerCount遞減,結(jié)束當(dāng)前thread

if(compareAndDecrementWorkerCount(c))

returnnull;

c=ctl.get();//Re-readctl

//需要重新檢查線程池狀態(tài),因?yàn)樯鲜霾僮鬟^(guò)程中線程池可能被SHUTDOWN

if(runStateOf(c)!=rs)

continueretry;

//elseCASfailedduetoworkerCountchange;retryinnerloop

}

try{

//1.以指定的超時(shí)時(shí)間從隊(duì)列中取任務(wù)

//2.corethread沒(méi)有超時(shí)

Runnabler=timed?workQueue.poll(keepAliveTime,TimeUnit.NANOSECONDS):workQueue.take();

if(r!=null)

returnr;

timedOut=true;//超時(shí)

}catch(InterruptedExceptionretry){

timedOut=false;//線程被中斷重試

}

}

}

下面我們看下processWorkerExit是如何工作的:

privatevoidprocessWorkerExit(Workerw,booleancompletedAbruptly){

//正常的話再runWorker的getTask方法workerCount已經(jīng)被減一了

if(completedAbruptly)

decrementWorkerCount();

finalReentrantLockmainLock=this.mainLock;

mainLock.lock();

try{

//累加線程的completedTasks

completedTaskCount+=w.completedTasks;

//從線程池中移除超時(shí)或者出現(xiàn)異常的線程

workers.remove(w);

}finally{

mainLock.unlock();

}

//嘗試停止線程池

tryTerminate();

intc=ctl.get();

//runState為RUNNING或SHUTDOWN

if(runStateLessThan(c,STOP)){

//線程不是異常結(jié)束

if(!completedAbruptly){

//線程池最小空閑數(shù),允許corethread超時(shí)就是0,否則就是corePoolSize

intmin=allowCoreThreadTimeOut?0:corePoolSize;

//如果min==0但是隊(duì)列不為空要保證有1個(gè)線程來(lái)執(zhí)行隊(duì)列中的任務(wù)

if(min==0&&!workQueue.isEmpty())

min=1;

//線程池還不為空那就不用擔(dān)心了

if(workerCountOf(c)>=min)

return;//replacementnotneeded

}

//1.線程異常退出

//2.線程池為空,但是隊(duì)列中還有任務(wù)沒(méi)執(zhí)行,看addWoker方法對(duì)這種情況的處理

addWorker(null,false);

}

}

tryTerminate

processWorkerExit方法中會(huì)嘗試調(diào)用tryTerminate來(lái)終止線程池。這個(gè)方法在任何可能導(dǎo)致線程池終止的動(dòng)作后執(zhí)行:比如減少wokerCount或SHUTDOWN狀態(tài)下從隊(duì)列中移除任務(wù)。

finalvoidtryTerminate(){

for(;;){

intc=ctl.get();

//以下狀態(tài)直接返回:

//1.線程池還處于RUNNING狀態(tài)

//2.SHUTDOWN狀態(tài)但是任務(wù)隊(duì)列非空

//3.runState>=TIDYING線程池已經(jīng)停止了或在停止了

if(isRunning(c)||runStateAtLeast(c,TIDYING)||(runStateOf(c)==SHUTDOWN&&!workQueue.isEmpty()))

return;

//只能是以下情形會(huì)繼續(xù)下面的邏輯:結(jié)束線程池。

//1.SHUTDOWN狀態(tài),這時(shí)不再接受新任務(wù)而且任務(wù)隊(duì)列也空了

//2.STOP狀態(tài),當(dāng)調(diào)用了shutdownNow方法

//workerCount不為0則還不能停止線程池,而且這時(shí)線程都處于空閑等待的狀態(tài)

//需要中斷讓線程“醒”過(guò)來(lái),醒過(guò)來(lái)的線程才能繼續(xù)處理shutdown的信號(hào)。

if(workerCountOf(c)!=0){//Eligibletoterminate

//runWoker方法中w.unlock就是為了可以被中斷,getTask方法也處理了中斷。

//ONLY_ONE:這里只需要中斷1個(gè)線程去處理shutdown信號(hào)就可以了。

interruptIdleWorkers(ONLY_ONE);

return;

}

finalReentrantLockmainLock=this.mainLock;

mainLock.lock();

try{

//進(jìn)入TIDYING狀態(tài)

if(ctl.compareAndSet(c,ctlOf(TIDYING,0))){

try{

//子類重載:一些資源清理工作

terminated();

}finally{

//TERMINATED狀態(tài)

ctl.set(ctlOf(TERMINATED,0));

//繼續(xù)awaitTermination

termination.signalAll();

}

return;

}

}finally{

mainLock.unlock();

}

//elseretryonfailedCAS

}

}

shutdown這個(gè)方法會(huì)將runState置為SHUTDOWN,會(huì)終止所有空閑的線程。shutdownNow方法將runState置為STOP。和shutdown方法的區(qū)別,這個(gè)方法會(huì)終止所有的線程。主要區(qū)別在于shutdown調(diào)用的是interruptIdleWorkers這個(gè)方法,而shutdownNow實(shí)際調(diào)用的是Worker類的interruptIfStarted方法:

他們的實(shí)現(xiàn)如下:

publicvoidshutdown(){

finalReentrantLockmainLock=this.mainLock;

mainLock.lock();

try{

checkShutdownAccess();

//線程池狀態(tài)設(shè)為SHUTDOWN,如果已經(jīng)至少是這個(gè)狀態(tài)那么則直接返回

advanceRunState(SHUTDOWN);

//注意這里是中斷所有空閑的線程:runWorker中等待的線程被中斷→進(jìn)入processWorkerExit→

//tryTerminate方法中會(huì)保證隊(duì)列中剩余的任務(wù)得到執(zhí)行。

interruptIdleWorkers();

onShutdown();//hookforScheduledThreadPoolExecutor

}finally{

mainLock.unlock();

}

tryTerminate();

}

publicListshutdownNow(){

Listtasks;

finalReentrantLockmainLock=this.mainLock;

mainLock.lock();

try{

checkShutdownAccess();

//STOP狀態(tài):不再接受新任務(wù)且不再執(zhí)行隊(duì)列中的任務(wù)。

advanceRunState(STOP);

//中斷所有線程

interruptWorkers();

//返回隊(duì)列中還沒(méi)有被執(zhí)行的任務(wù)。

tasks=drainQueue();

}

finally{

mainLock.unlock();

}

tryTerminate();

returntasks;

}

privatevoidinterruptIdleWorkers(booleanonlyOne){

finalReentrantLockmainLock=this.mainLock;

mainLock.lock();

try{

for(Workerw:workers){

Threadt=w.thread;

//w.tryLock能獲取到鎖,說(shuō)明該線程沒(méi)有在運(yùn)行,因?yàn)閞unWorker中執(zhí)行任務(wù)會(huì)先lock,

//因此保證了中斷的肯定是空閑的線程。

if(!t.isInterrupted()&&w.tryLock()){

try{

t.interrupt();

}catch(SecurityExceptionignore){

}finally{

w.unlock();

}

}

if(onlyOne)

break;

}

}

finally{

mainLock.unlock();

}

}

voidinterruptIfStarted(){

Threadt;

//初始化時(shí)state==-1

if(getState()>=0&&(t=thread)!=null&&!t.isInterrupted()){

try{

t.interrupt();

}catch(SecurityExceptionignore){

}

}

}

五、線程池的使用

線程池的創(chuàng)建

我們可以通過(guò)ThreadPoolExecutor來(lái)創(chuàng)建一個(gè)線程池

/**

*@paramcorePoolSize線程池基本大小,核心線程池大小,活動(dòng)線程小于corePoolSize則直接創(chuàng)建,大于等于則先加到workQueue中,

*隊(duì)列滿了才創(chuàng)建新的線程。當(dāng)提交一個(gè)任務(wù)到線程池時(shí),線程池會(huì)創(chuàng)建一個(gè)線程來(lái)執(zhí)行任務(wù),即使其他空閑的基本線程能夠執(zhí)行新任務(wù)也會(huì)創(chuàng)建線程,

*等到需要執(zhí)行的任務(wù)數(shù)大于線程池基本大小時(shí)就不再創(chuàng)建。如果調(diào)用了線程池的prestartAllCoreThreads()方法,

*線程池會(huì)提前創(chuàng)建并啟動(dòng)所有基本線程。

*@parammaximumPoolSize較大線程數(shù),超過(guò)就reject;線程池允許創(chuàng)建的較大線程數(shù)。如果隊(duì)列滿了,

*并且已創(chuàng)建的線程數(shù)小于較大線程數(shù),則線程池會(huì)再創(chuàng)建新的線程執(zhí)行任務(wù)

*@paramkeepAliveTime

*線程池的工作線程空閑后,保持存活的時(shí)間。所以,如果任務(wù)很多,并且每個(gè)任務(wù)執(zhí)行的時(shí)間比較短,可以調(diào)大時(shí)間,提高線程的利用率

*@paramunit線程活動(dòng)保持時(shí)間的單位):可選的單位有天(DAYS)、小時(shí)(HOURS)、分鐘(MINUTES)、

*毫秒(MILLISECONDS)、微秒(MICROSECONDS,千分之一毫秒)和納秒(NANOSECONDS,千分之一微秒)

*@paramworkQueue工作隊(duì)列,線程池中的工作線程都是從這個(gè)工作隊(duì)列源源不斷的獲取任務(wù)進(jìn)行執(zhí)行

*/

publicThreadPoolExecutor(intcorePoolSize,

intmaximumPoolSize,

longkeepAliveTime,

TimeUnitunit,

BlockingQueueworkQueue){

//threadFactory用于設(shè)置創(chuàng)建線程的工廠,可以通過(guò)線程工廠給每個(gè)創(chuàng)建出來(lái)的線程設(shè)置更有意義的名字

this(corePoolSize,maximumPoolSize,keepAliveTime,unit,workQueue,

Executors.defaultThreadFactory(),defaultHandler);

}

向線程池提交任務(wù)

可以使用兩個(gè)方法向線程池提交任務(wù),分別為execute()和submit()方法。execute()方法用于提交不需要返回值的任務(wù),所以無(wú)法判斷任務(wù)是否被線程池執(zhí)行成功。通過(guò)以下代碼可知execute()方法輸入的任務(wù)是一個(gè)Runnable類的實(shí)例。

threadsPool.execute(newRunnable(){

@Override

publicvoidrun(){

}

});

submit()方法用于提交需要返回值的任務(wù)。線程池會(huì)返回一個(gè)future類型的對(duì)象,通過(guò)這個(gè)future對(duì)象可以判斷任務(wù)是否執(zhí)行成功,并且可以通過(guò)future的get()方法來(lái)獲取返回值,get()方法會(huì)阻塞當(dāng)前線程直到任務(wù)完成,而使用get(longtimeout,TimeUnitunit)方法則會(huì)阻塞當(dāng)前線程一段時(shí)間后立即返回,這時(shí)候有可能任務(wù)沒(méi)有執(zhí)行完。

六、關(guān)閉線程池

可以通過(guò)調(diào)用線程池的shutdown或shutdownNow方法來(lái)關(guān)閉線程池。它們的原理是遍歷線程池中的工作線程,然后逐個(gè)調(diào)用線程的interrupt方法來(lái)中斷線程,所以無(wú)法響應(yīng)中斷的任務(wù)可能永遠(yuǎn)無(wú)法終止。但是它們存在一定的區(qū)別,shutdownNow首先將線程池的狀態(tài)設(shè)置成STOP,然后嘗試停止所有的正在執(zhí)行或暫停任務(wù)的線程,并返回等待執(zhí)行任務(wù)的列表,而shutdown只是將線程池的狀態(tài)設(shè)置成SHUTDOWN狀態(tài),然后中斷所有沒(méi)有正在執(zhí)行任務(wù)的線程。

只要調(diào)用了這兩個(gè)關(guān)閉方法中的任意一個(gè),isShutdown方法就會(huì)返回true。當(dāng)所有的任務(wù)都已關(guān)閉后,才表示線程池關(guān)閉成功,這時(shí)調(diào)用isTerminaed方法會(huì)返回true。至于應(yīng)該調(diào)用哪一種方法來(lái)關(guān)閉線程池,應(yīng)該由提交到線程池的任務(wù)特性決定,通常調(diào)用shutdown方法來(lái)關(guān)閉線程池,如果任務(wù)不一定要執(zhí)行完,則可以調(diào)用shutdownNow方法。

七、合理的配置線程池

要想合理地配置線程池,就必須首先分析任務(wù)特性,可以從以下幾個(gè)角度來(lái)分析。

1、任務(wù)的性質(zhì):CPU密集型任務(wù)、IO密集型任務(wù)和混合型任務(wù)。

2、任務(wù)的優(yōu)先級(jí):高、中和低。

3、任務(wù)的執(zhí)行時(shí)間:長(zhǎng)、中和短。

4、任務(wù)的依賴性:是否依賴其他系統(tǒng)資源,如數(shù)據(jù)庫(kù)連接。

性質(zhì)不同的任務(wù)可以用不同規(guī)模的線程池分開處理。CPU密集型任務(wù)應(yīng)配置盡可能小的線程,如配置Ncpu+1個(gè)線程的線程池。由于IO密集型任務(wù)線程并不是一直在執(zhí)行任務(wù),則應(yīng)配置盡可能多的線程,如2*Ncpu?;旌闲偷娜蝿?wù),如果可以拆分,將其拆分成一個(gè)CPU密集型任務(wù)和一個(gè)IO密集型任務(wù),只要這兩個(gè)任務(wù)執(zhí)行的時(shí)間相差不是太大,那么分解后執(zhí)行的吞吐量將高于串行執(zhí)行的吞吐量。如果這兩個(gè)任務(wù)執(zhí)行時(shí)間相差太大,則沒(méi)必要進(jìn)行分解??梢酝ㄟ^(guò)Runtime.getRuntime().availableProcessors()方法獲得當(dāng)前設(shè)備的CPU個(gè)數(shù)。優(yōu)先級(jí)不同的任務(wù)可以使用優(yōu)先級(jí)隊(duì)列PriorityBlockingQueue來(lái)處理。它可以讓優(yōu)先級(jí)高的任務(wù)先執(zhí)行

如果一直有優(yōu)先級(jí)高的任務(wù)提交到隊(duì)列里,那么優(yōu)先級(jí)低的任務(wù)可能永遠(yuǎn)不能執(zhí)行。執(zhí)行時(shí)間不同的任務(wù)可以交給不同規(guī)模的線程池來(lái)處理,或者可以使用優(yōu)先級(jí)隊(duì)列,讓執(zhí)行時(shí)間短的任務(wù)先執(zhí)行。依賴數(shù)據(jù)庫(kù)連接池的任務(wù),因?yàn)榫€程提交SQL后需要等待數(shù)據(jù)庫(kù)返回結(jié)果,等待的時(shí)間越長(zhǎng),則CPU空閑時(shí)間就越長(zhǎng),那么線程數(shù)應(yīng)該設(shè)置得越大,這樣才能更好地利用CPU。

建議使用有界隊(duì)列。有界隊(duì)列能增加系統(tǒng)的穩(wěn)定性和預(yù)警能力,可以根據(jù)需要設(shè)大一點(diǎn)兒,比如幾千。有時(shí)候我們系統(tǒng)里后臺(tái)任務(wù)線程池的隊(duì)列和線程池全滿了,不斷拋出拋棄任務(wù)的異常,通過(guò)排查發(fā)現(xiàn)是數(shù)據(jù)庫(kù)出現(xiàn)了問(wèn)題,導(dǎo)致執(zhí)行SQL變得非常緩慢,因?yàn)楹笈_(tái)任務(wù)線程池里的任務(wù)全是需要向數(shù)據(jù)庫(kù)查詢和插入數(shù)據(jù)的,所以導(dǎo)致線程池里的工作線程全部阻塞,任務(wù)積壓在線程池里。如果當(dāng)時(shí)我們?cè)O(shè)置成無(wú)界隊(duì)列,那么線程池的隊(duì)列就會(huì)越來(lái)越多,有可能會(huì)撐滿內(nèi)存,導(dǎo)致整個(gè)系統(tǒng)不可用,而不只是后臺(tái)任務(wù)出現(xiàn)問(wèn)題。當(dāng)然,我們的系統(tǒng)所有的任務(wù)是用單獨(dú)的服務(wù)器部署的,我們使用不同規(guī)模的線程池完成不同類型的任務(wù),但是出現(xiàn)這樣問(wèn)題時(shí)也會(huì)影響到其他任務(wù)。

八、線程池的監(jiān)控

如果在系統(tǒng)中大量使用線程池,則有必要對(duì)線程池進(jìn)行監(jiān)控,方便在出現(xiàn)問(wèn)題時(shí),可以根據(jù)線程池的使用狀況快速定位問(wèn)題??梢酝ㄟ^(guò)線程池提供的參數(shù)進(jìn)行監(jiān)控,在監(jiān)控線程池的時(shí)候可以使用以下屬性:

taskCount:線程池需要執(zhí)行的任務(wù)數(shù)量。

completedTaskCount:線程池在運(yùn)行過(guò)程中已完成的任務(wù)數(shù)量,小于或等于taskCount。

largestPoolSize:線程池里曾經(jīng)創(chuàng)建過(guò)的較大線程數(shù)量。通過(guò)這個(gè)數(shù)據(jù)可以知道線程池是否曾經(jīng)滿過(guò)。如該數(shù)值等于線程池的較大大小,則表示線程池曾經(jīng)滿過(guò)。

getPoolSize:線程池的線程數(shù)量。如果線程池不銷毀的話,線程池里的線程不會(huì)自動(dòng)銷毀,所以這個(gè)大小只增不減。

getActiveCount:獲取活動(dòng)的線程數(shù)。

通過(guò)擴(kuò)展線程池進(jìn)行監(jiān)控。可以通過(guò)繼承線程池來(lái)自定義線程池,重寫線程池的beforeExecute、afterExecute和terminated方法,也可以在任務(wù)執(zhí)行前、執(zhí)行后和線程池關(guān)閉前執(zhí)行一些代碼來(lái)進(jìn)行監(jiān)控。例如,監(jiān)控任務(wù)的平均執(zhí)行時(shí)間、較大執(zhí)行時(shí)間和最小執(zhí)行時(shí)間等。

以上就是扣丁學(xué)堂關(guān)于Java線程池原理分析與實(shí)際運(yùn)用的具體內(nèi)容介紹,扣丁學(xué)堂是互聯(lián)網(wǎng)IT技術(shù)內(nèi)容目前提供的熱門開發(fā)課程有JavaEE、全棧HTML5、PHP、Python人工智能+全棧、UI、大數(shù)據(jù)、VR/AR、Linux云計(jì)算、軟件測(cè)試、安卓、iOS等課程??鄱W(xué)堂不僅有專業(yè)的老師和課程體系,還有大量的零基礎(chǔ)JavaEE培訓(xùn)視頻教程供學(xué)員觀看學(xué)習(xí),想要學(xué)習(xí)的話就抓緊時(shí)間行動(dòng)吧??鄱W(xué)堂JAVA學(xué)習(xí)交流群:670348138。






【關(guān)注微信公眾號(hào)獲取更多的學(xué)習(xí)資料】



查看更多關(guān)于“Java開發(fā)資訊的相關(guān)文章>>

標(biāo)簽: JavaEE視頻教程 JavaEE培訓(xùn) JavaEE開發(fā)工程師 Java培訓(xùn)

熱門專區(qū)

暫無(wú)熱門資訊

課程推薦

微信
微博
15311698296

全國(guó)免費(fèi)咨詢熱線

郵箱:codingke@1000phone.com

官方群:148715490

北京千鋒互聯(lián)科技有限公司版權(quán)所有   北京市海淀區(qū)寶盛北里西區(qū)28號(hào)中關(guān)村智誠(chéng)科創(chuàng)大廈4層
京ICP備2021002079號(hào)-2   Copyright ? 2017 - 2022
返回頂部 返回頂部