- 資訊首頁(yè) > 開(kāi)發(fā)技術(shù) >
- Java線(xiàn)程池有哪些拒絕策略
這期內容當中小編將會(huì )給大家帶來(lái)有關(guān)Java線(xiàn)程池有哪些拒絕策略,文章內容豐富且以專(zhuān)業(yè)的角度為大家分析和敘述,閱讀完這篇文章希望大家可以有所收獲。
池化設計思想
池話(huà)設計應該不是一個(gè)新名詞。我們常見(jiàn)的如java線(xiàn)程池、jdbc連接池、連接池等就是這類(lèi)設計的代表實(shí)現。
這種設計會(huì )初始預設資源,解決的問(wèn)題就是抵消每次獲取資源的消耗,如創(chuàng )建線(xiàn)程的開(kāi)銷(xiāo),獲取遠程連接的開(kāi)銷(xiāo)等。就好比你去食堂打飯,打飯的大媽會(huì )先把飯盛好幾份放那里,你來(lái)了就直接拿著(zhù)飯盒加菜即可,不用再臨時(shí)又盛飯又打菜,效率就高了。
除了初始化資源,池化設計還包括如下這些特征:池子的初始值、池子的活躍值、池子的最大值等,這些特征可以直接映射到j(luò )ava線(xiàn)程池和數據庫連接池的成員屬性中。
線(xiàn)程池觸發(fā)拒絕策略的時(shí)機
和數據源連接池不一樣,線(xiàn)程池除了初始大小和池子最大值,還多了一個(gè)阻塞隊列來(lái)緩沖。
數據源連接池一般請求的連接數超過(guò)連接池的最大值的時(shí)候就會(huì )觸發(fā)拒絕策略,策略一般是阻塞等待設置的時(shí)間或者直接拋異常。
如圖,想要了解線(xiàn)程池什么時(shí)候觸發(fā)拒絕粗略,需要明確上面三個(gè)參數的具體含義,是這三個(gè)參數總體協(xié)調的結果,而不是簡(jiǎn)單的超過(guò)最大線(xiàn)程數就會(huì )觸發(fā)線(xiàn)程拒絕粗略,當提交的任務(wù)數大于corePoolSize時(shí),會(huì )優(yōu)先放到隊列緩沖區,只有填滿(mǎn)了緩沖區后,才會(huì )判斷當前運行的任務(wù)是否大于maxPoolSize,小于時(shí)會(huì )新建線(xiàn)程處理。大于時(shí)就觸發(fā)了拒絕策略。
總結就是:當前提交任務(wù)數大于(maxPoolSize + queueCapacity)時(shí)就會(huì )觸發(fā)線(xiàn)程池的拒絕策略了。
JDK內置4種線(xiàn)程池拒絕策略
拒絕策略接口定義
在分析JDK自帶的線(xiàn)程池拒絕策略前,先看下JDK定義的 拒絕策略接口,如下:
public interface RejectedExecutionHandler { void rejectedExecution(Runnable r, ThreadPoolExecutor executor); }
接口定義很明確,當觸發(fā)拒絕策略時(shí),線(xiàn)程池會(huì )調用你設置的具體的策略,將當前提交的任務(wù)以及線(xiàn)程池實(shí)例本身傳遞給你處理,具體作何處理,不同場(chǎng)景會(huì )有不同的考慮,下面看JDK為我們內置了哪些實(shí)現:
CallerRunsPolicy(調用者運行策略)
public static class CallerRunsPolicy implements RejectedExecutionHandler { public CallerRunsPolicy() { } public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { if (!e.isShutdown()) { r.run(); } } }
功能:當觸發(fā)拒絕策略時(shí),只要線(xiàn)程池沒(méi)有關(guān)閉,就由提交任務(wù)的當前線(xiàn)程處理。
使用場(chǎng)景:一般在不允許失敗的、對性能要求不高、并發(fā)量較小的場(chǎng)景下使用,因為線(xiàn)程池一般情況下不會(huì )關(guān)閉,也就是提交的任務(wù)一定會(huì )被運行,但是由于是調用者線(xiàn)程自己執行的,當多次提交任務(wù)時(shí),就會(huì )阻塞后續任務(wù)執行,性能和效率自然就慢了。
AbortPolicy(中止策略)
public static class AbortPolicy implements RejectedExecutionHandler { public AbortPolicy() { } public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { throw new RejectedExecutionException("Task " + r.toString() + " rejected from " + e.toString()); } }
功能:當觸發(fā)拒絕策略時(shí),直接拋出拒絕執行的異常,中止策略的意思也就是打斷當前執行流程
使用場(chǎng)景:這個(gè)就沒(méi)有特殊的場(chǎng)景了,但是一點(diǎn)要正確處理拋出的異常。
ThreadPoolExecutor中默認的策略就是AbortPolicy,ExecutorService接口的系列ThreadPoolExecutor因為都沒(méi)有顯示的設置拒絕策略,所以默認的都是這個(gè)。
但是請注意,ExecutorService中的線(xiàn)程池實(shí)例隊列都是無(wú)界的,也就是說(shuō)把內存撐爆了都不會(huì )觸發(fā)拒絕策略。當自己自定義線(xiàn)程池實(shí)例時(shí),使用這個(gè)策略一定要處理好觸發(fā)策略時(shí)拋的異常,因為他會(huì )打斷當前的執行流程。
DiscardPolicy(丟棄策略)
public static class DiscardPolicy implements RejectedExecutionHandler { public DiscardPolicy() { } public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { } }
功能:直接靜悄悄的丟棄這個(gè)任務(wù),不觸發(fā)任何動(dòng)作
使用場(chǎng)景:如果你提交的任務(wù)無(wú)關(guān)緊要,你就可以使用它 。因為它就是個(gè)空實(shí)現,會(huì )悄無(wú)聲息的吞噬你的的任務(wù)。所以這個(gè)策略基本上不用了
DiscardOldestPolicy(棄老策略)
public static class DiscardOldestPolicy implements RejectedExecutionHandler { public DiscardOldestPolicy() { } public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { if (!e.isShutdown()) { e.getQueue().poll(); e.execute(r); } } }
功能:如果線(xiàn)程池未關(guān)閉,就彈出隊列頭部的元素,然后嘗試執行
使用場(chǎng)景:這個(gè)策略還是會(huì )丟棄任務(wù),丟棄時(shí)也是毫無(wú)聲息,但是特點(diǎn)是丟棄的是老的未執行的任務(wù),而且是待執行優(yōu)先級較高的任務(wù)。
基于這個(gè)特性,我能想到的場(chǎng)景就是,發(fā)布消息,和修改消息,當消息發(fā)布出去后,還未執行,此時(shí)更新的消息又來(lái)了,這個(gè)時(shí)候未執行的消息的版本比現在提交的消息版本要低就可以被丟棄了。因為隊列中還有可能存在消息版本更低的消息會(huì )排隊執行,所以在真正處理消息的時(shí)候一定要做好消息的版本比較。
第三方實(shí)現的拒絕策略
dubbo中的線(xiàn)程拒絕策略
public class AbortPolicyWithReport extends ThreadPoolExecutor.AbortPolicy { protected static final Logger logger = LoggerFactory.getLogger(AbortPolicyWithReport.class); private final String threadName; private final URL url; private static volatile long lastPrintTime = 0; private static Semaphore guard = new Semaphore(1); public AbortPolicyWithReport(String threadName, URL url) { this.threadName = threadName; this.url = url; } @Override public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { String msg = String.format("Thread pool is EXHAUSTED!" + " Thread Name: %s, Pool Size: %d (active: %d, core: %d, max: %d, largest: %d), Task: %d (completed: %d)," + " Executor status:(isShutdown:%s, isTerminated:%s, isTerminating:%s), in %s://%s:%d!", threadName, e.getPoolSize(), e.getActiveCount(), e.getCorePoolSize(), e.getMaximumPoolSize(), e.getLargestPoolSize(), e.getTaskCount(), e.getCompletedTaskCount(), e.isShutdown(), e.isTerminated(), e.isTerminating(), url.getProtocol(), url.getIp(), url.getPort()); logger.warn(msg); dumpJStack(); throw new RejectedExecutionException(msg); } private void dumpJStack() { //省略實(shí)現 } }
可以看到,當dubbo的工作線(xiàn)程觸發(fā)了線(xiàn)程拒絕后,主要做了三個(gè)事情,原則就是盡量讓使用者清楚觸發(fā)線(xiàn)程拒絕策略的真實(shí)原因。
1)輸出了一條警告級別的日志,日志內容為線(xiàn)程池的詳細設置參數,以及線(xiàn)程池當前的狀態(tài),還有當前拒絕任務(wù)的一些詳細信息??梢哉f(shuō),這條日志,使用dubbo的有過(guò)生產(chǎn)運維經(jīng)驗的或多或少是見(jiàn)過(guò)的,這個(gè)日志簡(jiǎn)直就是日志打印的典范,其他的日志打印的典范還有spring。得益于這么詳細的日志,可以很容易定位到問(wèn)題所在
2)輸出當前線(xiàn)程堆棧詳情,這個(gè)太有用了,當你通過(guò)上面的日志信息還不能定位問(wèn)題時(shí),案發(fā)現場(chǎng)的dump線(xiàn)程上下文信息就是你發(fā)現問(wèn)題的救命稻草。
3)繼續拋出拒絕執行異常,使本次任務(wù)失敗,這個(gè)繼承了JDK默認拒絕策略的特性
擴展閱讀:Dubbo 面試18問(wèn),你能接得住嗎?
Netty中的線(xiàn)程池拒絕策略
private static final class NewThreadRunsPolicy implements RejectedExecutionHandler { NewThreadRunsPolicy() { super(); } public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { try { final Thread t = new Thread(r, "Temporary task executor"); t.start(); } catch (Throwable e) { throw new RejectedExecutionException( "Failed to start a new thread", e); } } }
Netty中的實(shí)現很像JDK中的CallerRunsPolicy,舍不得丟棄任務(wù)。不同的是,CallerRunsPolicy是直接在調用者線(xiàn)程執行的任務(wù)。而 Netty是新建了一個(gè)線(xiàn)程來(lái)處理的。
所以,Netty的實(shí)現相較于調用者執行策略的使用面就可以擴展到支持高效率高性能的場(chǎng)景了。但是也要注意一點(diǎn),Netty的實(shí)現里,在創(chuàng )建線(xiàn)程時(shí)未做任何的判斷約束,也就是說(shuō)只要系統還有資源就會(huì )創(chuàng )建新的線(xiàn)程來(lái)處理,直到new不出新的線(xiàn)程了,才會(huì )拋創(chuàng )建線(xiàn)程失敗的異常。推薦:什么是Netty?
activeMq中的線(xiàn)程池拒絕策略
new RejectedExecutionHandler() { @Override public void rejectedExecution(final Runnable r, final ThreadPoolExecutor executor) { try { executor.getQueue().offer(r, 60, TimeUnit.SECONDS); } catch (InterruptedException e) { throw new RejectedExecutionException("Interrupted waiting for BrokerService.worker"); } throw new RejectedExecutionException("Timed Out while attempting to enqueue Task."); } });
activeMq中的策略屬于最大努力執行任務(wù)型,當觸發(fā)拒絕策略時(shí),在嘗試一分鐘的時(shí)間重新將任務(wù)塞進(jìn)任務(wù)隊列,當一分鐘超時(shí)還沒(méi)成功時(shí),就拋出異常
pinpoint中的線(xiàn)程池拒絕策略
public class RejectedExecutionHandlerChain implements RejectedExecutionHandler { private final RejectedExecutionHandler[] handlerChain; public static RejectedExecutionHandler build(List<RejectedExecutionHandler> chain) { Objects.requireNonNull(chain, "handlerChain must not be null"); RejectedExecutionHandler[] handlerChain = chain.toArray(new RejectedExecutionHandler[0]); return new RejectedExecutionHandlerChain(handlerChain); } private RejectedExecutionHandlerChain(RejectedExecutionHandler[] handlerChain) { this.handlerChain = Objects.requireNonNull(handlerChain, "handlerChain must not be null"); } @Override public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { for (RejectedExecutionHandler rejectedExecutionHandler : handlerChain) { rejectedExecutionHandler.rejectedExecution(r, executor); } } }
pinpoint的拒絕策略實(shí)現很有特點(diǎn),和其他的實(shí)現都不同。他定義了一個(gè)拒絕策略鏈,包裝了一個(gè)拒絕策略列表,當觸發(fā)拒絕策略時(shí),會(huì )將策略鏈中的rejectedExecution依次執行一遍。
免責聲明:本站發(fā)布的內容(圖片、視頻和文字)以原創(chuàng )、來(lái)自互聯(lián)網(wǎng)轉載和分享為主,文章觀(guān)點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權請聯(lián)系QQ:712375056 進(jìn)行舉報,并提供相關(guān)證據,一經(jīng)查實(shí),將立刻刪除涉嫌侵權內容。
Copyright ? 2009-2021 56dr.com. All Rights Reserved. 特網(wǎng)科技 特網(wǎng)云 版權所有 珠海市特網(wǎng)科技有限公司 粵ICP備16109289號
域名注冊服務(wù)機構:阿里云計算有限公司(萬(wàn)網(wǎng)) 域名服務(wù)機構:煙臺帝思普網(wǎng)絡(luò )科技有限公司(DNSPod) CDN服務(wù):阿里云計算有限公司 中國互聯(lián)網(wǎng)舉報中心 增值電信業(yè)務(wù)經(jīng)營(yíng)許可證B2
建議您使用Chrome、Firefox、Edge、IE10及以上版本和360等主流瀏覽器瀏覽本網(wǎng)站