這篇文章主要介紹“ZooKeeper與Curator注冊和監控方法”,在日常操作中,相信很多人在ZooKeeper與Curator注冊和監控方法問(wèn)題上存在疑惑,小編查閱了各式資料,整理出簡(jiǎn)單好用的操作方法,希望對大家解答”ZooKeeper與Curator注冊和監控方法”的疑惑有所幫助!接下來(lái),請跟著(zhù)小編一起來(lái)學(xué)習吧!
Curator提供了對zookeeper客戶(hù)端的封裝,并監控連接狀態(tài)和會(huì )話(huà)session,特別是會(huì )話(huà)session過(guò)期后,curator能夠重新連接zookeeper,并且創(chuàng )建一個(gè)新的session。
對于zk的使用者來(lái)說(shuō),session的概念至關(guān)重要,如果想了解更多session的說(shuō)明,請訪(fǎng)問(wèn):
http://zookeeper.apache.org/doc/trunk/zookeeperProgrammers.html
zk客戶(hù)端和zk間主要可能存在下面幾種異常情況:
短暫失去連接:此時(shí)客戶(hù)端檢測到與服務(wù)端的連接已經(jīng)斷開(kāi),但是服務(wù)端維護的客戶(hù)端session尚未過(guò)期,之后客戶(hù)端和服務(wù)端重新建立了連接;當客戶(hù)端重新連接后,由于session沒(méi)有過(guò)期,zookeeper能夠保證連接恢復后保持正常服務(wù)。
失去連接時(shí)間很長(cháng):此時(shí)服務(wù)器相對于客戶(hù)端的session已經(jīng)過(guò)期了,與先前session相關(guān)的watcher和ephemeral的路徑和數據都會(huì )消失;當Curator重新創(chuàng )建了與zk的連接后,會(huì )獲取到session expired異常,Curator會(huì )銷(xiāo)毀先前的session,并且會(huì )創(chuàng )建一個(gè)新的session,需要注意的是,與之前session相關(guān)的watcher和ephemeral類(lèi)型的路徑和數據在新的session中也不會(huì )存在,需要開(kāi)發(fā)者在CuratorFramework.getConnectionStateListenable().addListener()中添加狀態(tài)監聽(tīng)事件,對ConnectionState.LOST事件進(jìn)行監聽(tīng),當session過(guò)期后,使得之前的session狀態(tài)得以恢復。對于ephemeral類(lèi)型,在客戶(hù)端應該保持數據的狀態(tài),以便及時(shí)恢復。
客戶(hù)端重新啟動(dòng):不論先前的zk session是否已經(jīng)過(guò)期,都需要重新創(chuàng )建臨時(shí)節點(diǎn)、添加數據和watch事件,先前的session也會(huì )在稍后的一段時(shí)間內過(guò)期。
Zk服務(wù)器重新啟動(dòng):由于zk將session信息存放到了硬盤(pán)上,因此重啟后,先前未過(guò)期的session仍然存在,在zk服務(wù)器啟動(dòng)后,客戶(hù)端與zk服務(wù)器創(chuàng )建新的連接,并使用先前的session,與1相同。
需要注意的是,當session過(guò)期了,在session過(guò)期期間另外的客戶(hù)端修改了zk的值,那么這個(gè)修改在客戶(hù)端重新連接到zk上時(shí),zk客戶(hù)端不會(huì )接收到這個(gè)修改的watch事件(盡管添加了watch),如果需要嚴格的watch邏輯,就需要在curator的狀態(tài)監控中添加邏輯。
特別提示:watcher僅僅是一次性的,zookeeper通知了watcher事件后,就會(huì )將這個(gè)watcher從session中刪除,因此,如果想繼續監控,就要添加新的watcher。
下面提供了對persistent和ephemeral兩種類(lèi)型節點(diǎn)的監控方法,其中g(shù)et方法說(shuō)明了persistent節點(diǎn)如何監控,而register方法說(shuō)明了ephemeral類(lèi)型的節點(diǎn)如何監控。
package demo; import java.net.InetAddress; import java.nio.charset.Charset; import java.util.concurrent.ConcurrentSkipListSet; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.framework.api.CuratorWatcher; import org.apache.curator.framework.state.ConnectionState; import org.apache.curator.framework.state.ConnectionStateListener; import org.apache.curator.retry.RetryNTimes; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher.Event.EventType; import org.apache.zookeeper.ZooDefs; import org.apache.zookeeper.data.Stat; public class CuratorTest { private CuratorFramework zkTools; private ConcurrentSkipListSet<String> watchers = new ConcurrentSkipListSet<String>(); private static Charset charset = Charset.forName("utf-8"); public CuratorTest() { zkTools = CuratorFrameworkFactory.builder() .connectString("192.168.0.216:3306") .namespace("zk/test") .retryPolicy(new RetryNTimes(2000, 20000)) .build(); zkTools.start(); } public void addReconnectionWatcher(final String path, final ZookeeperWatcherType watcherType, final CuratorWatcher watcher) { synchronized (this) { if (!watchers.contains(watcher.toString()))// 不要添加重復的監聽(tīng)事件 { watchers.add(watcher.toString()); System.out.println("add new watcher " + watcher); zkTools.getConnectionStateListenable().addListener(new ConnectionStateListener() { @Override public void stateChanged(CuratorFramework client, ConnectionState newState) { System.out.println(newState); if (newState == ConnectionState.LOST) {// 處理session過(guò)期 try { if (watcherType == ZookeeperWatcherType.EXITS) { zkTools.checkExists().usingWatcher(watcher).forPath(path); } else if (watcherType == ZookeeperWatcherType.GET_CHILDREN) { zkTools.getChildren().usingWatcher(watcher).forPath(path); } else if (watcherType == ZookeeperWatcherType.GET_DATA) { zkTools.getData().usingWatcher(watcher).forPath(path); } else if (watcherType == ZookeeperWatcherType.CREATE_ON_NO_EXITS) { // ephemeral類(lèi)型的節點(diǎn)session過(guò)期了,需要重新創(chuàng )建節點(diǎn),并且注冊監聽(tīng)事件,之后監聽(tīng)事件中, // 會(huì )處理create事件,將路徑值恢復到先前狀態(tài) Stat stat = zkTools.checkExists().usingWatcher(watcher) .forPath(path); if (stat == null) { System.err.println("to create"); zkTools.create().creatingParentsIfNeeded() .withMode(CreateMode.EPHEMERAL) .withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE).forPath(path); } } } catch (Exception e) { e.printStackTrace(); } } } }); } } } public void create() throws Exception { zkTools.create()// 創(chuàng )建一個(gè)路徑 .creatingParentsIfNeeded()// 如果指定的節點(diǎn)的父節點(diǎn)不存在,遞歸創(chuàng )建父節點(diǎn) .withMode(CreateMode.PERSISTENT)// 存儲類(lèi)型(臨時(shí)的還是持久的) .withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE)// 訪(fǎng)問(wèn)權限 .forPath("zk/test");// 創(chuàng )建的路徑 } public void put() throws Exception { // 對路徑節點(diǎn)賦值 zkTools.setData().forPath("zk/test", "hello world".getBytes(Charset.forName("utf-8"))); } public void get() throws Exception { String path = "zk/test"; ZKWatch watch = new ZKWatch(path); byte[] buffer = zkTools.getData().usingWatcher(watch).forPath(path); System.out.println(new String(buffer, charset)); // 添加session過(guò)期的監控 addReconnectionWatcher(path, ZookeeperWatcherType.GET_DATA, watch); } public void register() throws Exception { String ip = InetAddress.getLocalHost().getHostAddress(); String registeNode = "zk/register/" + ip;// 節點(diǎn)路徑 byte[] data = "disable".getBytes(charset);// 節點(diǎn)值 CuratorWatcher watcher = new ZKWatchRegister(registeNode, data); // 創(chuàng )建一個(gè)register watcher Stat stat = zkTools.checkExists().forPath(registeNode); if (stat != null) { zkTools.delete().forPath(registeNode); } zkTools.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL) .withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE).forPath(registeNode, data);// 創(chuàng )建的路徑和值 // 添加到session過(guò)期監控事件中 addReconnectionWatcher(registeNode, ZookeeperWatcherType.CREATE_ON_NO_EXITS, watcher); data = zkTools.getData().usingWatcher(watcher).forPath(registeNode); System.out.println("get path form zk : " + registeNode + ":" + new String(data, charset)); } public static void main(String[] args) throws Exception { CuratorTest test = new CuratorTest(); test.get(); test.register(); Thread.sleep(10000000000L); } public class ZKWatch implements CuratorWatcher { private final String path; public String getPath() { return path; } public ZKWatch(String path) { this.path = path; } @Override public void process(WatchedEvent event) throws Exception { System.out.println(event.getType()); if (event.getType() == EventType.NodeDataChanged) { byte[] data = zkTools.getData().usingWatcher(this).forPath(path); System.out.println(path + ":" + new String(data, Charset.forName("utf-8"))); } } } public class ZKWatchRegister implements CuratorWatcher { private final String path; private byte[] value; public String getPath() { return path; } public ZKWatchRegister(String path, byte[] value) { this.path = path; this.value = value; } @Override public void process(WatchedEvent event) throws Exception { System.out.println(event.getType()); if (event.getType() == EventType.NodeDataChanged) { // 節點(diǎn)數據改變了,需要記錄下來(lái),以便session過(guò)期后,能夠恢復到先前的數據狀態(tài) byte[] data = zkTools.getData().usingWatcher(this).forPath(path); value = data; System.out.println(path + ":" + new String(data, charset)); } else if (event.getType() == EventType.NodeDeleted) { // 節點(diǎn)被刪除了,需要創(chuàng )建新的節點(diǎn) System.out.println(path + ":" + path + " has been deleted."); Stat stat = zkTools.checkExists().usingWatcher(this).forPath(path); if (stat == null) { zkTools.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL) .withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE).forPath(path); } } else if (event.getType() == EventType.NodeCreated) { // 節點(diǎn)被創(chuàng )建時(shí),需要添加監聽(tīng)事件(創(chuàng )建可能是由于session過(guò)期后,curator的狀態(tài)監聽(tīng)部分觸發(fā)的) System.out.println(path + ":" + " has been created!" + "the current data is " + new String(value)); zkTools.setData().forPath(path, value); zkTools.getData().usingWatcher(this).forPath(path); } } } public enum ZookeeperWatcherType { GET_DATA, GET_CHILDREN, EXITS, CREATE_ON_NO_EXITS } }
免責聲明:本站發(fā)布的內容(圖片、視頻和文字)以原創(chuàng )、來(lái)自本網(wǎng)站內容采集于網(wǎng)絡(luò )互聯(lián)網(wǎng)轉載等其它媒體和分享為主,內容觀(guān)點(diǎn)不代表本網(wǎng)站立場(chǎng),如侵犯了原作者的版權,請告知一經(jīng)查實(shí),將立刻刪除涉嫌侵權內容,聯(lián)系我們QQ:712375056,同時(shí)歡迎投稿傳遞力量。
Copyright ? 2009-2022 56dr.com. All Rights Reserved. 特網(wǎng)科技 特網(wǎng)云 版權所有 特網(wǎng)科技 粵ICP備16109289號
域名注冊服務(wù)機構:阿里云計算有限公司(萬(wàn)網(wǎng)) 域名服務(wù)機構:煙臺帝思普網(wǎng)絡(luò )科技有限公司(DNSPod) CDN服務(wù):阿里云計算有限公司 百度云 中國互聯(lián)網(wǎng)舉報中心 增值電信業(yè)務(wù)經(jīng)營(yíng)許可證B2
建議您使用Chrome、Firefox、Edge、IE10及以上版本和360等主流瀏覽器瀏覽本網(wǎng)站