Closed theferrit32 closed 1 year ago
For clinvar-raw, each message received requires reading gigabytes of data out of google cloud storage and processing it, so each poll()
loop is relatively long, and it is sometimes exceeding the timeout on the consumer's membership in the consumer group (default is 5 minutes, configurable with max.poll.interval.ms
).
If we set max.poll.records
to 1 on the consumer, this might be fine. Normally wouldn't for consumer performance reasons, but here the bottleneck isn't consuming, it's the work done for each message consumed.
https://kafka.apache.org/documentation/#consumerconfigs_max.poll.records https://kafka.apache.org/documentation/#consumerconfigs_max.poll.interval.ms
I'm seeing this timeline:
poll()
callpoll()
again, which tries to commit the prior offset of 125The time to produce the 2019-07-01
release was roughly 11 minutes. Other releases vary depending on how much activity happened in clinvar between it and the prior release. (a quick scan shows them varying between 10s of seconds and 10 minutes)
22-08-18 16:15:09 clinvar-raw-6546979d9f-rzz7s INFO [clinvar-raw.stream:208] - Received drop message: {:serialized-key-size -1, :key nil, :offset 0, :value "{\"release_date\": \"2019-07-01\", …
22-08-18 16:25:56 clinvar-raw-6546979d9f-rzz7s INFO [clinvar-raw.stream:208] - Received drop message: {:serialized-key-size -1, :key nil, :offset 1, :value "{\"release_date\": \"2019-07-31\", …
Previously we weren't hitting this because the consumed messages were put onto an async channel and the main poll loop continued polling, keeping the consumer in the group. Now processing is done synchronously in the main poll loop.
There doesn't seem to be any downside to setting max.poll.interval.ms
much higher, like an hour. There is an additional timeout called the heartbeat, which is an internal thread in the KafkaConsumer that lets the broker know that the consumer object is still existing and connecting. So if the consumer object stops sending those, it will still successfully be removed from the consumer group by the broker relatively quickly (session.timeout.ms default: 10 seconds)
See note in the changelog:
The default values for two configurations of the StreamsConfig class were changed to improve the resiliency of Kafka Streams applications. The internal Kafka Streams producer retries default value was changed from 0 to 10. The internal Kafka Streams consumer max.poll.interval.ms default value was changed from 300000 to Integer.MAX_VALUE.
https://kafka.apache.org/documentation/#upgrade_10201_notable
I thought this was resolved but after adding the deduplication logic, it is taking significantly longer to process particularly the initial release. It is over an hour. This is explainable by the additional ~3-4ms to do both a read and write to rocksdb, multiplied over millions of records. Reducing the amount of data stored in rocksdb might help some, but not by a large factor.
Filtered logs: logs-20220829-130103.txt
Last run through the 2019-07-01
release in GKE with 75GB pd-ssd and no cpu caps took 2h 42m. (edit: note the application is single-threaded but there are some threads started by library objects. Both KafkaConsumer and KafkaProducer use background threads)
Next release of 2019-07-31
took ~5m.
2019-07-01 start: 2022-08-29 20:26:51.781 EDT 2019-07-31 start: 2022-08-29 22:48:52.491 EDT 2019-09-02 start: 2022-08-29 22:53:24.089 EDT
This has been resolved with by tinkering with the poll batch size and timeout. This is a little hacky, and perhaps we should move to the clinvar-raw service managing its own offsets instead of relying on the broker-side consumer group management. Will need to think about how to persist this info and what the lifecycle of that storage volume needs to be.
Relevant gke log: query:
jump to time:
Entries:
[main] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-1, groupId=clinvar-raw-dev] Failing OffsetCommit request since the consumer is not part of an active group
[main] WARN org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-1, groupId=clinvar-raw-dev] Synchronous auto-commit of offsets {broad-dsp-clinvar-0=OffsetAndMetadata{offset=127, leaderEpoch=0, metadata=''}} failed: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing max.poll.interval.ms or by reducing the maximum size of batches returned in poll() with max.poll.records.
[main] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-1, groupId=clinvar-raw-dev] Revoking previously assigned partitions [broad-dsp-clinvar-0]
[main] INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-1, groupId=clinvar-raw-dev] (Re-)joining group
[main] INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-1, groupId=clinvar-raw-dev] Group coordinator b19-pkc-4yyd6.us-east1.gcp.confluent.cloud:9092 (id: 2147483628 rack: null) is unavailable or invalid, will attempt rediscovery
[main] INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-1, groupId=clinvar-raw-dev] (Re-)joining group
[main] INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-1, groupId=clinvar-raw-dev] (Re-)joining group
[main] INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-1, groupId=clinvar-raw-dev] Successfully joined group with generation 11
[main] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-1, groupId=clinvar-raw-dev] Setting newly assigned partitions: broad-dsp-clinvar-0
[main] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-1, groupId=clinvar-raw-dev] Setting offset for partition broad-dsp-clinvar-0 to the committed offset FetchPosition{offset=0, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=b15-pkc-4yyd6.us-east1.gcp.confluent.cloud:9092 (id: 15 rack: 0), epoch=0}}