死磕 java同步系列之StampedLock源碼解析

問題

(1)StampedLock是什麼?

(2)StampedLock具有什麼特性?

(3)StampedLock是否支持可重入?

(4)StampedLock與ReentrantReadWriteLock的對比?

簡介

StampedLock是java8中新增的類,它是一個更加高效的讀寫鎖的實現,而且它不是基於AQS來實現的,它的內部自成一片邏輯,讓我們一起來學習吧。

StampedLock具有三種模式:寫模式、讀模式、樂觀讀模式。

ReentrantReadWriteLock中的讀和寫都是一種悲觀鎖的體現,StampedLock加入了一種新的模式——樂觀讀,它是指當樂觀讀時假定沒有其它線程修改數據,讀取完成后再檢查下版本號有沒有變化,沒有變化就讀取成功了,這種模式更適用於讀多寫少的場景。

使用方法

讓我們通過下面的例子了解一下StampedLock三種模式的使用方法:

class Point {
    private double x, y;
    private final StampedLock sl = new StampedLock();

    void move(double deltaX, double deltaY) {
        // 獲取寫鎖,返回一個版本號(戳)
        long stamp = sl.writeLock();
        try {
            x += deltaX;
            y += deltaY;
        } finally {
            // 釋放寫鎖,需要傳入上面獲取的版本號
            sl.unlockWrite(stamp);
        }
    }

    double distanceFromOrigin() {
        // 樂觀讀
        long stamp = sl.tryOptimisticRead();
        double currentX = x, currentY = y;
        // 驗證版本號是否有變化
        if (!sl.validate(stamp)) {
            // 版本號變了,樂觀讀轉悲觀讀
            stamp = sl.readLock();
            try {
                // 重新讀取x、y的值
                currentX = x;
                currentY = y;
            } finally {
                // 釋放讀鎖,需要傳入上面獲取的版本號
                sl.unlockRead(stamp);
            }
        }
        return Math.sqrt(currentX * currentX + currentY * currentY);
    }

    void moveIfAtOrigin(double newX, double newY) {
        // 獲取悲觀讀鎖
        long stamp = sl.readLock();
        try {
            while (x == 0.0 && y == 0.0) {
                // 轉為寫鎖
                long ws = sl.tryConvertToWriteLock(stamp);
                // 轉換成功
                if (ws != 0L) {
                    stamp = ws;
                    x = newX;
                    y = newY;
                    break;
                }
                else {
                    // 轉換失敗
                    sl.unlockRead(stamp);
                    // 獲取寫鎖
                    stamp = sl.writeLock();
                }
            }
        } finally {
            // 釋放鎖
            sl.unlock(stamp);
        }
    }
}

從上面的例子我們可以與ReentrantReadWriteLock進行對比:

(1)寫鎖的使用方式基本一對待;

(2)讀鎖(悲觀)的使用方式可以進行升級,通過tryConvertToWriteLock()方式可以升級為寫鎖;

(3)樂觀讀鎖是一種全新的方式,它假定數據沒有改變,樂觀讀之後處理完業務邏輯再判斷版本號是否有改變,如果沒改變則樂觀讀成功,如果有改變則轉化為悲觀讀鎖重試;

下面我們一起來學習它的源碼是怎麼實現的。

源碼分析

主要內部類

static final class WNode {
    // 前一個節點
    volatile WNode prev;
    // 后一個節點
    volatile WNode next;
    // 讀線程所用的鏈表(實際是一個棧結果)
    volatile WNode cowait;    // list of linked readers
    // 阻塞的線程
    volatile Thread thread;   // non-null while possibly parked
    // 狀態
    volatile int status;      // 0, WAITING, or CANCELLED
    // 讀模式還是寫模式
    final int mode;           // RMODE or WMODE
    WNode(int m, WNode p) { mode = m; prev = p; }
}

隊列中的節點,類似於AQS隊列中的節點,可以看到它組成了一個雙向鏈表,內部維護着阻塞的線程。

主要屬性

