Blizzard / node-rdkafka

Node.js bindings for librdkafka
MIT License
2.08k stars 391 forks source link

Consumer is picking messages from single partition after adding 3 hours of delayed in consuming. #1045

Open sanketdhoble opened 9 months ago

sanketdhoble commented 9 months ago

Environment Information

this.consumer = <any>new Kafka.KafkaConsumer(
    {
        'group.id': consumerConfig.defaultGroupId + "_" + topics.join("_"),
        'client.id': consumerConfig.defaultClientId + "_" + topics.join("_"),
        'metadata.broker.list': consumerConfig.brokerList,
        'compression.codec': consumerConfig.compressionCodec,
        'retry.backoff.ms': 100,
        'socket.keepalive.enable': true,
        'enable.auto.commit': this.autoCommit,
        'statistics.interval.ms': 5000,
        // 'queued.min.messages': 10,
        // 'fetch.message.max.bytes': 30,
        // 'queued.max.messages.kbytes': 10 // didn't work
        debug: 'all'
    },
    {
        'auto.offset.reset': consumerConfig.autoOffsetReset,
        'request.required.acks': consumerConfig.requestRequiredAcks
    }
);

Topic: profile_updates Partitions: 10

We are trying to add a mechanism where we produce msgs at 5-6 qps but we want to consume those msgs after 3 hours of delay since produce. We consume in a batch of 90 messages. Each msg of ~150 bytes. In the first batch, we checked the timestamp and add delay of 3 hrs - (Date.now()- msg_produce_timestamp) i.e slept for ~3 hours. We are using manual commit (autoCommit: false).

Now the issue with this workflow is, after waking up from sleep, the consumer is picking all 90 msgs from partition:0 only. As a result, lag keeps increasing in the other 9 partitions. Is there any way/configuration to optimize this flow so that we could consume from each partition equally and hence the lag will be the same in all partitions?

sanketdhoble commented 9 months ago

As a work around we played with

'queued.min.messages': 9,
'fetch.message.max.bytes': 1800,

But still 3-4 partitions are running slow compared to the rest.

OPIC                          PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG        CONSUMER-ID                                       HOST                           CLIENT-ID
profile_updates                0          42169083        42199150        30067      kafka-client_profile_updates-147f7ff1-a1bd-4a43-b1a7-3ffb38f9cb1a/IP                kafka-client_profile_updates
profile_updates                1          42245275        42249372        4097       kafka-client_profile_updates-147f7ff1-a1bd-4a43-b1a7-3ffb38f9cb1a/IP                kafka-client_profile_updates
profile_updates                2          41997395        42005654        8259       kafka-client_profile_updates-147f7ff1-a1bd-4a43-b1a7-3ffb38f9cb1a/IP                kafka-client_profile_updates
profile_updates                3          41627552        41657073        29521      kafka-client_profile_updates-147f7ff1-a1bd-4a43-b1a7-3ffb38f9cb1a/IP                kafka-client_profile_updates
profile_updates                4          42614473        42618613        4140       kafka-client_profile_updates-147f7ff1-a1bd-4a43-b1a7-3ffb38f9cb1a/IP                kafka-client_profile_updates
profile_updates                5          41785716        41793027        7311       kafka-client_profile_updates-147f7ff1-a1bd-4a43-b1a7-3ffb38f9cb1a/IP                kafka-client_profile_updates
profile_updates                6          42351924        42381599        29675      kafka-client_profile_updates-147f7ff1-a1bd-4a43-b1a7-3ffb38f9cb1a/IP                kafka-client_profile_updates
profile_updates                7          41933727        41937595        3868       kafka-client_profile_updates-147f7ff1-a1bd-4a43-b1a7-3ffb38f9cb1a/IP                kafka-client_profile_updates
profile_updates                8          41898170        41906114        7944       kafka-client_profile_updates-147f7ff1-a1bd-4a43-b1a7-3ffb38f9cb1a/IP                kafka-client_profile_updates
profile_updates                9          41898250        41927516        29266      kafka-client_profile_updates-147f7ff1-a1bd-4a43-b1a7-3ffb38f9cb1a/IP                kafka-client_profile_updates