redisson鎖繼承implements reentrant lock,所以具備 reentrant lock 鎖中的一些特性:超時,重試,可中斷等。加上redisson中redis具備分布式的特性,所以非常適合用來做java中的分布式鎖。 下面我們對其加鎖、解鎖過程中的源碼細(xì)節(jié)進(jìn)行一一分析。
鎖的接口定義了一下方法:
分布式鎖當(dāng)中加鎖,我們常用的加鎖接口:
boolean trylock(long waittime, long leasetime, timeunit unit) throws interruptedexception;
下面我們來看一下方法的具體實(shí)現(xiàn):
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
|
public boolean trylock( long waittime, long leasetime, timeunit unit) throws interruptedexception { long time = unit.tomillis(waittime); long current = system.currenttimemillis(); final long threadid = thread.currentthread().getid(); long ttl = tryacquire(leasetime, unit, threadid); // lock acquired if (ttl == null ) { return true ; } time -= (system.currenttimemillis() - current); if (time <= 0 ) { acquirefailed(threadid); return false ; } current = system.currenttimemillis(); final rfuture subscribefuture = subscribe(threadid); if (!await(subscribefuture, time, timeunit.milliseconds)) { if (!subscribefuture.cancel( false )) { subscribefuture.addlistener( new futurelistener() { @override public void operationcomplete(future future) throws exception { if (subscribefuture.issuccess()) { unsubscribe(subscribefuture, threadid); } } }); } acquirefailed(threadid); return false ; } try { time -= (system.currenttimemillis() - current); if (time <= 0 ) { acquirefailed(threadid); return false ; } while ( true ) { long currenttime = system.currenttimemillis(); ttl = tryacquire(leasetime, unit, threadid); // lock acquired if (ttl == null ) { return true ; } time -= (system.currenttimemillis() - currenttime); if (time = 0 && ttl < time) { getentry(threadid).getlatch().tryacquire(ttl, timeunit.milliseconds); } else { getentry(threadid).getlatch().tryacquire(time, timeunit.milliseconds); } time -= (system.currenttimemillis() - currenttime); if (time <= 0 ) { acquirefailed(threadid); return false ; } } } finally { unsubscribe(subscribefuture, threadid); } // return get(trylockasync(waittime, leasetime, unit)); } |
首先我們看到調(diào)用tryacquire嘗試獲取鎖,在這里是否能獲取到鎖,是根據(jù)鎖名稱的過期時間ttl來判定的(ttl
下面我們接著看一下tryacquire的實(shí)現(xiàn):
1
2
3
|
private long tryacquire( long leasetime, timeunit unit, long threadid) { return get(tryacquireasync(leasetime, unit, threadid)); } |
可以看到真正獲取鎖的操作經(jīng)過一層get操作里面執(zhí)行的,這里為何要這么操作,本人也不是太理解,如有理解錯誤,歡迎指正。
1
|
get 是由commandasyncexecutor(一個線程executor)封裝的一個executor |
設(shè)置一個單線程的同步控制器countdownlatch,用于控制單個線程的中斷信息。個人理解經(jīng)過中間的這么一步:主要是為了支持線程可中斷操作。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
|
public v get(rfuture future) { if (!future.isdone()) { final countdownlatch l = new countdownlatch( 1 ); future.addlistener( new futurelistener() { @override public void operationcomplete(future future) throws exception { l.countdown(); } }); boolean interrupted = false ; while (!future.isdone()) { try { l.await(); } catch (interruptedexception e) { interrupted = true ; } } if (interrupted) { thread.currentthread().interrupt(); } } // commented out due to blocking issues up to 200 ms per minute for each thread:由于每個線程的阻塞問題,每分鐘高達(dá)200毫秒 // future.awaituninterruptibly(); if (future.issuccess()) { return future.getnow(); } throw convertexception(future); } |
我們進(jìn)一步往下看:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
|
private rfuture tryacquireasync( long leasetime, timeunit unit, final long threadid) { if (leasetime != - 1 ) { return trylockinnerasync(leasetime, unit, threadid, rediscommands.eval_long); } rfuture ttlremainingfuture = trylockinnerasync(commandexecutor.getconnectionmanager().getcfg().getlockwatchdogtimeout(), timeunit.milliseconds, threadid, rediscommands.eval_long); ttlremainingfuture.addlistener( new futurelistener() { @override public void operationcomplete(future future) throws exception { if (!future.issuccess()) { return ; } long ttlremaining = future.getnow(); // lock acquired if (ttlremaining == null ) { scheduleexpirationrenewal(threadid); } } }); return ttlremainingfuture; } |
首先判斷鎖是否有超時時間,有過期時間的話,會在后面獲取鎖的時候設(shè)置進(jìn)去。沒有過期時間的話,則會用默認(rèn)的
1
|
private long lockwatchdogtimeout = 30 * 1000 ; |
下面我們在進(jìn)一步往下分析真正獲取鎖的操作:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
|
rfuture trylockinnerasync( long leasetime, timeunit unit, long threadid, redisstrictcommand command) { internallockleasetime = unit.tomillis(leasetime); return commandexecutor.evalwriteasync(getname(), longcodec.instance, command, "if (redis.call('exists', keys[1]) == 0) then " + "redis.call('hset', keys[1], argv[2], 1); " + "redis.call('pexpire', keys[1], argv[1]); " + "return nil; " + "end; " + "if (redis.call('hexists', keys[1], argv[2]) == 1) then " + "redis.call('hincrby', keys[1], argv[2], 1); " + "redis.call('pexpire', keys[1], argv[1]); " + "return nil; " + "end; " + "return redis.call('pttl', keys[1]);" , collections.singletonlist(getname()), internallockleasetime, getlockname(threadid)); } |
我把里面的重點(diǎn)信息做了以下三點(diǎn)總結(jié):
1:真正執(zhí)行的是一段具有原子性的lua腳本,并且最終也是由commandasynexecutor去執(zhí)行。
2:鎖真正持久化到redis時,用的hash類型key field value
3:獲取鎖的三個參數(shù):getname()是邏輯鎖名稱,例如:分布式鎖要鎖住的methodname+params;internallockleasetime是毫秒單位的鎖過期時間;getlockname則是鎖對應(yīng)的線程級別的名稱,因?yàn)橹С窒嗤€程可重入,不同線程不可重入,所以這里的鎖的生成方式是:uuid+":"threadid。有的同學(xué)可能會問,這樣不是很縝密:不同的jvm可能會生成相同的threadid,所以redission這里加了一個區(qū)分度很高的uuid;
lua腳本中的執(zhí)行分為以下三步:
1:exists檢查redis中是否存在鎖名稱;如果不存在,則獲取成功;同時把邏輯鎖名稱keys[1],線程級別的鎖名稱[argv[2],value=1,設(shè)置到redis。并設(shè)置邏輯鎖名稱的過期時間argv[2],返回;
2:如果檢查到存在keys[1],[argv[2],則說明獲取成功,此時會自增對應(yīng)的value值,記錄重入次數(shù);并更新鎖的過期時間
3:key不存,直接返回key的剩余過期時間(-2)
原文鏈接:https://www.roncoo.com/article/detail/133572