// 一堆常量
// 讀線程的個數佔有低7位
private static final int LG_READERS = 7;
// 讀線程個數每次增加的單位
private static final long RUNIT = 1L;
// 寫線程個數所在的位置
private static final long WBIT  = 1L << LG_READERS;  // 128 = 1000 0000
// 讀線程個數所在的位置
private static final long RBITS = WBIT - 1L;  // 127 = 111 1111
// 最大讀線程個數
private static final long RFULL = RBITS - 1L;  // 126 = 111 1110
// 讀線程個數和寫線程個數的掩碼
private static final long ABITS = RBITS | WBIT;  // 255 = 1111 1111
// 讀線程個數的反數,高25位全部為1
private static final long SBITS = ~RBITS;  // -128 = 1111 1111 1111 1111 1111 1111 1111 1111 1111 1111 1111 1111 1111 1111 1000 0000

// state的初始值
private static final long ORIGIN = WBIT << 1;  // 256 = 1 0000 0000
// 隊列的頭節點
private transient volatile WNode whead;
// 隊列的尾節點
private transient volatile WNode wtail;
// 存儲着當前的版本號,類似於AQS的狀態變量state
private transient volatile long state;

通過屬性可以看到,這是一個類似於AQS的結構,內部同樣維護着一個狀態變量state和一個CLH隊列。

構造方法

public StampedLock() {
    state = ORIGIN;
}

state的初始值為ORIGIN(256),它的二進制是 1 0000 0000,也就是初始版本號。

writeLock()方法

獲取寫鎖。

public long writeLock() {
    long s, next;
    // ABITS = 255 = 1111 1111
    // WBITS = 128 = 1000 0000
    // state與ABITS如果等於0,嘗試原子更新state的值加WBITS
    // 如果成功則返回更新的值,如果失敗調用acquireWrite()方法
    return ((((s = state) & ABITS) == 0L &&
             U.compareAndSwapLong(this, STATE, s, next = s + WBIT)) ?
            next : acquireWrite(false, 0L));
}

我們以state等於初始值為例,則state & ABITS的結果為:

此時state為初始狀態,與ABITS與運算后的值為0,所以執行後面的CAS方法,s + WBITS的值為384 = 1 1000 0000。

到這裏我們大膽猜測:state的高24位存儲的是版本號,低8位存儲的是是否有加鎖,第8位存儲的是寫鎖,低7位存儲的是讀鎖被獲取的次數,而且如果只有第8位存儲寫鎖的話,那麼寫鎖只能被獲取一次,也就不可能重入了。

到底我們猜測的對不對呢,走着瞧^^

我們接着來分析acquireWrite()方法:

(手機橫屏看源碼更方便)

