- 資訊首頁(yè) > 開(kāi)發(fā)技術(shù) > 編程語(yǔ)言 >
- Java多線(xiàn)程之ReentrantReadWriteLock源碼解析
ReentrantReadWriteLock 是一個(gè)讀寫(xiě)鎖,允許多個(gè)讀或者一個(gè)寫(xiě)線(xiàn)程在執行。
內部的 Sync 繼承自 AQS,這個(gè) Sync 包含一個(gè)共享讀鎖 ReadLock 和一個(gè)獨占寫(xiě)鎖 WriteLock。
該鎖可以設置公平和非公平,默認非公平。
一個(gè)持有寫(xiě)鎖的線(xiàn)程可以獲取讀鎖。如果該線(xiàn)程先持有寫(xiě)鎖,再持有讀鎖并釋放寫(xiě)鎖,稱(chēng)為鎖降級。
WriteLock支持Condition并且與ReentrantLock語(yǔ)義一致,而ReadLock則不能使用Condition,否則拋出UnsupportedOperationException異常。
public class ReentrantReadWriteLock implements ReadWriteLock { /** 讀鎖 */ private final ReentrantReadWriteLock.ReadLock readerLock; /** 寫(xiě)鎖 */ private final ReentrantReadWriteLock.WriteLock writerLock; /** 持有的AQS子類(lèi)對象 */ final Sync sync; abstract static class Sync extends AbstractQueuedSynchronizer {} static final class NonfairSync extends Sync {} static final class FairSync extends Sync {} public static class ReadLock implements Lock {} public static class WriteLock implements Lock {} //默認非公平 public ReentrantReadWriteLock() { this(false); } public ReentrantReadWriteLock(boolean fair) { sync = fair ? new FairSync() : new NonfairSync(); readerLock = new ReadLock(this); writerLock = new WriteLock(this); } public static class ReadLock implements Lock { private final Sync sync; protected ReadLock(ReentrantReadWriteLock lock) { sync = lock.sync; } } public static class WriteLock implements Lock { private final Sync sync; protected WriteLock(ReentrantReadWriteLock lock) { sync = lock.sync; } } }
Sync 繼承了 AQS,其中有一個(gè) int 的成員變量 state,int 共32位,這里將其視為兩部分,高16位表示讀的數量,低16位表示寫(xiě)的數量,這里的數量表示線(xiàn)程重入后的總數量。
abstract static class Sync extends AbstractQueuedSynchronizer { //繼承的一個(gè)int的成員變量,將其拆分為高16位和低16位 //private volatile int state; static final int SHARED_SHIFT = 16; //讀一次,鎖增加的值 static final int SHARED_UNIT = (1 << SHARED_SHIFT); static final int MAX_COUNT = (1 << SHARED_SHIFT) - 1; static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1; //讀的數量 static int sharedCount(int c) { return c >>> SHARED_SHIFT; } //寫(xiě)的數量 static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; } }
讀鎖使用了一個(gè) ThreadLocal<HoldCounter>
讓每個(gè)線(xiàn)程有一個(gè)線(xiàn)程私有的 HoldCounter
,HoldCounter
包含一個(gè)線(xiàn)程 id 以及讀重入的次數。
查找對應線(xiàn)程的HoldCounter
其實(shí)只用一個(gè) ThreadLocalHoldCounter
也足夠了。這里為了加快查詢(xún),用了兩個(gè)額外的緩存,即 cachedHoldCounter
、firstReader
和 firstReaderHoldCount
(后兩個(gè)組合起來(lái)相當于一個(gè) HoldCounter
)。
在讀鎖的相關(guān)操作中,先檢查 firstReader
是否為當前線(xiàn)程,否則檢查 cachedHoldCounter
內部的線(xiàn)程是否為當前線(xiàn)程,如果失敗最后會(huì )通過(guò) readHolds
來(lái)獲取當前線(xiàn)程的 HoldCounter
。
static final class HoldCounter { int count = 0; // 使用線(xiàn)程id,而不是線(xiàn)程的引用。這樣可以防止垃圾不被回收 final long tid = getThreadId(Thread.currentThread()); } static final class ThreadLocalHoldCounter extends ThreadLocal<HoldCounter> { public HoldCounter initialValue() { return new HoldCounter(); } } //使用的ThreadLocal private transient ThreadLocalHoldCounter readHolds; //一個(gè)緩存 private transient HoldCounter cachedHoldCounter; //組合起來(lái)相當于一個(gè)緩存 private transient Thread firstReader = null; private transient int firstReaderHoldCount;
下面講解 tryAcquireShared
和 tryReadLock
,tryReadLock
是一種直接搶占的非公平獲取,和 tryAcquireShared
中的非公平獲取有所不同。
根據注釋
1.檢查是否存在其他線(xiàn)程持有的寫(xiě)鎖,是的話(huà)失敗,返回 -1;
2.判斷在當前公平狀態(tài)下能否讀,以及是否超過(guò)讀的最大數量,滿(mǎn)足條件則嘗試 CAS 修改狀態(tài),讓 state 加一個(gè)單位的讀 SHARED_UNIT;修改成功后會(huì )根據三種情況,即首次讀、firstReader 是當前線(xiàn)程,以及其他情況分別進(jìn)行處理,成功,返回1;
3.前面未返回結果,會(huì )執行 fullTryAcquireShared
。
可以將該方法視為 fullTryAcquireShared
的一次快速?lài)L試,如果嘗試失敗,會(huì )在 fullTryAcquireShared
的自旋中一直執行,直到返回成功或者失敗。
//ReadLock public void lock() { sync.acquireShared(1); } //AQS public final void acquireShared(int arg) { if (tryAcquireShared(arg) < 0) doAcquireShared(arg); } //Sync protected final int tryAcquireShared(int unused) { /* * Walkthrough: * 1. If write lock held by another thread, fail. * 2. Otherwise, this thread is eligible for * lock wrt state, so ask if it should block * because of queue policy. If not, try * to grant by CASing state and updating count. * Note that step does not check for reentrant * acquires, which is postponed to full version * to avoid having to check hold count in * the more typical non-reentrant case. * 3. If step 2 fails either because thread * apparently not eligible or CAS fails or count * saturated, chain to version with full retry loop. */ Thread current = Thread.currentThread(); int c = getState(); // 如果寫(xiě)的數量不是0,且寫(xiě)線(xiàn)程不是當前線(xiàn)程,失敗 if (exclusiveCount(c) != 0 && getExclusiveOwnerThread() != current) return -1; // 獲取讀的個(gè)數 int r = sharedCount(c); // 如果當前線(xiàn)程想要讀,沒(méi)有被堵塞 // 當前讀的數量未超過(guò)最大允許的讀的個(gè)數 // CAS執行成功 if (!readerShouldBlock() && r < MAX_COUNT && compareAndSetState(c, c + SHARED_UNIT)) { // 第一次讀,修改firstReader和firstReaderHoldCount if (r == 0) { firstReader = current; firstReaderHoldCount = 1; // 如果當前線(xiàn)程正好是firstReader } else if (firstReader == current) { firstReaderHoldCount++; // 其他情況,經(jīng)過(guò)一系列處理后,使得rh為當前線(xiàn)程的HoldCounter // 對rh的記數加一 } else { HoldCounter rh = cachedHoldCounter; // 如果cached為null或者不是當前線(xiàn)程 if (rh == null || rh.tid != getThreadId(current)) // 從readHolds中g(shù)et,并修改cached cachedHoldCounter = rh = readHolds.get(); // 如果cached不是null,但記數為null // 這種情況表示當前線(xiàn)程的HoldCounter已經(jīng)被刪除,即為null, // 但cached仍然保留著(zhù)null之前的那個(gè)HoldCounter, // 為了方便,直接將cached設置給ThreadLocal即可 else if (rh.count == 0) readHolds.set(rh); //執行到這里,rh表示當前線(xiàn)程的HoldCounter,記數加1 rh.count++; } return 1; } // 前面未返回結果,執行第三步 return fullTryAcquireShared(current); }
在上述的簡(jiǎn)單嘗試 tryAcquireShared
未能確定結果后,執行第三步 fullTryAcquireShared
自旋來(lái)不斷嘗試獲取讀鎖,直到成功獲取鎖返回1,或者滿(mǎn)足相應條件認定失敗返回-1。
1.其他線(xiàn)程持有寫(xiě)鎖,失敗
2.當前線(xiàn)程讀的嘗試滿(mǎn)足堵塞條件表示當前線(xiàn)程排在其他線(xiàn)程后面,且當前線(xiàn)程沒(méi)有持有鎖即非重入的情況,失敗
3.其他情況則不斷自旋CAS,達到最大讀的數量會(huì )拋出異常,其他情況在成功后返回1。
final int fullTryAcquireShared(Thread current) { /* * This code is in part redundant with that in * tryAcquireShared but is simpler overall by not * complicating tryAcquireShared with interactions between * retries and lazily reading hold counts. */ HoldCounter rh = null; for (;;) { int c = getState(); if (exclusiveCount(c) != 0) { // 存在其他線(xiàn)程持有寫(xiě)鎖,返回-1 if (getExclusiveOwnerThread() != current) return -1; // else we hold the exclusive lock; blocking here // would cause deadlock. //沒(méi)有寫(xiě)鎖,且該線(xiàn)程排在其他線(xiàn)程后面,應該被堵塞 //如果已經(jīng)持有讀鎖,此次獲取是重入,可以執行else if 之后的操作; //否則,會(huì )被堵塞,返回-1。 } else if (readerShouldBlock()) { // Make sure we're not acquiring read lock reentrantly //檢查firstReader if (firstReader == current) { // assert firstReaderHoldCount > 0; } else { if (rh == null) { rh = cachedHoldCounter; if (rh == null || rh.tid != getThreadId(current)) { //執行到下一步rh是cached或者readHolds.get(),檢查rh rh = readHolds.get(); //在get時(shí),如果不存在,會(huì )產(chǎn)生一個(gè)新的HoldCounter //記數為0表示不是重入鎖,會(huì )刪除讓其重新為null if (rh.count == 0) readHolds.remove(); } } //返回失敗 if (rh.count == 0) return -1; } } //達到最大值,不允許繼續增加 if (sharedCount(c) == MAX_COUNT) throw new Error("Maximum lock count exceeded"); //和2.1.1中相似 if (compareAndSetState(c, c + SHARED_UNIT)) { if (sharedCount(c) == 0) { firstReader = current; firstReaderHoldCount = 1; } else if (firstReader == current) { firstReaderHoldCount++; } else { if (rh == null) rh = cachedHoldCounter; if (rh == null || rh.tid != getThreadId(current)) rh = readHolds.get(); else if (rh.count == 0) readHolds.set(rh); rh.count++; cachedHoldCounter = rh; // cache for release } return 1; } } }
該方法返回當前線(xiàn)程請求獲得讀鎖是否應該被堵塞,在公平鎖和非公平鎖中的實(shí)現不同
在公平鎖中,返回在排隊的隊列中當前線(xiàn)程之前是否存在其他線(xiàn)程,是的話(huà)返回 true,當前線(xiàn)程在隊列頭部或者隊列為空返回 false。
// FairSync final boolean readerShouldBlock() { return hasQueuedPredecessors(); } // AQS public final boolean hasQueuedPredecessors() { // The correctness of this depends on head being initialized // before tail and on head.next being accurate if the current // thread is first in queue. Node t = tail; // Read fields in reverse initialization order Node h = head; Node s; return h != t && ((s = h.next) == null || s.thread != Thread.currentThread()); }
在非公平鎖中,隊列中存在兩個(gè)節點(diǎn),且第二個(gè)節點(diǎn)是獨占的寫(xiě)節點(diǎn),會(huì )返回 true,使得新來(lái)的讀線(xiàn)程堵塞。
這種方式只能在第二個(gè)節點(diǎn)是請求寫(xiě)鎖的情況下返回 true,避免寫(xiě)鎖的無(wú)限等待;如果寫(xiě)鎖的請求節點(diǎn)在隊列的其他位置,返回 false,不影響新來(lái)的讀線(xiàn)程獲取讀鎖。
如果不按照這種方式處理,而按照隊列中的順序進(jìn)行處理,則只要存在其他線(xiàn)程在讀,每次來(lái)一個(gè)新的線(xiàn)程請求讀鎖,總是成功,寫(xiě)鎖會(huì )一直等待下去。
// NonfairSync final boolean readerShouldBlock() { /* As a heuristic to avoid indefinite writer starvation, * block if the thread that momentarily appears to be head * of queue, if one exists, is a waiting writer. This is * only a probabilistic effect since a new reader will not * block if there is a waiting writer behind other enabled * readers that have not yet drained from the queue. */ return apparentlyFirstQueuedIsExclusive(); } // AQS final boolean apparentlyFirstQueuedIsExclusive() { Node h, s; return (h = head) != null && (s = h.next) != null && !s.isShared() && s.thread != null; }
和 fullTryAcquireShared
有相似之處,該方法總是直接去搶占鎖,直到其他線(xiàn)程獲取寫(xiě)鎖返回失敗,或者當前當前線(xiàn)程獲取讀鎖返回成功。
//ReadLock public boolean tryLock() { return sync.tryReadLock(); } //Sync final boolean tryReadLock() { Thread current = Thread.currentThread(); for (;;) { int c = getState(); if (exclusiveCount(c) != 0 && getExclusiveOwnerThread() != current) return false; int r = sharedCount(c); if (r == MAX_COUNT) throw new Error("Maximum lock count exceeded"); if (compareAndSetState(c, c + SHARED_UNIT)) { if (r == 0) { firstReader = current; firstReaderHoldCount = 1; } else if (firstReader == current) { firstReaderHoldCount++; } else { HoldCounter rh = cachedHoldCounter; if (rh == null || rh.tid != getThreadId(current)) cachedHoldCounter = rh = readHolds.get(); else if (rh.count == 0) readHolds.set(rh); rh.count++; } return true; } } }
tryReleaseShared
在 if/else 中實(shí)現了通過(guò) first/cached/readHolds 獲取相應的 HoldCounter,并修改其中的記數,記數為0則刪除;在 for 中,不斷自旋實(shí)現 CAS 修改狀態(tài) c,如果修改后的狀態(tài)為0,表示讀寫(xiě)鎖全部釋放,返回 true,否則是 false。
// ReadLockpublic void unlock() { sync.releaseShared(1);}// AQSpublic final boolean releaseShared(int arg) { if (tryReleaseShared(arg)) { doReleaseShared(); return true; } return false;}// Syncprotected final boolean tryReleaseShared(int unused) { Thread current = Thread.currentThread(); // 先檢查 firstReader是否是當前線(xiàn)程 if (firstReader == current) { // assert firstReaderHoldCount > 0; if (firstReaderHoldCount == 1) firstReader = null; else firstReaderHoldCount--; //否則,處理 cached/readHolds中的HoldCounter } else { HoldCounter rh = cachedHoldCounter; if (rh == null || rh.tid != getThreadId(current)) rh = readHolds.get(); int count = rh.count; if (count <= 1) { readHolds.remove(); if (count <= 0) throw unmatchedUnlockException(); } --rh.count; } //自旋修改 state for (;;) { int c = getState(); int nextc = c - SHARED_UNIT; if (compareAndSetState(c, nextc)) // Releasing the read lock has no effect on readers, // but it may allow waiting writers to proceed if // both read and write locks are now free. //只有讀寫(xiě)鎖均釋放干凈,才返回true return nextc == 0; }}
下面講解 tryAcquire
和 tryWriteLock
,tryWriteLock
是一種非公平的獲取。
根據注釋?zhuān)瑃ryAcquire 分為三步
1.如果讀記數非0,或者寫(xiě)記數非0且寫(xiě)線(xiàn)程不是當前線(xiàn)程,失敗
2.寫(xiě)鎖的獲取應該被堵塞或者CAS失敗,失敗
3.其他情況,寫(xiě)重入和新來(lái)的寫(xiě)線(xiàn)程,均成功
//WriteLockpublic void lock() { sync.acquire(1);}//AQSpublic final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt();}//Syncprotected final boolean tryAcquire(int acquires) { /* * Walkthrough: * 1. If read count nonzero or write count nonzero * and owner is a different thread, fail. * 2. If count would saturate, fail. (This can only * happen if count is already nonzero.) * 3. Otherwise, this thread is eligible for lock if * it is either a reentrant acquire or * queue policy allows it. If so, update state * and set owner. */ Thread current = Thread.currentThread(); int c = getState(); int w = exclusiveCount(c); //c分為兩部分,寫(xiě)和讀 if (c != 0) { // (Note: if c != 0 and w == 0 then shared count != 0) // c非0,w是0,則讀記數非0 || 獨占的寫(xiě)線(xiàn)程不是當前線(xiàn)程 // 返回 false if (w == 0 || current != getExclusiveOwnerThread()) return false; if (w + exclusiveCount(acquires) > MAX_COUNT) throw new Error("Maximum lock count exceeded"); // Reentrant acquire // 重入的情況 setState(c + acquires); return true; } // 寫(xiě)應該被堵塞或者CAS失敗,返回false if (writerShouldBlock() || !compareAndSetState(c, c + acquires)) return false; // 非重入,在CAS成功后,設定獨占寫(xiě)線(xiàn)程為當前線(xiàn)程,返回true setExclusiveOwnerThread(current); return true;}
在公平鎖中,檢查隊列前面是否有其他線(xiàn)程在排隊,在非公平鎖中,總是返回false,即總是不堵塞。
//FairSyncfinal boolean writerShouldBlock() { return hasQueuedPredecessors();}//NonfairSyncfinal boolean writerShouldBlock() { return false; // writers can always barge}
和 tryAcquire
在非公平鎖的寫(xiě)法基本一樣。
final boolean tryWriteLock() { Thread current = Thread.currentThread(); int c = getState(); if (c != 0) { int w = exclusiveCount(c); if (w == 0 || current != getExclusiveOwnerThread()) return false; if (w == MAX_COUNT) throw new Error("Maximum lock count exceeded"); } if (!compareAndSetState(c, c + 1)) return false; setExclusiveOwnerThread(current); return true;}
在 tryRelease
中,修改相應的狀態(tài),如果修改后寫(xiě)鎖記數為0,則返回 true。
//WriteLockpublic void unlock() { sync.release(1);}//AQSpublic final boolean release(int arg) { if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != 0) unparkSuccessor(h); return true; } return false;}//Syncprotected final boolean tryRelease(int releases) { // 首先檢查當前線(xiàn)程是否持有寫(xiě)鎖 if (!isHeldExclusively()) throw new IllegalMonitorStateException(); int nextc = getState() - releases; // 根據修改后的寫(xiě)記數來(lái)確定free boolean free = exclusiveCount(nextc) == 0; // 此時(shí),寫(xiě)鎖完全釋放,設定寫(xiě)獨占線(xiàn)程為null if (free) setExclusiveOwnerThread(null); setState(nextc); // 返回 free return free;}
如果一個(gè)線(xiàn)程已經(jīng)持有寫(xiě)鎖,再去獲取讀鎖并釋放寫(xiě)鎖,這個(gè)過(guò)程稱(chēng)為鎖降級。
持有寫(xiě)鎖的時(shí)候去獲取讀鎖,只有該持有寫(xiě)鎖的線(xiàn)程能夠成功獲取讀鎖,然后再釋放寫(xiě)鎖,保證此時(shí)當前線(xiàn)程是有讀鎖的;如果有寫(xiě)鎖,先釋放寫(xiě)鎖,再獲取讀鎖,可能暫時(shí)不能獲取讀鎖,會(huì )在隊列中排隊等待。
到此這篇關(guān)于Java基礎之ReentrantReadWriteLock源碼解析的文章就介紹到這了,更多相關(guān)Java ReentrantReadWriteLock源碼解析內容請搜索腳本之家以前的文章或繼續瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
免責聲明:本站發(fā)布的內容(圖片、視頻和文字)以原創(chuàng )、來(lái)自本網(wǎng)站內容采集于網(wǎng)絡(luò )互聯(lián)網(wǎng)轉載等其它媒體和分享為主,內容觀(guān)點(diǎn)不代表本網(wǎng)站立場(chǎng),如侵犯了原作者的版權,請告知一經(jīng)查實(shí),將立刻刪除涉嫌侵權內容,聯(lián)系我們QQ:712375056,同時(shí)歡迎投稿傳遞力量。
Copyright ? 2009-2022 56dr.com. All Rights Reserved. 特網(wǎng)科技 特網(wǎng)云 版權所有 特網(wǎng)科技 粵ICP備16109289號
域名注冊服務(wù)機構:阿里云計算有限公司(萬(wàn)網(wǎng)) 域名服務(wù)機構:煙臺帝思普網(wǎng)絡(luò )科技有限公司(DNSPod) CDN服務(wù):阿里云計算有限公司 百度云 中國互聯(lián)網(wǎng)舉報中心 增值電信業(yè)務(wù)經(jīng)營(yíng)許可證B2
建議您使用Chrome、Firefox、Edge、IE10及以上版本和360等主流瀏覽器瀏覽本網(wǎng)站