akka / alpakka-kafka

Alpakka Kafka connector - Alpakka is a Reactive Enterprise Integration library for Java and Scala, based on Reactive Streams and Akka.
https://doc.akka.io/libraries/alpakka-kafka/current/
Other
1.42k stars 386 forks source link

[Failure] At-Least-Once One To Conditional should support batching of offsets `withOffsetContext` #1070

Open ennru opened 4 years ago

ennru commented 4 years ago

First seen in #1069

AtLeastOnce:
At-Least-Once One To Conditional
- should support batching of offsets `withOffsetContext` *** FAILED *** (10 seconds, 431 milliseconds)
  Vector(ConsumerRecord(topic = topic-2-96, partition = 0, leaderEpoch = 0, offset = 0, CreateTime = 1583141715456, serialized key size = 3, serialized value size = 9, headers = RecordHeaders(headers = [], isReadOnly = false), key = key, value = 1,2,3,4,5), ConsumerRecord(topic = topic-2-96, partition = 0, leaderEpoch = 0, offset = 1, CreateTime = 1583141715456, serialized key size = 3, serialized value size = 10, headers = RecordHeaders(headers = [], isReadOnly = false), key = key, value = 6,7,8,9,10), ConsumerRecord(topic = topic-2-96, partition = 0, leaderEpoch = 0, offset = 0, CreateTime = 1583141715456, serialized key size = 3, serialized value size = 9, headers = RecordHeaders(headers = [], isReadOnly = false), key = key, value = 1,2,3,4,5), ConsumerRecord(topic = topic-2-96, partition = 0, leaderEpoch = 0, offset = 1, CreateTime = 1583141715456, serialized key size = 3, serialized value size = 10, headers = RecordHeaders(headers = [], isReadOnly = false), key = key, value = 6,7,8,9,10)) 
  had size 4 instead of expected size 2 (AtLeastOnce.scala:198)

  org.scalatest.exceptions.TestFailedException:
  at org.scalatest.MatchersHelper$.indicateFailure(MatchersHelper.scala:343)
  at org.scalatest.Matchers$ResultOfHaveWordForExtent.size(Matchers.scala:2707)
  at docs.scaladsl.AtLeastOnce.$anonfun$new$16(AtLeastOnce.scala:198)

https://travis-ci.org/akka/alpakka-kafka/jobs/657205159#L490

seglo commented 4 years ago

https://travis-ci.org/github/akka/alpakka-kafka/jobs/699377074#L495

seglo commented 4 years ago

https://travis-ci.org/github/akka/alpakka-kafka/jobs/705758567#L504

seglo commented 4 years ago

My first idea here would have been to enable idempotence on the producer that produces the grouped messages to topic-2, but if dupes were generated by this producer then I would expect distinct offsets for them in the topic-2.

Based on the results of the last failure it looks like the second consumer is reading the same messages from topic-2 twice. The test expects the results to be of size 2, but 4 are returned with duplicate offsets in the ConsumerRecord. Formatted:

Vector(
  ConsumerRecord(topic = topic-2-96, partition = 0, leaderEpoch = 0, offset = 0, CreateTime = 1594127538548, serialized key size = 3, serialized value size = 9, headers = RecordHeaders(headers = [], isReadOnly = false), key = key, value = 1,2,3,4,5), 
  ConsumerRecord(topic = topic-2-96, partition = 0, leaderEpoch = 0, offset = 1, CreateTime = 1594127538548, serialized key size = 3, serialized value size = 10, headers = RecordHeaders(headers = [], isReadOnly = false), key = key, value = 6,7,8,9,10), 
  ConsumerRecord(topic = topic-2-96, partition = 0, leaderEpoch = 0, offset = 0, CreateTime = 1594127538548, serialized key size = 3, serialized value size = 9, headers = RecordHeaders(headers = [], isReadOnly = false), key = key, value = 1,2,3,4,5), 
  ConsumerRecord(topic = topic-2-96, partition = 0, leaderEpoch = 0, offset = 1, CreateTime = 1594127538548, serialized key size = 3, serialized value size = 10, headers = RecordHeaders(headers = [], isReadOnly = false), key = key, value = 6,7,8,9,10)
)
ennru commented 4 years ago

In the scheduled build https://travis-ci.org/github/akka/alpakka-kafka/jobs/725973526

seglo commented 3 years ago

https://github.com/akka/alpakka-kafka/pull/1366/checks?check_run_id=2514640841#step:6:260

patriknw commented 1 year ago

https://github.com/akka/alpakka-kafka/actions/runs/6625089113/job/17995508279?pr=1687#step:6:311