您現在的位置是:首頁 > 單機遊戲首頁單機遊戲

Java 中悲觀鎖的底層實現機制

簡介用文字描述 acquire(int arg) 方法的呼叫流程:首先呼叫自定義 AQS 實現的 tryAcquire(int arg) 方法,該方法的作用是嘗試獲取資源:如果獲取資源成功,則直接從 acquire(int arg) 方法返回如

int to time什麼意思

介紹 AQS

AQS(AbstractQueuedSynchronizer)是 Java 併發包中,實現各種同步元件的基礎。比如

各種鎖:ReentrantLock、ReadWriteLock、StampedLock

各種執行緒同步工具類:CountDownLatch、CyclicBarrier、Semaphore

執行緒池中的 Worker

Lock 介面的實現基本都是透過聚合了一個 AQS 的子類來完成執行緒訪問控制的。

Doug Lea 曾經介紹過 AQS 的設計初衷。從原理上,一種同步元件往往是可以利用其他的元件實現的,例如可以使用 Semaphore 實現互斥鎖。但是,對某種同步元件的傾向,會導致複雜、晦澀的實現邏輯,所以,他選擇了將基礎的同步相關操作抽象在 AbstractQueuedSynchronizer 中,利用 AQS 為我們構建同步元件提供了範本。

如何使用 AQS

利用 AQS 實現一個同步元件,我們至少要實現兩類基本的方法,分別是:

獲取資源,需要實現 tryAcquire(int arg) 方法

釋放資源,需要實現 tryRelease(int arg) 方法

如果需要共享式獲取 / 釋放資源,需要實現對應的 tryAcquireShared(int arg)、tryReleaseShared(int arg)

AQS 使用的是模板方法設計模式。AQS 方法的修飾符很有規律,其中,使用 protected 修飾的方法為抽象方法,通常需要子類去實現,從而實現不同的同步元件;使用 public 修飾的方法基本可以認為是模板方法,不建議子類直接覆蓋。

透過呼叫 AQS 的 acquire(int arg) 方法可以獲取資源,該方法會呼叫 protected 修飾的 tryAcquire(int arg) 方法,因此我們需要在 AQS 的子類中實現 tryAcquire(int arg),tryAcquire(int arg) 方法的作用是:獲取資源。

當前執行緒獲取資源並執行了相應邏輯之後,就需要釋放資源,使得後續節點能夠繼續獲取資源。透過呼叫 AQS 的 release(int arg) 方法可以釋放資源,該方法會呼叫 protected 修飾的 tryRelease(int arg) 方法,因此我們需要在 AQS 的子類中實現 tryRelease(int arg),tryRelease(int arg) 方法的作用是:釋放資源。

AQS 的實現原理

從實現角度分析 AQS 是如何完成執行緒訪問控制。

AQS 的實現原理可以從 同步阻塞佇列、獲取資源時的執行流程、釋放資源時的執行流程 這 3 個方面介紹。

同步阻塞佇列

AQS 依賴內部的同步阻塞佇列(一個 FIFO 雙向佇列)來完成資源的管理。

同步阻塞佇列的工作機制

節點:同步阻塞佇列中的節點(Node)用來儲存獲取資源失敗的執行緒引用、等待狀態以及前驅和後繼節點,沒有成功獲取資源的執行緒將會成為節點加入同步阻塞佇列的尾部,同時會阻塞當前執行緒(Java 執行緒處於 WAITING 狀態,釋放 CPU 的使用權)。

首節點:同步阻塞佇列遵循 FIFO(先進先出),首節點是獲取資源成功的節點,首節點的執行緒在釋放資源時,將會喚醒後繼節點,使其再次嘗試獲取資源,而後繼節點將會在獲取資源成功時將自己設定為首節點。

Java 中悲觀鎖的底層實現機制

等待狀態

獲取資源、釋放資源的執行流程,結論先行:

在獲取資源時,獲取資源失敗的執行緒都會被加入到同步阻塞佇列中,並在佇列中進行自旋;移出佇列(或停止自旋)的條件是前驅節點為頭節點且成功獲取了資源。

