通過zookeeper實現分布式鎖
1、創建zookeeper的client
首先通過curatorframeworkfactory創建一個連接zookeeper的連接curatorframework client
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
|
public class curatorfactorybean implements factorybean<curatorframework>, initializingbean, disposablebean { private static final logger logger = loggerfactory.getlogger(contractfileinfocontroller. class ); private string connectionstring; private int sessiontimeoutms; private int connectiontimeoutms; private retrypolicy retrypolicy; private curatorframework client; public curatorfactorybean(string connectionstring) { this (connectionstring, 500 , 500 ); } public curatorfactorybean(string connectionstring, int sessiontimeoutms, int connectiontimeoutms) { this .connectionstring = connectionstring; this .sessiontimeoutms = sessiontimeoutms; this .connectiontimeoutms = connectiontimeoutms; } @override public void destroy() throws exception { logger.info( "closing curator framework..." ); this .client.close(); logger.info( "closed curator framework." ); } @override public curatorframework getobject() throws exception { return this .client; } @override public class <?> getobjecttype() { return this .client != null ? this .client.getclass() : curatorframework. class ; } @override public boolean issingleton() { return true ; } @override public void afterpropertiesset() throws exception { if (stringutils.isempty( this .connectionstring)) { throw new illegalstateexception( "connectionstring can not be empty." ); } else { if ( this .retrypolicy == null ) { this .retrypolicy = new exponentialbackoffretry( 1000 , 2147483647 , 180000 ); } this .client = curatorframeworkfactory.newclient( this .connectionstring, this .sessiontimeoutms, this .connectiontimeoutms, this .retrypolicy); this .client.start(); this .client.blockuntilconnected( 30 , timeunit.milliseconds); } } public void setconnectionstring(string connectionstring) { this .connectionstring = connectionstring; } public void setsessiontimeoutms( int sessiontimeoutms) { this .sessiontimeoutms = sessiontimeoutms; } public void setconnectiontimeoutms( int connectiontimeoutms) { this .connectiontimeoutms = connectiontimeoutms; } public void setretrypolicy(retrypolicy retrypolicy) { this .retrypolicy = retrypolicy; } public void setclient(curatorframework client) { this .client = client; } } |
2、封裝分布式鎖
根據curatorframework創建interprocessmutex(分布式可重入排它鎖)對一行數據進行上鎖
1
2
3
|
public interprocessmutex(curatorframework client, string path) { this (client, path, new standardlockinternalsdriver()); } |
使用 acquire方法
1、acquire() :入參為空,調用該方法后,會一直堵塞,直到搶奪到鎖資源,或者zookeeper連接中斷后,上拋異常。
2、acquire(long time, timeunit unit):入參傳入超時時間、單位,搶奪時,如果出現堵塞,會在超過該時間后,返回false。
1
2
3
4
5
6
7
8
9
|
public void acquire() throws exception { if (! this .internallock(-1l, (timeunit) null )) { throw new ioexception( "lost connection while trying to acquire lock: " + this .basepath); } } public boolean acquire( long time, timeunit unit) throws exception { return this .internallock(time, unit); } |
釋放鎖 mutex.release();
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
|
public void release() throws exception { thread currentthread = thread.currentthread(); interprocessmutex.lockdata lockdata = (interprocessmutex.lockdata) this .threaddata.get(currentthread); if (lockdata == null ) { throw new illegalmonitorstateexception( "you do not own the lock: " + this .basepath); } else { int newlockcount = lockdata.lockcount.decrementandget(); if (newlockcount <= 0 ) { if (newlockcount < 0 ) { throw new illegalmonitorstateexception( "lock count has gone negative for lock: " + this .basepath); } else { try { this .internals.releaselock(lockdata.lockpath); } finally { this .threaddata.remove(currentthread); } } } } } |
封裝后的dlock代碼
1、調用interprocessmutex processmutex = dlock.mutex(path);
2、手動釋放鎖processmutex.release();
3、需要手動刪除路徑dlock.del(path);
推薦 使用:
都是 函數式編程
在業務代碼執行完畢后 會釋放鎖和刪除path
1、這個有返回結果
public t mutex(string path, zklockcallback zklockcallback, long time, timeunit timeunit)
2、這個無返回結果
public void mutex(string path, zkvoidcallback zklockcallback, long time, timeunit timeunit)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
|
public class dlock { private final logger logger; private static final long timeout_d = 100l; private static final string root_path_d = "/dlock" ; private string lockrootpath; private curatorframework client; public dlock(curatorframework client) { this ( "/dlock" , client); } public dlock(string lockrootpath, curatorframework client) { this .logger = loggerfactory.getlogger(dlock. class ); this .lockrootpath = lockrootpath; this .client = client; } public interprocessmutex mutex(string path) { if (!stringutils.startswith(path, "/" )) { path = constant.keybuilder( new object[]{ "/" , path}); } return new interprocessmutex( this .client, constant.keybuilder( new object[]{ this .lockrootpath, "" , path})); } public <t> t mutex(string path, zklockcallback<t> zklockcallback) throws zklockexception { return this .mutex(path, zklockcallback, 100l, timeunit.milliseconds); } public <t> t mutex(string path, zklockcallback<t> zklockcallback, long time, timeunit timeunit) throws zklockexception { string finalpath = this .getlockpath(path); interprocessmutex mutex = new interprocessmutex( this .client, finalpath); try { if (!mutex.acquire(time, timeunit)) { throw new zklockexception( "acquire zk lock return false" ); } } catch (exception var13) { throw new zklockexception( "acquire zk lock failed." , var13); } t var8; try { var8 = zklockcallback.doinlock(); } finally { this .releaselock(finalpath, mutex); } return var8; } private void releaselock(string finalpath, interprocessmutex mutex) { try { mutex.release(); this .logger.info( "delete zk node path:{}" , finalpath); this .deleteinternal(finalpath); } catch (exception var4) { this .logger.error( "dlock" , "release lock failed, path:{}" , finalpath, var4); // logutil.error(this.logger, "dlock", "release lock failed, path:{}", new object[]{finalpath, var4}); } } public void mutex(string path, zkvoidcallback zklockcallback, long time, timeunit timeunit) throws zklockexception { string finalpath = this .getlockpath(path); interprocessmutex mutex = new interprocessmutex( this .client, finalpath); try { if (!mutex.acquire(time, timeunit)) { throw new zklockexception( "acquire zk lock return false" ); } } catch (exception var13) { throw new zklockexception( "acquire zk lock failed." , var13); } try { zklockcallback.response(); } finally { this .releaselock(finalpath, mutex); } } public string getlockpath(string custompath) { if (!stringutils.startswith(custompath, "/" )) { custompath = constant.keybuilder( new object[]{ "/" , custompath}); } string finalpath = constant.keybuilder( new object[]{ this .lockrootpath, "" , custompath}); return finalpath; } private void deleteinternal(string finalpath) { try { ((errorlistenerpathable) this .client.delete().inbackground()).forpath(finalpath); } catch (exception var3) { this .logger.info( "delete zk node path:{} failed" , finalpath); } } public void del(string custompath) { string lockpath = "" ; try { lockpath = this .getlockpath(custompath); ((errorlistenerpathable) this .client.delete().inbackground()).forpath(lockpath); } catch (exception var4) { this .logger.info( "delete zk node path:{} failed" , lockpath); } } } |
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
|
@functionalinterface public interface zklockcallback<t> { t doinlock(); } @functionalinterface public interface zkvoidcallback { void response(); } public class zklockexception extends exception { public zklockexception() { } public zklockexception(string message) { super (message); } public zklockexception(string message, throwable cause) { super (message, cause); } } |
配置curatorconfig
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
|
@configuration public class curatorconfig { @value ( "${zk.connectionstring}" ) private string connectionstring; @value ( "${zk.sessiontimeoutms:500}" ) private int sessiontimeoutms; @value ( "${zk.connectiontimeoutms:500}" ) private int connectiontimeoutms; @value ( "${zk.dlockroot:/dlock}" ) private string dlockroot; @bean public curatorfactorybean curatorfactorybean() { return new curatorfactorybean(connectionstring, sessiontimeoutms, connectiontimeoutms); } @bean @autowired public dlock dlock(curatorframework client) { return new dlock(dlockroot, client); } } |
測試代碼
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
|
@restcontroller @requestmapping ( "/dlock" ) public class lockcontroller { @autowired private dlock dlock; @requestmapping ( "/lock" ) public map testdlock(string no){ final string path = constant.keybuilder( "/test/no/" , no); long mutex=0l; try { system.out.println( "在拿鎖:" +path+system.currenttimemillis()); mutex = dlock.mutex(path, () -> { try { system.out.println( "拿到鎖了" + system.currenttimemillis()); thread.sleep( 10000 ); system.out.println( "操作完成了" + system.currenttimemillis()); } finally { return system.currenttimemillis(); } }, 1000 , timeunit.milliseconds); } catch (zklockexception e) { system.out.println( "拿不到鎖呀" +system.currenttimemillis()); } return collections.singletonmap( "ret" ,mutex); } @requestmapping ( "/dlock" ) public map testdlock1(string no){ final string path = constant.keybuilder( "/test/no/" , no); long mutex=0l; try { system.out.println( "在拿鎖:" +path+system.currenttimemillis()); interprocessmutex processmutex = dlock.mutex(path); processmutex.acquire(); system.out.println( "拿到鎖了" + system.currenttimemillis()); thread.sleep( 10000 ); processmutex.release(); system.out.println( "操作完成了" + system.currenttimemillis()); } catch (zklockexception e) { system.out.println( "拿不到鎖呀" +system.currenttimemillis()); e.printstacktrace(); } catch (exception e){ e.printstacktrace(); } return collections.singletonmap( "ret" ,mutex); } @requestmapping ( "/del" ) public map deldlock(string no){ final string path = constant.keybuilder( "/test/no/" , no); dlock.del(path); return collections.singletonmap( "ret" , 1 ); } } |
以上所述是小編給大家介紹的java(springboot)基于zookeeper的分布式鎖實現詳解整合,希望對大家有所幫助,如果大家有任何疑問請給我留言,小編會及時回復大家的。在此也非常感謝大家對服務器之家網站的支持!
原文鏈接:https://blog.csdn.net/LJY_SUPER/article/details/87807091