private long acquireWrite(boolean interruptible, long deadline) {
    // node為新增節點,p為尾節點(即將成為node的前置節點)
    WNode node = null, p;
    
    // 第一次自旋——入隊
    for (int spins = -1;;) { // spin while enqueuing
        long m, s, ns;
        // 再次嘗試獲取寫鎖
        if ((m = (s = state) & ABITS) == 0L) {
            if (U.compareAndSwapLong(this, STATE, s, ns = s + WBIT))
                return ns;
        }
        else if (spins < 0)
            // 如果自旋次數小於0,則計算自旋的次數
            // 如果當前有寫鎖獨佔且隊列無元素,說明快輪到自己了
            // 就自旋就行了,如果自旋完了還沒輪到自己才入隊
            // 則自旋次數為SPINS常量
            // 否則自旋次數為0
            spins = (m == WBIT && wtail == whead) ? SPINS : 0;
        else if (spins > 0) {
            // 當自旋次數大於0時,當前這次自旋隨機減一次自旋次數
            if (LockSupport.nextSecondarySeed() >= 0)
                --spins;
        }
        else if ((p = wtail) == null) {
            // 如果隊列未初始化,新建一個空節點並初始化頭節點和尾節點
            WNode hd = new WNode(WMODE, null);
            if (U.compareAndSwapObject(this, WHEAD, null, hd))
                wtail = hd;
        }
        else if (node == null)
            // 如果新增節點還未初始化,則新建之,並賦值其前置節點為尾節點
            node = new WNode(WMODE, p);
        else if (node.prev != p)
            // 如果尾節點有變化,則更新新增節點的前置節點為新的尾節點
            node.prev = p;
        else if (U.compareAndSwapObject(this, WTAIL, p, node)) {
            // 嘗試更新新增節點為新的尾節點成功,則退出循環
            p.next = node;
            break;
        }
    }

    // 第二次自旋——阻塞並等待喚醒
    for (int spins = -1;;) {
        // h為頭節點,np為新增節點的前置節點,pp為前前置節點,ps為前置節點的狀態
        WNode h, np, pp; int ps;
        // 如果頭節點等於前置節點,說明快輪到自己了
        if ((h = whead) == p) {
            if (spins < 0)
                // 初始化自旋次數
                spins = HEAD_SPINS;
            else if (spins < MAX_HEAD_SPINS)
                // 增加自旋次數
                spins <<= 1;
            
            // 第三次自旋,不斷嘗試獲取寫鎖
            for (int k = spins;;) { // spin at head
                long s, ns;
                if (((s = state) & ABITS) == 0L) {
                    if (U.compareAndSwapLong(this, STATE, s,
                                             ns = s + WBIT)) {
                        // 嘗試獲取寫鎖成功,將node設置為新頭節點並清除其前置節點(gc)
                        whead = node;
                        node.prev = null;
                        return ns;
                    }
                }
                // 隨機立減自旋次數,當自旋次數減為0時跳出循環再重試
                else if (LockSupport.nextSecondarySeed() >= 0 &&
                         --k <= 0)
                    break;
            }
        }
        else if (h != null) { // help release stale waiters
            // 這段代碼很難進來,是用於協助喚醒讀節點的
            // 我是這麼調試進來的:
            // 起三個寫線程,兩個讀線程
            // 寫線程1獲取鎖不要釋放
            // 讀線程1獲取鎖,讀線程2獲取鎖(會阻塞)
            // 寫線程2獲取鎖(會阻塞)
            // 寫線程1釋放鎖,此時會喚醒讀線程1
            // 在讀線程1裏面先不要喚醒讀線程2
            // 寫線程3獲取鎖,此時就會走到這裏來了
            WNode c; Thread w;
            // 如果頭節點的cowait鏈表(棧)不為空,喚醒裏面的所有節點
            while ((c = h.cowait) != null) {
                if (U.compareAndSwapObject(h, WCOWAIT, c, c.cowait) &&
                    (w = c.thread) != null)
                    U.unpark(w);
            }
        }
        
        // 如果頭節點沒有變化
        if (whead == h) {
            // 如果尾節點有變化,則更新
            if ((np = node.prev) != p) {
                if (np != null)
                    (p = np).next = node;   // stale
            }
            else if ((ps = p.status) == 0)
                // 如果尾節點狀態為0,則更新成WAITING
                U.compareAndSwapInt(p, WSTATUS, 0, WAITING);
            else if (ps == CANCELLED) {
                // 如果尾節點狀態為取消,則把它從鏈表中刪除
                if ((pp = p.prev) != null) {
                    node.prev = pp;
                    pp.next = node;
                }
            }
            else {
                // 有超時時間的處理
                long time; // 0 argument to park means no timeout
                if (deadline == 0L)
                    time = 0L;
                else if ((time = deadline - System.nanoTime()) <= 0L)
                    // 已超時,剔除當前節點
                    return cancelWaiter(node, node, false);
                // 當前線程
                Thread wt = Thread.currentThread();
                U.putObject(wt, PARKBLOCKER, this);
                // 把node的線程指向當前線程
                node.thread = wt;
                if (p.status < 0 && (p != h || (state & ABITS) != 0L) &&
                    whead == h && node.prev == p)
                    // 阻塞當前線程
                    U.park(false, time);  // 等同於LockSupport.park()
                    
                // 當前節點被喚醒后,清除線程
                node.thread = null;
                U.putObject(wt, PARKBLOCKER, null);
                // 如果中斷了,取消當前節點
                if (interruptible && Thread.interrupted())
                    return cancelWaiter(node, node, true);
            }
        }
    }
}