在釋放資源時,AQS 呼叫 tryRelease(int arg) 方法釋放資源,然後喚醒頭節點的後繼節點。

獲取資源

下面來介紹獲取資源時的執行流程。

呼叫 AQS 的 acquire(int arg) 方法可以獲取資源。

acquire(int arg) 方法是獨佔式獲取資源,它呼叫流程如下圖所示。

Java 中悲觀鎖的底層實現機制

用文字描述 acquire(int arg) 方法的呼叫流程:首先呼叫自定義 AQS 實現的 tryAcquire(int arg) 方法,該方法的作用是嘗試獲取資源:

如果獲取資源成功,則直接從 acquire(int arg) 方法返回

如果獲取資源失敗,則構造節點,並將該節點加入到同步阻塞佇列的尾部,最後呼叫 acquireQueued(Node node,int arg) 方法,使得該節點以“死迴圈”的方式嘗試獲取資源。只有當前節點的前驅節點是頭節點,才能嘗試獲取資源。

如果當前節點的前驅節點是頭節點,並且獲取資源成功,則設定當前節點為頭節點,並從 acquireQueued(Node node,int arg) 方法返回

如果當前節點的前驅節點不是頭節點 或者 獲取資源失敗,則阻塞當前執行緒,執行緒被喚醒後繼續執行該迴圈操作

acquireQueued(Node node,int arg) 方法的呼叫過程也被稱為“自旋過程”。

自旋是什麼意思是呢?我的理解就是:自旋就是一個死迴圈,迴圈執行某個操作序列,直到滿足某個條件才退出迴圈。

/** * Acquires in exclusive mode, ignoring interrupts。 Implemented * by invoking at least once {@link #tryAcquire}, * returning on success。 Otherwise the thread is queued, possibly * repeatedly blocking and unblocking, invoking {@link * #tryAcquire} until success。 This method can be used * to implement method {@link Lock#lock}。 * * @param arg the acquire argument。 This value is conveyed to * {@link #tryAcquire} but is otherwise uninterpreted and * can represent anything you like。 */public final void acquire(int arg) {if (!tryAcquire(arg) && acquireQueued(addWaiter(Node。EXCLUSIVE), arg))selfInterrupt();}

acquire(int arg) 的主要邏輯是:

首先呼叫自定義 AQS 實現的 tryAcquire(int arg) 方法,該方法保證執行緒安全的獲取資源:

如果獲取資源成功,則直接從 acquire(int arg) 方法返回

如果獲取資源失敗,則構造同步節點(獨佔式 Node。EXCLUSIVE,同一時刻只能有一個執行緒成功獲取資源)並透過 addWaiter(Node node) 方法將該節點加入到同步阻塞佇列的尾部,最後呼叫 acquireQueued(Node node,int arg) 方法,使得該節點以“死迴圈”的方式獲取資源。如果獲取不到則阻塞節點中的執行緒,而被阻塞執行緒的喚醒主要依靠 前驅節點的出隊 或 阻塞執行緒被中斷 來實現。

