practo / tipoca-stream

Near real time cloud native data pipeline in AWS (CDC+Sink). Hosts code for RedshiftSink. RDS to RedshiftSink Pipeline with masking and reloading support.
https://towardsdatascience.com/open-sourcing-tipoca-stream-f261cdcc3a13
Apache License 2.0
47 stars 5 forks source link

Kafka offset deletion causes reprocessing #180

Open alok87 opened 3 years ago

alok87 commented 3 years ago

Batcher data in s3 is getting rotated/archived/deleted sooner than expected 14d. Need to find the root cause and fix it.

Recreate is required everytime this is happening.

pq: Mandatory url is not present in manifest file.

alok87 commented 3 years ago

Archival has been paused to reproduce this, to be sure it is not due to code issue.

alok87 commented 3 years ago

Found the root cause: Issue is happening for topics which are not receiving updates for many weeks. When topic does not receive updates for many weeks, then Kafka deletes its consumer group. And next time when the consumer group starts it is starting processing from -1 instead of last offset.

lastOffset 1

I0423 09:37:33.888793 manager.go:213] topic:loader-ts.inventory.customers, partition:0, lastOffset:1 (kafka lastoffset)

initialOffset was expected to be 1 but is -2

I0423 09:37:37.379926 loader_handler.go:115] loader-ts.inventory.customers: consumeClaim started, initalOffset:-2

Doing this https://github.com/practo/tipoca-stream/issues/20 will help as we won't depend on Kafka to store the last consumer group last offsets.

alok87 commented 3 years ago

Workaround:

  1. Find the topics which are not receiving any updates for many hours(magic no): sort_desc(rate(kafka_consumergroup_current_offset{topic=~"ts.inventory.*"}[6h])) == 0
  2. Stop the loader, so the consumer group becomes inactive
  3. Reset the offset for those topics to the last offset found using: kafka_topic_partition_current_offset{topic=~"loader-ts.inventory.customers"} reset offset to latest using:
./bin/kafka-consumer-groups.sh --command-config ./bin/client-ssl-auth.properties --bootstrap-server=XXX --group=ts-redshiftsink-latest-invetory-loader --topic=loader-ts.inventory.customers --reset-offsets --to-latest --execute
alok87 commented 3 years ago

Solution could be very simple:

Operator watches over the topic having 0 new input for a topic for some time. And stop batcher and loaders for those topics. But this would bring in prometheus dependency, we can do this without it also.

alok87 commented 3 years ago

Related https://issues.apache.org/jira/browse/KAFKA-4682

Kafka by default deletes the offset info for consumer after 7 days.

https://www.reddit.com/r/apachekafka/comments/nakg6o/how_to_set_infinite_offset_retention_minutes/

alok87 commented 3 years ago

Increased the offset.retention.minutes to large value temp fix.