這裏對acquireWrite()方法做一個總結,這個方法裏面有三段自旋邏輯:

第一段自旋——入隊:

(1)如果頭節點等於尾節點,說明沒有其它線程排隊,那就多自旋一會,看能不能嘗試獲取到寫鎖;

(2)否則,自旋次數為0,直接讓其入隊;

第二段自旋——阻塞並等待被喚醒 + 第三段自旋——不斷嘗試獲取寫鎖:

(1)第三段自旋在第二段自旋內部;

(2)如果頭節點等於前置節點,那就進入第三段自旋,不斷嘗試獲取寫鎖;

(3)否則,嘗試喚醒頭節點中等待着的讀線程;

(4)最後,如果當前線程一直都沒有獲取到寫鎖,就阻塞當前線程並等待被喚醒;

這麼一大段邏輯看着比較鬧心,其實真正分解下來還是比較簡單的,無非就是自旋,把很多狀態的處理都糅合到一個for循環裏面處理了。

unlockWrite()方法

釋放寫鎖。

public void unlockWrite(long stamp) {
    WNode h;
    // 檢查版本號對不對
    if (state != stamp || (stamp & WBIT) == 0L)
        throw new IllegalMonitorStateException();
    // 這行代碼實際有兩個作用:
    // 1. 更新版本號加1
    // 2. 釋放寫鎖
    // stamp + WBIT實際會把state的第8位置為0,也就相當於釋放了寫鎖
    // 同時會進1,也就是高24位整體加1了
    state = (stamp += WBIT) == 0L ? ORIGIN : stamp;
    // 如果頭節點不為空,並且狀態不為0,調用release方法喚醒它的下一個節點
    if ((h = whead) != null && h.status != 0)
        release(h);
}
private void release(WNode h) {
    if (h != null) {
        WNode q; Thread w;
        // 將其狀態改為0
        U.compareAndSwapInt(h, WSTATUS, WAITING, 0);
        // 如果頭節點的下一個節點為空或者其狀態為已取消
        if ((q = h.next) == null || q.status == CANCELLED) {
            // 從尾節點向前遍歷找到一個可用的節點
            for (WNode t = wtail; t != null && t != h; t = t.prev)
                if (t.status <= 0)
                    q = t;
        }
        // 喚醒q節點所在的線程
        if (q != null && (w = q.thread) != null)
            U.unpark(w);
    }
}

寫鎖的釋放過程比較簡單:

(1)更改state的值,釋放寫鎖;

(2)版本號加1;

(3)喚醒下一個等待着的節點;

readLock()方法

獲取讀鎖。

public long readLock() {
    long s = state, next;  // bypass acquireRead on common uncontended case
    // 沒有寫鎖佔用,並且讀鎖被獲取的次數未達到最大值
    // 嘗試原子更新讀鎖被獲取的次數加1
    // 如果成功直接返回,如果失敗調用acquireRead()方法
    return ((whead == wtail && (s & ABITS) < RFULL &&
             U.compareAndSwapLong(this, STATE, s, next = s + RUNIT)) ?
            next : acquireRead(false, 0L));
}

獲取讀鎖的時候先看看現在有沒有其它線程佔用着寫鎖,如果沒有的話再檢測讀鎖被獲取的次數有沒有達到最大,如果沒有的話直接嘗試獲取一次讀鎖,如果成功了直接返回版本號,如果沒成功就調用acquireRead()排隊。

下面我們一起來看看acquireRead()方法,這又是一個巨長無比的方法,請保持耐心,我們一步步來分解:

(手機橫屏看源碼更方便)