/** * Acquires in exclusive uninterruptible mode for thread already in * queue。 Used by condition wait methods as well as acquire。 * * @param node the node * @param arg the acquire argument * @return {@code true} if interrupted while waiting */final boolean acquireQueued(final Node node, int arg) {boolean failed = true;try {boolean interrupted = false;for (;;) {final Node p = node。predecessor();if (p == head && tryAcquire(arg)) {setHead(node);p。next = null; // help GCfailed = false;return interrupted;}if (shouldParkAfterFailedAcquire(p, node) &&parkAndCheckInterrupt())interrupted = true;}} finally {if (failed)cancelAcquire(node);}}

在 acquireQueued(final Node node,int arg) 方法中,當前執行緒在“死迴圈”中嘗試獲取資源,而只有前驅節點是頭節點才能夠嘗試獲取資源,這是為什麼?原因有兩個,如下。

第一,頭節點是成功獲取到資源的節點,而頭節點的執行緒釋放了資源之後,將會喚醒其後繼節點,後繼節點的執行緒被喚醒後需要檢查自己的前驅節點是否是頭節點。

第二,維護同步阻塞佇列的 FIFO 原則。

釋放資源

當前執行緒獲取資源並執行了相應邏輯之後,就需要釋放資源,使得後續節點能夠繼續獲取資源。

下面來介紹釋放資源時的執行流程。

透過呼叫 AQS 的 release(int arg) 方法可以釋放資源,該方法在釋放資源之後,會喚醒頭節點的後繼節點,進而使後繼節點重新嘗試獲取資源。

/** * Releases in exclusive mode。 Implemented by unblocking one or * more threads if {@link #tryRelease} returns true。 * This method can be used to implement method {@link Lock#unlock}。 * * @param arg the release argument。 This value is conveyed to * {@link #tryRelease} but is otherwise uninterpreted and * can represent anything you like。 * @return the value returned from {@link #tryRelease} */public final boolean release(int arg) {if (tryRelease(arg)) {Node h = head;if (h != null && h。waitStatus != 0)unparkSuccessor(h);return true;}return false;}

release(int arg) 方法執行時,會喚醒頭節點的後繼節點執行緒, unparkSuccessor(Node node) 方法使用 LockSupport#unpark() 方法來喚醒處於等待狀態的執行緒。

共享式 獲取 & 釋放 資源

上面講的是獨佔式獲取 / 釋放 資源。

共享式獲取與獨佔式獲取最主要的區別在於:同一時刻能否有多個執行緒同時獲取到資源。以檔案的讀寫為例,如果一個程式在對檔案進行讀操作,那麼這一時刻對於該檔案的寫操作均被阻塞,而讀操作能夠同時進行。寫操作要求對資源的獨佔式訪問,而讀操作可以是共享式訪問。

共享式訪問資源時,其他共享式的訪問均被允許,獨佔式訪問被阻塞

獨佔式訪問資源時,同一時刻其他訪問均被阻塞

共享式獲取資源

呼叫 AQS 的 acquireShared(int arg) 方法可以共享式地獲取資源。

在 acquireShared(int arg) 方法中,AQS 呼叫 tryAcquireShared(int arg) 方法嘗試獲取資源, tryAcquireShared(int arg) 方法返回值為 int 型別,當返回值 >= 0 時,表示能夠獲取到資源。

可以看到,在 doAcquireShared(int arg) 方法的自旋過程中,如果當前節點的前驅為頭節點時,才能嘗試獲取資源,如果獲取資源成功(返回值 >= 0),則設定當前節點為頭節點,並從自旋過程中退出。

Java 中悲觀鎖的底層實現機制

共享式釋放資源

呼叫 releaseShared(int arg) 方法可以釋放資源。該方法在釋放資源之後,會喚醒頭節點的後繼節點,進而使後繼節點重新嘗試獲取資源。

對於能夠支援多個執行緒同時訪問的併發元件(比如 Semaphore),它和獨佔式主要區別在於 tryReleaseShared(int arg) 方法必須確保資源安全釋放,因為釋放資源的操作會同時來自多個執行緒。 確保資源安全釋放一般是透過迴圈和 CAS 來保證的。

public final boolean releaseShared(int arg) {if (tryReleaseShared(arg)) {doReleaseShared();return true;}return false;}

獨佔式超時獲取資源

呼叫 AQS 的 doAcquireNanos(int arg,long nanosTimeout) 方法可以超時獲取資源,即在指定的時間段內獲取資源,如果獲取資源成功則返回 true,否則返回 false。

該方法提供了傳統 Java 同步操作(比如 synchronized 關鍵字)所不具備的特性。

在分析該方法的實現前,先介紹一下響應中斷的獲取資源過程。

在 Java 5 之前,當一個執行緒獲取不到鎖而被阻塞在 synchronized 之外時,對該執行緒進行中斷操作,此時該執行緒的中斷標誌位會被修改,但執行緒依舊會阻塞在 synchronized 上,等待著獲取鎖。

在 Java 5 中,AQS 提供了 acquireInterruptibly(int arg) 方法,這個方法在等待獲取資源時,如果當前執行緒被中斷,會立刻返回,並丟擲 InterruptedException。

acquire(int arg) 方法對中斷不敏感,也就是由於執行緒獲取資源失敗後進入同步阻塞佇列中,後續對執行緒進行中斷操作時,執行緒不會從同步阻塞佇列中移出。

超時獲取資源過程可以被視作響應中斷獲取資源過程的“增強版”,doAcquireNanos(int arg,long nanosTimeout) 方法在支援響應中斷的基礎上,增加了超時獲取的特性。

針對超時獲取,主要需要計算出需要睡眠的時間間隔 nanosTimeout,為了防止過早通知, nanosTimeout 計算公式為:nanosTimeout -= now - lastTime,其中 now 為當前喚醒時間, lastTime 為上次喚醒時間,如果 nanosTimeout 大於 0 則表示超時時間未到,需要繼續睡眠 nanosTimeout 納秒,反之,表示已經超時。

Java 中悲觀鎖的底層實現機制

該方法在自旋過程中,當節點的前驅節點為頭節點時嘗試獲取資源,如果成功獲取資源則從該方法返回,這個過程和獨佔式同步獲取的過程類似,但是在獲取資源失敗的處理上有所不同。

如果當前執行緒獲取資源失敗,則判斷是否超時(nanosTimeout 小於等於 0 表示已經超時),如果沒有超時,則重新計算超時間隔 nanosTimeout,然後使當前執行緒等待 nanosTimeout 納秒(當已到設定的超時時間,該執行緒會從 LockSupport。parkNanos(Object blocker,long nanos)方法返回)。

如果 nanosTimeout 小於等於 spinForTimeoutThreshold(1000 納秒)時,將不會使該執行緒進行超時等待,而是進入快速的自旋過程。原因在於,非常短的超時等待無法做到十分精確,如果這時再進行超時等待,相反會讓 nanosTimeout 的超時從整體上表現得反而不精確。因此,在超時非常短的場景下,AQS 會進入無條件的快速自旋。

獨佔式超時獲取資源的流程如下所示。

Java 中悲觀鎖的底層實現機制

從圖中可以看出,獨佔式超時獲取資源 doAcquireNanos(int arg,long nanosTimeout) 和獨佔式獲取資源 acquire(int args)在流程上非常相似,其主要區別在於:未獲取到資源時的處理邏輯。

acquire(int args) 在未獲取到資源時,將會使當前執行緒一直處於等待狀態,而 doAcquireNanos(int arg,long nanosTimeout) 會使當前執行緒等待 nanosTimeout 納秒,如果當前執行緒在 nanosTimeout 納秒內沒有獲取到資源,將會從等待邏輯中自動返回。

Condition 的實現原理

技術是為了解決問題而生的,透過 Condition 我們可以實現等待 / 通知功能。

ConditionObject 是 AQS 的內部類。每個 Condition 物件都包含著一個條件等待佇列,這個條件等待佇列是 Condition 物件實現等待 / 通知功能的關鍵。

下面我們分析 Condition 的實現原理,主要包括:條件等待佇列、等待 和 通知。

下面提到的 Condition 如果不加說明均指的是 ConditionObject。

條件等待佇列

Condition 依賴內部的條件等待佇列(一個 FIFO 雙向佇列)來實現等待 / 通知功能。

條件等待佇列的工作機制

節點:條件等待佇列中的每個節點(Node)都包含一個執行緒引用,該執行緒就是在 Condition 物件上等待的執行緒,如果一個執行緒呼叫了 Condition。await()方法,那麼該執行緒將會釋放資源、構造成為節點加入條件等待佇列的尾部,同時執行緒狀態變為等待狀態。

事實上,條件等待佇列中的節點定義複用了 AQS 節點的定義,也就是說,同步阻塞佇列和條件等待佇列中節點型別都是 AQS 的靜態內部類 AbstractQueuedSynchronizer。Node。

在 Object 的監視器模型上,一個物件擁有一個同步阻塞佇列和一個條件等待佇列,而併發包中的 Lock(更確切地說是 AQS)擁有一個同步阻塞佇列和多個條件等待佇列。

等待

下面來介紹讓執行緒等待的執行流程。

呼叫 Condition 的 await() 方法(或者以 await 開頭的方法),將會使當前執行緒釋放資源、構造成為節點加入條件等待佇列的尾部,同時執行緒狀態變為等待狀態。

如果從佇列(同步阻塞佇列和條件等待佇列)的角度看 await()方法,當呼叫 await() 方法時,相當於同步阻塞佇列的首節點(獲取到鎖的節點)移動到 Condition 的條件等待佇列中。並且同步阻塞佇列的首節點並不會直接加入條件等待佇列,而是透過 addConditionWaiter() 方法把當前執行緒構造成一個新的節點,將其加入條件等待佇列中。

/** * Implements interruptible condition wait。 *

    *
  1. If current thread is interrupted, throw InterruptedException。 *
  2. Save lock state returned by {@link #getState}。 *
  3. Invoke {@link #release} with saved state as argument, * throwing IllegalMonitorStateException if it fails。 *
  4. Block until signalled or interrupted。 *
  5. Reacquire by invoking specialized version of * {@link #acquire} with saved state as argument。 *
  6. If interrupted while blocked in step 4, throw InterruptedException。 *
*/public final void await() throws InterruptedException { if (Thread。interrupted())throw new InterruptedException();Node node = addConditionWaiter();int savedState = fullyRelease(node);int interruptMode = 0;while (!isOnSyncQueue(node)) {LockSupport。park(this);if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)break;}if (acquireQueued(node, savedState) && interruptMode != THROW_IE)interruptMode = REINTERRUPT;if (node。nextWaiter != null) // clean up if cancelledunlinkCancelledWaiters();if (interruptMode != 0)reportInterruptAfterWait(interruptMode);}

通知

下面來介紹喚醒等待執行緒的執行流程。

呼叫 Condition 的 signal() 方法,將會喚醒在條件等待佇列中等待時間最長的節點(首節點),在喚醒節點之前,會將當前節點從條件等待佇列移動到同步阻塞佇列中。

條件等待佇列中的節點被喚醒後,被喚醒的執行緒以“死迴圈”的方式嘗試獲取資源。成功獲取資源之後,被喚醒的執行緒將從先前呼叫的 await() 方法返回。

如果被喚醒的執行緒不是透過其他執行緒呼叫 Condition。signal() 方法喚醒,而是對等待執行緒進行中斷,則會丟擲InterruptedException。

被喚醒的執行緒,將從 await() 方法中的 while 迴圈中退出(isOnSyncQueue(Node node) 方法返回 true,節點已經在同步阻塞佇列中),進而呼叫 AQS 的 acquireQueued() 方法以“死迴圈”的方式嘗試獲取資源。成功獲取資源之後,被喚醒的執行緒將從先前呼叫的 await() 方法返回。

Condition 的 signalAll() 方法,相當於對條件等待佇列中的每個節點均執行一次 signal() 方法,效果就是將條件等待佇列中所有節點全部移動到同步阻塞佇列中,並喚醒每個節點的執行緒。

雖然是把每個節點的執行緒都喚醒了,這些執行緒需要嘗試獲取資源, 但是隻有一個執行緒能夠成功獲取資源,然後從 await() 方法返回;其他獲取資源失敗的執行緒又都會被加入到同步阻塞佇列中,並在佇列中進行自旋;移出佇列(或停止自旋)的條件是前驅節點為頭節點且成功獲取了資源。

Java 中悲觀鎖的底層實現機制

參考資料

《Java併發程式設計藝術》第5章:Java 中的鎖

Top