国产成人精品18p,天天干成人网,无码专区狠狠躁天天躁,美女脱精光隐私扒开免费观看

Java多線(xiàn)程之ReentrantReadWriteLock源碼解析

發(fā)布時(shí)間:2021-07-17 21:51 來(lái)源:腳本之家 閱讀:0 作者:Java與大數據進(jìn)階 欄目: 編程語(yǔ)言 歡迎投稿:712375056

目錄

        一、介紹

        1.1 ReentrantReadWriteLock

        ReentrantReadWriteLock 是一個(gè)讀寫(xiě)鎖,允許多個(gè)讀或者一個(gè)寫(xiě)線(xiàn)程在執行。

        內部的 Sync 繼承自 AQS,這個(gè) Sync 包含一個(gè)共享讀鎖 ReadLock 和一個(gè)獨占寫(xiě)鎖 WriteLock。

        該鎖可以設置公平和非公平,默認非公平。

        一個(gè)持有寫(xiě)鎖的線(xiàn)程可以獲取讀鎖。如果該線(xiàn)程先持有寫(xiě)鎖,再持有讀鎖并釋放寫(xiě)鎖,稱(chēng)為鎖降級。

        WriteLock支持Condition并且與ReentrantLock語(yǔ)義一致,而ReadLock則不能使用Condition,否則拋出UnsupportedOperationException異常。

        public class ReentrantReadWriteLock implements ReadWriteLock {
            /** 讀鎖 */
            private final ReentrantReadWriteLock.ReadLock readerLock;
            /** 寫(xiě)鎖 */
            private final ReentrantReadWriteLock.WriteLock writerLock;
            /** 持有的AQS子類(lèi)對象 */
            final Sync sync;
        
            abstract static class Sync extends AbstractQueuedSynchronizer {}
        
            static final class NonfairSync extends Sync {}
        
            static final class FairSync extends Sync {}
        
            public static class ReadLock implements Lock {}
        
            public static class WriteLock implements Lock {}
          
            //默認非公平
            public ReentrantReadWriteLock() {
                this(false);
            }
        
            public ReentrantReadWriteLock(boolean fair) {
                sync = fair ? new FairSync() : new NonfairSync();
                readerLock = new ReadLock(this);
                writerLock = new WriteLock(this);
            }
        
            public static class ReadLock implements Lock {
            	private final Sync sync;
                protected ReadLock(ReentrantReadWriteLock lock) {
                    sync = lock.sync;
                }
            }
        
            public static class WriteLock implements Lock {
            	private final Sync sync;
                protected WriteLock(ReentrantReadWriteLock lock) {
                    sync = lock.sync;
                }
            }
        
        }
        

        1.2 state

        Sync 繼承了 AQS,其中有一個(gè) int 的成員變量 state,int 共32位,這里將其視為兩部分,高16位表示讀的數量,低16位表示寫(xiě)的數量,這里的數量表示線(xiàn)程重入后的總數量。

        abstract static class Sync extends AbstractQueuedSynchronizer {
          	//繼承的一個(gè)int的成員變量,將其拆分為高16位和低16位
            //private volatile int state;
            static final int SHARED_SHIFT   = 16;
          	//讀一次,鎖增加的值
            static final int SHARED_UNIT    = (1 << SHARED_SHIFT);
            static final int MAX_COUNT      = (1 << SHARED_SHIFT) - 1;
            static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;
        
            //讀的數量
            static int sharedCount(int c)    { return c >>> SHARED_SHIFT; }
            //寫(xiě)的數量
            static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }
        }
        

        1.3 HoldCounter

        讀鎖使用了一個(gè) ThreadLocal<HoldCounter> 讓每個(gè)線(xiàn)程有一個(gè)線(xiàn)程私有的 HoldCounter,HoldCounter包含一個(gè)線(xiàn)程 id 以及讀重入的次數。

        查找對應線(xiàn)程的HoldCounter 其實(shí)只用一個(gè) ThreadLocalHoldCounter 也足夠了。這里為了加快查詢(xún),用了兩個(gè)額外的緩存,即 cachedHoldCounter、firstReaderfirstReaderHoldCount(后兩個(gè)組合起來(lái)相當于一個(gè) HoldCounter)。

        在讀鎖的相關(guān)操作中,先檢查 firstReader 是否為當前線(xiàn)程,否則檢查 cachedHoldCounter 內部的線(xiàn)程是否為當前線(xiàn)程,如果失敗最后會(huì )通過(guò) readHolds 來(lái)獲取當前線(xiàn)程的 HoldCounter。

        static final class HoldCounter {
            int count = 0;
            // 使用線(xiàn)程id,而不是線(xiàn)程的引用。這樣可以防止垃圾不被回收
            final long tid = getThreadId(Thread.currentThread());
        }
        
        static final class ThreadLocalHoldCounter
            extends ThreadLocal<HoldCounter> {
            public HoldCounter initialValue() {
                return new HoldCounter();
            }
        }
        //使用的ThreadLocal
        private transient ThreadLocalHoldCounter readHolds;
        //一個(gè)緩存
        private transient HoldCounter cachedHoldCounter;
        //組合起來(lái)相當于一個(gè)緩存
        private transient Thread firstReader = null;
        private transient int firstReaderHoldCount;
        

        二、讀鎖

        2.1 讀鎖的獲取

        下面講解 tryAcquireSharedtryReadLock,tryReadLock 是一種直接搶占的非公平獲取,和 tryAcquireShared 中的非公平獲取有所不同。

        2.1.1 tryAcquireShared

        根據注釋

        1.檢查是否存在其他線(xiàn)程持有的寫(xiě)鎖,是的話(huà)失敗,返回 -1;

        2.判斷在當前公平狀態(tài)下能否讀,以及是否超過(guò)讀的最大數量,滿(mǎn)足條件則嘗試 CAS 修改狀態(tài),讓 state 加一個(gè)單位的讀 SHARED_UNIT;修改成功后會(huì )根據三種情況,即首次讀、firstReader 是當前線(xiàn)程,以及其他情況分別進(jìn)行處理,成功,返回1;

        3.前面未返回結果,會(huì )執行 fullTryAcquireShared。

        可以將該方法視為 fullTryAcquireShared 的一次快速?lài)L試,如果嘗試失敗,會(huì )在 fullTryAcquireShared 的自旋中一直執行,直到返回成功或者失敗。

        //ReadLock
        public void lock() {
            sync.acquireShared(1);
        }  
        //AQS
        public final void acquireShared(int arg) {
            if (tryAcquireShared(arg) < 0)
                doAcquireShared(arg);
        } 
        //Sync
        protected final int tryAcquireShared(int unused) {
            /*
             * Walkthrough:
             * 1. If write lock held by another thread, fail.
             * 2. Otherwise, this thread is eligible for
             *    lock wrt state, so ask if it should block
             *    because of queue policy. If not, try
             *    to grant by CASing state and updating count.
             *    Note that step does not check for reentrant
             *    acquires, which is postponed to full version
             *    to avoid having to check hold count in
             *    the more typical non-reentrant case.
             * 3. If step 2 fails either because thread
             *    apparently not eligible or CAS fails or count
             *    saturated, chain to version with full retry loop.
             */
            Thread current = Thread.currentThread();
            int c = getState();
          	// 如果寫(xiě)的數量不是0,且寫(xiě)線(xiàn)程不是當前線(xiàn)程,失敗
            if (exclusiveCount(c) != 0 &&
                getExclusiveOwnerThread() != current)
                return -1;
          	// 獲取讀的個(gè)數
            int r = sharedCount(c);
          	// 如果當前線(xiàn)程想要讀,沒(méi)有被堵塞
          	// 當前讀的數量未超過(guò)最大允許的讀的個(gè)數
          	// CAS執行成功
            if (!readerShouldBlock() &&
                r < MAX_COUNT &&
                compareAndSetState(c, c + SHARED_UNIT)) {
              	// 第一次讀,修改firstReader和firstReaderHoldCount 
                if (r == 0) {
                    firstReader = current;
                    firstReaderHoldCount = 1;
                  // 如果當前線(xiàn)程正好是firstReader
                } else if (firstReader == current) {
                    firstReaderHoldCount++;
                  // 其他情況,經(jīng)過(guò)一系列處理后,使得rh為當前線(xiàn)程的HoldCounter
                  // 對rh的記數加一
                } else {
                    HoldCounter rh = cachedHoldCounter;
                  	// 如果cached為null或者不是當前線(xiàn)程
                    if (rh == null || rh.tid != getThreadId(current))
                      	// 從readHolds中g(shù)et,并修改cached
                        cachedHoldCounter = rh = readHolds.get();
                  	// 如果cached不是null,但記數為null
                  	// 這種情況表示當前線(xiàn)程的HoldCounter已經(jīng)被刪除,即為null,
                  	// 但cached仍然保留著(zhù)null之前的那個(gè)HoldCounter,
                  	// 為了方便,直接將cached設置給ThreadLocal即可
                    else if (rh.count == 0)
                        readHolds.set(rh);
                  	//執行到這里,rh表示當前線(xiàn)程的HoldCounter,記數加1
                    rh.count++;
                }
                return 1;
            }
          	// 前面未返回結果,執行第三步
            return fullTryAcquireShared(current);
        }
        

        2.1.2 fullTryAcquireShared

        在上述的簡(jiǎn)單嘗試 tryAcquireShared 未能確定結果后,執行第三步 fullTryAcquireShared 自旋來(lái)不斷嘗試獲取讀鎖,直到成功獲取鎖返回1,或者滿(mǎn)足相應條件認定失敗返回-1。

        1.其他線(xiàn)程持有寫(xiě)鎖,失敗

        2.當前線(xiàn)程讀的嘗試滿(mǎn)足堵塞條件表示當前線(xiàn)程排在其他線(xiàn)程后面,且當前線(xiàn)程沒(méi)有持有鎖即非重入的情況,失敗

        3.其他情況則不斷自旋CAS,達到最大讀的數量會(huì )拋出異常,其他情況在成功后返回1。

        final int fullTryAcquireShared(Thread current) {
            /*
             * This code is in part redundant with that in
             * tryAcquireShared but is simpler overall by not
             * complicating tryAcquireShared with interactions between
             * retries and lazily reading hold counts.
             */
            HoldCounter rh = null;
            for (;;) {
                int c = getState();
                if (exclusiveCount(c) != 0) {
                  	// 存在其他線(xiàn)程持有寫(xiě)鎖,返回-1
                    if (getExclusiveOwnerThread() != current)
                        return -1;
                    // else we hold the exclusive lock; blocking here
                    // would cause deadlock.
                  //沒(méi)有寫(xiě)鎖,且該線(xiàn)程排在其他線(xiàn)程后面,應該被堵塞
                  //如果已經(jīng)持有讀鎖,此次獲取是重入,可以執行else if 之后的操作;
                  //否則,會(huì )被堵塞,返回-1。
                } else if (readerShouldBlock()) {
                    // Make sure we're not acquiring read lock reentrantly
                  	//檢查firstReader
                    if (firstReader == current) {
                        // assert firstReaderHoldCount > 0;
                    } else {
                        if (rh == null) {
                            rh = cachedHoldCounter;
                            if (rh == null || rh.tid != getThreadId(current)) {
                              	//執行到下一步rh是cached或者readHolds.get(),檢查rh
                                rh = readHolds.get();
                              	//在get時(shí),如果不存在,會(huì )產(chǎn)生一個(gè)新的HoldCounter
                              	//記數為0表示不是重入鎖,會(huì )刪除讓其重新為null
                                if (rh.count == 0)
                                    readHolds.remove();
                            }
                        }
                      	//返回失敗
                        if (rh.count == 0)
                            return -1;
                    }
                }
              	//達到最大值,不允許繼續增加
                if (sharedCount(c) == MAX_COUNT)
                    throw new Error("Maximum lock count exceeded");
              	//和2.1.1中相似
                if (compareAndSetState(c, c + SHARED_UNIT)) {
                    if (sharedCount(c) == 0) {
                        firstReader = current;
                        firstReaderHoldCount = 1;
                    } else if (firstReader == current) {
                        firstReaderHoldCount++;
                    } else {
                        if (rh == null)
                            rh = cachedHoldCounter;
                        if (rh == null || rh.tid != getThreadId(current))
                            rh = readHolds.get();
                        else if (rh.count == 0)
                            readHolds.set(rh);
                        rh.count++;
                        cachedHoldCounter = rh; // cache for release
                    }
                    return 1;
                }
            }
        }
        

        2.1.3 readerShouldBlock

        該方法返回當前線(xiàn)程請求獲得讀鎖是否應該被堵塞,在公平鎖和非公平鎖中的實(shí)現不同

        在公平鎖中,返回在排隊的隊列中當前線(xiàn)程之前是否存在其他線(xiàn)程,是的話(huà)返回 true,當前線(xiàn)程在隊列頭部或者隊列為空返回 false。

        // FairSync
        final boolean readerShouldBlock() {
            return hasQueuedPredecessors();
        }
        // AQS
        public final boolean hasQueuedPredecessors() {
            // The correctness of this depends on head being initialized
            // before tail and on head.next being accurate if the current
            // thread is first in queue.
            Node t = tail; // Read fields in reverse initialization order
            Node h = head;
            Node s;
            return h != t &&
                ((s = h.next) == null || s.thread != Thread.currentThread());
        }
        

        在非公平鎖中,隊列中存在兩個(gè)節點(diǎn),且第二個(gè)節點(diǎn)是獨占的寫(xiě)節點(diǎn),會(huì )返回 true,使得新來(lái)的讀線(xiàn)程堵塞。

        這種方式只能在第二個(gè)節點(diǎn)是請求寫(xiě)鎖的情況下返回 true,避免寫(xiě)鎖的無(wú)限等待;如果寫(xiě)鎖的請求節點(diǎn)在隊列的其他位置,返回 false,不影響新來(lái)的讀線(xiàn)程獲取讀鎖。

        如果不按照這種方式處理,而按照隊列中的順序進(jìn)行處理,則只要存在其他線(xiàn)程在讀,每次來(lái)一個(gè)新的線(xiàn)程請求讀鎖,總是成功,寫(xiě)鎖會(huì )一直等待下去。

        // NonfairSync
        final boolean readerShouldBlock() {
            /* As a heuristic to avoid indefinite writer starvation,
             * block if the thread that momentarily appears to be head
             * of queue, if one exists, is a waiting writer.  This is
             * only a probabilistic effect since a new reader will not
             * block if there is a waiting writer behind other enabled
             * readers that have not yet drained from the queue.
             */
            return apparentlyFirstQueuedIsExclusive();
        }
        // AQS
        final boolean apparentlyFirstQueuedIsExclusive() {
            Node h, s;
            return (h = head) != null &&
                (s = h.next)  != null &&
                !s.isShared()         &&
                s.thread != null;
        }
        

        2.1.4 tryReadLock

        fullTryAcquireShared 有相似之處,該方法總是直接去搶占鎖,直到其他線(xiàn)程獲取寫(xiě)鎖返回失敗,或者當前當前線(xiàn)程獲取讀鎖返回成功。

        //ReadLock
        public boolean tryLock() {
            return sync.tryReadLock();
        }
        //Sync
        final boolean tryReadLock() {
            Thread current = Thread.currentThread();
            for (;;) {
                int c = getState();
                if (exclusiveCount(c) != 0 &&
                    getExclusiveOwnerThread() != current)
                    return false;
                int r = sharedCount(c);
                if (r == MAX_COUNT)
                    throw new Error("Maximum lock count exceeded");
                if (compareAndSetState(c, c + SHARED_UNIT)) {
                    if (r == 0) {
                        firstReader = current;
                        firstReaderHoldCount = 1;
                    } else if (firstReader == current) {
                        firstReaderHoldCount++;
                    } else {
                        HoldCounter rh = cachedHoldCounter;
                        if (rh == null || rh.tid != getThreadId(current))
                            cachedHoldCounter = rh = readHolds.get();
                        else if (rh.count == 0)
                            readHolds.set(rh);
                        rh.count++;
                    }
                    return true;
                }
            }
        }
        

        2.2 讀鎖的釋放

        tryReleaseShared 在 if/else 中實(shí)現了通過(guò) first/cached/readHolds 獲取相應的 HoldCounter,并修改其中的記數,記數為0則刪除;在 for 中,不斷自旋實(shí)現 CAS 修改狀態(tài) c,如果修改后的狀態(tài)為0,表示讀寫(xiě)鎖全部釋放,返回 true,否則是 false。

        // ReadLockpublic void unlock() {    sync.releaseShared(1);}// AQSpublic final boolean releaseShared(int arg) {    if (tryReleaseShared(arg)) {        doReleaseShared();        return true;    }    return false;}// Syncprotected final boolean tryReleaseShared(int unused) {    Thread current = Thread.currentThread();  	// 先檢查 firstReader是否是當前線(xiàn)程    if (firstReader == current) {        // assert firstReaderHoldCount > 0;        if (firstReaderHoldCount == 1)            firstReader = null;        else            firstReaderHoldCount--;      //否則,處理 cached/readHolds中的HoldCounter    } else {        HoldCounter rh = cachedHoldCounter;        if (rh == null || rh.tid != getThreadId(current))            rh = readHolds.get();        int count = rh.count;        if (count <= 1) {            readHolds.remove();            if (count <= 0)                throw unmatchedUnlockException();        }        --rh.count;    }  	//自旋修改 state    for (;;) {        int c = getState();        int nextc = c - SHARED_UNIT;        if (compareAndSetState(c, nextc))            // Releasing the read lock has no effect on readers,            // but it may allow waiting writers to proceed if            // both read and write locks are now free.          	//只有讀寫(xiě)鎖均釋放干凈,才返回true            return nextc == 0;    }}
        

        三、寫(xiě)鎖

        3.1 寫(xiě)鎖的獲取

        下面講解 tryAcquiretryWriteLock,tryWriteLock 是一種非公平的獲取。

        3.1.1 tryAcquire

        根據注釋?zhuān)瑃ryAcquire 分為三步

        1.如果讀記數非0,或者寫(xiě)記數非0且寫(xiě)線(xiàn)程不是當前線(xiàn)程,失敗

        2.寫(xiě)鎖的獲取應該被堵塞或者CAS失敗,失敗

        3.其他情況,寫(xiě)重入和新來(lái)的寫(xiě)線(xiàn)程,均成功

        //WriteLockpublic void lock() {    sync.acquire(1);}//AQSpublic final void acquire(int arg) {    if (!tryAcquire(arg) &&        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))        selfInterrupt();}//Syncprotected final boolean tryAcquire(int acquires) {    /*     * Walkthrough:     * 1. If read count nonzero or write count nonzero     *    and owner is a different thread, fail.     * 2. If count would saturate, fail. (This can only     *    happen if count is already nonzero.)     * 3. Otherwise, this thread is eligible for lock if     *    it is either a reentrant acquire or     *    queue policy allows it. If so, update state     *    and set owner.     */    Thread current = Thread.currentThread();    int c = getState();    int w = exclusiveCount(c);  	//c分為兩部分,寫(xiě)和讀    if (c != 0) {        // (Note: if c != 0 and w == 0 then shared count != 0)      	// c非0,w是0,則讀記數非0 || 獨占的寫(xiě)線(xiàn)程不是當前線(xiàn)程      	// 返回 false        if (w == 0 || current != getExclusiveOwnerThread())            return false;        if (w + exclusiveCount(acquires) > MAX_COUNT)            throw new Error("Maximum lock count exceeded");        // Reentrant acquire      	// 重入的情況        setState(c + acquires);        return true;    }  	// 寫(xiě)應該被堵塞或者CAS失敗,返回false    if (writerShouldBlock() ||        !compareAndSetState(c, c + acquires))        return false;  	// 非重入,在CAS成功后,設定獨占寫(xiě)線(xiàn)程為當前線(xiàn)程,返回true    setExclusiveOwnerThread(current);    return true;}
        

        3.1.2 writerShouldBlock

        在公平鎖中,檢查隊列前面是否有其他線(xiàn)程在排隊,在非公平鎖中,總是返回false,即總是不堵塞。

        //FairSyncfinal boolean writerShouldBlock() {    return hasQueuedPredecessors();}//NonfairSyncfinal boolean writerShouldBlock() {    return false; // writers can always barge}
        

        3.1.3 tryWriteLock

        tryAcquire 在非公平鎖的寫(xiě)法基本一樣。

        final boolean tryWriteLock() {    Thread current = Thread.currentThread();    int c = getState();    if (c != 0) {        int w = exclusiveCount(c);        if (w == 0 || current != getExclusiveOwnerThread())            return false;        if (w == MAX_COUNT)            throw new Error("Maximum lock count exceeded");    }    if (!compareAndSetState(c, c + 1))        return false;    setExclusiveOwnerThread(current);    return true;}
        

        3.2 寫(xiě)鎖的釋放

        tryRelease 中,修改相應的狀態(tài),如果修改后寫(xiě)鎖記數為0,則返回 true。

        //WriteLockpublic void unlock() {    sync.release(1);}//AQSpublic final boolean release(int arg) {    if (tryRelease(arg)) {        Node h = head;        if (h != null && h.waitStatus != 0)            unparkSuccessor(h);        return true;    }    return false;}//Syncprotected final boolean tryRelease(int releases) {  	// 首先檢查當前線(xiàn)程是否持有寫(xiě)鎖    if (!isHeldExclusively())        throw new IllegalMonitorStateException();    int nextc = getState() - releases;  	// 根據修改后的寫(xiě)記數來(lái)確定free    boolean free = exclusiveCount(nextc) == 0;  	// 此時(shí),寫(xiě)鎖完全釋放,設定寫(xiě)獨占線(xiàn)程為null    if (free)        setExclusiveOwnerThread(null);    setState(nextc);  	// 返回 free    return free;}
        

        四、鎖降級

        如果一個(gè)線(xiàn)程已經(jīng)持有寫(xiě)鎖,再去獲取讀鎖并釋放寫(xiě)鎖,這個(gè)過(guò)程稱(chēng)為鎖降級。

        持有寫(xiě)鎖的時(shí)候去獲取讀鎖,只有該持有寫(xiě)鎖的線(xiàn)程能夠成功獲取讀鎖,然后再釋放寫(xiě)鎖,保證此時(shí)當前線(xiàn)程是有讀鎖的;如果有寫(xiě)鎖,先釋放寫(xiě)鎖,再獲取讀鎖,可能暫時(shí)不能獲取讀鎖,會(huì )在隊列中排隊等待。

        到此這篇關(guān)于Java基礎之ReentrantReadWriteLock源碼解析的文章就介紹到這了,更多相關(guān)Java ReentrantReadWriteLock源碼解析內容請搜索腳本之家以前的文章或繼續瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!

        免責聲明:本站發(fā)布的內容(圖片、視頻和文字)以原創(chuàng )、來(lái)自本網(wǎng)站內容采集于網(wǎng)絡(luò )互聯(lián)網(wǎng)轉載等其它媒體和分享為主,內容觀(guān)點(diǎn)不代表本網(wǎng)站立場(chǎng),如侵犯了原作者的版權,請告知一經(jīng)查實(shí),將立刻刪除涉嫌侵權內容,聯(lián)系我們QQ:712375056,同時(shí)歡迎投稿傳遞力量。

        又污又爽又黄的网站| 色丁狠狠桃花久久综合网| 亚洲国产一区二区三区亚瑟| 久久人人妻人人做人人爽| 麻豆国产97在线 | 欧洲| 真人床震高潮全部视频免费|