private long acquireRead(boolean interruptible, long deadline) {
    // node為新增節點,p為尾節點
    WNode node = null, p;
    // 第一段自旋——入隊
    for (int spins = -1;;) {
        // 頭節點
        WNode h;
        // 如果頭節點等於尾節點
        // 說明沒有排隊的線程了,快輪到自己了,直接自旋不斷嘗試獲取讀鎖
        if ((h = whead) == (p = wtail)) {
            // 第二段自旋——不斷嘗試獲取讀鎖
            for (long m, s, ns;;) {
                // 嘗試獲取讀鎖,如果成功了直接返回版本號
                if ((m = (s = state) & ABITS) < RFULL ?
                    U.compareAndSwapLong(this, STATE, s, ns = s + RUNIT) :
                    (m < WBIT && (ns = tryIncReaderOverflow(s)) != 0L))
                    // 如果讀線程個數達到了最大值,會溢出,返回的是0
                    return ns;
                else if (m >= WBIT) {
                    // m >= WBIT表示有其它線程先一步獲取了寫鎖
                    if (spins > 0) {
                        // 隨機立減自旋次數
                        if (LockSupport.nextSecondarySeed() >= 0)
                            --spins;
                    }
                    else {
                        // 如果自旋次數為0了,看看是否要跳出循環
                        if (spins == 0) {
                            WNode nh = whead, np = wtail;
                            if ((nh == h && np == p) || (h = nh) != (p = np))
                                break;
                        }
                        // 設置自旋次數
                        spins = SPINS;
                    }
                }
            }
        }
        // 如果尾節點為空,初始化頭節點和尾節點
        if (p == null) { // initialize queue
            WNode hd = new WNode(WMODE, null);
            if (U.compareAndSwapObject(this, WHEAD, null, hd))
                wtail = hd;
        }
        else if (node == null)
            // 如果新增節點為空,初始化之
            node = new WNode(RMODE, p);
        else if (h == p || p.mode != RMODE) {
            // 如果頭節點等於尾節點或者尾節點不是讀模式
            // 當前節點入隊
            if (node.prev != p)
                node.prev = p;
            else if (U.compareAndSwapObject(this, WTAIL, p, node)) {
                p.next = node;
                break;
            }
        }
        else if (!U.compareAndSwapObject(p, WCOWAIT,
                                         node.cowait = p.cowait, node))
            // 接着上一個elseif,這裏肯定是尾節點為讀模式了
            // 將當前節點加入到尾節點的cowait中,這是一個棧
            // 上面的CAS成功了是不會進入到這裏來的
            node.cowait = null;
        else {
            // 第三段自旋——阻塞當前線程並等待被喚醒
            for (;;) {
                WNode pp, c; Thread w;
                // 如果頭節點不為空且其cowait不為空,協助喚醒其中等待的讀線程
                if ((h = whead) != null && (c = h.cowait) != null &&
                    U.compareAndSwapObject(h, WCOWAIT, c, c.cowait) &&
                    (w = c.thread) != null) // help release
                    U.unpark(w);
                // 如果頭節點等待前前置節點或者等於前置節點或者前前置節點為空
                // 這同樣說明快輪到自己了
                if (h == (pp = p.prev) || h == p || pp == null) {
                    long m, s, ns;
                    // 第四段自旋——又是不斷嘗試獲取鎖
                    do {
                        if ((m = (s = state) & ABITS) < RFULL ?
                            U.compareAndSwapLong(this, STATE, s,
                                                 ns = s + RUNIT) :
                            (m < WBIT &&
                             (ns = tryIncReaderOverflow(s)) != 0L))
                            return ns;
                    } while (m < WBIT); // 只有當前時刻沒有其它線程佔有寫鎖就不斷嘗試
                }
                // 如果頭節點未曾改變且前前置節點也未曾改
                // 阻塞當前線程
                if (whead == h && p.prev == pp) {
                    long time;
                    // 如果前前置節點為空,或者頭節點等於前置節點,或者前置節點已取消
                    // 從第一個for自旋開始重試
                    if (pp == null || h == p || p.status > 0) {
                        node = null; // throw away
                        break;
                    }
                    // 超時檢測
                    if (deadline == 0L)
                        time = 0L;
                    else if ((time = deadline - System.nanoTime()) <= 0L)
                        // 如果超時了,取消當前節點
                        return cancelWaiter(node, p, false);
                    
                    // 當前線程
                    Thread wt = Thread.currentThread();
                    U.putObject(wt, PARKBLOCKER, this);
                    // 設置進node中
                    node.thread = wt;
                    // 檢測之前的條件未曾改變
                    if ((h != pp || (state & ABITS) == WBIT) &&
                        whead == h && p.prev == pp)
                        // 阻塞當前線程並等待被喚醒
                        U.park(false, time);
                    
                    // 喚醒之後清除線程
                    node.thread = null;
                    U.putObject(wt, PARKBLOCKER, null);
                    // 如果中斷了,取消當前節點
                    if (interruptible && Thread.interrupted())
                        return cancelWaiter(node, p, true);
                }
            }
        }
    }
    
    // 只有第一個讀線程會走到下面的for循環處,參考上面第一段自旋中有一個break,當第一個讀線程入隊的時候break出來的
    
    // 第五段自旋——跟上面的邏輯差不多,只不過這裏單獨搞一個自旋針對第一個讀線程
    for (int spins = -1;;) {
        WNode h, np, pp; int ps;
        // 如果頭節點等於尾節點,說明快輪到自己了
        // 不斷嘗試獲取讀鎖
        if ((h = whead) == p) {
            // 設置自旋次數
            if (spins < 0)
                spins = HEAD_SPINS;
            else if (spins < MAX_HEAD_SPINS)
                spins <<= 1;
                
            // 第六段自旋——不斷嘗試獲取讀鎖
            for (int k = spins;;) { // spin at head
                long m, s, ns;
                // 不斷嘗試獲取讀鎖
                if ((m = (s = state) & ABITS) < RFULL ?
                    U.compareAndSwapLong(this, STATE, s, ns = s + RUNIT) :
                    (m < WBIT && (ns = tryIncReaderOverflow(s)) != 0L)) {
                    // 獲取到了讀鎖
                    WNode c; Thread w;
                    whead = node;
                    node.prev = null;
                    // 喚醒當前節點中所有等待着的讀線程
                    // 因為當前節點是第一個讀節點,所以它是在隊列中的,其它讀節點都是掛這個節點的cowait棧中的
                    while ((c = node.cowait) != null) {
                        if (U.compareAndSwapObject(node, WCOWAIT,
                                                   c, c.cowait) &&
                            (w = c.thread) != null)
                            U.unpark(w);
                    }
                    // 返回版本號
                    return ns;
                }
                // 如果當前有其它線程佔有着寫鎖,並且沒有自旋次數了,跳出當前循環
                else if (m >= WBIT &&
                         LockSupport.nextSecondarySeed() >= 0 && --k <= 0)
                    break;
            }
        }
        else if (h != null) {
            // 如果頭節點不等待尾節點且不為空且其為讀模式,協助喚醒裏面的讀線程
            WNode c; Thread w;
            while ((c = h.cowait) != null) {
                if (U.compareAndSwapObject(h, WCOWAIT, c, c.cowait) &&
                    (w = c.thread) != null)
                    U.unpark(w);
            }
        }
        
        // 如果頭節點未曾變化
        if (whead == h) {
            // 更新前置節點及其狀態等
            if ((np = node.prev) != p) {
                if (np != null)
                    (p = np).next = node;   // stale
            }
            else if ((ps = p.status) == 0)
                U.compareAndSwapInt(p, WSTATUS, 0, WAITING);
            else if (ps == CANCELLED) {
                if ((pp = p.prev) != null) {
                    node.prev = pp;
                    pp.next = node;
                }
            }
            else {
                // 第一個讀節點即將進入阻塞
                long time;
                // 超時設置
                if (deadline == 0L)
                    time = 0L;
                else if ((time = deadline - System.nanoTime()) <= 0L)
                    // 如果超時了取消當前節點
                    return cancelWaiter(node, node, false);
                Thread wt = Thread.currentThread();
                U.putObject(wt, PARKBLOCKER, this);
                node.thread = wt;
                if (p.status < 0 &&
                    (p != h || (state & ABITS) == WBIT) &&
                    whead == h && node.prev == p)
                    // 阻塞第一個讀節點並等待被喚醒
                    U.park(false, time);
                node.thread = null;
                U.putObject(wt, PARKBLOCKER, null);
                if (interruptible && Thread.interrupted())
                    return cancelWaiter(node, node, true);
            }
        }
    }
}

