ZkClient

永久监听的封装

添加maven依赖
1
2
3
4
5
<dependency>
<groupId>com.101tec</groupId>
<artifactId>zkclient</artifactId>
<version>0.10</version>
</dependency>
建立连接
1
2
3
//建立连接
ZkClient zkClient=new ZkClient(CONNECTSTRING,4000);
System.out.println(zkClient+"---success");
创建子节点
1
2
3
4
5
//递归创建节点
//内部实现:当父节点不存在时,会报异常,在catch里判断第二个参数(是否创建父节点)
//如果为false:则直接将异常抛出
//如果为true:截取节点路径,获取父节点,先创建父节点,再创建子节点
client.createPersistent("/zkClientParent/zkClientSon",true);
删除节点
1
2
3
//删除节点
//如果存在子节点,则递归删除子节点
client.deleteRecursive("/zkRootNode");
获取子节点
1
2
3
//获取子节点
List<String> children= client.getChildren("/zkParent");
System.out.println(children);
修改节点
1
client.writeData("/zkParent", "123");
Watch
1
2
3
4
5
6
7
8
9
10
//watch
client.subscribeDataChanges("/zkParent", new IZkDataListener() {
public void handleDataChange(String s, Object o) throws Exception {
System.out.println("change:" + s + "---" + o);
}

public void handleDataDeleted(String s) throws Exception {
System.out.println("delete:" + s);
}
});
完整版
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
package com.bai.zkclient;

import org.I0Itec.zkclient.IZkDataListener;
import org.I0Itec.zkclient.ZkClient;

import java.util.List;
import java.util.concurrent.TimeUnit;

/**
* Created by 2bai on 25/10/2017.
*/
public class ZkClientApiOperatorDemo {

private final static String CONNECTSTRING = "";

private static ZkClient getInstance() {
return new ZkClient(CONNECTSTRING, 5000);
}

public static void main(String[] args) throws InterruptedException {
ZkClient client = getInstance();

//递归创建节点
//内部实现:当父节点不存在时,会报异常,在catch里判断第二个参数(是否创建父节点)
//如果为false:则直接将异常抛出
//如果为true:截取节点路径,获取父节点,先创建父节点,再创建子节点
client.createPersistent("/zkParent/zkSon/zksuperson", true);

//获取子节点
List<String> children = client.getChildren("/zkParent");
System.out.println(children);

//watch
client.subscribeDataChanges("/zkParent", new IZkDataListener() {
public void handleDataChange(String s, Object o) throws Exception {
System.out.println("change:" + s + "---" + o);
}

public void handleDataDeleted(String s) throws Exception {
System.out.println("delete:" + s);
}
});
//修改节点
client.writeData("/zkParent", "123");
TimeUnit.SECONDS.sleep(2);

//删除节点
boolean result = client.deleteRecursive("/zkParent");
TimeUnit.SECONDS.sleep(2);

}
}

Curator

介绍

Curator本身是Netflix公司开源的zookeeper客户端;

curator提供了各种应用场景的实现封装

curator-framework 提供了fluent风格api

curator-replice 提供了实现封装

比较
  • 和Zookeeper原生客户端相比
    • 接口和API的层次更加抽象化,简化Zookeeper原生客户端的开发量
  • 和ZkClient相比
    • 处理更加优美
    • 异常处理比较好
      • 比如节点不存在可以通过exists判断,但是ZkClient直接在catch中处理了。
创建会话
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
//创建会话的两种方式

//1.正常的方式
CuratorFramework curatorFramework = CuratorFrameworkFactory.newClient(CONNECTSTRING, 5000,
5000, new ExponentialBackoffRetry(1000, 3));
//启动连接
curatorFramework.start();

//2.flunt风格
//namespace指定所有会话均在此节点下创建,可省略
CuratorFramework curatorFrameworkFlunt=CuratorFrameworkFactory.builder().connectString(CONNECTSTRING).sessionTimeoutMs(5000).
retryPolicy(new ExponentialBackoffRetry(1000, 3)).namespace("/curator").build();
//启动连接
curatorFrameworkFlunt.start();
System.out.println("success");
curator连接的重试策略
  • ExponentialBackoffRetry() 衰减重试
  • RetryNTimes 指定最大重试次数
  • RetryOneTime 仅重试一次
  • RetryUnitilElapsed 一直重试直到规定的时间
创建节点
1
2
3
4
5
6
try {
String result=curatorFramework.create().creatingParentContainersIfNeeded().withMode(CreateMode.PERSISTENT).forPath("/curator/curator1/curator11","123".getBytes());
System.out.println(result);
} catch (Exception e) {
e.printStackTrace();
}
获取节点
1
2
3
4
5
6
7
8
9
10
try {
Stat stat=new Stat();
//节点的状态信息
//storingStatIn把节点的状态信息存在一个变量里最终返回
//状态信息对应的名称可以去Stat源码里看
byte[] data=curatorFramework.getData().storingStatIn(stat).forPath("/curator");
System.out.println(new String(data)+"---stat---"+stat);
} catch (Exception e) {
e.printStackTrace();
}
修改节点
1
2
3
4
5
6
7
//修改数据
try {
Stat stat=curatorFramework.setData().forPath("/curator","123".getBytes());
System.out.println(stat);
} catch (Exception e) {
e.printStackTrace();
}
删除节点
1
2
3
4
5
6
7
try {   
//只写根节点,递归删除子节点
//默认情况下,version为-1
curatorFramework.delete().deletingChildrenIfNeeded().forPath("/curator");
} catch (Exception e) {
e.printStackTrace();
}
异步操作
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
//异步操作
//有时候操作可能消耗时间较长,会处于阻塞,无法执行后续流程,可以使用异步操作

