etcd-io / jetcd

etcd java client
Apache License 2.0
1.11k stars 316 forks source link

How can catch put and delete events with Watch? #853

Closed cleverpig closed 3 years ago

cleverpig commented 3 years ago

I made a watch with jetcd 0.5.0, and process event with LoggingWatchListener. but It only call when any event complete, how can do like etcdctl: image

Here is my code:

public void keepWatch(String key){
        if (watchMap.containsKey(key)==false) {
            Watch watch = etcdClient.getWatchClient();
            ByteSequence keyBS = ByteSequence.from(key, StandardCharsets.UTF_8);
            Watch.Watcher watcher=watch.watch(
                    keyBS,
                    WatchOption.DEFAULT,
                    new LoggingWatchListener());
            log.debug("watch on key[{}]",key);
        }
        else{
            log.debug("we had watched on key[{}]",key);
        }
    }
class LoggingWatchListener implements Watch.Listener {
        @Override
        public void onNext(WatchResponse response) {
            log.info("watch[listener] - nextEvent");
            for(WatchEvent event:response.getEvents()){
                log.debug("watch[listener] - nextEvent - type:{}, KV:{}, PreKV:{}",
                        event.getEventType(),event.getKeyValue(),event.getPrevKV()
                );
            }

        }

        @Override
        public void onError(Throwable throwable) {
            log.error("watch[listener] - errorEvent - something wrong: {}",throwable.getMessage(),throwable);
        }

        @Override
        public void onCompleted() {
            log.debug("watch[listener] - completedEvent");
        }
    }
lburgazzoli commented 3 years ago

What do you mean exactly ?

cleverpig commented 3 years ago

What do you mean exactly ?

I mean How to monitor and process events with etcd watcher?
I tried the Listener to listen events, but can't get event type and more event detail. So I tried watch.watch method with consumer parameter, but it doesn't work.

lburgazzoli commented 3 years ago

can you please try to update to the latest release ? we do have some tests here you may want to check: https://github.com/etcd-io/jetcd/blob/master/jetcd-core/src/test/java/io/etcd/jetcd/WatchTest.java#L50

cleverpig commented 3 years ago

can you please try to update to the latest release ? we do have some tests here you may want to check: https://github.com/etcd-io/jetcd/blob/master/jetcd-core/src/test/java/io/etcd/jetcd/WatchTest.java#L50

Yes, I'd checked out the master and compile & install it. But Listener doesn't fire "onNext" event still. I debuged my code, and found it never call "onNext" method in WatcherImpl class. image

lburgazzoli commented 3 years ago

how do you set up the watcher ?

lburgazzoli commented 3 years ago

can you provide a reproducer ? a failing test would be very welcome

cleverpig commented 3 years ago

Sure! 1, Maven dependencies:

<dependencies>
    <!-- etcd client -->
    <dependency>
      <groupId>io.etcd</groupId>
      <artifactId>jetcd-core</artifactId>
      <version>0.5.0</version>
    </dependency>

    <!-- test -->
    <dependency>
      <groupId>junit</groupId>
      <artifactId>junit</artifactId>
      <version>4.12</version>
      <scope>test</scope>
    </dependency>

    <!-- logging -->
    <dependency>
      <groupId>org.slf4j</groupId>
      <artifactId>slf4j-log4j12</artifactId>
      <version>1.7.5<</version>
    </dependency>
    <dependency>
      <groupId>org.slf4j</groupId>
      <artifactId>jcl-over-slf4j</artifactId>
      <version>1.7.5<</version>
    </dependency>

    <!-- misc -->
    <dependency>
      <groupId>org.projectlombok</groupId>
      <artifactId>lombok</artifactId>
      <version>1.18.16</version>
      <scope>provided</scope>
    </dependency>
</dependencies>

2,etcdUtil and testCase source code

lburgazzoli commented 3 years ago

I've create the following example:

@Grab(group = 'org.slf4j', module = 'slf4j-simple', version = '1.7.30')
@Grab(group = 'io.etcd', module = 'jetcd-core', version = '0.5.4')

import io.etcd.jetcd.ByteSequence
import io.etcd.jetcd.Client
import io.etcd.jetcd.Watch
import io.etcd.jetcd.options.WatchOption
import io.etcd.jetcd.watch.WatchResponse
import org.slf4j.LoggerFactory
import java.nio.charset.StandardCharsets

def log = LoggerFactory.getLogger('watcher')
def client = Client.builder().endpoints(args[0]).build()
def watch = client.getWatchClient()
def kv = client.getKVClient()
def key = ByteSequence.from(UUID.randomUUID().toString(), StandardCharsets.UTF_8)