讀鎖的獲取過程比較艱辛,一共有六段自旋,Oh my god,讓我們來大致地分解一下:

(1)讀節點進來都是先判斷是頭節點如果等於尾節點,說明快輪到自己了,就不斷地嘗試獲取讀鎖,如果成功了就返回;

(2)如果頭節點不等於尾節點,這裏就會讓當前節點入隊,這裏入隊又分成了兩種;

(3)一種是首個讀節點入隊,它是會排隊到整個隊列的尾部,然後跳出第一段自旋;

(4)另一種是非第一個讀節點入隊,它是進入到首個讀節點的cowait棧中,所以更確切地說應該是入棧;

(5)不管是入隊還入棧后,都會再次檢測頭節點是不是等於尾節點了,如果相等,則會再次不斷嘗試獲取讀鎖;

(6)如果頭節點不等於尾節點,那麼才會真正地阻塞當前線程並等待被喚醒;

(7)上面說的首個讀節點其實是連續的讀線程中的首個,如果是兩個讀線程中間夾了一個寫線程,還是老老實實的排隊。

自旋,自旋,自旋,旋轉的木馬,讓我忘了傷^^

unlockRead()方法

釋放讀鎖。

public void unlockRead(long stamp) {
    long s, m; WNode h;
    for (;;) {
        // 檢查版本號
        if (((s = state) & SBITS) != (stamp & SBITS) ||
            (stamp & ABITS) == 0L || (m = s & ABITS) == 0L || m == WBIT)
            throw new IllegalMonitorStateException();
        // 讀線程個數正常
        if (m < RFULL) {
            // 釋放一次讀鎖
            if (U.compareAndSwapLong(this, STATE, s, s - RUNIT)) {
                // 如果讀鎖全部都釋放了,且頭節點不為空且狀態不為0,喚醒它的下一個節點
                if (m == RUNIT && (h = whead) != null && h.status != 0)
                    release(h);
                break;
            }
        }
        else if (tryDecReaderOverflow(s) != 0L)
            // 讀線程個數溢出檢測
            break;
    }
}

