jet / FsKafka

Minimal F# wrappers for Confluent.Kafka+librdkafka.redist 1.x
https://github.com/jet/dotnet-templates
Apache License 2.0
88 stars 17 forks source link

Decide which tests are needed and implement #9

Closed vchekan closed 4 years ago

vchekan commented 6 years ago

Description

Given that librdkafka and confluent-kafka-dotnet have test coverage, decide which test coverage is needed for wrapper.

  1. Sanity check for trivial code: can produce and consume.
  2. Message ordering is preserved within partition.
  3. F# specific: async cancellation works.
  4. Autocommit:
    • it works when it is on
    • commit interval works
    • it does not commit when it is off
  5. Configuration: convert Seq<string,obj> to string and compare to what is expected.
  6. Our implementation of offsetRange works
  7. Legacy produce/consume works.
chrnola commented 6 years ago

Speaking of tests, I'm in the process of trying to write a test for the consumer since it looks like we're calling StoreOffsets before the handle completes which could cause message loss: https://github.com/jet/confluent-kafka-fsharp/blob/9de117c5b83f3ef4a5c3c4a97b7bfaef9f309480/src/Confluent.Kafka.FSharp/ConfluentKafka.fs#L310-L321

~I'm having a bit of trouble initializing the consumer offsets to make the test more resilient, but If I get it working I'll send a PR.~ Edit: done, see #10

chrnola commented 6 years ago

We also need to ensure that auto-commit works for low volume topics. In other words, even if the consumer does not receive any new messages, we still need it to redundantly commit its offsets periodically so that they don't expire. It's admittedly a weird use case, but it will come up.

vchekan commented 6 years ago

@chrnola " we still need it to redundantly commit its offsets periodically so that they don't expire"

I've never heard about offsets expiring. I thought they are stored to kafka compaction topics, where latest one is preserved forever?

chrnola commented 6 years ago

@vchekan From my understanding a client can optionally provide an offset retention duration when it commits offsets. If it doesn't, the broker's default retention is applied (which on the shared production clusters at Jet is 24h).

My team encountered expiring offsets on low volume topics shortly after we first migrated to Kafunk about a year ago. As a result this commit was made in Kafunk to ensure that offsets would be committed even if they didn't change.

Whether or not this behavior should be replicated in the wrapper is up for debate, just figured I would put it on your radar. 😉

vchekan commented 6 years ago

@chrnola understood. Quick glance at librdkafka shows that it use v1 of OffsetCommitRequest, which has timestamp but no RetentionTime (introduced in v2 of OffsetCommitRequest) https://github.com/edenhill/librdkafka/blob/47e47323290664954c6b93eb8d3c2fd3cc4f2181/src/rdkafka_cgrp.c#L2060 3rd parameter is "1", indicating version of OffsetCommitRequest API. But I'll need to test how it behaves in case of no traffic.

bartelink commented 4 years ago

Closing as this is never going to happen. The FsKafka0 module introduced in 1.4.3 provides an onramp to the single practical/viable/maintained/tested API: FsKafka.