多線程高併發編程(11) — 非阻塞算法實現ConcurrentLinkedQueue源碼分析

  一.背景

  要實現對隊列的安全訪問,有兩種方式:阻塞算法和非阻塞算法。阻塞算法的實現是使用一把鎖(出隊和入隊同一把鎖ArrayBlockingQueue)和兩把鎖(出隊和入隊各一把鎖LinkedBlockingQueue)來實現;非阻塞算法使用自旋+CAS實現。

  
阻塞,顧名思義:當我們的生產者向隊列中生產數據時,若隊列已滿,那麼生產線程會暫停下來,直到隊列中有可以存放數據的地方,才會繼續工作;而當我們的消費者向隊列中獲取數據時,若隊列為空,則消費者線程會暫停下來,直到容器中有元素出現,才能進行獲取操作。
  那麼對於非阻塞來說,非阻塞隊列的執行並不會被阻塞,無論是消費者的出隊,還是生產者的入隊。同時對於入隊和出隊,使用了向後推進策略(重新尋找頭或尾節點)保證併發的數據一致性。

  今天來探究下使用非阻塞算法來實現的線程安全隊列ConcurrentLinkedQueue,它是一個基於鏈接節點的無界線程安全隊列,採用先進先出的規則對節點進行排序,當我們添加一個元素的時候,它會添加到隊列的尾部,當我們獲取一個元素時,它會返回隊列頭部的元素。它採用了“wait-free”算法(即CAS算法)來實現。即當入隊時,插入的元素依次向後延伸,形成鏈表;而出隊時,則從鏈表的第一個元素開始獲取,依次遞增。

  ConcurrentLinkedQueue的類圖結構:

   從類圖中可以看到,ConcurrentLinkedQueue由head和tail節點組成,每個節點Node由節點元素item和指向下一個節點的引用next組成,節點與節點之間通過next關聯起來組成一張鏈表結構的隊列。

  二.源碼解析

  1. 構造方法

        private static class Node<E> {
            volatile E item;//元素
            volatile Node<E> next;//下一節點
    
            Node(E item) {//添加元素
                UNSAFE.putObject(this, itemOffset, item);
            }
    
            boolean casItem(E cmp, E val) {//cas修改元素
                return UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val);
            }
    
            void lazySetNext(Node<E> val) {//添加節點
                UNSAFE.putOrderedObject(this, nextOffset, val);
            }
    
            boolean casNext(Node<E> cmp, Node<E> val) {//cas修改節點
                return UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);
            }
    
            private static final sun.misc.Unsafe UNSAFE;
            private static final long itemOffset;
            private static final long nextOffset;
    
            static {
                try {
                    UNSAFE = sun.misc.Unsafe.getUnsafe();
                    Class<?> k = Node.class;
                    //獲得元素的偏移位置
                    itemOffset = UNSAFE.objectFieldOffset
                        (k.getDeclaredField("item"));
                    //獲得下一節點的偏移位置
                    nextOffset = UNSAFE.objectFieldOffset
                        (k.getDeclaredField("next"));
                } catch (Exception e) {
                    throw new Error(e);
                }
            }
        }
        //頭節點
        private transient volatile Node<E> head;
        //尾節點
        private transient volatile Node<E> tail;
        public ConcurrentLinkedQueue() {
            //默認情況下head節點存儲的元素為空,tail節點等於head節點。
            head = tail = new Node<E>(null);
        }
        public ConcurrentLinkedQueue(Collection<? extends E> c) {
            Node<E> h = null, t = null;
            //遍歷集合
            for (E e : c) {
                checkNotNull(e);//檢查是否為空,如果為空拋出空指針異常
                //創建節點和將元素存儲到節點中
                Node<E> newNode = new Node<E>(e);
                if (h == null)//頭節點為空
                    h = t = newNode;//頭和尾節點是創建的節點
                else {
                    t.lazySetNext(newNode);//添加節點
                    t = newNode;//修改尾節點的標識
                }
            }
            //如果集合沒有元素,設置隊列的頭尾節點為空
            if (h == null)
                h = t = new Node<E>(null);
            head = h;//更新隊列的頭節點標識
            tail = t;//更新隊列的尾節點標識
        }
        private static void checkNotNull(Object v) {
            if (v == null)
                throw new NullPointerException();
        }    
  2. 入隊add:

    • 入隊操作主要做兩件事情,第一是將入隊節點設置成當前隊列尾節點的下一個節點;第二是更新tail節點,如果tail節點的next節點不為空,則將入隊節點設置成tail節點,如果tail節點的next節點為空,則將入隊節點設置成tail的next節點,所以tail節點不總是尾節點

    • 上面的分析讓我們從單線程入隊的角度來理解入隊過程,但是多個線程同時進行入隊情況就變得更加複雜,因為可能會出現其他線程插隊的情況。如果有一個線程正在入隊,那麼它必須先獲取尾節點,然後設置尾節點的下一個節點為入隊節點,但這時可能有另外一個線程插隊了,那麼隊列的尾節點就會發生變化,這時當前線程要暫停入隊操作,然後重新獲取尾節點。

    • 源碼解析:從下面可以看出,入隊永遠是返回true,所以不要通過返回值判斷是否入隊成功

      public boolean add(E e) {
              return offer(e);
          }
          public boolean offer(E e) {
              checkNotNull(e);//檢查是否為空
              //創建入隊節點,將元素添加到節點中
              final Node<E> newNode = new Node<E>(e);
              //自旋隊列CAS直到入隊成功
              // 1、根據tail節點定位出尾節點(last node);2、將新節點置為尾節點的下一個節點;3、casTail更新尾節點
              for (Node<E> t = tail, p = t;;) {
                  //p是尾節點,q得到尾節點的next
                  Node<E> q = p.next;
                  //如果q為空
                  if (q == null) {
                      //p是last node,將尾節點的next修改為創建的節點
                      if (p.casNext(null, newNode)) {
                          //p在遍歷後會變化,因此需要判斷,如果不相等即p != t = tail,表示t(= tail)不是尾節點,則將入隊節點設置成tail節點,更新失敗了也沒關係,因為失敗了表示有其他線程成功更新了tail節點
                          if (p != t)
                              casTail(t, newNode);//入隊節點更新為尾節點,允許失敗,因此t= tail並不總是尾節點
                          return true;//結束
                      }
                  }
                  //重新獲取head節點:多線程操作時,輪詢后p有可能等於q,此時,就需要對p重新賦值
                  //(多線程自引用的情況,只有offer()和poll()交替執行時會出現)
                  else if (p == q)
                      //因為併發下可能tail被改了,如果被改了,則使用新的t,否則跳轉到head,從鏈表頭重新輪詢,因為從head開始所有的節點都可達
                      p = (t != (t = tail)) ? t : head;//運行到這裏再繼續自旋遍歷
                  else
                      /**
                       * 尋找尾節點,同樣,當t不等於p時,說明p在上面被重新賦值了,並且tail也被別的線程改了,則使用新的tail,否則循環檢查p的下個節點
                       *  (多offer()情況下會出現)
                       * p=condition?result1:result2
                       *  滿足result1的場景為 :
                       *      獲取尾節點tail的快照已經過時了(其他線程更新了新的尾節點tail),直接跳轉到當前獲得的最新尾節點的地方
                       *  滿足result2的場景為:
                       *      多線程同時操作offer(),執行p.casNext(null, newNode)CAS成功后,未更新尾節點(未執行casTail(t, newNode)方法:兩種原因 1是未滿足前置條件if判斷 2是CAS更新失敗),直接找next節點
                       */
                      p = (p != t && t != (t = tail)) ? t : q;//運行到這裏再繼續自旋遍歷
              }
          }
    1. debug斷點測試案例:

      public static void main(String[] args) throws IndexOutOfBoundsException {
              ConcurrentLinkedQueue c = new ConcurrentLinkedQueue();
              new Thread(()->{
                  int i;
                  for(i=0;i<10;){
                      c.offer(i++);
                      Object poll = c.poll();//註釋或取消進行測試
                      System.out.println(Thread.currentThread().getName()+":"+poll);
                  }
              }).start();
              new Thread(()->{
                  int i;
                  for(i=200;i<210;){
                      c.offer(i++);
                      Object poll = c.poll();//註釋或取消進行測試
                      System.out.println(Thread.currentThread().getName()+":"+poll);
                  }
              }).start();
           }
    2. tail多線程的更新情況:通過p和t是否相等來判斷

  3. 出隊poll:

    • 從上圖可知,並不是每次出隊時都更新head節點,當head節點里有元素時,直接彈出head節點里的元素,而不會更新head節點。只有當head節點里沒有元素時,出隊操作才會更新head節點。採用這種方式也是為了減少使用CAS更新head節點的消耗,從而提高出隊效率。
    • 源碼解析:

      public E poll() {
              restartFromHead:
              //自旋
              for (;;) {
                  //獲得頭節點
                  for (Node<E> h = head, p = h, q;;) {
                      E item = p.item;//獲得頭節點元素
                      //如果頭節點元素不為null並且cas刪除頭節點元素成功
                      if (item != null && p.casItem(item, null)) {
                          //p被修改了
                          if (p != h) // hop two nodes at a time
                              // 如果p 的next 屬性不是null ,將 p 作為頭節點,而 q 將會消失
                              updateHead(h, ((q = p.next) != null) ? q : p);
                          return item;
                      }
                      //如果頭節點的元素為空或頭節點發生了變化,這說明頭節點已經被另外一個線程修改了。
                      // 那麼獲取p節點的下一個節點,如果p節點的下一節點為null,則表明隊列已經空了
                      // 如果 p(head) 的 next 節點 q 也是null,則表示沒有數據了,返回null,則將 head 設置為null
                      // 注意:updateHead 方法最後還會將原有的 head 作為自己 next 節點,方便offer 連接。
                      else if ((q = p.next) == null) {
                          updateHead(h, p);
                          return null;
                      }
                      //如果 p == q,說明別的線程取出了 head,並將 head 更新了。就需要重新開始獲取head節點
                      else if (p == q)
                          continue restartFromHead;
                      // 如果下一個元素不為空,則將頭節點的下一個節點設置成頭節點
                      else
                          p = q;
                  }
              }
          }
          final void updateHead(Node<E> h, Node<E> p) {
              if (h != p && casHead(h, p))
                  // 將舊的頭結點h的next域指向為h
                  h.lazySetNext(h);
          }
  4. 入隊和出隊操作中,都有p == q的情況,在下面這種情況中:

    • 在彈出一個節點之後,tail節點有一條指向自己的虛線,這是什麼意思呢?在poll()方法中,移除元素之後,會調用updateHead方法,其中有h.lazySetNext(h),可以看到,在更新完head之後,會將舊的頭結點h的next域指向為h,上圖中所示的虛線也就表示這個節點的自引用
    • 如果這時,再有一個線程來添加元素,通過tail獲取的next節點則仍然是它本身,這就出現了p == q的情況,出現該種情況之後,則會觸發執行head的更新,將p節點重新指向為head,所有“活着”的節點(指未刪除節點),都能從head通過遍歷可達,這樣就能通過head成功獲取到尾節點,然後添加元素了。

  5. 獲取首部元素peek:

    • 從圖中可以看到,peek操作會改變head指向,執行peek()方法后head會指向第一個具有非空元素的節點。
    • 源碼解析:
      // 獲取鏈表的首部元素(只讀取而不移除)
          public E peek() {
              restartFromHead:
              //自旋
              for (;;) {
                  for (Node<E> h = head, p = h, q;;) {
                      //獲得頭節點元素
                      E item = p.item;
                      //頭節點元素不為空或頭節點下一節點為空(表示鏈表只有一個節點)
                      if (item != null || (q = p.next) == null) {
                          updateHead(h, p);//更新頭節點標識
                          return item;
                      }
                      /如果 p == q,說明別的線程取出了 head,並將 head 更新了。就需要重新開始獲取head節點
                      else if (p == q)
                          continue restartFromHead;
                      // 如果下一個元素不為空,則將頭節點的下一個節點設置成頭節點
                      else
                          p = q;
                  }
              }
          }
  6. 判斷隊列是否為空isEmpty:

        public boolean isEmpty() {
                return first() == null;
        }
        Node<E> first() {
            restartFromHead:
            for (;;) {
                for (Node<E> h = head, p = h, q;;) {
                    //頭節點是否有元素
                    boolean hasItem = (p.item != null);
                    //頭節點有元素或當前鏈表只有一個節點
                    if (hasItem || (q = p.next) == null) {
                        updateHead(h, p);
                        return hasItem ? p : null;//頭節點有值返回節點,否則返回null
                    }
                    else if (p == q)
                        continue restartFromHead;
                    else
                        p = q;
                }
            }
        }
  7. 獲取個數size:在併發環境中,其結果可能不精確,因為整個過程都沒有加鎖,所以從調用size方法到返回結果期間有可能增刪元素,導致統計的元素個數不精確。【在隊列元素很多的時候,size()方法十分消耗性能和時間,只是單純的判斷隊列為空使用isEmpty()即可!!!】

        public int size() {
            int count = 0;
            // first()獲取第一個具有非空元素的節點,若不存在,返回null
            // succ(p)方法獲取p的後繼節點,若p == p的後繼節點,則返回head
            for (Node<E> p = first(); p != null; p = succ(p))
                //節點有元素數量+1
                if (p.item != null)
                    if (++count == Integer.MAX_VALUE)
                        break;
            return count;
        }
        //取下一節點
        final Node<E> succ(Node<E> p) {
            Node<E> next = p.next;
            //若p == p的後繼節點(自引用情況下會出現),則返回head
            return (p == next) ? head : next;
        }
    • 探討:為何 ConcurrentLinkedQueue 中需要遍歷鏈表來獲取 size 而不適用一個原子變量呢?

      • 這是因為使用原子變量保存隊列元素個數需要保證入隊出隊操作和操作原子變量是原子操作,而ConcurrentLinkedQueue 是使用 CAS 無鎖算法的,所以無法做到這個。
  8. 判斷元素是否包含contains:該方法和size方法類似,有可能返回錯誤結果,比如調用該方法時,元素還在隊列裏面,但是遍歷過程中,該元素被刪除了,那麼就會返回false。

        public boolean contains(Object o) {
            if (o == null) return false;
            for (Node<E> p = first(); p != null; p = succ(p)) {
                E item = p.item;
                // 若找到匹配節點,則返回true
                if (item != null && o.equals(item))
                    return true;
            }
            return false;
        }
  9. 刪除元素remove:

        public boolean remove(Object o) {
            //刪除的元素不能為null,
            if (o != null) {
                Node<E> next, pred = null;
                //遍歷,開始獲得頭節點,
                for (Node<E> p = first(); p != null; pred = p, p = next) {
                    boolean removed = false;//刪除的標識
                    E item = p.item;//節點元素
                    if (item != null) {
                        //節點的元素不等於要刪除的元素,獲取下一節點進行遍歷循環操作
                        if (!o.equals(item)) {
                            next = succ(p);//將當前遍歷的節點移到下一節點
                            continue;
                        }
                        //節點元素等於刪除元素,CAS將節點元素置為null
                        removed = p.casItem(item, null);
                    }
                    next = succ(p);//獲取刪除節點的下一節點,
                    //有前節點和後置節點
                    if (pred != null && next != null) // unlink
                        pred.casNext(p, next);//刪除當前節點,即當前節點移除出隊列
                    if (removed)//元素刪除了返回true
                        return true;
                }
            }
            return false;
        }

  三.總結

  • 使用 CAS 原子指令來處理對數據的併發訪問,這是非阻塞算法得以實現的基礎。
  • head/tail 並非總是指向隊列的頭 / 尾節點,也就是說允許隊列處於不一致狀態。 這個特性把入隊 / 出隊時,原本需要一起原子化執行的兩個步驟分離開來,從而縮小了入隊 / 出隊時需要原子化更新值的範圍到唯一變量。這是非阻塞算法得以實現的關鍵。
  • 由於隊列有時會處於不一致狀態。為此,ConcurrentLinkedQueue 使用三個不變式來維護非阻塞算法的正確性。
  • 以批處理方式來更新 head/tail,從整體上減少入隊 / 出隊操作的開銷。
  • 為了有利於垃圾收集,隊列使用特有的 head 更新機制;為了確保從已刪除節點向後遍歷,可到達所有的非刪除節點,隊列使用了特有的向後推進策略。

  四.參考

  • https://blog.csdn.net/qq_38293564/article/details/80798310
  • https://www.ibm.com/developerworks/cn/java/j-lo-concurrent/index.html

本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理

【其他文章推薦】

※為什麼 USB CONNECTOR 是電子產業重要的元件?

網頁設計一頭霧水該從何著手呢? 台北網頁設計公司幫您輕鬆架站!

※台北網頁設計公司全省服務真心推薦

※想知道最厲害的網頁設計公司"嚨底家"!

※推薦評價好的iphone維修中心

您可能也會喜歡…