? ? Apache curator-recipes组件提供了大量已经"生产化"(produced)的特性,极大的简化了使用zk的复杂度.
? ? 1. Cache: 提供了对一个Node持续监听,如果节点数据变更,即可立即得到响应. 开发者无需过度的关注watcher和Event操作.
? ? 2. Queues: 提供了重量级的分布式队列解决方案,比如:权重队列,可延迟队列等.其实Zookeeper并不适合作为数据存储系统,你可以适度的使用它来达成分布式队列的设计要求.
? ? 3. Counters: 全局计数器是分布式设计中很常用的,包括"全局计算器"和"原子自增计数器".
? ? 4. Locks: 分布式锁的设计有很多手段,此组件提供了分布式"读写分写锁"/"共享锁"等.在我们需要控制资源访问的情况下,非常有用.
? ? 5. Barries: 栅栏,需要对分布式环境中,多个操作(进程)进行同步或者协同时,可以考虑使用barries;
? ? 6. Elections: 选举;可以在多个"注册者"之间选举出leader,作为操作调度/任务监控/队列消费的执行者,我们在设计"leader角色选举"/"单点任务执行""分布式队列消费者"等场景时,非常有效.
?
? ? 代码实例
1. 创建Client
CuratorFrameworkFactory.Builder builder = CuratorFrameworkFactory.builder();
//fluent style
String namespace = "cluster-worker";
CuratorFramework client = builder.connectString("127.0.0.1:2181")
.sessionTimeoutMs(30000)
.connectionTimeoutMs(30000)
.canBeReadOnly(false)
.retryPolicy(new ExponentialBackoffRetry(1000, Integer.MAX_VALUE))
.namespace(namespace)
.defaultData(null)
.build();
client.start();
EnsurePath ensure = client.newNamespaceAwareEnsurePath(namespace);
//code for test
Thread.sleep(5000);
//client.close()//
?
2. Caches
? ? 持续watcher节点,并将节点的数据变更即时的在本地反应出来.recpise提供了PathChildrenCache和NodeCache两个API.
public static PathChildrenCache pathChildrenCache(CuratorFramework client, String path, Boolean cacheData) throws Exception {
final PathChildrenCache cached = new PathChildrenCache(client, path, cacheData);
cached.getListenable().addListener(new PathChildrenCacheListener() {
@Override
public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception {
PathChildrenCacheEvent.Type eventType = event.getType();
switch (eventType) {
case CONNECTION_RECONNECTED:
cached.rebuild();
break;
case CONNECTION_SUSPENDED:
case CONNECTION_LOST:
System.out.println("Connection error,waiting...");
break;
default:
System.out.println("Data:" + event.getData().toString());
}
}
});
return cached;
}
?
PathChildrenCache cached = pathChildrenCache(client,path,true);
//= start() + rebuild()
//事件操作,将会在额外的线程中执行.
cached.start(PathChildrenCache.StartMode.BUILD_INITIAL_CACHE);
List<ChildData> childData = cached.getCurrentData();
if (childData != null) {
for (ChildData data : childData) {
System.out.println("Path:" + data.getPath() + ",data" + new String(data.getData(), "utf-8"));
}
}
//当不在需要关注此节点数据时,需要及时的关闭它.
//因为每个cached,都会额外的消耗一个线程.
//cached.close();////close the watcher,clear the cached Data
? ? 对于PathChildrenCache.getCurrentData()将从获取本地的数据列表,而不是触发一次zookeeper.getChildren(),因此为"Cache".
3. Queues:分布式队列
? ? 分布式队列的基本特性,就是"生产者"或"消费者"跨越多个进程,且在此种环境中需要确保队列的push/poll的有序性.
? ? zookeeper本身并没有提供分布式队列的实现,只是recipse根据zookeeper的watcher和具有version标记的node,来间接的实现分布式queue..内部机制如下:
? ? --> 如果是消费者(QueueConsumer),会创建一个类似于PathChildrenCache的实例用于监听queuePath下的子节点变更事件(单独的线程中).同时consumer处于阻塞状态,当有子节点变更事件时会被唤醒(包括创建子节点/删除子节点等);
? ? --> 此时consumer获取子节点列表,并将每个节点信息封装成Runnable任务单元,提交到线程池中.?
