Redisson分布式鎖
之前的基于注解的鎖有一種鎖是基本redis的分布式鎖,鎖的實現我是基于redisson組件提供的RLock,這篇來看看redisson是如何實現鎖的。
不同版本實現鎖的機制并不相同
引用的redisson最近發布的版本3.2.3,不同的版本可能實現鎖的機制并不相同,早期版本好像是采用簡單的setnx,getset等常規命令來配置完成,而后期由于redis支持了腳本Lua變更了實現原理。
1
2
3
4
5
|
<dependency> <groupId>org.redisson</groupId> <artifactId>redisson</artifactId> <version> 3.2 . 3 </version> </dependency> |
setnx需要配合getset以及事務來完成,這樣才能比較好的避免死鎖問題,而新版本由于支持lua腳本,可以避免使用事務以及操作多個redis命令,語義表達更加清晰一些。
RLock接口的特點
繼承標準接口Lock
擁有標準鎖接口的所有特性,比如lock,unlock,trylock等等。
擴展標準接口Lock
擴展了很多方法,常用的主要有:強制鎖釋放,帶有效期的鎖,還有一組異步的方法。其中前面兩個方法主要是解決標準lock可能造成的死鎖問題。比如某個線程獲取到鎖之后,線程所在機器死機,此時獲取了鎖的線程無法正常釋放鎖導致其余的等待鎖的線程一直等待下去。
可重入機制
各版本實現有差異,可重入主要考慮的是性能,同一線程在未釋放鎖時如果再次申請鎖資源不需要走申請流程,只需要將已經獲取的鎖繼續返回并且記錄上已經重入的次數即可,與jdk里面的ReentrantLock功能類似。重入次數靠hincrby命令來配合使用,詳細的參數下面的代碼。
怎么判斷是同一線程?
redisson的方案是,RedissonLock實例的一個guid再加當前線程的id,通過getLockName返回
1
2
3
4
5
6
7
8
9
10
11
|
public class RedissonLock extends RedissonExpirable implements RLock { final UUID id; protected RedissonLock(CommandExecutor commandExecutor, String name, UUID id) { super (commandExecutor, name); this .internalLockLeaseTime = TimeUnit.SECONDS.toMillis(30L); this .commandExecutor = commandExecutor; this .id = id; } String getLockName( long threadId) { return this .id + ":" + threadId; } |
RLock獲取鎖的兩種場景
這里拿tryLock的源碼來看:tryAcquire方法是申請鎖并返回鎖有效期還剩余的時間,如果為空說明鎖未被其它線程申請直接獲取并返回,如果獲取到時間,則進入等待競爭邏輯。
1
2
3
4
5
6
7
8
9
10
11
12
|
public boolean tryLock( long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException { long time = unit.toMillis(waitTime); long current = System.currentTimeMillis(); final long threadId = Thread.currentThread().getId(); Long ttl = this .tryAcquire(leaseTime, unit); if (ttl == null ) { //直接獲取到鎖 return true ; } else { //有競爭的后續看 } } |
無競爭,直接獲取鎖
先看下首先獲取鎖并釋放鎖背后的redis都在做什么,可以利用redis的monitor來在后臺監控redis的執行情況。當我們在方法了增加@RequestLockable之后,其實就是調用lock以及unlock,下面是redis命令:
加鎖
由于高版本的redis支持lua腳本,所以redisson也對其進行了支持,采用了腳本模式,不熟悉lua腳本的可以去查找下。執行lua命令的邏輯如下:
1
2
3
4
|
<T> RFuture<T> tryLockInnerAsync( long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) { this .internalLockLeaseTime = unit.toMillis(leaseTime); return this .commandExecutor.evalWriteAsync( this .getName(), LongCodec.INSTANCE, command, "if (redis.call(\'exists\', KEYS[1]) == 0) then redis.call(\'hset\', KEYS[1], ARGV[2], 1); redis.call(\'pexpire\', KEYS[1], ARGV[1]); return nil; end; if (redis.call(\'hexists\', KEYS[1], ARGV[2]) == 1) then redis.call(\'hincrby\', KEYS[1], ARGV[2], 1); redis.call(\'pexpire\', KEYS[1], ARGV[1]); return nil; end; return redis.call(\'pttl\', KEYS[1]);" , Collections.singletonList( this .getName()), new Object[]{Long.valueOf( this .internalLockLeaseTime), this .getLockName(threadId)}); } |
加鎖的流程:
- 判斷lock鍵是否存在,不存在直接調用hset存儲當前線程信息并且設置過期時間,返回nil,告訴客戶端直接獲取到鎖。
- 判斷lock鍵是否存在,存在則將重入次數加1,并重新設置過期時間,返回nil,告訴客戶端直接獲取到鎖。
- 被其它線程已經鎖定,返回鎖有效期的剩余時間,告訴客戶端需要等待。
1
2
3
4
5
6
7
8
9
10
11
12
|
"EVAL" " if (redis.call( 'exists' , KEYS[ 1 ]) == 0 ) then redis.call( 'hset' , KEYS[ 1 ], ARGV[ 2 ], 1 ); redis.call( 'pexpire' , KEYS[ 1 ], ARGV[ 1 ]); return nil; end; if (redis.call( 'hexists' , KEYS[ 1 ], ARGV[ 2 ]) == 1 ) then redis.call( 'hincrby' , KEYS[ 1 ], ARGV[ 2 ], 1 ); redis.call( 'pexpire' , KEYS[ 1 ], ARGV[ 1 ]); return nil; end; return redis.call( 'pttl' , KEYS[ 1 ]);" "1" "lock.com.csp.product.api.service.ProductAppService.searchProductByPage#0" "1000" "346e1eb8-5bfd-4d49-9870-042df402f248:21" |
上面的lua腳本會轉換成真正的redis命令,下面的是經過lua腳本運算之后實際執行的redis命令。
1
2
3
4
|
1486642677.053488 [ 0 lua] "exists" "lock.com.csp.product.api.service.ProductAppService.searchProductByPage#0" 1486642677.053515 [ 0 lua] "hset" "lock.com.csp.product.api.service.ProductAppService.searchProductByPage#0" "346e1eb8-5bfd-4d49-9870-042df402f248:21" "1" 1486642677.053540 [ 0 lua] "pexpire" "lock.com.csp.product.api.service.ProductAppService.searchProductByPage#0" "1000" |
解鎖
解鎖的流程看起來復雜些:
- 如果lock鍵不存在,發消息說鎖已經可用
- 如果鎖不是被當前線程鎖定,則返回nil
- 由于支持可重入,在解鎖時將重入次數需要減1
- 如果計算后的重入次數>0,則重新設置過期時間
- 如果計算后的重入次數<=0,則發消息說鎖已經可用
1
2
3
4
5
6
7
8
9
10
11
12
13
14
|
"EVAL" " if (redis.call( 'exists' , KEYS[ 1 ]) == 0 ) then redis.call( 'publish' , KEYS[ 2 ], ARGV[ 1 ]); return 1 ; end; if (redis.call( 'hexists' , KEYS[ 1 ], ARGV[ 3 ]) == 0 ) then return nil;end; local counter = redis.call( 'hincrby' , KEYS[ 1 ], ARGV[ 3 ], - 1 ); if (counter > 0 ) then redis.call( 'pexpire' , KEYS[ 1 ], ARGV[ 2 ]); return 0 ; else redis.call( 'del' , KEYS[ 1 ]); redis.call( 'publish' , KEYS[ 2 ], ARGV[ 1 ]); return 1 ; end; return nil;" "2" "lock.com.csp.product.api.service.ProductAppService.searchProductByPage#0" "redisson_lock__channel:{lock.com.csp.product.api.service.ProductAppService.searchProductByPage#0}" "0" "1000" "346e1eb8-5bfd-4d49-9870-042df402f248:21" |
無競爭情況下解鎖redis命令:
主要是發送一個解鎖的消息,以此喚醒等待隊列中的線程重新競爭鎖。
1
2
|
1486642678.493691 [ 0 lua] "exists" "lock.com.csp.product.api.service.ProductAppService.searchProductByPage#0" 1486642678.493712 [ 0 lua] "publish" "redisson_lock__channel:{lock.com.csp.product.api.service.ProductAppService.searchProductByPage#0}" "0" |
有競爭,等待
有競爭的情況在redis端的lua腳本是相同的,只是不同的條件執行不同的redis命令,復雜的在redisson的源碼上。當通過tryAcquire發現鎖被其它線程申請時,需要進入等待競爭邏輯中。
- this.await返回false,說明等待時間已經超出獲取鎖最大等待時間,取消訂閱并返回獲取鎖失敗
- this.await返回true,進入循環嘗試獲取鎖。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
|
public boolean tryLock( long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException { long time = unit.toMillis(waitTime); long current = System.currentTimeMillis(); final long threadId = Thread.currentThread().getId(); Long ttl = this .tryAcquire(leaseTime, unit); if (ttl == null ) { return true ; } else { //重點是這段 time -= System.currentTimeMillis() - current; if (time <= 0L) { return false ; } else { current = System.currentTimeMillis(); final RFuture subscribeFuture = this .subscribe(threadId); if (! this .await(subscribeFuture, time, TimeUnit.MILLISECONDS)) { if (!subscribeFuture.cancel( false )) { subscribeFuture.addListener( new FutureListener() { public void operationComplete(Future<RedissonLockEntry> future) throws Exception { if (subscribeFuture.isSuccess()) { RedissonLock. this .unsubscribe(subscribeFuture, threadId); } } }); } return false ; } else { boolean var16; try { time -= System.currentTimeMillis() - current; if (time <= 0L) { boolean currentTime1 = false ; return currentTime1; } do { long currentTime = System.currentTimeMillis(); ttl = this .tryAcquire(leaseTime, unit); if (ttl == null ) { var16 = true ; return var16; } time -= System.currentTimeMillis() - currentTime; if (time <= 0L) { var16 = false ; return var16; } currentTime = System.currentTimeMillis(); if (ttl.longValue() >= 0L && ttl.longValue() < time) { this .getEntry(threadId).getLatch().tryAcquire(ttl.longValue(), TimeUnit.MILLISECONDS); } else { this .getEntry(threadId).getLatch().tryAcquire(time, TimeUnit.MILLISECONDS); } time -= System.currentTimeMillis() - currentTime; } while (time > 0L); var16 = false ; } finally { this .unsubscribe(subscribeFuture, threadId); } return var16; } } } } |
循環嘗試一般有如下幾種方法:
- while循環,一次接著一次的嘗試,這個方法的缺點是會造成大量無效的鎖申請。
- Thread.sleep,在上面的while方案中增加睡眠時間以降低鎖申請次數,缺點是這個睡眠的時間設置比較難控制。
- 基于信息量,當鎖被其它資源占用時,當前線程訂閱鎖的釋放事件,一旦鎖釋放會發消息通知待等待的鎖進行競爭,有效的解決了無效的鎖申請情況。核心邏輯是this.getEntry(threadId).getLatch().tryAcquire,this.getEntry(threadId).getLatch()返回的是一個信號量,有興趣可以再研究研究。
redisson依賴
由于redisson不光是針對鎖,提供了很多客戶端操作redis的方法,所以會依賴一些其它的框架,比如netty,如果只是簡單的使用鎖也可以自己去實現。
以上就是本文的全部內容,希望本文的內容對大家的學習或者工作能帶來一定的幫助,同時也希望多多支持服務器之家!
原文鏈接:http://www.cnblogs.com/ASPNET2008/p/6385249.html