dlt-hub / verified-sources

Contribute to dlt verified sources šŸ”„
https://dlthub.com/docs/walkthroughs/add-a-verified-source
Apache License 2.0
50 stars 40 forks source link

Kafka source should either subscribe or assign, not both? #450

Closed aksestok closed 1 month ago

aksestok commented 1 month ago

dlt version

0.4.7

Source name

kafka

Describe the problem

[!WARNING] Kafka is still a new concept for me, so bear with me if I'm using incorrect terminology or have misunderstood some concepts.

When using the Kafka source, if for any reason the extraction is interupted, messages consumed in the initial run are not reprocessed and are lost forever.

My understanding is that this happens because the assignment made in the offset tracker is undone by the Conumser.subscribe call immeaditely after:

sources/kafka/helpers.py https://github.com/dlt-hub/verified-sources/blob/9d7c77e927ddec051467ebd68eaf261c3a3c6465/sources/kafka/helpers.py#L159

sources/kafka/init.py https://github.com/dlt-hub/verified-sources/blob/9d7c77e927ddec051467ebd68eaf261c3a3c6465/sources/kafka/__init__.py#L81-L82

Commenting out the line in question (Line 82 consumer.subscribe(topics)) seems to resolve the issue.

Expected behavior

The source tracks it's own state and should not rely on offsets provided by the broker?

Steps to reproduce

Run a Kafka pipeline and abort during extraction. Restart the pipeline and watch all the precious messages be gone forever.

How you are using the source?

I run this source in production.

Operating system

Linux

Runtime environment

Kubernetes

Python version

3.12

dlt destination

bigquery

Additional information

No response

rudolfix commented 1 month ago

@aksestok interesting! 90% of the code is to avoid the situation above... but you are probably right. it seems subscribe will attach to current offsets of the topics. also it looks like that our approach will prevent this automatic load balancing with subscriber groups. but I think it is OK

@IlyaFaer please take a look! and I think we miss a test where we re-read the messages in the queue by discarding the state and trying again. could you add it? also please test locally since our kafka cloud is defunc

IlyaFaer commented 1 month ago

@aksestok, I wonder what destination you are using?

aksestok commented 1 month ago

@IlyaFaer

We use BigQuery (also in the issue description šŸ˜Š).

IlyaFaer commented 1 month ago

@aksestok, oh, yeah, it is šŸ‘Œ Let's see...