akka / alpakka-kafka

Alpakka Kafka connector - Alpakka is a Reactive Enterprise Integration library for Java and Scala, based on Reactive Streams and Akka.
https://doc.akka.io/docs/alpakka-kafka/current/home.html
Other
1.42k stars 387 forks source link

Consumer might skip offsets #336

Closed kodemaniak closed 5 years ago

kodemaniak commented 6 years ago

We are facing a strange behaviour using reactive-kafka.

We are consuming a single topic with 256 partitions and about 2.5 Million messages. From that log we build a view on our data and strongly require at-least-once message semantics in the consumer in order to correctly build the view.

We experienced a lot of exceptions resulting from events that have been skipped, e.g., an update message coming in for a given entity without the corresponding create message creating that entity in the first place. We wrote some tooling to check whether the events are in the correct partition and are stored in the right order and were able to confirm that the log (i.e. the topic/partition) is in the expected state.

After a lot of debugging we think we have reduced the problem to reactive-kafka. We use a partitioned commitable source and consume each partition separately. Due to some internal issues the processing of the messages was rather slow on our side. This triggered a couple of timeouts on our side but also on the kafka consumer side, in turn leading to a lot of rebalances. Regarding the Kafka documentation this is expected and can be fine tuned using timeout settings and the ConsumerConfig.MAX_POLL_RECORDS_CONFIG. However, the experienced behaviour that messages are skipped should not occur.

It seems a topic that is being rebalanced onto the same node reuses the KafkaConsumer and skips exactly ConsumerConfig.MAX_POLL_RECORDS_CONFIG records. Our assumption is that as long as the consumer is the same, it will never reset it's internal state but request new messages from the broker (ConsumerConfig.MAX_POLL_RECORDS_CONFIG records in one reqeust to the broker). We assume that the KafkaConsumer relies on a restart/recreation of the consumer when something fails during processing in order to restart from the last committed offset. When we change the `MAX_POLL_RECORDS_CONFIG' to different values, the number of skipped messages is always consistent with that setting.

In our setting we also have messages that can be applied to the view independently of the previous messages, so that we actually successfully commit offsets after messages have been skipped. Therefore, we are basically ending up in a totally incomplete and irrecoverable state of the view.

Correct behaviour would be, IMO, that after a failure during the processing, the consumer needs to be reset to the last committed offset. This must probably be done by the KafkaConsumerActor by recreating the consumer.

patriknw commented 6 years ago

Interesting, would you be able to create a pull request?

kodemaniak commented 6 years ago

Yes, I will work on that and try to produce a pull request.

seglo commented 6 years ago

@kodemaniak Based on the issue description I assume you're storing offsets in Kafka. There was a PR #367 introduced last fall that has been in reactive-kafka since 0.18 that provides the user with an event handler to reset the consumer's partition offsets on consumer group rebalance. It was intended for end users who store offsets off Kafka. It could be used in its present form to reset the consumer in the way you want, but it would be a bit of a clunky solution since it's not meant for offsets managed by Kafka. However, it already introduces some of the plumbing required to reset the consumer offsets (via Consumer.seek), so adding a feature to do what you want wouldn't require many changes.

vlovgr commented 5 years ago

I'm experiencing something very similar to this issue. I'm consuming a topic with 4 partitions, running 4 separate instances of the same stream in parallel and am experiencing an issue with messages being lost. Everything seems to be processing as normal, when the following then happens.

2018-08-02T03:16:02.437Z [Consumer clientId=consumer-1, groupId=group] Revoking previously assigned partitions [topic-0]
2018-08-02T03:16:02.437Z [Consumer clientId=consumer-1, groupId=group] (Re-)joining group
2018-08-02T03:16:02.875Z [Consumer clientId=consumer-1, groupId=group] Successfully joined group with generation 1333
2018-08-02T03:16:02.876Z [Consumer clientId=consumer-1, groupId=group] Setting newly assigned partitions [topic-1]
2018-08-02T03:16:06.136Z [Consumer clientId=consumer-1, groupId=group] Revoking previously assigned partitions [topic-1]
2018-08-02T03:16:06.137Z [Consumer clientId=consumer-1, groupId=group] (Re-)joining group
2018-08-02T03:16:06.170Z [Consumer clientId=consumer-1, groupId=group] Successfully joined group with generation 1334
2018-08-02T03:16:06.171Z [Consumer clientId=consumer-1, groupId=group] Setting newly assigned partitions [topic-1, topic-0]

The next log is that offset 29129158 on partition 0 is being processed at 2018-08-02T03:16:06.437Z.

