jet / FsKafka

Minimal F# wrappers for Confluent.Kafka+librdkafka.redist 1.x
https://github.com/jet/dotnet-templates
Apache License 2.0
88 stars 17 forks source link

Make sure that offsets are not GC-ed if consumer experience no traffic for long time #11

Closed vchekan closed 4 years ago

vchekan commented 6 years ago

Description

@chrnola:

a client can optionally provide an offset retention duration when it commits offsets. If it doesn't, the broker's default retention is applied (which on the shared production clusters at Jet is 24h).

Make sure CK does not have this issue and if it does, file issue at CK

chrnola commented 6 years ago

I'm inclined to say that we're going to have this issue. I just ran a simple test with a consumer that has reached EOF for all partitions. I stopped publishing to this topic entirely, and then restarted the consumer. From watching the CK debug logs, I noticed this show up every minute (which is my auto commit interval):

2018-06-25 11:24:48.8633|leve=7 name=#consumer-1 facility=HEARTBEAT m=[thrd:main]: 10.135.1.12:9092/0: Heartbeat for group "test-group" generation id 11
2018-06-25 11:24:49.8642|leve=7 name=#consumer-1 facility=OFFSET m=[thrd:main]: Topic test-topic [0]: stored offset -1001, committed offset 16679
2018-06-25 11:24:49.8642|leve=7 name=#consumer-1 facility=OFFSET m=[thrd:main]: Topic test-topic [0]: setting offset INVALID for commit
2018-06-25 11:24:49.8642|leve=7 name=#consumer-1 facility=OFFSET m=[thrd:main]: Topic test-topic [1]: stored offset -1001, committed offset 16093
2018-06-25 11:24:49.8642|leve=7 name=#consumer-1 facility=OFFSET m=[thrd:main]: Topic test-topic [1]: setting offset INVALID for commit
2018-06-25 11:24:49.8642|leve=7 name=#consumer-1 facility=OFFSET m=[thrd:main]: Topic test-topic [2]: stored offset -1001, committed offset 19410
2018-06-25 11:24:49.8642|leve=7 name=#consumer-1 facility=OFFSET m=[thrd:main]: Topic test-topic [2]: setting offset INVALID for commit
2018-06-25 11:24:49.8642|leve=7 name=#consumer-1 facility=OFFSET m=[thrd:main]: Topic test-topic [3]: stored offset -1001, committed offset 15878
2018-06-25 11:24:49.8642|leve=7 name=#consumer-1 facility=OFFSET m=[thrd:main]: Topic test-topic [3]: setting offset INVALID for commit
2018-06-25 11:24:49.8642|leve=7 name=#consumer-1 facility=COMMIT m=[thrd:main]: OffsetCommit internal error: Local: No offset stored
2018-06-25 11:24:49.8642|leve=7 name=#consumer-1 facility=COMMIT m=[thrd:main]: OffsetCommit for 4 partition(s): cgrp auto commit timer: returned: Local: No offset stored
2018-06-25 11:24:49.8642|leve=7 name=#consumer-1 facility=UNASSIGN m=[thrd:main]: Unassign not done yet (0 wait_unassign, 4 assigned, 0 wait commit): OffsetCommit done (__NO_OFFSET)
2018-06-25 11:24:49.9032|leve=7 name=#consumer-1 facility=HEARTBEAT m=[thrd:main]: 10.135.1.12:9092/0: Heartbeat for group "test-group" generation id 11

Additionally my handler for Consumer.OnOffsetsCommitted was never called. We can see from the Confluent.Kafka source that -1001 is a special offset (RD_KAFKA_OFFSET_INVALID).

My interpretation of all this is that the driver is refusing to periodically commit since we have disabled enable.auto.offset.store without ever calling Consumer.StoreOffsets.

chrnola commented 6 years ago

My naive attempt at solving this problem didn't work:

consumer.OnPartitionsAssigned
|> Event.add (fun tps ->
    let comittedOffsets = consumer.Committed(tps, TimeSpan.FromSeconds(20.))

    comittedOffsets
    |> Seq.iter (fun tpoe ->
        if tpoe.Error.HasError then failwithf "Error fetching committed offsets: %O" tpoe.Error.Reason)

    let result =
        comittedOffsets
        |> Seq.map (fun tpoe -> tpoe.TopicPartitionOffset)
        |> consumer.StoreOffsets

    result
    |> Seq.iter (fun tpoe ->
        if tpoe.Error.HasError then failwithf "Failed to store offsets: %O" tpoe.Error.Reason)

    Log.info "Committed offsets stored in driver")

I figured that if I could at least initialize the offsets in the in-memory store they would be periodically flushed to the broker, but that proved not to be true:

