文章

ZooKeeper Curator 使用示例

ZooKeeper Curator 使用示例

1. Curator 简介

Apache Curator 是一个用于简化 Apache ZooKeeper 使用的 Java 客户端库。ZooKeeper 提供了分布式协调服务(如选举、锁、队列等),但其原生 API 复杂且易出错。Curator 通过封装底层细节,提供了更高层次的抽象和丰富的 Recipe 实现,使开发者能够更高效地构建分布式系统。

1.1. 核心价值

  • 简化连接管理:自动处理重连、会话恢复。
  • 增强容错性:内置多种重试策略。
  • 提供高级功能:如分布式锁、领导选举、队列等。
  • 命名空间隔离:避免路径冲突。
  • 异步支持:Curator 4.x 开始支持异步 API。

1.2. 与原生 ZooKeeper API 的对比

功能 原生 API Curator
连接管理 需手动处理重连、会话失效 自动重连、内置重试策略
节点操作 基础 CRUD 操作 支持版本控制、事务操作
分布式协调 需手动实现(如 Watcher 逻辑) 提供封装好的 Recipe(如锁、选举)
异常处理 需手动捕获并处理异常 自动重试、异常封装
可维护性 代码复杂、易出错 代码简洁、可读性强

2. 连接管理

2.1. 客户端初始化

Curator 提供了灵活的客户端构建方式,支持自定义连接参数和命名空间。

CuratorFramework client = CuratorFrameworkFactory.builder()
    .connectString("localhost:2181")          // ZooKeeper 地址
    .sessionTimeoutMs(5000)                   // 会话超时时间
    .connectionTimeoutMs(5000)                // 连接超时时间
    .retryPolicy(new ExponentialBackoffRetry(100, 3))  // 指数退避重试策略
    .namespace("myNamespace")                 // 命名空间隔离
    .authorization("digest", "user:password".getBytes())  // 认证信息
    .build();
client.start();  // 启动客户端

2.2. 重试策略详解

Curator 提供多种重试策略,适用于不同场景:

策略类 描述
RetryOneTime 每次重试固定间隔,适合短时故障(如网络抖动)
RetryNTimes 最多重试 N 次
ExponentialBackoffRetry 指数退避重试(推荐),初始间隔 × 2^ 重试次数,避免雪崩效应
RetryUntilElapsed 在指定时间内持续重试

示例:指数退避策略

new ExponentialBackoffRetry(100, 3, 1000);  // 初始间隔 100ms,最多重试 3 次,最大间隔 1000ms

2.3. 命名空间(Namespace)

命名空间是 Curator 提供的逻辑隔离机制,所有操作路径会自动加上前缀。

.namespace("myNamespace")  // 实际路径为 /myNamespace/...

使用场景

  • 多租户系统中隔离不同业务的数据。
  • 避免路径冲突(如多个应用使用相同路径)。

3. 节点及数据管理

3.1. 节点类型(CreateMode)

Curator 支持多种节点类型,与 ZooKeeper 原生节点类型一致:

类型 描述
PERSISTENT 持久节点,ZooKeeper 重启后仍存在
PERSISTENT_SEQUENTIAL 持久有序节点,路径末尾自动追加递增序号
EPHEMERAL 临时节点,客户端断开连接后自动删除
EPHEMERAL_SEQUENTIAL 临时有序节点

示例:创建临时节点

client.create().withMode(CreateMode.EPHEMERAL)  // 创建临时节点
    .forPath("/example/ephemeral");

3.2. 版本控制(Optimistic Locking)

ZooKeeper 使用版本号实现乐观锁,Curator 提供了便捷的版本控制 API。

Stat stat = new Stat();
byte[] data = client.getData().storingStatIn(stat).forPath("/example/path");

// 更新数据时指定版本号
client.setData().withVersion(stat.getVersion())  // 仅当版本匹配时更新
    .forPath("/example/path", "new_data".getBytes());

版本不匹配异常:若版本号不匹配,抛出 KeeperException.BadVersionException

3.3. 事务操作

Curator 支持原子事务操作,保证多个操作的原子性。

client.inTransaction().create().forPath("/example/path1", "data1".getBytes()).and()
    .setData().forPath("/example/path2", "data2".getBytes()).and()
    .commit();

4. Recipe 实现

4.1. 领导选举(Leader Election)

Curator 提供了 LeaderSelector 实现领导选举,支持自动重选。

LeaderSelector leaderSelector = new LeaderSelector(client, "/example/leader", new LeaderSelectorListenerAdapter() {
    @Override
    public void takeLeadership(CuratorFramework client) throws Exception {
        System.out.println("I am the leader now!");
        Thread.sleep(3000);  // 模拟任务执行
        System.out.println("Releasing leadership");}
});
leaderSelector.autoRequeue();  // 释放领导权后重新参与选举
leaderSelector.start();

实现原理

  1. 每个客户端在 /example/leader 路径下创建临时顺序子节点。
  2. 客户端获取所有子节点并排序,判断自己是否是最小节点。
  3. 如果是最小节点,则成为 Leader;否则监听前一个节点的变化。

4.2. 分布式锁(Distributed Lock)