private void release(WNode h) {
    if (h != null) {
        WNode q; Thread w;
        // 將其狀態改為0
        U.compareAndSwapInt(h, WSTATUS, WAITING, 0);
        // 如果頭節點的下一個節點為空或者其狀態為已取消
        if ((q = h.next) == null || q.status == CANCELLED) {
            // 從尾節點向前遍歷找到一個可用的節點
            for (WNode t = wtail; t != null && t != h; t = t.prev)
                if (t.status <= 0)
                    q = t;
        }
        // 喚醒q節點所在的線程
        if (q != null && (w = q.thread) != null)
            U.unpark(w);
    }
}

讀鎖釋放的過程就比較簡單了,將state的低7位減1,當減為0的時候說明完全釋放了讀鎖,就喚醒下一個排隊的線程。

tryOptimisticRead()方法

樂觀讀。

public long tryOptimisticRead() {
    long s;
    return (((s = state) & WBIT) == 0L) ? (s & SBITS) : 0L;
}

如果沒有寫鎖,就返回state的高25位,這裏把寫所在位置一起返回了,是為了後面檢測數據有沒有被寫過。

validate()方法

檢測樂觀讀版本號是否變化。

public boolean validate(long stamp) {
    // 強制加入內存屏障,刷新數據
    U.loadFence();
    return (stamp & SBITS) == (state & SBITS);
}

檢測兩者的版本號是否一致,與SBITS與操作保證不受讀操作的影響。

變異的CLH隊列

StampedLock中的隊列是一種變異的CLH隊列,圖解如下:

總結

StampedLock的源碼解析到這裏就差不多了,讓我們來總結一下:

(1)StampedLock也是一種讀寫鎖,它不是基於AQS實現的;

