adevinta / zoe

The Kafka CLI for humans
https://adevinta.github.io/zoe
MIT License
287 stars 21 forks source link

Consume silently return nothing #27

Closed Hubbitus closed 3 years ago

Hubbitus commented 3 years ago

I've configure zoe initially and list shown my topic:

 zoe -v topics list | jq -r '.[]' | grep datahub.datafactory.cdm.v2.Location

2021-03-22 21:42:08 INFO zoe: loading config from url : file:/home/pasha/.zoe/config/default.yml
2021-03-22 21:42:09 INFO zoe: requesting topics...
2021-03-22 21:42:10 INFO AppInfoParser: Kafka version: 5.5.0-ccs
2021-03-22 21:42:10 INFO AppInfoParser: Kafka commitId: 785a156634af5f7e
2021-03-22 21:42:10 INFO AppInfoParser: Kafka startTimeMs: 1616438530085
2021-03-22 21:43:13 WARN zoe: unexpected error on config describe request
java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.TimeoutException: Call(callName=describeConfigs, deadlineMs=1616438593962) timed out at 1616438593963 after 2 attempt(s)
        at org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45)
        at org.apache.kafka.common.internals.KafkaFutureImpl.access$000(KafkaFutureImpl.java:32)
        at org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:89)
        at org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:260)
        at com.adevinta.oss.zoe.core.functions.AdminKt$$special$$inlined$zoeFunction$1.execute(base.kt:65)
        at com.adevinta.oss.zoe.core.functions.JsonZoeFunction$DefaultImpls.execute(base.kt:32)
        at com.adevinta.oss.zoe.core.functions.AdminKt$$special$$inlined$zoeFunction$1.execute(base.kt:40)
        at com.adevinta.oss.zoe.core.FunctionsRegistry.call(main.kt:95)
        at com.adevinta.oss.zoe.core.FunctionsRegistry.call(main.kt:99)
        at com.adevinta.oss.zoe.service.runners.LocalZoeRunner$launch$2.get(local.kt:22)
        at com.adevinta.oss.zoe.service.runners.LocalZoeRunner$launch$2.get(local.kt:17)
        at java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(Unknown Source)
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
        at java.base/java.lang.Thread.run(Unknown Source)
Caused by: org.apache.kafka.common.errors.TimeoutException: Call(callName=describeConfigs, deadlineMs=1616438593962) timed out at 1616438593963 after 2 attempt(s)
Caused by: org.apache.kafka.common.errors.DisconnectException: Cancelled describeConfigs request with correlation id 11 due to node 431 being disconnected
router.test.datahub.datafactory.cdm.v2.Location
datahub.datafactory.cdm.v2.Location.Test17
stg_datahub.datafactory.cdm.v2.Location
datahub.datafactory.cdm.v2.Location.Test2New
datahub.datafactory.cdm.v2.Location.Test123
datahub.datafactory.cdm.v2.Location.Test1
datahub.datafactory.cdm.v2.Location.Test
datahub.datafactory.cdm.v2.Location
datahub.datafactory.cdm.v2.Location.Test3122021
datahub.datafactory.cdm.v2.Location11
datahub.datafactory.cdm.v2.LocationHierarchy
datahub.datafactory.cdm.v2.Location_testAS_2
datahub.datafactory.cdm.v2.Location-test-topic

But consuming topics return nothing:

$ zoe -v topics consume locations -n 10   
2021-03-22 21:44:00 INFO zoe: loading config from url : file:/home/pasha/.zoe/config/default.yml
2021-03-22 21:44:01 INFO zoe: querying offsets...
2021-03-22 21:44:01 INFO ConsumerConfig: ConsumerConfig values: 
        allow.auto.create.topics = true
        auto.commit.interval.ms = 5000
        auto.offset.reset = latest
        bootstrap.servers = [PLAINTEXT://ecsc00a060af.epam.com:9092]
        check.crcs = true
        client.dns.lookup = default
        client.id = 
        client.rack = 
        connections.max.idle.ms = 540000
        default.api.timeout.ms = 60000
        enable.auto.commit = true
        exclude.internal.topics = true
        fetch.max.bytes = 52428800
        fetch.max.wait.ms = 500
        fetch.min.bytes = 1
        group.id = null
        group.instance.id = null
        heartbeat.interval.ms = 3000
        interceptor.classes = []
        internal.leave.group.on.close = true
        isolation.level = read_uncommitted
        key.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
        max.partition.fetch.bytes = 1048576
        max.poll.interval.ms = 300000
        max.poll.records = 500
        metadata.max.age.ms = 300000
        metric.reporters = []
        metrics.num.samples = 2
        metrics.recording.level = INFO
        metrics.sample.window.ms = 30000
        partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]
        receive.buffer.bytes = 65536
        reconnect.backoff.max.ms = 1000
        reconnect.backoff.ms = 50
        request.timeout.ms = 30000
        retry.backoff.ms = 100
        sasl.client.callback.handler.class = null
        sasl.jaas.config = null
        sasl.kerberos.kinit.cmd = /usr/bin/kinit
        sasl.kerberos.min.time.before.relogin = 60000
        sasl.kerberos.service.name = null
        sasl.kerberos.ticket.renew.jitter = 0.05
        sasl.kerberos.ticket.renew.window.factor = 0.8
        sasl.login.callback.handler.class = null
        sasl.login.class = null
        sasl.login.refresh.buffer.seconds = 300
        sasl.login.refresh.min.period.seconds = 60
        sasl.login.refresh.window.factor = 0.8
        sasl.login.refresh.window.jitter = 0.05
        sasl.mechanism = GSSAPI
        security.protocol = PLAINTEXT
        security.providers = null
        send.buffer.bytes = 131072
        session.timeout.ms = 10000
        ssl.cipher.suites = null
        ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
        ssl.endpoint.identification.algorithm = https
        ssl.key.password = null
        ssl.keymanager.algorithm = SunX509
        ssl.keystore.location = null
        ssl.keystore.password = null
        ssl.keystore.type = JKS
        ssl.protocol = TLS
        ssl.provider = null
        ssl.secure.random.implementation = null
        ssl.trustmanager.algorithm = PKIX
        ssl.truststore.location = null
        ssl.truststore.password = null
        ssl.truststore.type = JKS
        value.deserializer = class io.confluent.kafka.serializers.KafkaAvroDeserializer

