本文基于jdk1.8進行分析。
reentrantlock是一個可重入鎖,在concurrenthashmap中使用了reentrantlock。
首先看一下源碼中對reentrantlock的介紹。如下圖。reentrantlock是一個可重入的排他鎖,它和synchronized的方法和代碼有著相同的行為和語義,但有更多的功能。reentrantlock是被最后一個成功lock鎖并且還沒有unlock的線程擁有著。如果鎖沒有被別的線程擁有,那么一個線程調用lock方法,就會成功獲取鎖并返回。如果當前線程已經擁有該鎖,那么lock方法會立刻返回。這個可以通過isheldbycurrentthread方法和getholdcount方法進行驗證。除了這部分介紹外,類前面的javadoc文檔很長,就不在這里全部展開。隨著后面介紹源碼,會一一涉及到。
1
2
3
4
5
6
7
8
9
10
11
12
|
/** * a reentrant mutual exclusion { @link lock} with the same basic * behavior and semantics as the implicit monitor lock accessed using * { @code synchronized } methods and statements, but with extended * capabilities. * <p>a { @code reentrantlock} is <em>owned</em> by the thread last * successfully locking, but not yet unlocking it. a thread invoking * { @code lock} will return , successfully acquiring the lock, when * the lock is not owned by another thread. the method will return * immediately if the current thread already owns the lock. this can * be checked using methods { @link #isheldbycurrentthread}, and { @link * #getholdcount}. |
首先看一下成員變量,如下圖。reentrantlock只有一個成員變量sync,即同步器,這個同步器提供所有的機制。sync是abstractqueuedsynchronizer的子類,同時,sync有2個子類,nonfairsync和fairsync,分別是非公平鎖和公平鎖。sync,nonfairesync和fairsync的具體實現后面再講。
1
2
|
/** synchronizer providing all implementation mechanics **/ private final sync sync; |
下面看一下構造函數。如下圖。可以看到,reentrantlock默認是非公平鎖,它可以通過參數,指定初始化為公平鎖或非公平鎖。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
|
/** * creates an instance of {@code reentrantlock}. * this is equivalent to using {@code reentrantlock(false)}. **/ public reentrantlock() { sync = new nonfairsync(); } /** * creates an instance of {@code reentrantlock} with the * given fairness policy. * @param fair {@code true} if this lock should use a fair ordering policy **/ public reentrantlock( boolean fair) { sync = fair ? new fairsync() : new nonfairsync(); } |
下面看一下reentrantlock的主要方法。首先是lock方法。如下圖。lock方法的實現很簡單,就是調用sync的lock方法。而sync的lock方法是個抽象的,具體實現在nonfairsync和fairsync中。這里我們先不展開講,而是先讀一下lock方法的注釋,看看它的作用。lock方法的作用是獲取該鎖。分為3種情況。
1,如果鎖沒有被別的線程占有,那么當前線程就可以獲取到鎖并立刻返回,并把鎖計數設置為1。
2,如果當前線程已經占有該鎖了,那么就會把鎖計數加1,立刻返回。
3,如果鎖被另一個線程占有了,那么當前線程就無法再被線程調度,并且開始睡眠,直到獲取到鎖,在獲取到到鎖時,會把鎖計數設置為1。
lockinterruptibly方法與lock功能類似,但lockinterruptibly方法在等待的過程中,可以響應中斷。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
|
/** * acquires the lock. * <p>acquires the lock if it is not held by another thread and returns * immediately, setting the lock hold count to one. * <p>if the current thread already holds the lock then the hold * count is incremented by one and the method returns immediately. * <p>if the lock is held by another thread then the * current thread becomes disabled for thread scheduling * purposes and lies dormant until the lock has been acquired, * at which time the lock hold count is set to one. **/ public void lock() { sync.lock(); } public void lockinterruptibly() throws interruptedexception { sync.acquireinterruptibly( 1 ); } |
下面,詳細看一下非公平鎖和公平鎖中對lock函數的實現。如下圖。下圖同時列出了公平鎖和非公平鎖中lock的實現邏輯。從注釋和代碼邏輯中,都可以看出,非公平鎖進行lock時,先嘗試立刻闖入(搶占),如果成功,則獲取到鎖,如果失敗,再執行通常的獲取鎖的行為,即acquire(1)。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
|
/** * 非公平鎖中的lock * performs lock. try immediate barge, backing up to normal * acquire on failure. **/ final void lock() { if (compareandsetstate( 0 , 1 )) setexclusiveownerthread(thread.currentthread()); else acquire( 1 ); } //公平鎖中的lock final void lock() { acquire( 1 ); } |
那么,我們首先了解下,非公平鎖“嘗試立刻闖入”,究竟做了什么。稍后再繼續講解通常的獲取鎖的行為。下圖是立即闖入行為compareandsetstate(0, 1)的實現。從compareandsetstate函數的注釋中,可以知道,如果同步狀態值與期望值相等,那么就把它的值設置為updated值。否則同步狀態值與期望值不相等,則返回false。這個操作和volatile有著相同的內存語義,也就是說,這個操作對其他線程是可見的。compareandsetstate函數注釋里描述的功能,是通過unsafe.compareandswapint方法實現的,而unsafe.compareandswapint是一個native方法,是用c++實現的。那么繼續追問,c++底層是怎么實現的?c++底層是通過cas指令來實現的。什么是cas指令呢?來自維基百科的解釋是,cas,比較和交換,compare and swap,是用用于實現多線程原子同步的指令。它將內存位置的內容和給定值比較,只有在相同的情況下,將該內存的值設置為新的給定值。這個操作是原子操作。那么繼續追問,cas指令的原子性,是如何實現的呢?我們都知道指令時cpu來執行的,在多cpu系統中,內存是共享的,內存和多個cpu都掛在總線上,當一個cpu執行cas指令時,它會先將總線lock位點設置為高電平。如果別的cpu也要執行cas執行,它會發現總線lock位點已經是高電平了,則無法執行cas執行。cpu通過lock保證了指令的原子執行。
現在來看一下非公平鎖的lock行為,compareandsetstate(0, 1),它期望鎖狀態為0,即沒有別的線程占用,并把新狀態設置為1,即標記為占用狀態。如果成功,則非公平鎖成功搶到鎖,之后setexclusiveownerthread,把自己設置為排他線程。非公平鎖這小子太壞了。如果搶占失敗,則執行與公平鎖相同的操作。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
|
/** * atomically sets synchronization state to the given updated * value if the current state value equals the expected value. * this operation has memory semantics of a {@code volatile} read * and write. * @param expect the expected value * @param update the new value * @return {@code true} if successful. false return indicates that the actual * value was not equal to the expected value. **/ protected final boolean compareandsetstate( int expect, int update) { // see below for intrinsics setup to support this return unsafe.compareandswapint( this , stateoffset, expect, update); } public final native boolean compareandswapint(object var1, long var2, int var4, int var5); |
下面看一下公平鎖獲取鎖時的行為。如下圖。這部分的邏輯有些多,請閱讀代碼中的注釋進行理解。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
|
/** * 公平鎖的lock **/ final void lock() { acquire( 1 ); } /** * acquires in exclusive mode, ignoring interrupts. implemented * by invoking at least once {@link #tryacquire}, * returning on success. otherwise the thread is queued, possibly * repeatedly blocking and unblocking, invoking {@link * #tryacquire} until success. this method can be used * to implement method {@link lock#lock}. * @param arg the acquire argument. this value is conveyed to * {@link #tryacquire} but is otherwise uninterpreted and * can represent anything you like. **/ public final void acquire( int arg) { /** * acquire首先進行tryacquire()操作。如果tryacquire()成功時則獲取到鎖,即刻返回。 * 如果tryacquire()false時,會執行acquirequeued(addwaiter(node.exclusive), arg) * 操作。如果acquirequeued(addwaiter(node.exclusive), arg)true時,則當前線程中斷自己。 * 如果acquirequeued(addwaiter(node.exclusive), arg)false,則返回。 * 其中tryacquire()操作在nonfairsync中和fairsync中實現又有所區別。 **/ if (!tryacquire(arg) && acquirequeued(addwaiter(node.exclusive), arg)) selfinterrupt(); } /** * nonfairsync中的tryacquire。 * @param acquires * @return **/ protected final boolean tryacquire( int acquires) { return nonfairtryacquire(acquires); } /** * performs non-fair trylock. tryacquire is implemented in * subclasses, but both need nonfair try for trylock method. **/ final boolean nonfairtryacquire( int acquires) { final thread current = thread.currentthread(); //首先獲取當前同步狀態值 int c = getstate(); if (c == 0 ) { //c為0,表示目前沒有線程占用鎖。沒有線程占用鎖時,當前線程嘗試搶鎖,如果搶鎖成功,則返回true。 if (compareandsetstate( 0 , acquires)) { setexclusiveownerthread(current); return true ; } } else if (current == getexclusiveownerthread()) { //c不等于0時表示鎖被線程占用。如果是當前線程占用了,則將鎖計數加上acquires,并返回true。 int nextc = c + acquires; if (nextc < 0 ) // overflow throw new error( "maximum lock count exceeded" ); setstate(nextc); return true ; } //以上情況都不是時,返回false,表示非公平搶鎖失敗。 return false ; } /** * fair version of tryacquire. don't grant access unless * recursive call or no waiters or is first. * 這個是公平版本的tryacquire **/ protected final boolean tryacquire( int acquires) { final thread current = thread.currentthread(); int c = getstate(); if (c == 0 ) { //c=0時表示鎖未被占用。這里是先判斷隊列中前面是否有別的線程。沒有別的線程時,才進行cas操作。 //公平鎖之所以公平,正是因為這里。它發現鎖未被占用時,首先判斷等待隊列中是否有別的線程已經在等待了。 //而非公平鎖,發現鎖未被占用時,根本不管隊列中的排隊情況,上來就搶。 if (!hasqueuedpredecessors() && compareandsetstate( 0 , acquires)) { setexclusiveownerthread(current); return true ; } } else if (current == getexclusiveownerthread()) { int nextc = c + acquires; if (nextc < 0 ) throw new error( "maximum lock count exceeded" ); setstate(nextc); return true ; } return false ; } /** * acquires in exclusive uninterruptible mode for thread already in * queue. used by condition wait methods as well as acquire. * 當搶鎖失敗時,先執行addwaiter(node.exclusive),將當前線程加入等待隊列,再執行該方法。 * 該方法的作用是中斷當前線程,并進行檢查,知道當前線程是隊列中的第一個線程,并且搶鎖成功時, * 該方法返回。 * @param node the node * @param arg the acquire argument * @return {@code true} if interrupted while waiting **/ final boolean acquirequeued( final node node, int arg) { boolean failed = true ; try { boolean interrupted = false ; for (;;) { final node p = node.predecessor(); if (p == head && tryacquire(arg)) { sethead(node); p.next = null ; // help gc failed = false ; return interrupted; } if (shouldparkafterfailedacquire(p, node) && parkandcheckinterrupt()) interrupted = true ; } } finally { if (failed) cancelacquire(node); } } |
接下來是trylock方法。代碼如下。從注釋中我們可以理解到,只有當調用trylock時鎖沒有被別的線程占用,trylock才會獲取鎖。如果鎖沒有被另一個線程占用,那么就獲取鎖,并立刻返回true,并把鎖計數設置為1. 甚至在鎖被設置為公平排序的情況下,若果鎖可用,調用trylock會立刻獲取鎖,而不管有沒有別的線程在等待鎖了。從這里我們總結出,不管可重入鎖是公平鎖還是非公平鎖,trylock方法只會是非公平的。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
|
/** * acquires the lock only if it is not held by another thread at the time * of invocation. * <p>acquires the lock if it is not held by another thread and * returns immediately with the value {@code true}, setting the * lock hold count to one. even when this lock has been set to use a * fair ordering policy, a call to {@code trylock()} <em>will</em> * immediately acquire the lock if it is available, whether or not * other threads are currently waiting for the lock. * this "barging" behavior can be useful in certain * circumstances, even though it breaks fairness. if you want to honor * the fairness setting for this lock, then use * {@link #trylock(long, timeunit) trylock(0, timeunit.seconds) } * which is almost equivalent (it also detects interruption). * <p>if the current thread already holds this lock then the hold * count is incremented by one and the method returns {@code true}. * <p>if the lock is held by another thread then this method will return * immediately with the value {@code false}. * @return {@code true} if the lock was free and was acquired by the * current thread, or the lock was already held by the current * thread; and {@code false} otherwise **/ public boolean trylock() { return sync.nonfairtryacquire( 1 ); } public boolean trylock( long timeout, timeunit unit) throws interruptedexception { return sync.tryacquirenanos( 1 , unit.tonanos(timeout)); } |
接下來是釋放鎖的方法unlock。代碼如下。unlock方式的實現,是以參數1來調用sync.release方法。而release方法是如何實現的呢?release方法首先會調用tryrelease方法,如果tryrelease成功,則喚醒后繼者線程。而tryrelease的實現過程十分清晰,首先獲取鎖狀態,鎖狀態減去參數(放鎖次數),得到新狀態。然后判斷持有鎖的線程是否為當前線程,如果不是當前線程,則拋出illegalmonitorstateexception。然后判斷,如果新狀態為0,說明放鎖成功,則把持有鎖的線程設置為null,并返回true。如果新狀態不為0,則返回false。從tryrelease的返回值來看,它返回的true或false,指的是否成功的釋放了該鎖。成功的釋放該鎖的意思是徹底釋放鎖,別的線程就可以獲取鎖了。這里要認識到,即便tryrelease返回false,它也只是說明了鎖沒有完全釋放,本次調用的這個釋放次數值,依然是釋放成功的。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
|
/** * attempts to release this lock. * <p>if the current thread is the holder of this lock then the hold * count is decremented. if the hold count is now zero then the lock * is released. if the current thread is not the holder of this * lock then {@link illegalmonitorstateexception} is thrown. * @throws illegalmonitorstateexception if the current thread does not * hold this lock **/ public void unlock() { sync.release( 1 ); } /** * releases in exclusive mode. implemented by unblocking one or * more threads if {@link #tryrelease} returns true. * this method can be used to implement method {@link lock#unlock}. * @param arg the release argument. this value is conveyed to * {@link #tryrelease} but is otherwise uninterpreted and * can represent anything you like. * @return the value returned from {@link #tryrelease} **/ public final boolean release( int arg) { if (tryrelease(arg)) { node h = head; if (h != null && h.waitstatus != 0 ) unparksuccessor(h); return true ; } return false ; } protected final boolean tryrelease( int releases) { int c = getstate() - releases; if (thread.currentthread() != getexclusiveownerthread()) throw new illegalmonitorstateexception(); boolean free = false ; if (c == 0 ) { free = true ; setexclusiveownerthread( null ); } setstate(c); return free; } /** * wakes up node's successor, if one exists. * @param node the node **/ private void unparksuccessor(node node) { /** * if status is negative (i.e., possibly needing signal) try * to clear in anticipation of signalling. it is ok if this * fails or if status is changed by waiting thread. **/ int ws = node.waitstatus; if (ws < 0 ) compareandsetwaitstatus(node, ws, 0 ); /** * thread to unpark is held in successor, which is normally * just the next node. but if cancelled or apparently null, * traverse backwards from tail to find the actual * non-cancelled successor. **/ node s = node.next; if (s == null || s.waitstatus > 0 ) { s = null ; for (node t = tail; t != null && t != node; t = t.prev) if (t.waitstatus <= 0 ) s = t; } if (s != null ) locksupport.unpark(s.thread); } |
接下來是newcondition方法。關于condition這里不展開介紹,只是了解下該方法的作用。如下圖。該方法返回一個和這個鎖實例一起使用的condition實例。返回的condition實例支持和object的監控方法例如wait-notify和notifyall相同的用法。
- 1,如果沒有獲取鎖,調用condition的await,signal,signalall方法的任何一個時,會拋出illegalmonitorstateexception異常。
- 2,調用condition的await方法時,鎖也會釋放,在await返回之前,鎖會被重新獲取,并且鎖計數會恢復到調用await方法時的值。
- 3,如果一個線程在等待的過程中被中斷了,那么等待就會結束,并拋出interruptedexception異常,線程的中斷標志位會被清理。
- 4,等待的線程以fifo的順序被喚醒。
- 5,從await方法返回的線程們的獲取到鎖的順序,和線程最開始獲取鎖的順序相同,這是未指定情況下的默認實現。但是,公平鎖更鐘愛那些已經等待了最長時間的線程。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
|
/** * returns a {@link condition} instance for use with this * {@link lock} instance. * <p>the returned {@link condition} instance supports the same * usages as do the {@link object} monitor methods ({@link * object#wait() wait}, {@link object#notify notify}, and {@link * object#notifyall notifyall}) when used with the built-in * monitor lock. * <ul> * <li>if this lock is not held when any of the {@link condition} * {@linkplain condition#await() waiting} or {@linkplain * condition#signal signalling} methods are called, then an {@link * illegalmonitorstateexception} is thrown. * <li>when the condition {@linkplain condition#await() waiting} * methods are called the lock is released and, before they * return, the lock is reacquired and the lock hold count restored * to what it was when the method was called. * <li>if a thread is {@linkplain thread#interrupt interrupted} * while waiting then the wait will terminate, an {@link * interruptedexception} will be thrown, and the thread's * interrupted status will be cleared. * <li> waiting threads are signalled in fifo order. * <li>the ordering of lock reacquisition for threads returning * from waiting methods is the same as for threads initially * acquiring the lock, which is in the default case not specified, * but for <em>fair</em> locks favors those threads that have been * waiting the longest. * </ul> * @return the condition object **/ public condition newcondition() { return sync.newcondition(); } |
可重入鎖還有一些其他的方法,這里就不一一介紹了。this is the end.
總結
以上就是這篇文章的全部內容了,希望本文的內容對大家的學習或者工作具有一定的參考學習價值,謝謝大家對服務器之家的支持。如果你想了解更多相關內容請查看下面相關鏈接
原文鏈接:https://blog.csdn.net/li_canhui/article/details/85006114