Curator 提供了 InterProcessMutex 实现可重入锁。

InterProcessMutex lock = new InterProcessMutex(client, "/example/lock");
try {
    if (lock.acquire(10, TimeUnit.SECONDS)) {  // 尝试获取锁
        try {
            System.out.println("Lock acquired, performing work...");
            Thread.sleep(5000);} finally {lock.release();  // 释放锁
        }
    } else {
        System.out.println("Failed to acquire lock");}
} catch (Exception e) {e.printStackTrace();
}

实现原理

  1. 客户端在锁路径下创建临时顺序子节点。
  2. 获取所有子节点并排序,判断自己是否是最小节点。
  3. 如果是最小节点,则持有锁;否则监听前一个节点的变化。

4.3. 分布式队列(Distributed Queue)

Curator 提供了 DistributedQueue 实现 FIFO 队列。

DistributedQueue<String> queue = QueueBuilder.builder(client, message -> {
    System.out.println("Consumed:" + message);}, "/example/queue", new StringSerializer()).buildQueue();
queue.start();

// 生产者
queue.put("Message 1");
queue.put("Message 2");

实现原理

  • 生产者创建有序节点(如 /example/queue/0000000001)。
  • 消费者监听队列路径,按序消费节点。

5. 节点监听

5.1. NodeCache(监听单节点)

NodeCache 用于监听单个节点的数据变化。

NodeCache nodeCache = new NodeCache(client, "/example/node");
nodeCache.getListenable().addListener(() -> {
    byte[] newData = nodeCache.getCurrentData().getData();
    System.out.println("Node data updated:" + new String(newData));
});
nodeCache.start(true);  // true 表示启动时获取初始数据

5.2. PathChildrenCache(监听子节点)

PathChildrenCache 用于监听指定路径下的子节点变化。

PathChildrenCache childrenCache = new PathChildrenCache(client, "/example/path", true);
childrenCache.getListenable().addListener((client, event) -> {
    switch (event.getType()) {
        case CHILD_ADDED:
            System.out.println("Child added:" + event.getData().getPath());
            break;
        case CHILD_UPDATED:
            System.out.println("Child updated:" + event.getData().getPath());
            break;
        case CHILD_REMOVED:
            System.out.println("Child removed:" + event.getData().getPath());
            break;
    }
});
childrenCache.start(PathChildrenCache.StartMode.NORMAL);

5.3. TreeCache(监听整个树)

TreeCache 可以监听指定节点及其所有子节点的变化。

TreeCache treeCache = new TreeCache(client, "/example/path");
treeCache.getListenable().addListener((client, event) -> {
    String path = event.getData().getPath();
    switch (event.getType()) {
        case NODE_ADDED:
            System.out.println("Node added:" + path);
            break;
        case NODE_UPDATED:
            System.out.println("Node updated:" + path);
            break;
        case NODE_REMOVED:
            System.out.println("Node removed:" + path);
            break;
    }
});
treeCache.start();

6. 高级特性

6.1. 异步 API(Curator 4.x 新增)

Curator 4.x 开始支持异步 API,提升性能:

CompletableFuture<Void> future = client.create().forPathAsync("/example/async", "data".getBytes());
future.thenRun(() -> System.out.println("Async create completed"));

6.2. 服务发现(Service Discovery)

Curator 提供了 ServiceDiscovery 模块,实现服务注册与发现。

ServiceDiscovery<InstanceDetails> serviceDiscovery = ServiceDiscoveryBuilder.builder(InstanceDetails.class)
    .client(client)
    .basePath("/services").build();
serviceDiscovery.start();

// 注册服务实例
ServiceInstance<InstanceDetails> instance = ServiceInstance.<InstanceDetails>builder()
    .name("my-service")
    .address("127.0.0.1")
    .port(8080)
    .payload(new InstanceDetails("metadata")).build();
serviceDiscovery.registerService(instance);

// 发现服务
Collection<ServiceInstance<InstanceDetails>> instances = serviceDiscovery.queryForInstances("my-service");

7. 最佳实践

7.1. 会话超时设置

  • Session Timeout:建议设置为 5~15 秒,过短会导致频繁重连,过长会延迟故障检测。
  • Connection Timeout:通常设置为 5~10 秒。

7.2. 异常处理

  • 使用 RetryPolicy 自动重试。
  • 捕获并记录 KeeperExceptionInterruptedException

7.3. 性能优化

  • 避免频繁创建 / 删除节点。
  • 使用 PathChildrenCache 替代多次 getChildren()
  • 合理使用异步 API。

8. 总结

Apache Curator 是构建分布式系统的强大工具,通过封装 ZooKeeper 的复杂性,提供了简洁、可靠的 API。其核心优势在于:

  • 简化连接管理:自动重连、会话恢复。
  • 高级功能:领导选举、分布式锁、队列等。
  • 灵活监听机制:NodeCache、PathChildrenCache、TreeCache。
  • 支持异步操作:提升性能。

通过合理使用 Curator,开发者可以专注于业务逻辑,而无需深入处理 ZooKeeper 的底层细节。对于需要高可用和强一致性的分布式系统,Curator 是首选客户端库。


参考文档

License:  CC BY 4.0