- 資訊首頁(yè) > 開(kāi)發(fā)技術(shù) >
- Java并發(fā)編程之ConcurrentLinkedQueue源碼詳解
并編程中,一般需要用到安全的隊列,如果要自己實(shí)現安全隊列,可以使用2種方式:
方式1:加鎖,這種實(shí)現方式就是我們常說(shuō)的阻塞隊列。
方式2:使用循環(huán)CAS算法實(shí)現,這種方式實(shí)現隊列稱(chēng)之為非阻塞隊列。
從點(diǎn)到面, 下面我們來(lái)看下非阻塞隊列經(jīng)典實(shí)現類(lèi):ConcurrentLinkedQueue (JDK1.8版)
ConcurrentLinkedQueue 是一個(gè)基于鏈接節點(diǎn)的無(wú)界線(xiàn)程安全的隊列。當我們添加一個(gè)元素的時(shí)候,它會(huì )添加到隊列的尾部,當我們獲取一個(gè)元素時(shí),它會(huì )返回隊列頭部的元素。它采用了“wait-free”算法來(lái)實(shí)現,用CAS實(shí)現了非阻塞的線(xiàn)程安全隊列。當多個(gè)線(xiàn)程共享訪(fǎng)問(wèn)一個(gè)公共 collection 時(shí),ConcurrentLinkedQueue 是一個(gè)恰當的選擇。此隊列不允許使用 null 元素,因為移除元素時(shí)實(shí)際是將節點(diǎn)中item置為null,如果元素本身為null,則跟刪除有沖突
我們首先看一下ConcurrentLinkedQueue的類(lèi)圖結構先,好有一個(gè)內部邏輯有一個(gè)大概的印象,如下圖所示:
主要屬性head節點(diǎn),tail節點(diǎn)
// 鏈表頭節點(diǎn) private transient volatile Node<E> head; // 鏈表尾節點(diǎn) private transient volatile Node<E> tail;
主要內部類(lèi)Node
類(lèi)Node在static方法里獲取到item和next的內存偏移量,之后通過(guò)casItem和casNext更改item值和next節點(diǎn)
private static class Node<E> { volatile E item; volatile Node<E> next; /** * Constructs a new node. Uses relaxed write because item can * only be seen after publication via casNext. */ Node(E item) { //將item存放在本節點(diǎn)的itemOffset偏移量位置的內存里 UNSAFE.putObject(this, itemOffset, item);//設置this對象的itemoffset位置 } //更新item值 boolean casItem(E cmp, E val) { //this對象的itemoffset位置存放的值如果和期望值cmp相等,則替換為val return UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val); } void lazySetNext(Node<E> val) { //this對象的nextOffset位置存入val UNSAFE.putOrderedObject(this, nextOffset, val); } //更新next節點(diǎn)值 boolean casNext(Node<E> cmp, Node<E> val) { //this對象的nextOffset位置存放的值如果和期望值cmp相等,則替換為val return UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val); } // Unsafe mechanics private static final sun.misc.Unsafe UNSAFE; //當前節點(diǎn)存放的item的內存偏移量 private static final long itemOffset; //當前節點(diǎn)的next節點(diǎn)的內存偏移量 private static final long nextOffset; static { try { UNSAFE = sun.misc.Unsafe.getUnsafe(); Class<?> k = Node.class; itemOffset = UNSAFE.objectFieldOffset (k.getDeclaredField("item")); nextOffset = UNSAFE.objectFieldOffset (k.getDeclaredField("next")); } catch (Exception e) { throw new Error(e); } } }
concurrentlinkedqueue同樣在static方法里獲取到head和tail的內存偏移量:之后通過(guò)casHead和casTail更改head節點(diǎn)和tail節點(diǎn)值
static { try { UNSAFE = sun.misc.Unsafe.getUnsafe(); Class<?> k = ConcurrentLinkedQueue.class; headOffset = UNSAFE.objectFieldOffset (k.getDeclaredField("head")); tailOffset = UNSAFE.objectFieldOffset (k.getDeclaredField("tail")); } catch (Exception e) { throw new Error(e); } } private boolean casTail(Node<E> cmp, Node<E> val) { return UNSAFE.compareAndSwapObject(this, tailOffset, cmp, val); } private boolean casHead(Node<E> cmp, Node<E> val) { return UNSAFE.compareAndSwapObject(this, headOffset, cmp, val); }
//無(wú)參構造函數,head=tail=new Node<E>(null)=空節點(diǎn) //初始一個(gè)為空的ConcurrentLinkedQueue,此時(shí)head和tail都指向一個(gè)item為null的節點(diǎn) public ConcurrentLinkedQueue() { // 初始化頭尾節點(diǎn) head = tail = new Node<E>(null); } //集合構造函數:就是將集合中的元素挨個(gè)鏈起來(lái) public ConcurrentLinkedQueue(Collection<? extends E> c) { Node<E> h = null, t = null; for (E e : c) { checkNotNull(e); Node<E> newNode = new Node<E>(e); if (h == null) h = t = newNode; else { t.lazySetNext(newNode);//可以理解為一種懶加載, 將t的next值設置為newNode t = newNode; } } if (h == null) h = t = new Node<E>(null); head = h; tail = t; } private static void checkNotNull(Object v) { if (v == null) throw new NullPointerException(); } //putObjectVolatile的內存非立即可見(jiàn)版本, //寫(xiě)后結果并不會(huì )被其他線(xiàn)程看到,通常是幾納秒后被其他線(xiàn)程看到,這個(gè)時(shí)間比較短,所以代價(jià)可以接收 void lazySetNext(Node<E> val) { UNSAFE.putOrderedObject(this, nextOffset, val); }
獲取到當前尾節點(diǎn)p=tail:
public boolean offer (E e){ //先檢查元素是否為null,是null則拋出異常 不是null,則構造新節點(diǎn)準備入隊 checkNotNull(e); final Node<E> newNode = new Node<E>(e); //初始p指針和t指針都指向尾節點(diǎn),p指針用來(lái)向隊列后面推移,t指針用來(lái)判斷尾節點(diǎn)是否改變 Node<E> t = tail, p = t; for (; ; ) { Node<E> q = p.next; if (q == null) {//p.next為null,則代表p為尾節點(diǎn),則將p.next指向新節點(diǎn) // p is last node if (p.casNext(null, newNode)) { /** * 如果p!=t,即p向后推移了,t沒(méi)動(dòng),則此時(shí)同時(shí)將tail更新 * 不符合條件不更新tail,這里可以看出并不是每入隊一個(gè)節點(diǎn)都會(huì )更新tail的 * 而此時(shí)真正的尾節點(diǎn)其實(shí)是newNode了,所以tail不一定是真正的尾節點(diǎn), * tail的更新具有滯后性,這樣設計提高了入隊的效率,不用每入隊一個(gè),更新一次 *尾節點(diǎn) */ if (p != t) casTail(t, newNode); // Failure is OK. return true; } // Lost CAS race to another thread; re-read next } else if (p == q) /** * 如果p.next和p相等,這種情況是出隊時(shí)的一種哨兵節點(diǎn)代表已被遺棄刪除, * 那就是有線(xiàn)程在一直刪除節點(diǎn),刪除到了p.next 那此時(shí)如果有線(xiàn)程已經(jīng)更新了tail,那就從p指向tail再開(kāi)始繼續像后推移 * 如果始終沒(méi)有線(xiàn)程更新tail,則p指針從head開(kāi)始向后推移 * * p從head開(kāi)始推移的原因:tail沒(méi)有更新,以前的tail肯定在哨兵節點(diǎn)的前面(因為此循環(huán)是從tail向后推移到哨兵節點(diǎn)的), * 而head節點(diǎn)一定在哨兵節點(diǎn)的后面(出隊時(shí)只有更新了head節點(diǎn),才會(huì )把前面部分的某個(gè)節點(diǎn)置為哨兵節點(diǎn)) * 此時(shí)其實(shí)是一種tail在head之前,但實(shí)際上tail已經(jīng)無(wú)用了,哨兵之前的節點(diǎn)都無(wú)用了, * 等著(zhù)其他線(xiàn)程入隊時(shí)更新尾節點(diǎn)tail,此時(shí)的tail才有用所以從head開(kāi)始,從head開(kāi)始可以找到任何節點(diǎn) * */ p = (t != (t = tail)) ? t : head; else /** * p.next和p不相等時(shí),此時(shí)p應該向后推移到p.next,即p=p.next, * 如果next一直不為null一直定位不到尾節點(diǎn),會(huì )一直next, * 但是中間會(huì )優(yōu)先判斷tail是否已更新,如果tail已更新則p直接從tail向后推移即可。就沒(méi)必要一直next了。 */ // Check for tail updates after two hops. p = (p != t && t != (t = tail)) ? t : q; } }
poll出隊:
獲取到當前頭節點(diǎn)p=head:如果成功設置了item為null,即p.catItem(item,null),
如果此時(shí)被其他線(xiàn)程搶走消費了,此時(shí)需要p=p.next,向后繼續爭搶消費,直到成功執行p.catItem(item,null),此時(shí)檢查p是不是head節點(diǎn),如果不是更新p.next為頭結點(diǎn)
public E poll() { restartFromHead: for (;;) { // p節點(diǎn)表示首節點(diǎn),即需要出隊的節點(diǎn) for (Node<E> h = head, p = h, q;;) { E item = p.item; // 如果p節點(diǎn)的元素不為null,則通過(guò)CAS來(lái)設置p節點(diǎn)引用的元素為null,如果成功則返回p節點(diǎn)的元素 if (item != null && p.casItem(item, null)) { // Successful CAS is the linearization point // for item to be removed from this queue. // 如果p != h,則更新head if (p != h) // hop two nodes at a time updateHead(h, ((q = p.next) != null) ? q : p); return item; } // 如果頭節點(diǎn)的元素為空或頭節點(diǎn)發(fā)生了變化,這說(shuō)明頭節點(diǎn)已經(jīng)被另外一個(gè)線(xiàn)程修改了。 // 那么獲取p節點(diǎn)的下一個(gè)節點(diǎn),如果p節點(diǎn)的下一節點(diǎn)為null,則表明隊列已經(jīng)空了 else if ((q = p.next) == null) { // 更新頭結點(diǎn) updateHead(h, p); return null; } // p == q,則使用新的head重新開(kāi)始 else if (p == q) continue restartFromHead; // 如果下一個(gè)元素不為空,則將頭節點(diǎn)的下一個(gè)節點(diǎn)設置成頭節點(diǎn) else p = q; } } }
offer:
找到尾節點(diǎn),將新節點(diǎn)鏈入到尾節點(diǎn)后面,tail.next=newNode,
由于多線(xiàn)程操作,所以拿到p=tail后cas操作執行p.next=newNode可能由于被其他線(xiàn)程搶去而執行不成功,此時(shí)需要p=p.next向后遍歷,直到找到p.next=null的目標節點(diǎn)。繼續嘗試向其后面添加元素,添加成功后檢查p是否是tail,如果不是tail,則更新tail=p,添加不成功繼續向后next遍歷
poll:
獲取到當前頭節點(diǎn)p=head:如果成功設置了item為null,即p.catItem(item,null),
如果此時(shí)被其他線(xiàn)程搶走消費了,此時(shí)需要p=p.next,向后繼續爭搶消費,直到成功執行p.catItem(item,null),此時(shí)檢查p是不是head節點(diǎn),如果不是更新頭結點(diǎn)head=p.next(因為p已經(jīng)刪除了)
更新tail和head:
不是每次添加都更新tail,而是間隔一次更新一次(head也是一樣道理):第一個(gè)搶到的線(xiàn)程拿到tail執行成功tail.next=newNode1此時(shí)不更新tail,那么第二個(gè)線(xiàn)程再執行成功添加p.next=newNode2會(huì )判斷出p是newNode1而不是tail,所以就更新tail為newNode2。
tail節點(diǎn)不總是最后一個(gè),head節點(diǎn)不總是第一個(gè)設計初衷:
讓tail節點(diǎn)永遠作為隊列的尾節點(diǎn),這樣實(shí)現代碼量非常少,而且邏輯非常清楚和易懂。但是這么做有個(gè)缺點(diǎn)就是每次都需要使用循環(huán)CAS更新tail節點(diǎn)。如果能減少CAS更新tail節點(diǎn)的次數,就能提高入隊的效率。
在JDK 1.7的實(shí)現中,doug lea使用hops變量來(lái)控制并減少tail節點(diǎn)的更新頻率,并不是每次節點(diǎn)入隊后都將 tail節點(diǎn)更新成尾節點(diǎn),而是當tail節點(diǎn)和尾節點(diǎn)的距離大于等于常量HOPS的值(默認等于1)時(shí)才更新tail節點(diǎn),tail和尾節點(diǎn)的距離越長(cháng)使用CAS更新tail節點(diǎn)的次數就會(huì )越少,但是距離越長(cháng)帶來(lái)的負面效果就是每次入隊時(shí)定位尾節點(diǎn)的時(shí)間就越長(cháng),因為循環(huán)體需要多循環(huán)一次來(lái)定位出尾節點(diǎn),但是這樣仍然能提高入隊的效率,因為從本質(zhì)上來(lái)看它通過(guò)增加對volatile變量的讀操作來(lái)減少了對volatile變量的寫(xiě)操作,而對volatile變量的寫(xiě)操作開(kāi)銷(xiāo)要遠遠大于讀操作,所以入隊效率會(huì )有所提升。
在JDK 1.8的實(shí)現中,tail的更新時(shí)機是通過(guò)p和t是否相等來(lái)判斷的,其實(shí)現結果和JDK 1.7相同,即當tail節點(diǎn)和尾節點(diǎn)的距離大于等于1時(shí),更新tail。
到此這篇關(guān)于Java并發(fā)編程之ConcurrentLinkedQueue源碼詳解的文章就介紹到這了,更多相關(guān)Java ConcurrentLinkedQueue源碼內容請搜索腳本之家以前的文章或繼續瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
免責聲明:本站發(fā)布的內容(圖片、視頻和文字)以原創(chuàng )、來(lái)自互聯(lián)網(wǎng)轉載和分享為主,文章觀(guān)點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權請聯(lián)系QQ:712375056 進(jìn)行舉報,并提供相關(guān)證據,一經(jīng)查實(shí),將立刻刪除涉嫌侵權內容。
Copyright ? 2009-2021 56dr.com. All Rights Reserved. 特網(wǎng)科技 特網(wǎng)云 版權所有 珠海市特網(wǎng)科技有限公司 粵ICP備16109289號
域名注冊服務(wù)機構:阿里云計算有限公司(萬(wàn)網(wǎng)) 域名服務(wù)機構:煙臺帝思普網(wǎng)絡(luò )科技有限公司(DNSPod) CDN服務(wù):阿里云計算有限公司 中國互聯(lián)網(wǎng)舉報中心 增值電信業(yè)務(wù)經(jīng)營(yíng)許可證B2
建議您使用Chrome、Firefox、Edge、IE10及以上版本和360等主流瀏覽器瀏覽本網(wǎng)站