...
2018-06-25 11:56:48.7552|Committed offsets stored in driver
...
2018-06-25 11:57:43.2203|leve=7 name=#consumer-1 facility=HEARTBEAT m=[thrd:main]: 10.135.1.20:9092/2: Heartbeat for group "test-group" generation id 13
2018-06-25 11:57:43.3463|leve=7 name=#consumer-1 facility=OFFSET m=[thrd:main]: Topic test-topic [0]: stored offset 16679, committed offset 16679
2018-06-25 11:57:43.3463|leve=7 name=#consumer-1 facility=OFFSET m=[thrd:main]: Topic test-topic [0]: setting offset INVALID for commit
2018-06-25 11:57:43.3463|leve=7 name=#consumer-1 facility=OFFSET m=[thrd:main]: Topic test-topic [1]: stored offset 16093, committed offset 16093
2018-06-25 11:57:43.3463|leve=7 name=#consumer-1 facility=OFFSET m=[thrd:main]: Topic test-topic [1]: setting offset INVALID for commit
2018-06-25 11:57:43.3463|leve=7 name=#consumer-1 facility=OFFSET m=[thrd:main]: Topic test-topic [2]: stored offset 19410, committed offset 19410
2018-06-25 11:57:43.3463|leve=7 name=#consumer-1 facility=OFFSET m=[thrd:main]: Topic test-topic [2]: setting offset INVALID for commit
2018-06-25 11:57:43.3463|leve=7 name=#consumer-1 facility=OFFSET m=[thrd:main]: Topic test-topic [3]: stored offset 15878, committed offset 15878
2018-06-25 11:57:43.3463|leve=7 name=#consumer-1 facility=OFFSET m=[thrd:main]: Topic test-topic [3]: setting offset INVALID for commit
2018-06-25 11:57:43.3463|leve=7 name=#consumer-1 facility=COMMIT m=[thrd:main]: OffsetCommit internal error: Local: No offset stored
2018-06-25 11:57:43.3463|leve=7 name=#consumer-1 facility=COMMIT m=[thrd:main]: OffsetCommit for 4 partition(s): cgrp auto commit timer: returned: Local: No offset stored
2018-06-25 11:57:43.3463|leve=7 name=#consumer-1 facility=UNASSIGN m=[thrd:main]: Unassign not done yet (0 wait_unassign, 4 assigned, 0 wait commit): OffsetCommit done (__NO_OFFSET)
2018-06-25 11:57:43.7593|leve=7 name=#consumer-1 facility=HEARTBEAT m=[thrd:main]: 10.135.1.20:9092/2: Heartbeat for group "test-group" generation id 13

I believe this is caused by this bit of logic from librdkafka:

if (rktp->rktp_stored_offset >
    rktp->rktp_committed_offset) {
    verb = "setting stored";
    rktpar->offset = rktp->rktp_stored_offset;
} else {
    rktpar->offset = RD_KAFKA_OFFSET_INVALID;
}
vchekan commented 6 years ago

@chrnola I've submitted support request with confluent. Well see what they have to say.

As I was writing it, I thought, either it is a bug in kafka coordinator and not in client. Why would coordinator detect group inactivity based on offset changes and not on the last fetch request timestamp?

chrnola commented 6 years ago

Forgive me, but I'm struggling to see how the group coordinator could be at fault here.

The client is actively choosing not to send what it deems to be "redundant" commits, but that means that the offset expiration timestamp doesn't advance. So unless the client receives a new message before that timestamp, the offset will expire. The next time the consumer is restarted it will crash (unless it defines an AutoOffsetReset strategy).

chrnola commented 6 years ago

I think I understand what you're saying now. Looks like the feature you described will be released as part of Kafka 2.1.0, see: https://issues.apache.org/jira/browse/KAFKA-4682

In the meantime my team might be able to live with AutoOffsetReset as a workaround. We'll just have to amend our alerting to treat these low volume topics differently.

That being said I'd really rather not have offsets expire regularly. Would you be open to disabling enable.auto.commit in the wrapper and handling the periodic commit loop ourselves?

vchekan commented 6 years ago

@chrnola

Would you be open to disabling enable.auto.commit in the wrapper and handling the periodic commit loop ourselves?

We already handle autocommit on our own, because confluent allows message losses in enable.auto.commit mode. Perhaps we need yet another timer which would issue consumer.CommitAsync() every hour. Should think carefully how to avoid race condition between this timer and StoreOffsets called from message handler.

chrnola commented 6 years ago

Perhaps we need yet another timer which would issue consumer.CommitAsync() every hour.

I may have referred to the wrong config key earlier, but yes this is what I was trying to suggest. 😄

I recall Leo did something similar in his version of this wrapper.

bartelink commented 4 years ago

Closing as this is never going to happen. The FsKafka0 module introduced in 1.4.3 provides an onramp to the practical/viable/maintained API:- FsKafka.