ZkClient
永久监听的封装
添加maven依赖
1 2 3 4 5
| <dependency> <groupId>com.101tec</groupId> <artifactId>zkclient</artifactId> <version>0.10</version> </dependency>
|
建立连接
1 2 3
| ZkClient zkClient=new ZkClient(CONNECTSTRING,4000); System.out.println(zkClient+"---success");
|
创建子节点
1 2 3 4 5
|
client.createPersistent("/zkClientParent/zkClientSon",true);
|
删除节点
1 2 3
|
client.deleteRecursive("/zkRootNode");
|
获取子节点
1 2 3
| List<String> children= client.getChildren("/zkParent"); System.out.println(children);
|
修改节点
1
| client.writeData("/zkParent", "123");
|
Watch
1 2 3 4 5 6 7 8 9 10
| client.subscribeDataChanges("/zkParent", new IZkDataListener() { public void handleDataChange(String s, Object o) throws Exception { System.out.println("change:" + s + "---" + o); }
public void handleDataDeleted(String s) throws Exception { System.out.println("delete:" + s); } });
|
完整版
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52
| package com.bai.zkclient;
import org.I0Itec.zkclient.IZkDataListener; import org.I0Itec.zkclient.ZkClient;
import java.util.List; import java.util.concurrent.TimeUnit;
public class ZkClientApiOperatorDemo {
private final static String CONNECTSTRING = "";
private static ZkClient getInstance() { return new ZkClient(CONNECTSTRING, 5000); }
public static void main(String[] args) throws InterruptedException { ZkClient client = getInstance(); client.createPersistent("/zkParent/zkSon/zksuperson", true);
List<String> children = client.getChildren("/zkParent"); System.out.println(children);
client.subscribeDataChanges("/zkParent", new IZkDataListener() { public void handleDataChange(String s, Object o) throws Exception { System.out.println("change:" + s + "---" + o); }
public void handleDataDeleted(String s) throws Exception { System.out.println("delete:" + s); } }); client.writeData("/zkParent", "123"); TimeUnit.SECONDS.sleep(2);
boolean result = client.deleteRecursive("/zkParent"); TimeUnit.SECONDS.sleep(2);
} }
|
Curator
介绍
Curator本身是Netflix公司开源的zookeeper客户端;
curator提供了各种应用场景的实现封装
curator-framework 提供了fluent风格api
curator-replice 提供了实现封装
比较
- 和Zookeeper原生客户端相比
- 接口和API的层次更加抽象化,简化Zookeeper原生客户端的开发量
- 和ZkClient相比
- 处理更加优美
- 异常处理比较好
- 比如节点不存在可以通过exists判断,但是ZkClient直接在catch中处理了。
创建会话
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
|
CuratorFramework curatorFramework = CuratorFrameworkFactory.newClient(CONNECTSTRING, 5000, 5000, new ExponentialBackoffRetry(1000, 3));
curatorFramework.start();
CuratorFramework curatorFrameworkFlunt=CuratorFrameworkFactory.builder().connectString(CONNECTSTRING).sessionTimeoutMs(5000). retryPolicy(new ExponentialBackoffRetry(1000, 3)).namespace("/curator").build();
curatorFrameworkFlunt.start(); System.out.println("success");
|
curator连接的重试策略
ExponentialBackoffRetry()
衰减重试
RetryNTimes
指定最大重试次数
RetryOneTime
仅重试一次
RetryUnitilElapsed
一直重试直到规定的时间
创建节点
1 2 3 4 5 6
| try { String result=curatorFramework.create().creatingParentContainersIfNeeded().withMode(CreateMode.PERSISTENT).forPath("/curator/curator1/curator11","123".getBytes()); System.out.println(result); } catch (Exception e) { e.printStackTrace(); }
|
获取节点
1 2 3 4 5 6 7 8 9 10
| try { Stat stat=new Stat(); byte[] data=curatorFramework.getData().storingStatIn(stat).forPath("/curator"); System.out.println(new String(data)+"---stat---"+stat); } catch (Exception e) { e.printStackTrace(); }
|
修改节点
1 2 3 4 5 6 7
| try { Stat stat=curatorFramework.setData().forPath("/curator","123".getBytes()); System.out.println(stat); } catch (Exception e) { e.printStackTrace(); }
|
删除节点
1 2 3 4 5 6 7
| try { curatorFramework.delete().deletingChildrenIfNeeded().forPath("/curator"); } catch (Exception e) { e.printStackTrace(); }
|
异步操作
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25
|
ExecutorService service= Executors.newFixedThreadPool(1); final CountDownLatch countDownLatch=new CountDownLatch(1);
try { final String result=curatorFramework.create().creatingParentContainersIfNeeded().withMode(CreateMode.PERSISTENT).inBackground(new BackgroundCallback() { public void processResult(CuratorFramework curatorFramework, CuratorEvent curatorEvent) throws Exception { System.out.println(Thread.currentThread().getName()+"----"+curatorEvent.getResultCode()+"---"+curatorEvent.getType()); countDownLatch.countDown(); } }).forPath("/curatorTest/curator1/curator11","777".getBytes()); System.out.println("result:"+result); } catch (Exception e) { e.printStackTrace(); }
countDownLatch.await(); service.shutdown();
|
事务操作
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
|
try { Collection<CuratorTransactionResult> resultCollection=curatorFramework.inTransaction().create() .forPath("/trans","111".getBytes()).and().setData().forPath("/curatorTest","111".getBytes()).and().commit();
for (CuratorTransactionResult result:resultCollection){ System.out.println(result.getForPath()+"---"+result.getType()); } } catch (Exception e) { e.printStackTrace(); }
|
监听
- 封装了三种watch来做节点的监听
PathChildrenCache
:监视一个路径下子节点的创建、删除、以及节点数据更新
NodeCache
:监视一个节点的创建、更新、删除并且会将节点的数据缓存到本地
treecache
:pathcache和nodecache的合体(监视路径下的创建、更新、删除事件并且还会缓存路径下的所有子节点的数据)
NodeCache
1 2 3 4 5 6 7 8 9 10 11
| CuratorFramework curatorFramework=CuratorClientUtil.getInstance();
curatorFramework.create().creatingParentContainersIfNeeded().withMode(CreateMode.EPHEMERAL).forPath("/curator","111".getBytes());
final NodeCache nodeCache=new NodeCache(curatorFramework,"/curator",false); nodeCache.start(true);
nodeCache.getListenable().addListener(()-> System.out.println("节点数据发生变化,结果---"+new String(nodeCache.getCurrentData().getData())));
curatorFramework.setData().forPath("/curator","123".getBytes()); TimeUnit.SECONDS.sleep(2);
|
PathChildrenCache
PathChildrenCache.StartMode
NORMAL
:初始化时为空
BUILD_INITIAL_CACHE
:在这个方法返回之前调用rebuild操作
POST_INITIALIZED_EVENT
:当cache初始化数据后发送一个PathChildrenCache事件
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38
| pathChildrenCache.getListenable().addListener((curatorFramework1, pathChildrenCacheEvent) -> { switch (pathChildrenCacheEvent.getType()) { case INITIALIZED: System.out.println("INITIALIZED"); break; case CHILD_ADDED: System.out.println("CHILD_ADDED"); break; case CHILD_REMOVED: System.out.println("CHILD_REMOVED"); break; case CHILD_UPDATED: System.out.println("CHILD_UPDATED"); break; case CONNECTION_LOST: System.out.println("CONNECTION_LOST"); break; case CONNECTION_SUSPENDED: System.out.println("CONNECTION_SUSPENDED"); break; case CONNECTION_RECONNECTED: System.out.println("CONNECTION_RECONNECTED"); break; } });
curatorFramework.create().withMode(CreateMode.PERSISTENT).forPath("/event", "event".getBytes()); TimeUnit.SECONDS.sleep(2); System.out.println("create event");
curatorFramework.create().withMode(CreateMode.EPHEMERAL).forPath("/event/event1", "event".getBytes()); TimeUnit.SECONDS.sleep(2);
curatorFramework.setData().forPath("/event/event1", "222".getBytes()); TimeUnit.SECONDS.sleep(2);
curatorFramework.delete().forPath("/event/event1"); curatorFramework.delete().forPath("/event");
|