(2)StampedLock相較於ReentrantReadWriteLock多了一種樂觀讀的模式,以及讀鎖轉化為寫鎖的方法;

(3)StampedLock的state存儲的是版本號,確切地說是高24位存儲的是版本號,寫鎖的釋放會增加其版本號,讀鎖不會;

(4)StampedLock的低7位存儲的讀鎖被獲取的次數,第8位存儲的是寫鎖被獲取的次數;

(5)StampedLock不是可重入鎖,因為只有第8位標識寫鎖被獲取了,並不能重複獲取;

(6)StampedLock中獲取鎖的過程使用了大量的自旋操作,對於短任務的執行會比較高效,長任務的執行會浪費大量CPU;

(7)StampedLock不能實現條件鎖;

彩蛋

StampedLock與ReentrantReadWriteLock的對比?

答:StampedLock與ReentrantReadWriteLock作為兩種不同的讀寫鎖方式,彤哥大致歸納了它們的異同點:

(1)兩者都有獲取讀鎖、獲取寫鎖、釋放讀鎖、釋放寫鎖的方法,這是相同點;

(2)兩者的結構基本類似,都是使用state + CLH隊列;

(3)前者的state分成三段,高24位存儲版本號、低7位存儲讀鎖被獲取的次數、第8位存儲寫鎖被獲取的次數;

(4)後者的state分成兩段,高16位存儲讀鎖被獲取的次數,低16位存儲寫鎖被獲取的次數;

(5)前者的CLH隊列可以看成是變異的CLH隊列,連續的讀線程只有首個節點存儲在隊列中,其它的節點存儲的首個節點的cowait棧中;

(6)後者的CLH隊列是正常的CLH隊列,所有的節點都在這個隊列中;

(7)前者獲取鎖的過程中有判斷首尾節點是否相同,也就是是不是快輪到自己了,如果是則不斷自旋,所以適合執行短任務;

(8)後者獲取鎖的過程中非公平模式下會做有限次嘗試;

(9)前者只有非公平模式,一上來就嘗試獲取鎖;

(10)前者喚醒讀鎖是一次性喚醒連續的讀鎖的,而且其它線程還會協助喚醒;

(11)後者是一個接着一個地喚醒的;

(12)前者有樂觀讀的模式,樂觀讀的實現是通過判斷state的高25位是否有變化來實現的;

(13)前者各種模式可以互轉,類似tryConvertToXxx()方法;

(14)前者寫鎖不可重入,後者寫鎖可重入;

(15)前者無法實現條件鎖,後者可以實現條件鎖;

差不多就這麼多吧,如果你還能想到,也歡迎補充哦^^

推薦閱讀

1、死磕 java同步系列之開篇

2、死磕 java魔法類之Unsafe解析

3、死磕 java同步系列之JMM(Java Memory Model)

4、死磕 java同步系列之volatile解析

5、死磕 java同步系列之synchronized解析

6、死磕 java同步系列之自己動手寫一個鎖Lock

7、死磕 java同步系列之AQS起篇

8、死磕 java同步系列之ReentrantLock源碼解析(一)——公平鎖、非公平鎖

9、死磕 java同步系列之ReentrantLock源碼解析(二)——條件鎖

10、死磕 java同步系列之ReentrantLock VS synchronized

11、死磕 java同步系列之ReentrantReadWriteLock源碼解析

12、死磕 java同步系列之Semaphore源碼解析

13、死磕 java同步系列之CountDownLatch源碼解析

14、死磕 java同步系列之AQS終篇

歡迎關注我的公眾號“彤哥讀源碼”,查看更多源碼系列文章, 與彤哥一起暢遊源碼的海洋。

【精選推薦文章】

自行創業 缺乏曝光? 下一步"網站設計"幫您第一時間規劃公司的門面形象

網頁設計一頭霧水??該從何著手呢? 找到專業技術的網頁設計公司,幫您輕鬆架站!

評比前十大台北網頁設計台北網站設計公司知名案例作品心得分享

台北網頁設計公司這麼多,該如何挑選?? 網頁設計報價省錢懶人包"嚨底家"

您可能也會喜歡…