fede1024 / rust-rdkafka

A fully asynchronous, futures-based Kafka client library for Rust based on librdkafka
MIT License
1.63k stars 280 forks source link

Dropping `Consumer` when using `enable.auto.commit` and Kafka is disconnected hangs for `session.timeout.ms` #611

Open slinkydeveloper opened 1 year ago

slinkydeveloper commented 1 year ago

As the title says, when using enable.auto.commit and Kafka is disconnected (e.g. kafka crashed), drop(Consumer) will block for around the same time of session.timeout.ms. When enable.auto.commit is disabled, and I commit offsets synchronously, this behavior doesn't reproduce.

This hangs my application for some time while shutting down.

Reproducer

I have attached a reproducer, taken from the simple_consumer example: simple_consumer.zip. To reproduce the behavior:

# Start Kafka container
podman run -p 8082:8082 -p 9092:9092 -p 9101:9101 --rm confluentinc/confluent-local

# Create topic
http localhost:8082/v3/clusters/$(http http://localhost:8082/v3/clusters/ | jq '.data[0].cluster_id' -r)/topics topic_name=my-topic

# Send some records
echo '{"records":[{"key":"jsmith","value":"alarm clock"},{"key":"htanaka","value":"batteries"},{"key":"awalther","value":"bookshelves"}]}' | http POST localhost:8082/topics/my-topic 'Content-Type: application/vnd.kafka.json.v2+json'

You'll observe that it takes roughly the same amount of time of session.timeout.ms to drop the consumer.

Sample logs:

15:00:39.524 (t: main) INFO - simple_consumer - rd_kafka_version: 0x020200ff, 2.2.0
15:00:39.535 (t: main) INFO - simple_consumer - Pre rebalance Assign(TPL {my-topic/0: offset=Invalid metadata="", error=Ok(())})
15:00:39.536 (t: main) INFO - simple_consumer - Post rebalance Assign(TPL {my-topic/0: offset=Invalid metadata="", error=Ok(())})
15:01:06.446 (t: main) INFO - simple_consumer - key: 'Some([34, 106, 115, 109, 105, 116, 104, 34])', payload: '"alarm clock"', topic: my-topic, partition: 0, offset: 3, timestamp: CreateTime(1695646866443)
15:01:06.448 (t: main) INFO - simple_consumer - key: 'Some([34, 104, 116, 97, 110, 97, 107, 97, 34])', payload: '"batteries"', topic: my-topic, partition: 0, offset: 4, timestamp: CreateTime(1695646866444)
15:01:06.448 (t: main) INFO - simple_consumer - key: 'Some([34, 97, 119, 97, 108, 116, 104, 101, 114, 34])', payload: '"bookshelves"', topic: my-topic, partition: 0, offset: 5, timestamp: CreateTime(1695646866444)
15:01:09.526 (t: main) INFO - simple_consumer - Committing offsets: Ok(())
15:01:27.688 (t: unknown) INFO - librdkafka - librdkafka: FAIL [thrd:GroupCoordinator]: GroupCoordinator: localhost:9092: Disconnected (after 48159ms in state UP)
15:01:27.688 (t: main) ERROR - rdkafka::client - librdkafka: Global error: BrokerTransportFailure (Local: Broker transport failure): GroupCoordinator: localhost:9092: Disconnected (after 48159ms in state UP)
15:01:27.688 (t: unknown) INFO - librdkafka - librdkafka: FAIL [thrd:localhost:9092/bootstrap]: localhost:9092/1: Disconnected (after 48161ms in state UP)
15:01:27.689 (t: main) ERROR - rdkafka::client - librdkafka: Global error: BrokerTransportFailure (Local: Broker transport failure): localhost:9092/1: Disconnected (after 48161ms in state UP)
15:01:27.689 (t: main) ERROR - rdkafka::client - librdkafka: Global error: AllBrokersDown (Local: All broker connections are down): 2/2 brokers are down
15:01:27.706 (t: unknown) INFO - librdkafka - librdkafka: FAIL [thrd:GroupCoordinator]: GroupCoordinator: localhost:9092: Disconnected (after 16ms in state APIVERSION_QUERY, 1 identical error(s) suppressed)
15:01:27.706 (t: unknown) INFO - librdkafka - librdkafka: FAIL [thrd:localhost:9092/bootstrap]: localhost:9092/1: Disconnected while requesting ApiVersion: might be caused by incorrect security.protocol configuration (connecting to a SSL listener?) or broker version is < 0.10 (see api.version.request) (after 16ms in state APIVERSION_QUERY)
15:01:27.706 (t: main) ERROR - rdkafka::client - librdkafka: Global error: BrokerTransportFailure (Local: Broker transport failure): GroupCoordinator: localhost:9092: Disconnected (after 16ms in state APIVERSION_QUERY, 1 identical error(s) suppressed)
15:01:27.706 (t: main) ERROR - rdkafka::client - librdkafka: Global error: BrokerTransportFailure (Local: Broker transport failure): localhost:9092/1: Disconnected while requesting ApiVersion: might be caused by incorrect security.protocol configuration (connecting to a SSL listener?) or broker version is < 0.10 (see api.version.request) (after 16ms in state APIVERSION_QUERY)
15:01:27.782 (t: unknown) ERROR - librdkafka - librdkafka: FAIL [thrd:GroupCoordinator]: GroupCoordinator: localhost:9092: Connect to ipv4#127.0.0.1:9092 failed: Connection refused (after 0ms in state CONNECT)
15:01:27.782 (t: main) ERROR - rdkafka::client - librdkafka: Global error: BrokerTransportFailure (Local: Broker transport failure): GroupCoordinator: localhost:9092: Connect to ipv4#127.0.0.1:9092 failed: Connection refused (after 0ms in state CONNECT)
15:01:27.828 (t: unknown) ERROR - librdkafka - librdkafka: FAIL [thrd:localhost:9092/bootstrap]: localhost:9092/1: Connect to ipv4#127.0.0.1:9092 failed: Connection refused (after 0ms in state CONNECT)
15:01:27.828 (t: main) ERROR - rdkafka::client - librdkafka: Global error: BrokerTransportFailure (Local: Broker transport failure): localhost:9092/1: Connect to ipv4#127.0.0.1:9092 failed: Connection refused (after 0ms in state CONNECT)
15:01:28.346 (t: unknown) ERROR - librdkafka - librdkafka: FAIL [thrd:localhost:9092/bootstrap]: localhost:9092/1: Connect to ipv6#[::1]:9092 failed: Connection refused (after 0ms in state CONNECT)
15:01:28.346 (t: main) ERROR - rdkafka::client - librdkafka: Global error: BrokerTransportFailure (Local: Broker transport failure): localhost:9092/1: Connect to ipv6#[::1]:9092 failed: Connection refused (after 0ms in state CONNECT)
15:01:28.362 (t: unknown) ERROR - librdkafka - librdkafka: FAIL [thrd:GroupCoordinator]: GroupCoordinator: localhost:9092: Connect to ipv6#[::1]:9092 failed: Connection refused (after 0ms in state CONNECT)
15:01:28.362 (t: main) ERROR - rdkafka::client - librdkafka: Global error: BrokerTransportFailure (Local: Broker transport failure): GroupCoordinator: localhost:9092: Connect to ipv6#[::1]:9092 failed: Connection refused (after 0ms in state CONNECT)
15:01:29.190 (t: unknown) ERROR - librdkafka - librdkafka: FAIL [thrd:localhost:9092/bootstrap]: localhost:9092/1: Connect to ipv4#127.0.0.1:9092 failed: Connection refused (after 0ms in state CONNECT)
15:01:29.190 (t: main) ERROR - rdkafka::client - librdkafka: Global error: BrokerTransportFailure (Local: Broker transport failure): localhost:9092/1: Connect to ipv4#127.0.0.1:9092 failed: Connection refused (after 0ms in state CONNECT)
15:01:29.445 (t: unknown) ERROR - librdkafka - librdkafka: FAIL [thrd:GroupCoordinator]: GroupCoordinator: localhost:9092: Connect to ipv4#127.0.0.1:9092 failed: Connection refused (after 0ms in state CONNECT)
15:01:29.445 (t: main) ERROR - rdkafka::client - librdkafka: Global error: BrokerTransportFailure (Local: Broker transport failure): GroupCoordinator: localhost:9092: Connect to ipv4#127.0.0.1:9092 failed: Connection refused (after 0ms in state CONNECT)
^C15:01:31.481 (t: main) INFO - simple_consumer - Going to drop consumer, time SystemTime { tv_sec: 1695646891, tv_nsec: 481807324 }
15:01:31.481 (t: main) INFO - simple_consumer - Pre rebalance Revoke(TPL {my-topic/0: offset=Invalid metadata="", error=Ok(())})
15:01:31.481 (t: main) INFO - simple_consumer - Post rebalance Revoke(TPL {my-topic/0: offset=Invalid metadata="", error=Ok(())})
15:01:33.226 (t: unknown) ERROR - librdkafka - librdkafka: FAIL [thrd:localhost:9092/bootstrap]: localhost:9092/1: Connect to ipv6#[::1]:9092 failed: Connection refused (after 0ms in state CONNECT)
15:01:33.226 (t: main) ERROR - rdkafka::client - librdkafka: Global error: BrokerTransportFailure (Local: Broker transport failure): localhost:9092/1: Connect to ipv6#[::1]:9092 failed: Connection refused (after 0ms in state CONNECT)
15:01:33.897 (t: unknown) ERROR - librdkafka - librdkafka: FAIL [thrd:GroupCoordinator]: GroupCoordinator: localhost:9092: Connect to ipv6#[::1]:9092 failed: Connection refused (after 0ms in state CONNECT)
15:01:33.897 (t: main) ERROR - rdkafka::client - librdkafka: Global error: BrokerTransportFailure (Local: Broker transport failure): GroupCoordinator: localhost:9092: Connect to ipv6#[::1]:9092 failed: Connection refused (after 0ms in state CONNECT)
15:01:37.523 (t: main) INFO - simple_consumer - Committing offsets: Err(KafkaError (Consumer commit error: WaitingForCoordinator (Local: Waiting for coordinator)))
15:01:37.524 (t: main) INFO - simple_consumer - Dropped consumer, time SystemTime { tv_sec: 1695646897, tv_nsec: 524233308 }
slinkydeveloper commented 1 year ago

This seems somewhat related to https://github.com/fede1024/rust-rdkafka/issues/509 and https://github.com/fede1024/rust-rdkafka/issues/453 in particular.

slinkydeveloper commented 1 year ago

Apparently even disabling enable.auto.commit and manually committing offsets blocks the drop(Consumer) operation: https://github.com/fede1024/rust-rdkafka/issues/597#issuecomment-1733920186

xulai1001 commented 5 months ago

I am new to rdkafka, and this issue pops up while I was trying on a minimum example of BaseConsumer.

For test purpose, I want to (1) read from earliest; (2) disable auto commit, and not commiting messages at all.

After a Ctrl-C to terminate the program, I must wait for (session.timeout.ms) seconds to re-run, or the consumer won't work. The code is here: (really simple)

https://pastebin.com/b9iN18Hw