Then there are no more offsets being processed on partition 0, only on partition 1. No other relevant logs are being output. About 18 seconds later, the following logs are written:

2018-08-02T03:16:24.276Z [Consumer clientId=consumer-1, groupId=group] (Re-)joining group
2018-08-02T03:16:24.276Z [Consumer clientId=consumer-1, groupId=group] Revoking previously assigned partitions [topic-1, topic-0]
2018-08-02T03:16:24.477Z [Consumer clientId=consumer-1, groupId=group] Setting newly assigned partitions [topic-0]
2018-08-02T03:16:24.477Z [Consumer clientId=consumer-1, groupId=group] Successfully joined group with generation 1335

and then offset 29129659 on partition 0 is being processed at 2018-08-02T03:16:27.503Z. The offsets between 29129159 and 29129659 (exactly 500 offsets = max.poll.records) were never processed by the stream. I've been able to verify this externally, because the stream makes an API call for every message, and the API access logs do not contain entries for these 500 messages.

The stream is roughly defined as follows.

Consumer
  .committableSource(consumerSettings, subscription)
  .mapAsync(25)(processRecord)
  .collect { case Some(message) => message }
  .via(Producer.flexiFlow(producerSettings))
  .mapAsync(25)(afterPublishRecord)
  .map(_.passThrough)
  .groupedWithin(30, 10.seconds)
  .map(_.foldLeft(CommittableOffsetBatch.empty)(_ updated _))
  .mapAsync(1)(_.commitScaladsl())
  .toMat(Sink.ignore)(Keep.left)
  .run()

ConsumerConfig values as follows.

auto.commit.interval.ms = 5000
auto.offset.reset = earliest
check.crcs = true
client.id =
connections.max.idle.ms = 540000
enable.auto.commit = false
exclude.internal.topics = true
fetch.max.bytes = 52428800
fetch.max.wait.ms = 500
fetch.min.bytes = 1
heartbeat.interval.ms = 3000
interceptor.classes = null
internal.leave.group.on.close = true
isolation.level = read_uncommitted
max.partition.fetch.bytes = 1048576
max.poll.interval.ms = 300000
max.poll.records = 500
metadata.max.age.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]
receive.buffer.bytes = 65536
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
request.timeout.ms = 305000
retry.backoff.ms = 100
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.mechanism = GSSAPI
security.protocol = SSL
send.buffer.bytes = 131072
session.timeout.ms = 10000
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
ssl.endpoint.identification.algorithm = null
ssl.key.password = [hidden]
ssl.keymanager.algorithm = SunX509
ssl.keystore.type = PKCS12
ssl.protocol = TLS
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.password = [hidden]
ssl.truststore.type = JKS

This issue is pretty devastating as I also require strong at-least-once delivery guarantees.

2m commented 5 years ago

Great analysis @vlovgr! This will be very helpful to track down the issue.

aisven commented 5 years ago

I read that #377 was closed as invalid and the project is waiting for feedback from the power users. @kodemaniak As we use this library in many services, I find this Issue interesting, so I want to ask you directly: Did an alpakka-kafka (akka-streams-kafka) upgrade somehow fix this problem for you, or are you using your own fork of the library in production, or perhaps something else is the case?

aisven commented 5 years ago

Note @kodemaniak as you maybe also follow things, this comment in this other Issue might be relevant also here to some degree: https://github.com/akka/alpakka-kafka/issues/353#issuecomment-446519939 "The release from yesterday 1.0-RC1 does use the new Kafka API without WakeupException."

GrigorievNick commented 5 years ago

It's look very close to my issue. https://github.com/akka/alpakka-kafka/issues/382#issuecomment-412092233 I Belive problem appeared when subsource per partiton stopped, with out process all data from Consumr Actor. When new subsource appeared again Consumer akka do not reprocess uncomitted message. Just proceed sending to new SubSource from Latest point. Mean no offset managment between Subsource and ConsumerActor.

kodemaniak commented 5 years ago

@sourcekick I have not tried the latest release. I am actually using a custom source implementation that uses a dedicated Kafka consumer per stream, similar to what I proposed in the pull request. Since the original problem was hard to reproduce I will stick to my implementation which runs without any problems in production for months now.

GrigorievNick commented 5 years ago

TOday i recheck Lost messages in plainPartitionedSource on partition reassignment #382 with version 1.0.4. it's fiexd, Most of all this issue also resolved.

ennru commented 5 years ago

Thank you for reporting. You're right this seems to be solved by #589, as well. Closing this.