//Java5以后
ExecutorService service= Executors.newFixedThreadPool(1);
final CountDownLatch countDownLatch=new CountDownLatch(1);

//PS:创建节点的事件是由线程池去处理的,而不是当前线程
try {
final String result=curatorFramework.create().creatingParentContainersIfNeeded().withMode(CreateMode.PERSISTENT).inBackground(new BackgroundCallback() {
public void processResult(CuratorFramework curatorFramework, CuratorEvent curatorEvent) throws Exception {
//操作完成后的回调
//当前线程名---响应后的结果码---当前操作类型
//main-EventThread----0---CREATE
System.out.println(Thread.currentThread().getName()+"----"+curatorEvent.getResultCode()+"---"+curatorEvent.getType());
countDownLatch.countDown();
}
}).forPath("/curatorTest/curator1/curator11","777".getBytes());
System.out.println("result:"+result);
} catch (Exception e) {
e.printStackTrace();
}

countDownLatch.await();
service.shutdown();
事务操作
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
//import org.apache.curator.framework.api.transaction.CuratorTransactionResult;
//事务操作
//curator独有
//提供一个封装的操作可以允许事务性操作
try {
//创建和修改是绑定在一个事务里,执行完成后再提交事务
//修改的节点并不存在,会报错org.apache.zookeeper.KeeperException$NoNodeException: KeeperErrorCode = NoNode,并且trans节点不会被创建
//修改的节点存在,则会成功输出结果
//
Collection<CuratorTransactionResult> resultCollection=curatorFramework.inTransaction().create()
.forPath("/trans","111".getBytes()).and().setData().forPath("/curatorTest","111".getBytes()).and().commit();

for (CuratorTransactionResult result:resultCollection){
System.out.println(result.getForPath()+"---"+result.getType());
}
} catch (Exception e) {
e.printStackTrace();
}
监听
  • 封装了三种watch来做节点的监听
    • PathChildrenCache:监视一个路径下子节点的创建、删除、以及节点数据更新
    • NodeCache:监视一个节点的创建、更新、删除并且会将节点的数据缓存到本地
    • treecache:pathcache和nodecache的合体(监视路径下的创建、更新、删除事件并且还会缓存路径下的所有子节点的数据)

NodeCache

1
2
3
4
5
6
7
8
9
10
11
CuratorFramework curatorFramework=CuratorClientUtil.getInstance();

curatorFramework.create().creatingParentContainersIfNeeded().withMode(CreateMode.EPHEMERAL).forPath("/curator","111".getBytes());

final NodeCache nodeCache=new NodeCache(curatorFramework,"/curator",false);
nodeCache.start(true);//启动时把初始操作做了构造

nodeCache.getListenable().addListener(()-> System.out.println("节点数据发生变化,结果---"+new String(nodeCache.getCurrentData().getData())));

curatorFramework.setData().forPath("/curator","123".getBytes());
TimeUnit.SECONDS.sleep(2);

PathChildrenCache

  • PathChildrenCache.StartMode
    • NORMAL:初始化时为空
    • BUILD_INITIAL_CACHE:在这个方法返回之前调用rebuild操作
    • POST_INITIALIZED_EVENT:当cache初始化数据后发送一个PathChildrenCache事件
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
pathChildrenCache.getListenable().addListener((curatorFramework1, pathChildrenCacheEvent) -> {
switch (pathChildrenCacheEvent.getType()) {
case INITIALIZED:
System.out.println("INITIALIZED");
break;
case CHILD_ADDED:
System.out.println("CHILD_ADDED");
break;
case CHILD_REMOVED:
System.out.println("CHILD_REMOVED");
break;
case CHILD_UPDATED:
System.out.println("CHILD_UPDATED");
break;
case CONNECTION_LOST:
System.out.println("CONNECTION_LOST");
break;
case CONNECTION_SUSPENDED:
System.out.println("CONNECTION_SUSPENDED");
break;
case CONNECTION_RECONNECTED:
System.out.println("CONNECTION_RECONNECTED");
break;
}
});

curatorFramework.create().withMode(CreateMode.PERSISTENT).forPath("/event", "event".getBytes());
TimeUnit.SECONDS.sleep(2);
System.out.println("create event");

curatorFramework.create().withMode(CreateMode.EPHEMERAL).forPath("/event/event1", "event".getBytes());
TimeUnit.SECONDS.sleep(2);

curatorFramework.setData().forPath("/event/event1", "222".getBytes());
TimeUnit.SECONDS.sleep(2);

curatorFramework.delete().forPath("/event/event1");
curatorFramework.delete().forPath("/event");