本實例我們用的是java3.4.6版本,實例方便大家學習完后有不明白的可以在留言區討論。
開發應用程序的zookeeper java綁定主要由兩個java包組成:
org.apache.zookeeper
org.apache.zookeeper.data
org.apache.zookeeper包由zookeeper監視的接口定義和zookeeper的各種回調處理程序組成。 它定義了zookeeper客戶端類庫的主要類以及許多zookeeper事件類型和狀態的靜態定義。 org.apache.zookeeper.data包定義了與數據寄存器(也稱為znode)相關的特性,例如訪問控制列表(acl),ids,stats等。
zookeeper java api中的org.apache.zookeeper.server,org.apache.zookeeper.server.quorum和org.apache.zookeeper.server.upgrade包是服務器實現的一部分。 org.apache.zookeeper.client包用于查詢zookeeper服務器的狀態。
一 準備開發環境
apache zookeeper是一個復雜的軟件,因此它需要運行許多其他類庫。 依賴庫作為jar文件在zookeeper發行版中附帶在lib目錄中。 核心zookeeper jar文件名字為zookeeper-3.4.6.jar,位于主目錄下。
要開發java的zookeeper應用程序,我們必須設置指向zookeeper jar的類路徑,以及zookeeper所依賴的所有第三方庫。在 bin 目錄下有一個 zkenv.sh文件,可以用來設置classpath。
我們需要將腳本如下設置,在命令行中執行以下語句:
1
2
|
$ zoobindir=${zk_home}/bin $ source ${zoobindir}/zkenv.sh |
shell變量zk_home被設置為安裝zookeeper的路徑,在我的設置中,它是/usr/share/zookeeper。 之后,classpath變量被正確設置,在我的系統中,如下所示:
1
2
|
$ echo $classpath /usr/share/zookeeper- 3.4 . 6 /bin/../build/classes :/usr/share/zookeeper- 3.4 . 6 /bin/../build/lib/*.jar :/usr/share/zookeeper- 3.4 . 6 /bin/../lib/slf4j-log4j12- 1.6 . 1 .jar :/usr/share/zookeeper- 3.4 . 6 /bin/../lib/slf4j-api- 1.6 . 1 .jar :/usr/share/zookeeper- 3.4 . 6 /bin/../lib/netty- 3.7 . 0 . final .jar :/usr/share/zookeeper- 3.4 . 6 /bin/../lib/log4j- 1.2 . 16 .jar :/usr/share/zookeeper- 3.4 . 6 /bin/../lib/jline- 0.9 . 94 .jar :/usr/share/zookeeper- 3.4 . 6 /bin/../zookeeper- 3.4 . 6 .jar :/usr/share/zookeeper- 3.4 . 6 /bin/../src/java/lib/*.jar :/usr/share/zookeeper- 3.4 . 6 /bin/../conf: |
在windows操作系統中,需要運行zkenv.cmd腳本。 現在可以使用classpath變量來編譯和運行使用zookeeper api編寫的java程序。 可以在uni/linux中的主目錄的.bashrc文件中找到zkenv.sh腳本,避免每次啟動shell會話時都采用它。
二 第一個zookeeper程序
為了引入zookeeper java api,讓我們從一個非常簡單的程序開始,它可以連接到localhost中的zookeeper實例,如果連接成功,它將在zookeeper名稱空間的根路徑下打印znode的列表。
這個程序的代碼如下所示:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
|
/*our first zookeeper program*/ import java.io.ioexception; import java.util.arraylist; import java.util.list; import org.apache.zookeeper.keeperexception; import org.apache.zookeeper.zookeeper; public class hellozookeeper { public static void main(string[] args) throws ioexception { string hostport = "localhost:2181" ; string zpath = "/" ; list <string> zoochildren = new arraylist<string>(); zookeeper zk = new zookeeper(hostport, 2000 , null ); if (zk != null ) { try { zoochildren = zk.getchildren(zpath, false ); system.out.println( "znodes of '/': " ); for (string child: zoochildren) { //print the children system.out.println(child); } } catch (keeperexception e) { e.printstacktrace(); } catch (interruptedexception e) { e.printstacktrace(); } } } } |
在構建和執行前面的代碼片段之前,讓我們來看看它具體做了什么。代碼從導入語句開始。使用這些語句,我們導入了程序各個組件所需的包。如前所述,org.apache.zookeeper包包含客戶端與zookeeper服務器進行交互所需的所有類和接口。在導入包之后,定義了一個名為hellozookeeper的類。由于我們要連接到在同一系統中運行的zookeeper實例,在main方法中將主機和端口字符串定義為localhost:2181。代碼行zk = new zookeeper(hostport, 2000, null)調用zookeeper構造方法,該構造方法嘗試連接到zookeeper服務器并返回一個引用。對于連接到zookeeper服務器實例并維護該連接的客戶端程序,需要維護一個實時會話。在此例中,構造方法實例化的zk對象返回的引用表示這個會話。 zookeeper api是圍繞這個引用構建的,每個方法調用都需要一個引用來執行。
zookeeper類的構造方法使用以下代碼創建zookeeper實例的引用:
1
|
zookeeper(string connectstring, int sessiontimeout, watcher watcher) |
使用的參數含義如下:
connectstring:以逗號分隔的主機:端口號列表,每個對應一個zookeeper服務器。 例如,10.0.0.1:2001,10.0.0.2:2002和10.0.0.3:2003表示三個節點的zookeeper ensemble的有效的主機:端口匹配對。 sessiontimeout:這是以毫秒為單位的會話超時時間。這是zookeeper在宣布session結束之前,沒有從客戶端獲得心跳的時間。 watcher:一個watcher對象,如果創建,當狀態改變和發生節點事件時會收到通知。這個watcher對象需要通過一個用戶定義的類單獨創建,通過實現watcher接口并將實例化的對象傳遞給zookeeper構造方法。客戶端應用程序可以收到各種類型的事件的通知,例如連接丟失、會話過期等。
zookeeper java api定義了另外帶有三個參數的構造方法,以指定更高級的操作。代碼如下:
1
|
zookeeper(string connectstring, int sessiontimeout, watcher watcher, boolean canbereadonly) |
在zookeeper類的上面的構造方法中,如果設置為true,boolean canbereadonly參數允許創建的客戶端在網絡分區的情況下進入只讀模式。只讀模式是客戶端無法找到任何多數服務器的場景,但有一個可以到達的分區服務器,以只讀模式連接到它,這樣就允許對服務器的讀取請求,而寫入請求則不允許。客戶端繼續嘗試在后臺連接到大多數服務器,同時仍然保持只讀模式。分區服務器僅僅是zookeeper組的一個子集,它是由于集群中的網絡分配而形成的。大多數服務器構成了ensemble中的大多數quorum。
以下構造方法顯示了兩個附加參數的定義:
1
|
zookeeper(string connectstring, int sessiontimeout, watcher watcher, long sessionid, byte [] sessionpasswd) |
這個構造方法允許zookeeper客戶端對象創建兩個額外的參數:
sessionid:在客戶端重新連接到zookeeper服務器的情況下,可以使用特定的會話id來引用先前連接的會話 sessionpasswd:如果指定的會話需要密碼,可以在這里指定
以下構造方法是前兩個調用的組合:
1
|
zookeeper(string connectstring, int sessiontimeout,watcher watcher, long sessionid, byte [] sessionpasswd, boolean canbereadonly) |
此構造方法是前兩個調用的組合,允許在啟用只讀模式的情況下重新連接到指定的會話。
note
zookeeper類的詳細java api文檔可以在http://zookeeper.apache.org/doc/r3.4.6/api/index.html上查詢。
現在,回到我們的zookeeper程序。 在調用構造方法后,如果連接成功,我們將得到zookeeper服務器的引用。 我們通過下面的代碼將引用傳遞給getchildren方法:
1
|
zoochildren = zk.getchildren(zpath, false ) |
zookeeper類的getchildren(string path,boolean watch)方法返回給定路徑上znode的子級列表。 我們只是迭代這個方法返回的列表,并將字符串打印到控制臺。
將程序命名為hellozookeeper.java,并編譯我們的程序如下:
1
|
$ javac -cp $classpath hellozookeeper.java |
在我們運行的程序之前,需要使用以下命令來啟動zookeeper服務器實例:
1
|
$ ${zk_home}/bin/zkserver.sh start |
運行程序如下:
1
|
$ java -cp $classpath hellozookeeper |
執行程序會在控制臺上打印日志消息,顯示所使用的zookeeper版本,java版本,java類路徑,服務器體系結構等等。 這里顯示了這些日志消息的一部分:
zookeeper java api生成的日志消息對調試非常有用。 它為我們提供了關于客戶端連接到zookeeper服務器,建立會話等后臺得信息。 上面顯示的最后三條日志消息告訴我們客戶端如何使用程序中指定的參數來啟動連接,以及在成功連接后,服務器如何為客戶端分配會話id。
最后,程序執行最后在控制臺中輸出以下內容:
我們可以使用zookeeper shell來驗證程序的正確性:
1
|
$ $zk_home/bin/zkcli.sh -server localhost |
恭喜! 我們剛剛成功編寫了我們的第一個zookeeper客戶端程序。
二 實現watcher接口
zookeeper watcher監視使客戶端能夠接收來自zookeeper服務器的通知,并在發生時處理這些事件。 zookeeper java api提供了一個名為watcher的公共接口,客戶端事件處理程序類必須實現該接口才能接收有關來自zookeeper服務器的事件通知。 以編程方式,使用這種客戶端的應用程序通過向客戶端注冊回調(callback)對象來處理這些事件。
我們將實現watcher接口,處理與znode關聯的數據更改時由zookeeper生成的事件。
watcher接口在org.apache.zookeeper包中聲明如下:
1
2
3
|
public interface watcher { void process(watchedevent event); } |
為了演示znode數據監視器(watcher),有兩個java類:datawatcher和dataupdater。 datawatcher將一直運行,并在/myconfig指定znode路徑中偵聽來自zookeeper服務器的nodedatachange事件。dataupdater類將定期更新此znode路徑中的數據字段,這將生成事件,并且在接收到這些事件后,datawatcher類將把更改后的數據打印到控制臺上。
以下是datawatcher.java類的代碼:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
|
import java.io.ioexception; import org.apache.zookeeper.createmode; import org.apache.zookeeper.keeperexception; import org.apache.zookeeper.watchedevent; import org.apache.zookeeper.watcher; import org.apache.zookeeper.zoodefs; import org.apache.zookeeper.zookeeper; public class datawatcher implements watcher, runnable { private static string hostport = "localhost:2181" ; private static string zoodatapath = "/myconfig" ; byte zoo_data[] = null ; zookeeper zk; public datawatcher() { try { zk = new zookeeper(hostport, 2000 , this ); if (zk != null ) { try { //create the znode if it doesn't exist, with the following code: if (zk.exists(zoodatapath, this ) == null ) { zk.create(zoodatapath, "" .getbytes(), zoodefs.ids.open_acl_unsafe, createmode.persistent); } } catch (keeperexception | interruptedexception e) { e.printstacktrace(); } } } catch (ioexception e) { e.printstacktrace(); } } public void printdata() throws interruptedexception, keeperexception { zoo_data = zk.getdata(zoodatapath, this , null ); string zstring = new string(zoo_data); // the following code prints the current content of the znode to the console: system.out.printf( "\ncurrent data @ zk path %s: %s" , zoodatapath, zstring); } @override public void process(watchedevent event) { system.out.printf( "\nevent received: %s" , event.tostring()); //we will process only events of type nodedatachanged if (event.gettype() == event.eventtype.nodedatachanged) { try { printdata(); } catch (interruptedexception e) { e.printstacktrace(); } catch (keeperexception e) { e.printstacktrace(); } } } public static void main(string[] args) throws interruptedexception, keeperexception { datawatcher datawatcher = new datawatcher(); datawatcher.printdata(); datawatcher.run(); } public void run() { try { synchronized ( this ) { while ( true ) { wait(); } } } catch (interruptedexception e) { e.printstacktrace(); thread.currentthread().interrupt(); } } } |
我們來看一下datawatcher.java類的代碼來理解一個zookeeper監視器的實現。 datawatcher公共類實現watcher接口以及runnable接口,打算將監視器作為線程運行。 main方法創建datawatcher類的一個實例。 在前面的代碼中,datawatcher構造方法嘗試連接到在本地主機上運行的zookeeper實例。 如果連接成功,我們用下面的代碼檢查znode路徑/myconfig是否存在:
1
|
if (zk.exists(zoodatapath, this ) == null ) { |
如果znode不存在zookeeper命名空間中,那么exists方法調用將返回null,并且嘗試使用代碼將其創建為持久化znode,如下所示:
1
|
zk.create(zoodatapath, "" .getbytes(), zoodefs.ids.open_acl_unsafe, createmode.persistent); |
接下來是process方法,它在org.apache.zookeeper的watcher接口中聲明,并由datawatcher類使用以下代碼實現:
1
|
public void process(watchedevent event) { |
為了簡單起見,在process方法中,打印從zookeeper實例接收的事件,并僅對nodedatachanged類型的事件進行進一步處理,如下所示:
1
|
if (event.gettype() == event.eventtype.nodedatachanged) |
當znode路徑/myconfig的數據字段發生任何更新或更改而收到nodedatachanged類型的事件時,調用printdata方法來打印znode的當前內容。 在znode上執行一個getdata調用時,我們再次設置一個監視,這是該方法的第二個參數,如下面的代碼所示:
1
|
zoo_data = zk.getdata(zoodatapath, this , null ); |
監視事件是發送給設置監視的客戶端的一次性觸發器,為了不斷接收進一步的事件通知,客戶端應該重置監視器。
dataupdater.java是一個簡單的類,它連接到運行本地主機的zookeeper實例,并用隨機字符串更新znode路徑/myconfig的數據字段。 在這里,我們選擇使用通用唯一標識符(uuid)字符串更新znode,因為后續的uuid生成器調用將保證生成唯一的字符串。
dataupdater.java類代碼如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
|
import java.io.ioexception; import java.util.uuid; import org.apache.zookeeper.keeperexception; import org.apache.zookeeper.watchedevent; import org.apache.zookeeper.watcher; import org.apache.zookeeper.zookeeper; public class dataupdater implements watcher { private static string hostport = "localhost:2181" ; private static string zoodatapath = "/myconfig" ; zookeeper zk; public dataupdater() throws ioexception { try { zk = new zookeeper(hostport, 2000 , this ); } catch (ioexception e) { e.printstacktrace(); } } // updates the znode path /myconfig every 5 seconds with a new uuid string. public void run() throws interruptedexception, keeperexception { while ( true ) { string uuid = uuid.randomuuid().tostring(); byte zoo_data[] = uuid.getbytes(); zk.setdata(zoodatapath, zoo_data, - 1 ); try { thread.sleep( 5000 ); // sleep for 5 secs } catch (interruptedexception e) { thread.currentthread().interrupt(); } } } public static void main(string[] args) throws ioexception, interruptedexception, keeperexception { dataupdater dataupdater = new dataupdater(); dataupdater.run(); } @override public void process(watchedevent event) { system.out.printf( "\nevent received: %s" , event.tostring()); } } |
上面的代碼使zookeeper服務器觸發一個nodedatachanged事件。 由于datawatcher為此znode路徑設置了監視,因此它會接收數據更改事件的通知。 然后它檢索更新的數據,重置監視,并在控制臺上打印數據。
使用以下命令編譯datawatcher和dataupdater類:
1
2
|
$ javac –cp $classpath datawatcher.java $ javac –cp $classpath dataupdater.java |
要執行監視器和更新程序,需要打開兩個終端窗口。 我要先運行監視器,因為它創建了/myconfig的znode(如果還未在zookeeper的命名空間中創建的話)。 運行監視器之前,請確保zookeeper服務器在本地主機上已經運行。
在其中一個終端窗口中,通過運行以下命令來執行watcher類:
1
|
$ java –cp $classpath datawatcher |
輸出類似于以下屏幕截圖所示的消息:
如前面的截圖所示,znode路徑/myconfig是由datawatcher類創建的。 它也打印znode的內容,但沒有打印在控制臺中,因為我們在創建znode時沒有設置任何數據。 當znode被創建時,類中的監視者收到了nodecreated類型的事件通知,這個通知被打印在控制臺中。 datawatcher類繼續運行,并從zookeeper服務器偵聽/myconfig節點上的事件。
讓我們在另一個終端窗口中運行dataupdater類:
1
|
$ java -cp $classpath dataupdater |
將最初的zookeeper特定日志消息記錄到控制臺后,dataupdater類運行時沒有提示。 它將一個新的uuid字符串設置到zookeeper路徑/myconfig的數據字段中。 因此,看到每隔5秒鐘,在下面的屏幕截圖中顯示的輸出內容打印在運行datawatche的終端窗口中:
datawatcher也可以使用zookeeper shell進行測試。 繼續像以前一樣在終端中運行datawatcher類,并在另一個終端中調用zookeeper shell并運行以下屏幕截圖中所示的命令:
在datawatcher正在運行的終端中,將打印以下消息:
三 示例——群集監視器
通過互聯網提供的流行服務,如電子郵件,文件服務平臺,在線游戲等,都是通過跨越多個數據中心的高度可用的成百上千臺服務器來服務的,而這些服務器通常在地理位置上分開。 在這種集群中,設置了一些專用的服務器節點來監視生產網絡中承載服務或應用程序的服務器的活躍性。 在云計算環境中,也用于管理云環境的這種監控節點被稱為云控制器。 這些控制器節點的一個重要工作是實時檢測生產服務器的故障,并相應地通知管理員,并采取必要的措施,例如將故障服務器上的應用程序故障轉移到另一個服務器,從而確保容錯性和高可用性。
在本節中,我們將使用zookeeper java客戶端api開發一個簡約的分布式集群監視器模型。 使用zookeeper的ephemeral znode概念來構建這個監視模型相當簡單和優雅,如以下步驟所述:
每個生產服務器運行一個zookeeper客戶端作為守護進程。 這個過程連接到zookeeper服務器,并在/zookeeper命名空間的預定義路徑(比如/members)下創建一個帶有名稱(最好是其網絡名稱或主機名)的ephemeral znode。云控制器節點運行zookeeper監視器進程,該進程監視路徑/members并監聽nodechildrenchanged類型的事件。 這個監視器進程作為服務或守護進程運行,并設置或重置路徑上的監視,并且實現其邏輯以調用適當的模塊來為監視事件采取必要的行動。現在,如果生產服務器由于硬件故障或軟件崩潰而關閉,zookeeper客戶端進程就會被終止,導致服務器和zookeeper服務之間的會話被終止。 由于ephemeral znode的屬性唯一,每當客戶端連接關閉時,zookeeper服務會自動刪除路徑/members中的znode。路徑中znode的刪除引發了nodechildrenchanged事件,因此云控制器中的觀察器進程會收到通知。 通過調用路徑/members中的getchildren方法,可以確定哪個服務器節點已經關閉。然后,控制器節點可以采取適當的措施,比如執行恢復邏輯以重啟另一臺服務器中的故障服務。這個邏輯可以構建為實時工作,保證接近于零停機的時間和高度可用的服務。
為實現這個集群監控模型,我們將開發兩個java類。 clustermonitor類將持續運行監視器,以監視zookeeper樹中的路徑/members。 處理完引發事件后,我們將在控制臺中打印znode列表并重置監視。 另一個類clusterclient將啟動到zookeeper服務器的連接,在/members下創建一個ephemeral znode。
要模擬具有多個節點的集群,我們在同一臺計算機上啟動多個客戶端,并使用客戶端進程的進程id創建ephemeral znode。 通過查看進程標識,clustermonitor類可以確定哪個客戶進程已經關閉,哪些進程還在。 在實際情況中,客戶端進程通常會使用當前正在運行的服務器的主機名創建ephemeral znode。 下面顯示了這兩個類的源代碼。
clustermonitor.java類定義如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
|
import java.io.ioexception; import java.util.list; import org.apache.zookeeper.createmode; import org.apache.zookeeper.keeperexception; import org.apache.zookeeper.watchedevent; import org.apache.zookeeper.watcher; import org.apache.zookeeper.zoodefs.ids; import org.apache.zookeeper.zookeeper; public class clustermonitor implements runnable { private static string membershiproot = "/members" ; private final watcher connectionwatcher; private final watcher childrenwatcher; private zookeeper zk; boolean alive= true ; public clustermonitor(string hostport) throws ioexception, interruptedexception, keeperexception { connectionwatcher = new watcher() { @override public void process(watchedevent event) { if (event.gettype()==watcher.event.eventtype.none && event.getstate() == watcher.event.keeperstate.syncconnected) { system.out.printf( "\nevent received: %s" , event.tostring()); } } }; childrenwatcher = new watcher() { @override public void process(watchedevent event) { system.out.printf( "\nevent received: %s" , event.tostring()); if (event.gettype() == event.eventtype.nodechildrenchanged) { try { //get current list of child znode, //reset the watch list<string> children = zk.getchildren( membershiproot, this ); wall( "!!!cluster membership change!!!" ); wall( "members: " + children); } catch (keeperexception e) { throw new runtimeexception(e); } catch (interruptedexception e) { thread.currentthread().interrupt(); alive = false ; throw new runtimeexception(e); } } } }; zk = new zookeeper(hostport, 2000 , connectionwatcher); // ensure the parent znode exists if (zk.exists(membershiproot, false ) == null ) { zk.create(membershiproot, "clustermonitorroot" .getbytes(), ids.open_acl_unsafe, createmode.persistent); } // set a watch on the parent znode list<string> children = zk.getchildren(membershiproot, childrenwatcher); system.err.println( "members: " + children); } public synchronized void close() { try { zk.close(); } catch (interruptedexception e) { e.printstacktrace(); } } public void wall (string message) { system.out.printf( "\nmessage: %s" , message); } public void run() { try { synchronized ( this ) { while (alive) { wait(); } } } catch (interruptedexception e) { e.printstacktrace(); thread.currentthread().interrupt(); } finally { this .close(); } } public static void main(string[] args) throws ioexception, interruptedexception, keeperexception { if (args.length != 1 ) { system.err.println( "usage: clustermonitor <host:port>" ); system.exit( 0 ); } string hostport = args[ 0 ]; new clustermonitor(hostport).run(); } } |
clusterclient.java類定義如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
|
import java.io.ioexception; import java.lang.management.managementfactory; import org.apache.zookeeper.createmode; import org.apache.zookeeper.keeperexception; import org.apache.zookeeper.watchedevent; import org.apache.zookeeper.watcher; import org.apache.zookeeper.zoodefs.ids; import org.apache.zookeeper.zookeeper; public class clusterclient implements watcher, runnable { private static string membershiproot = "/members" ; zookeeper zk; public clusterclient(string hostport, long pid) { string processid = pid.tostring(); try { zk = new zookeeper(hostport, 2000 , this ); } catch (ioexception e) { e.printstacktrace(); } if (zk != null ) { try { zk.create(membershiproot + '/' + processid, processid.getbytes(),ids.open_acl_unsafe, createmode.ephemeral); } catch ( keeperexception | interruptedexception e) { e.printstacktrace(); } } } public synchronized void close() { try { zk.close(); } catch (interruptedexception e) { e.printstacktrace(); } } @override public void process(watchedevent event) { system.out.printf( "\nevent received: %s" , event.tostring()); } public void run() { try { synchronized ( this ) { while ( true ) { wait(); } } } catch (interruptedexception e) { e.printstacktrace(); thread.currentthread().interrupt(); } finally { this .close(); } } public static void main(string[] args) { if (args.length != 1 ) { system.err.println( "usage: clusterclient <host:port>" ); system.exit( 0 ); } string hostport = args[ 0 ]; //get the process id string name = managementfactory.getruntimemxbean().getname(); int index = name.indexof( '@' ); long processid = long .parselong(name.substring( 0 , index)); new clusterclient(hostport, processid).run(); } } |
使用下面命令編譯這兩個類:
1
2
|
$ javac -cp $classpath clustermonitor.java $ javac -cp $classpath clusterclient.java |
要執行群集監控模型,打開兩個終端。 在其中一個終端中,運行clustermonitor類。 在另一個終端中,通過在后臺運行clusterclient類來執行多個實例。
在第一個終端中,執行clustermonitor類:
1
|
$ java -cp $classpath clustermonitorlocalhost: 2181 |
如前面的示例所示,看到來自客戶端api的調試日志消息,最后,clustermonitor類開始監視事件,輸入如下內容:
現在,執行clusterclient類的五個實例來模擬一個集群的五個節點。clusterclient在zookeeper樹的/members路徑中使用自己的進程id創建ephemeral znode:
1
2
3
4
5
6
7
8
9
10
|
$ java -cp $classpath clusterclient localhost: 2181 2 >& 1 >/dev/ null & [ 1 ] 4028 $ java -cp $classpath clusterclient localhost: 2181 2 >& 1 >/dev/ null & [ 2 ] 4045 $ java -cp $classpath clusterclient localhost: 2181 2 >& 1 >/dev/ null & [ 3 ] 4057 $ java -cp $classpath clusterclient localhost: 2181 2 >& 1 >/dev/ null & [ 4 ] 4072 $ java -cp $classpath clusterclient localhost: 2181 2 >& 1 >/dev/ null & [ 5 ] 4084 |
與此相對應,將觀察到clustermonitor類檢測到這些新的clusterclient類實例,因為它正在監視zookeeper樹的/members路徑上的事件。 這模擬了一個真正的集群中的節點加入事件。 可以在clustermonitor類的終端中看到輸出,這與下面的截圖中顯示的類似:
現在,如果殺死一個clusterclient.java進程,那么它與zookeeper服務器一起維護的會話將被終止。因此,客戶端創建的ephemeral znode將被刪除。刪除將觸發nodechildrenchanged事件,該事件將被clustermonitor類捕獲。該模擬在集群中一個節點離開的場景。
讓我們用id 4084終止clusterclient進程:
1
|
$ kill - 9 4084 |
以下屏幕截圖顯示了clustermonitor類的終端中的輸出。 它列出了當前可用的進程及其進程id,這些進程id模擬了實時服務器:
上面的簡單而優雅的集群監控模型的示例實現展示了zookeeper的真正威力。 在沒有zookeeper的情況下,開發這樣一個能夠實時監控節點活躍度的模型將是一項真正的艱巨任務。
原文鏈接:https://www.cnblogs.com/IcanFixIt/p/7882107.html