

Apache ZooKeeper - 使用原生的API操作ZK
ZooKeeper原生Java API的不足之处:
Apache curator:
Curator是netflix公司开源的一套zookeeper客户端,Apache的顶级项目
与Zookeeper提供的原生客户端相比,Curator的抽象层次更高,简化了Zookeeper客户端的开发量
Curator解决了很多zookeeper客户端非常底层的细节开发工作,包括连接重连、反复注册wathcer和NodeExistsException 异常等
Apache Curator : https://curator.apache.org/

看看模块

<dependency>
<groupId>org.apache.curatorgroupId>
<artifactId>curator-recipesartifactId>
<version>5.0.0version>
<exclusions>
<exclusion>
<groupId>org.apache.zookeepergroupId>
<artifactId>zookeeperartifactId>
exclusion>
exclusions>
dependency>
<dependency>
<groupId>org.apache.curatorgroupId>
<artifactId>curator-x-discoveryartifactId>
<version>5.0.0version>
<exclusions>
<exclusion>
<groupId>org.apache.zookeepergroupId>
<artifactId>zookeeperartifactId>
exclusion>
exclusions>
dependency>和客户端/ 服务器交互,第一步就要创建会话
Curator 提供了多种方式创建会话
RetryPolicy retryPolicy = new ExponentialBackoffRetry(5000, 30);
CuratorFramework client = CuratorFrameworkFactory.newClient(getConnectStr(), retryPolicy);
client .start(); RetryPolicy retryPolicy = new ExponentialBackoffRetry(5000, 30);
curatorFramework = CuratorFrameworkFactory.builder().connectString(getConnectStr())
.retryPolicy(retryPolicy)
.sessionTimeoutMs(sessionTimeoutMs)
.connectionTimeoutMs(connectionTimeoutMs)
.canBeReadOnly(true)
.build();
curatorFramework.start();上述代码采用了流式方式,最核心的类是 CuratorFramework 类,该类的作用是定义一个 ZooKeeper 客户端对象,并在之后的上下文中使用。
在定义 CuratorFramework 对象实例的时候, 使用了 CuratorFrameworkFactory 工厂方法,并指定了 connectionString服务器地址列表、retryPolicy 重试策略 、sessionTimeoutMs 会话超时时间、connectionTimeoutMs 会话创建超时时间。
host1:port1,host2:port2,host3:port3策略名称 | 描述 |
|---|---|
ExponentialBackoffRetry | 重试一组次数,重试之间的睡眠时间增加 |
RetryNTimes | 重试最大次数 |
RetryOneTime | 只重试一次 |
RetryUntilElapsed | 在给定的时间结束之前重试 |
/**
* 递归创建子节点
*/
@SneakyThrows
@Test
public void testCreateWithParent() {
CuratorFramework curatorFramework = getCuratorFramework();
String pathWithParent = "/artisan-node/artisan-node-sub1/artisan-node-sub1-1";
String path = curatorFramework.create().creatingParentsIfNeeded().forPath(pathWithParent);
log.info("curator create node :{} successfully.", path);
}
使用 create 函数创建数据节点,并通过 withMode 函数指定节点类型持久化节点,临时节点,顺序节点,临时顺序节点,持久化顺序节点等),默认是持久化节点,之后调用 forPath 函数来指定节点的路径和数据信息
/**
* protection 模式,防止由于异常原因,导致僵尸节点
* @throws Exception
*/
@SneakyThrows
@Test
public void testCreate() {
CuratorFramework curatorFramework = getCuratorFramework();
String forPath = curatorFramework
.create()
.withProtection() // 防止僵尸节点
.withMode(CreateMode.EPHEMERAL_SEQUENTIAL).
forPath("/curator-node", "data".getBytes());
log.info("curator create node :{} successfully.", forPath);
}
看下zk中的数据

实现原理后面单独开篇解读,总体思想就是 随机生成一个UUID, 再创建之前客户端根据这个缓存的UUID去看ZK Server是否存在,存在则认为是成功的,否则就继续创建。
@Test
public void testGetData() throws Exception {
CuratorFramework curatorFramework = getCuratorFramework();
byte[] bytes = curatorFramework.getData().forPath("/curator-node");
log.info("get data from node :{} successfully.", new String(bytes));
}通过客户端实例的getData() 方法更新 ZooKeeper 服务上的数据节点,在getData 方法的后边,通过 forPath 函数来指定查询的节点名称
@Test
public void testSetData() throws Exception {
CuratorFramework curatorFramework = getCuratorFramework();
curatorFramework.setData().forPath("/curator-node", "changed!".getBytes());
byte[] bytes = curatorFramework.getData().forPath("/curator-node");
log.info("get data from node /curator-node :{} successfully.", new String(bytes));
}通过客户端实例的 setData() 方法更新 ZooKeeper 服务上的数据节点,在setData 方法的后边,通过 forPath 函数来指定更新的数据节点路径以及要更新的数据。
@Test
public void testDelete() throws Exception {
CuratorFramework curatorFramework = getCuratorFramework();
String pathWithParent = "/node-parent";
curatorFramework.delete().guaranteed().deletingChildrenIfNeeded().forPath(pathWithParent);
}guaranteed:主要起到一个保障删除成功的作用, 只要该客户端的会话有效,就会在后台持续发起删除请求,直到该数据节点在ZooKeeper 服务端被删除。
deletingChildrenIfNeeded:指定了该函数后,系统在删除该数据节点的时候会以递归的方式直接删除其子节点,以及子节点的子节点。
@Test
public void testListChildren() throws Exception {
CuratorFramework curatorFramework = getCuratorFramework();
String pathWithParent = "/artisan-node";
List<String> list = curatorFramework.getChildren().forPath(pathWithParent);
list.forEach(System.out::println);
}通过客户端实例的 getChildren() 方法更新 ZooKeeper 服务上的数据节点,在getChildren方法的后边,通过 forPath 函数来指定节点下的一级子节点的名称
Curator 使用BackgroundCallback 接口处理服务器端返回来的信息。
如果在异步线程中调用,默认在 EventThread 线程中调用,支持自定义线程池
/**
* 使用默认的 EventThread异步线程处理
* @throws Exception
*/
@Test
public void testThreadPoolByDefaultEventThread() throws Exception {
CuratorFramework curatorFramework = getCuratorFramework();
String ZK_NODE="/artisan-node";
curatorFramework.getData().inBackground((client, event) -> {
log.info(" background: {}", new String(event.getData()));
}).forPath(ZK_NODE);;
}
/**
* 使用自定义线程池
* @throws Exception
*/
@Test
public void testThreadPoolByCustomThreadPool() throws Exception {
CuratorFramework curatorFramework = getCuratorFramework();
ExecutorService executorService = Executors.newSingleThreadExecutor();
String ZK_NODE="/artisan-node";
curatorFramework.getData().inBackground((client, event) -> {
log.info(" background: {}", new String(event.getData()));
},executorService).forPath(ZK_NODE);
}