woshikid / blog

Apache License 2.0
8 stars 1 forks source link

Curator学习笔记 #182

Open woshikid opened 2 years ago

woshikid commented 2 years ago

POM

<dependency>
    <groupId>org.apache.curator</groupId>
    <artifactId>curator-framework</artifactId>
    <version>5.2.1</version>
</dependency>
<!-- 如出现不兼容情况,需指定zookeeper的版本 -->
<dependency>
    <groupId>org.apache.zookeeper</groupId>
    <artifactId>zookeeper</artifactId>
    <version>3.8.0</version>
</dependency>

创建客户端

//CuratorFramework curator = CuratorFrameworkFactory.newClient("127.0.0.1:2181", new ExponentialBackoffRetry(1000, 3));
CuratorFramework curator = CuratorFrameworkFactory.builder()
    .connectString("127.0.0.1:2181,127.0.0.1:2182")
    .sessionTimeoutMs(60000) // 连接断开超过会话时间后临时节点与Watcher将被清除
    .retryPolicy(new ExponentialBackoffRetry(1000, 3)) // 重连策略,指数退让与最大重试次数
    //.namespace("test") // 路径前缀,后续所有操作都会带上该路径前缀:/test
    .build();

启动长连接(自动重连)

curator.start();

监听状态改变

curator.getConnectionStateListenable().addListener((cf, state) -> System.out.println(state)); // 持久监听器(state改变)

创建节点

curator.create()
    .creatingParentsIfNeeded() // 自动创建父节点
    .withMode(CreateMode.EPHEMERAL)
    //.inBackground((cf, event) -> System.out.println(event.getName())) // 使用异步
    .forPath("/node", "data".getBytes());

查询节点信息

Stat stat = curator.checkExists()
    .usingWatcher((CuratorWatcher) event -> System.out.println(event)) // 单次监听器(NodeCreated/NodeDeleted/NodeDataChanged)
    //.inBackground((cf, event) -> System.out.println(event.getStat())) // 使用异步
    .forPath("/node");

boolean exists = stat != null;
int version = stat.getVersion();
long ctime = stat.getCtime();
long mtime = stat.getMtime();
long num = stat.getNumChildren();

查询节点数据与信息

byte[] data = curator.getData()
    //.storingStatIn(stat) // 顺便查询节点信息
    .usingWatcher((CuratorWatcher) event -> System.out.println(event)) // 单次监听器(NodeDeleted/NodeDataChanged)
    //.inBackground((cf, event) -> System.out.println(event.getData())) // 使用异步
    .forPath("/node");

更新节点数据

curator.setData()
    //.withVersion(-1) // -1可以匹配任何版本
    //.inBackground((cf, event) -> System.out.println(event.getStat())) // 使用异步
    .forPath("/node", "data".getBytes());

删除节点

curator.delete()
    .deletingChildrenIfNeeded()
    //.inBackground((cf, event) -> System.out.println(event.getResultCode())) // 使用异步
    .forPath("/node");

查询子节点

List<String> children = curator.getChildren()
    .usingWatcher((CuratorWatcher) event -> System.out.println(event)) // 单次监听器(NodeDeleted/NodeChildrenChanged)
    //.inBackground((cf, event) -> System.out.println(event.getChildren())) // 使用异步
    .forPath("/");

使用事务

CuratorOp create = curator.transactionOp().create().forPath("/node");
CuratorOp setData = curator.transactionOp().setData().forPath("/node");
CuratorOp delete = curator.transactionOp().delete().forPath("/node");

List<CuratorTransactionResult> result = curator.transaction().forOperations(create, setData, delete);

创建监听器 注意:需要ZooKeeper 3.6+

curator.watchers()
    .add()
    .usingWatcher((CuratorWatcher) event -> System.out.println(event)) // 持久监听器(NodeCreated/NodeDeleted/NodeDataChanged)(包含子路径)
    .forPath("/node");

关闭连接

curator.close();

高级API

POM

<dependency>
    <groupId>org.apache.curator</groupId>
    <artifactId>curator-recipes</artifactId>
    <version>5.2.1</version>
</dependency>

缓存/持久监听器

注意:需要ZooKeeper 3.6+

CuratorCache cache = CuratorCache.builder(curator, "/node").build();
CuratorCacheListener listener = CuratorCacheListener.builder().forAll((type, old, data) -> System.out.println(type)).build();
cache.listenable().addListener(listener);
cache.start(); // close()

Thread.sleep(1000); // 初始化数据需要时间
byte[] data = cache.get("/node/data").get().getData(); // 查询节点缓存数据

分布式锁

// 可重入锁
InterProcessMutex lock = new InterProcessMutex(curator, "/lock");
// 非重入锁
InterProcessSemaphoreMutex lock = new InterProcessSemaphoreMutex(curator, "/lock");
// 可重入读写锁
InterProcessReadWriteLock readWriteLock = new InterProcessReadWriteLock(curator, "/lock");
var lock = readWriteLock.writeLock(); // readWriteLock.readLock();
// 组合锁(可重入锁/非重入锁)
InterProcessMultiLock lock = new InterProcessMultiLock(List.of(lock1, lock2));

// 获得锁
lock.acquire();
lock.acquire(10, TimeUnit.SECONDS);

// 锁状态
lock.isAcquiredInThisProcess();
//lock.isOwnedByCurrentThread();

// 释放锁
lock.release();

分布式计数器

DistributedAtomicLong atomic = new DistributedAtomicLong(curator, "/count", new RetryNTimes(10, 1000));
boolean succeeded = atomic.increment().succeeded();
long value = atomic.get().postValue();

Leader选举

LeaderSelector leaderSelector = new LeaderSelector(curator, "/leader", new LeaderSelectorListener() {
    @Override
    public void stateChanged(CuratorFramework client, ConnectionState newState) {
        System.out.println(newState);
    }

    @Override
    public void takeLeadership(CuratorFramework client) throws Exception {
        // 进入此方法即表示成为leader
        Thread.sleep(60000);
        // 退出此方法即表示放弃leader
    }
});

// 默认放弃leader后不再参与选举
leaderSelector.autoRequeue(); // 自动重新参与选举
leaderSelector.start(); // close()