watch.watch(
    key,
    WatchOption.newBuilder().withProgressNotify(true).withPrevKV(true).build(),
    new Watch.Listener() {
        @Override
        void onNext(WatchResponse response) {
            log.info("onNext")
            response.events.each {
                log.info("""
                    nextEvent: 
                        type: ${it.getEventType()}
                        KV: ${it.keyValue.key.toString(StandardCharsets.UTF_8)} -> ${it.keyValue.value.toString(StandardCharsets.UTF_8)}
                        PreKV: ${it.prevKV.key.toString(StandardCharsets.UTF_8)} -> ${it.prevKV.value.toString(StandardCharsets.UTF_8)}
                """.stripIndent())
            }
        }

        @Override
        void onError(Throwable throwable) {
            log.error("onError - something wrong: {}", throwable.message, throwable);
        }

        @Override
        void onCompleted() {
            log.debug("onCompleted");
        }
    }
)

while (true) {
    kv.put(key, ByteSequence.from(UUID.randomUUID().toString(), StandardCharsets.UTF_8))
    Thread.sleep(1000)
}

It requires groovy and if I run it with:

groovy watcher.groovy http://localhost:2379

I get something like:

[grpc-default-executor-1] INFO watcher - onNext
[grpc-default-executor-1] INFO watcher - 
nextEvent: 
    type: PUT
    KV: 3d3cfdca-432f-47c8-af5a-f95d6111e47a -> 3e77bcdd-0f43-400c-a8c5-cc90d12641be
    PreKV:  -> 

[grpc-default-executor-1] INFO watcher - onNext
[grpc-default-executor-1] INFO watcher - 
nextEvent: 
    type: PUT
    KV: 3d3cfdca-432f-47c8-af5a-f95d6111e47a -> ad0b437b-41be-4cca-aa0c-3ff942c82174
    PreKV: 3d3cfdca-432f-47c8-af5a-f95d6111e47a -> 3e77bcdd-0f43-400c-a8c5-cc90d12641be

So to me it appears to work, am I missing something ?

cleverpig commented 3 years ago

I translated groove to java class, this doesn't work for me.

public void testKeepWatch2() throws InterruptedException {
        Client client = Client.builder().endpoints("http://localhost:2379").build();
        Watch watch = client.getWatchClient();
        KV kv = client.getKVClient();
        ByteSequence key = ByteSequence.from(UUID.randomUUID().toString(), StandardCharsets.UTF_8);

        watch.watch(
                key,
                WatchOption.newBuilder().withProgressNotify(true).withPrevKV(true).build(),
                new Watch.Listener() {
                    @Override
                    public void onNext(WatchResponse response) {
                        log.info("onNext");
                    }

                    @Override
                    public void onError(Throwable throwable) {
                        log.error("onError - something wrong: {}", throwable.getMessage(), throwable);
                    }

                    @Override
                    public void onCompleted() {
                        log.debug("onCompleted");
                    }
                }
        );

        for(int i=0;i<99;i++){
            kv.put(key, ByteSequence.from(UUID.randomUUID().toString(), StandardCharsets.UTF_8));
            log.debug("put [{}]times",i+1);
            Thread.sleep(1000);
        }
    }

logging output without any listener's event:

2020-11-24 11:22:50,669 DEBUG [com.bjinfotech.practice.etcd.EtcdUtilTest]:116 - <put [1]times>
2020-11-24 11:22:51,674 DEBUG [com.bjinfotech.practice.etcd.EtcdUtilTest]:116 - <put [2]times>
2020-11-24 11:22:52,674 DEBUG [com.bjinfotech.practice.etcd.EtcdUtilTest]:116 - <put [3]times>
2020-11-24 11:22:53,674 DEBUG [com.bjinfotech.practice.etcd.EtcdUtilTest]:116 - <put [4]times>
2020-11-24 11:22:54,674 DEBUG [com.bjinfotech.practice.etcd.EtcdUtilTest]:116 - <put [5]times>
2020-11-24 11:22:55,674 DEBUG [com.bjinfotech.practice.etcd.EtcdUtilTest]:116 - <put [6]times>
2020-11-24 11:22:56,674 DEBUG [com.bjinfotech.practice.etcd.EtcdUtilTest]:116 - <put [7]times>
2020-11-24 11:22:57,674 DEBUG [com.bjinfotech.practice.etcd.EtcdUtilTest]:116 - <put [8]times>
2020-11-24 11:22:58,674 DEBUG [com.bjinfotech.practice.etcd.EtcdUtilTest]:116 - <put [9]times>
....

etcd server output:

2020-11-24 11:22:51.602118 D | etcdserver/api/v3rpc: start time = 2020-11-24 11:22:51.6021187 +0800 CST m=+1093.828563401, time spent = 0s, remote = 127.0.0.1:54043, response type = /etcdserverpb.KV/Put, request count = 1, request size = 76, response count = 0, response size = 28, request content = key:"c291e080-825b-4990-a2f1-f1666d1d7cd8" value_size:36
2020-11-24 11:22:51.722125 D | etcdserver/api/v3rpc: start time = 2020-11-24 11:22:51.677123 +0800 CST m=+1093.903567701, time spent = 45.0026ms, remote = 127.0.0.1:54043, response type = /etcdserverpb.KV/Put, request count = 1, request size = 76, response count = 0, response size = 28, request content = key:"c291e080-825b-4990-a2f1-f1666d1d7cd8" value_size:36
2020-11-24 11:22:52.678180 D | etcdserver/api/v3rpc: start time = 2020-11-24 11:22:52.6781803 +0800 CST m=+1094.904625001, time spent = 0s, remote = 127.0.0.1:54043, response type = /etcdserverpb.KV/Put, request count = 1, request size = 76, response count = 0, response size = 28, request content = key:"c291e080-825b-4990-a2f1-f1666d1d7cd8" value_size:36
2020-11-24 11:22:53.677237 D | etcdserver/api/v3rpc: start time = 2020-11-24 11:22:53.6762373 +0800 CST m=+1095.902682001, time spent = 1.0001ms, remote = 127.0.0.1:54043, response type = /etcdserverpb.KV/Put, request count = 1, request size = 76, response count = 0, response size = 28, request content = key:"c291e080-825b-4990-a2f1-f1666d1d7cd8" value_size:36
2020-11-24 11:22:54.678294 D | etcdserver/api/v3rpc: start time = 2020-11-24 11:22:54.6782947 +0800 CST m=+1096.904739401, time spent = 0s, remote = 127.0.0.1:54043, response type = /etcdserverpb.KV/Put, request count = 1, request size = 76, response count = 0, response size = 28, request content = key:"c291e080-825b-4990-a2f1-f1666d1d7cd8" value_size:36
2020-11-24 11:22:55.677351 D | etcdserver/api/v3rpc: start time = 2020-11-24 11:22:55.6773518 +0800 CST m=+1097.903796501, time spent = 0s, remote = 127.0.0.1:54043, response type = /etcdserverpb.KV/Put, request count = 1, request size = 76, response count = 0, response size = 28, request content = key:"c291e080-825b-4990-a2f1-f1666d1d7cd8" value_size:36
2020-11-24 11:22:56.678409 D | etcdserver/api/v3rpc: start time = 2020-11-24 11:22:56.6784091 +0800 CST m=+1098.904853801, time spent = 0s, remote = 127.0.0.1:54043, response type = /etcdserverpb.KV/Put, request count = 1, request size = 76, response count = 0, response size = 28, request content = key:"c291e080-825b-4990-a2f1-f1666d1d7cd8" value_size:36

Btw, which etcdServer version are your? I tried 3.3.25 and 3.4.13.

lburgazzoli commented 3 years ago

I've tested with both 3.3 and 3.4, command line to run it is:

 docker run --rm -ti --name etcd \
    --publish 4001:4001 \
    --publish 2380:2380 \
    --publish 2379:2379 \
    gcr.io/etcd-development/etcd:v3.4.13 \
        etcd \
            -debug \
            -name etcdv3 \
            -advertise-client-urls http://127.0.0.1:2379,http://127.0.0.1:4001 \
            -listen-client-urls http://0.0.0.0:2379,http://0.0.0.0:4001 \
            -initial-advertise-peer-urls http://127.0.0.1:2380

Then you can try my example with jbang using a gist:

jbang https://gist.github.com/lburgazzoli/4398aef11cc2880daf4ad05a4dbe8f58

And it does work for me, I suspect you have some issue in how you start etcd

cleverpig commented 3 years ago

I've tested with both 3.3 and 3.4, command line to run it is:

 docker run --rm -ti --name etcd \
    --publish 4001:4001 \
    --publish 2380:2380 \
    --publish 2379:2379 \
    gcr.io/etcd-development/etcd:v3.4.13 \
        etcd \
            -debug \
            -name etcdv3 \
            -advertise-client-urls http://127.0.0.1:2379,http://127.0.0.1:4001 \
            -listen-client-urls http://0.0.0.0:2379,http://0.0.0.0:4001 \
            -initial-advertise-peer-urls http://127.0.0.1:2380

Then you can try my example with jbang using a gist:

jbang https://gist.github.com/lburgazzoli/4398aef11cc2880daf4ad05a4dbe8f58

And it does work for me, I suspect you have some issue in how you start etcd

Jbang is ok. But I change to the maven build with your code and etcd config, it doesn't work too...

lburgazzoli commented 3 years ago

So you have something wrong on your set-up.

Did you try to use the exact same dependencies I've used for jbang/groovy ? I've noticed that you are using some old version of jetcd and slf4j.

github-actions[bot] commented 3 years ago

This issue is stale because it has been open 60 days with no activity. Remove stale label or comment or this will be closed in 7 days.

SadykovR commented 2 years ago

I have the same issue. I tried to put and get values and it works well, but Watch doesnt works - Watch.Listener doesn't fires when I put a value. I use 0.5.0 version. P.S. Also tried to add Watch and test it using etcdctl tool on the same server - works fine.