confluentinc / librdkafka

The Apache Kafka C/C++ library
Other
278 stars 3.15k forks source link

Unable to receive acks after enable.idempotence is set to true #2797

Closed zhaowq32 closed 4 years ago

zhaowq32 commented 4 years ago

Description

Unable to receive acks after enable.idempotence is set to true. Disabling idempotence can receive acks normally.

How to reproduce

<your steps how to reproduce goes here, or remove section if not relevant>

IMPORTANT: Always try to reproduce the issue on the latest released version (see https://github.com/edenhill/librdkafka/releases), if it can't be reproduced on the latest version the issue has been fixed.

Checklist

IMPORTANT: We will close issues where the checklist has not been completed.

Please provide the following information:

Producer global config

builtin.features        gzip,snappy,ssl,sasl,regex,lz4,sasl_gssapi,sasl_plain,sasl_scram,plugins,sasl_oauthbearer
client.id       rdkafka
metadata.broker.list    10.124.142.168:9094,10.124.142.168:9093,10.124.142.168:9092
message.max.bytes       10000000
message.copy.max.bytes  65535
receive.message.max.bytes       100000000
max.in.flight.requests.per.connection   5
metadata.request.timeout.ms     60000
topic.metadata.refresh.interval.ms      300000
metadata.max.age.ms     900000
topic.metadata.refresh.fast.interval.ms 250
topic.metadata.refresh.fast.cnt 10
topic.metadata.refresh.sparse   true
debug   generic,broker,topic,metadata,feature,queue,msg,protocol,cgrp,security,fetch,interceptor,plugin,consumer,admin,eos,mock,all
socket.timeout.ms       60000
socket.blocking.max.ms  1000
socket.send.buffer.bytes        0
socket.receive.buffer.bytes     0
socket.keepalive.enable false
socket.nagle.disable    false
socket.max.fails        1
broker.address.ttl      1000
broker.address.family   any
enable.sparse.connections       true
reconnect.backoff.jitter.ms     0
reconnect.backoff.ms    100
reconnect.backoff.max.ms        10000
statistics.interval.ms  0
enabled_events  0
log_cb  0x101da770
log_level       6
log.queue       false
log.thread.name true
log.connection.close    true
socket_cb       0x101e7448
open_cb 0x101daf80
default_topic_conf      0x100011c8b90
internal.termination.signal     0
api.version.request     true
api.version.request.timeout.ms  10000
api.version.fallback.ms 0
broker.version.fallback 0.11.0
security.protocol       sasl_plaintext
enable.ssl.certificate.verification     true
sasl.mechanisms SCRAM-SHA-256
sasl.kerberos.service.name      kafka
sasl.kerberos.principal kafkaclient
sasl.kerberos.kinit.cmd kinit -R -t "%{sasl.kerberos.keytab}" -k %{sasl.kerberos.principal} || kinit -t "%{sasl.kerberos.keytab}" -k %{sasl.kerberos.principal}
sasl.kerberos.min.time.before.relogin   60000
sasl.username   lida
sasl.password   lida
enable.sasl.oauthbearer.unsecure.jwt    false
test.mock.num.brokers   0
partition.assignment.strategy   range,roundrobin
session.timeout.ms      10000
heartbeat.interval.ms   3000
group.protocol.type     consumer
coordinator.query.interval.ms   600000
max.poll.interval.ms    300000
enable.auto.commit      true
auto.commit.interval.ms 5000
enable.auto.offset.store        true
queued.min.messages     100000
queued.max.messages.kbytes      1048576
fetch.wait.max.ms       100
fetch.message.max.bytes 1048576
fetch.max.bytes 52428800
fetch.min.bytes 1
fetch.error.backoff.ms  500
offset.store.method     broker
isolation.level read_committed
enable.partition.eof    false
check.crcs      false
client.rack
enable.idempotence      true
enable.gapless.guarantee        false
queue.buffering.max.messages    100000
queue.buffering.max.kbytes      1048576
queue.buffering.max.ms  0.5
message.send.max.retries        3
retry.backoff.ms        100
queue.buffering.backpressure.threshold  1
compression.codec       none
batch.num.messages      10000
delivery.report.only.error      false
dr_msg_cb       0x101d6510

Producer topic config

request.required.acks   -1
request.timeout.ms      5000
message.timeout.ms      300000
queuing.strategy        fifo
produce.offset.report   false
partitioner     consistent_random
compression.codec       inherit
compression.level       -1
auto.commit.enable      true
auto.commit.interval.ms 60000
auto.offset.reset       largest
offset.store.path       .
offset.store.sync.interval.ms   -1
offset.store.method     broker
consume.callback.max.messages   0

broker config

advertised.listeners=SASL_PLAINTEXT://10.124.142.168:9092
security.inter.broker.protocol=SASL_PLAINTEXT
sasl.mechanism.inter.broker.protocol=SCRAM-SHA-256
sasl.enabled.mechanisms=SCRAM-SHA-256
allow.everyone.if.no.acl.found=false
super.users=User:admin
authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer
zhaowq32 commented 4 years ago

What caused the error 'Broker: Cluster authorization failed'?

zhaowq32 commented 4 years ago

Using the admin user solved the problem