2021-03-22 21:44:01 INFO KafkaAvroDeserializerConfig: KafkaAvroDeserializerConfig values: 
        bearer.auth.token = [hidden]
        proxy.port = -1
        schema.reflection = false
        auto.register.schemas = true
        max.schemas.per.subject = 1000
        basic.auth.credentials.source = URL
        specific.avro.reader = false
        value.subject.name.strategy = class io.confluent.kafka.serializers.subject.TopicNameStrategy
        schema.registry.url = [http://schema-registry-sbox.epm-eco.projects.epam.com:8081]
        basic.auth.user.info = [hidden]
        proxy.host = 
        schema.registry.basic.auth.user.info = [hidden]
        bearer.auth.credentials.source = STATIC_TOKEN
        key.subject.name.strategy = class io.confluent.kafka.serializers.subject.TopicNameStrategy

2021-03-22 21:44:01 INFO AppInfoParser: Kafka version: 5.5.0-ccs
2021-03-22 21:44:01 INFO AppInfoParser: Kafka commitId: 785a156634af5f7e
2021-03-22 21:44:01 INFO AppInfoParser: Kafka startTimeMs: 1616438641907
2021-03-22 21:44:03 INFO Metadata: [Consumer clientId=consumer-1, groupId=null] Cluster ID: Wu4DZL8VTT6QRqeEaoQcxg

Meantime I'm sure that is not empty:

kafka-run-class kafka.tools.GetOffsetShell --broker-list PLAINTEXT://ecsc00a060af.epam.com:9092,PLAINTEXT://ecsc00a060b0.epam.com:9092,PLAINTEXT://ecsc00a060b1.epam.com:9092,PLAINTEXT://ecsc00a060b2.epam.com:9092,PLAINTEXT://ecsc00a060b3.epam.com:9092 --topic datahub.datafactory.cdm.v2.Location

datahub.datafactory.cdm.v2.Location:0:410850

And confluent tools and kafcacat can read that.

My config:

$ cat ~/.zoe/config/default.yml 
---
runners:
  default: local
  config:
    lambda:
      nameSuffix: null
      deploy: null
      credentials:
        type: "default"
      awsRegion: null
      enabled: false
    kubernetes:
      namespace: "default"
      context: null
      deletePodAfterCompletion: true
      cpu: "1"
      memory: "512M"
      timeoutMs: 300000
      image:
        registry: "docker.io"
        image: "adevinta/zoe-core"
        tag: null
    local:
      enabled: true
storage: null
secrets: null
expressions: {}
clusters:
  default:
    registry: 'http://schema-registry-sbox.epm-eco.projects.epam.com:8081'
    props:
      bootstrap.servers: "PLAINTEXT://ecsc00a060af.epam.com:9092"
      schema.registry.url: 'http://schema-registry-sbox.epm-eco.projects.epam.com:8081'
      key.deserializer: "org.apache.kafka.common.serialization.StringDeserializer"
      # https://adevinta.github.io/zoe/advanced/avro/
      value.deserializer: io.confluent.kafka.serializers.KafkaAvroDeserializer
#      key.serializer: "org.apache.kafka.common.serialization.StringSerializer"
#      value.serializer: "org.apache.kafka.common.serialization.ByteArraySerializer"
    topics:
      locations:
        name: datahub.datafactory.cdm.v2.Location
        subject: datahub.datafactory.cdm.v2.Location
        propsOverride: {}
    groups: {}
wlezzar commented 3 years ago

Hi @Hubbitus . Thanks for raising the issue!

I have few questions to help me investigate the issue:

Thanks again for your help : )

wlezzar commented 3 years ago

I will wait until your confirmation & answers to my above questions before closing this issue.

wlezzar commented 3 years ago

Closing the issue as my guess is that it's related to the counter intuitive behavior of zoe described above and has been fixed in 0.27.0. Please try again with this version and if the problem persists, let's reopen the issue : )