exasol / kafka-connector-extension

Exasol Kafka Extension for accessing Apache Kafka
MIT License
4 stars 7 forks source link

#39: Fixed bug related to consuming all offsets given empty records #40

Closed morazow closed 3 years ago

morazow commented 3 years ago

Fixes #39

In addition started refactoring integration tests for #9.

morazow commented 3 years ago

Hello @jwarlander,

Could you please have a look?

I could not test the behavior using integration tests, but instead using mocking in unit tests. It is not very easy to simulate the polling in integration tests.

One other issue is, once consumer obtains empty records it stop and does not continue, even last offset is not reached. I think this should be fine.

jwarlander commented 3 years ago

One other issue is, once consumer obtains empty records it stop and does not continue, even last offset is not reached. I think this should be fine.

Empty records should not.. stop, but rather emit a row full of null values, right? And then proceed. Otherwise it might be a bit troublesome, as we are definitely seeing empty records a lot throughout the day in some of our topics :sunglasses:

Anyhow, I'll take a look at the changes! :smile:

morazow commented 3 years ago

Yes, I think it is wrong to call them empty records. Basically, the poll returns nothing, e.g, empty consumer records. Do you have such cases?

Please have a look to these tests.

We could also track the record offsets outside of the function, that should allow us to continue until last offset.

jwarlander commented 3 years ago

Yeah, ok, I see what you mean now :)

But yes, one of the reasons that I felt an option for full load until latest offset was needed, is that it was very tricky to get a good set of parameters for the Kafka consumer polling. The default parameters apparently ended up with us often getting empty record sets from the polling, so the consumer would stop reading even though there were millions of records left to catch up on.

Interestingly, often I saw that on the first attempt, we'd get say 6-7 million records (out of about 15 million).. then on the second run, just 2-3 million, then maybe 1 million, and so on.. so I don't know if some kind of throttling was happening on our end.

Anyhow, yes, to be clear -- we really need to keep polling even if a single poll returns an empty set :smile:

morazow commented 3 years ago

Hey @jwarlander,

Thanks for the feedback!

Yes, we could definitely continue when getting empty records in between. This assumes, that we would at some point reach the end offset.

However, maybe we can add another counter, that is if we get some consecutive number of empty polls stop the import.

jwarlander commented 3 years ago

Right, or a timeout? Eg. if we haven't gotten any records for X seconds (probably 30 by default), stop the import. I feel it's easier / more straightforward to control it this way, than having to think about polling timeout multiplied by max empty poll retries.. Then you can think about those two parameters separately, instead of having to also adjust the other every time you change one of them.

So the cases that are interesting here, I'd say, and when we should stop the import:

Another case I thought of, that's more of a true "edge case" I guess, is -- what if we import from a topic that's been abandoned, and while the offset says X, all the records have actually been purged from the topic already due to age? Here, we'd potentially start out at offset 0 (if it's the first import), or perhaps X-10.. but never get any records, so we'd never "catch up" to the max offset.

This would actually be the only reason I know of that we could keep getting 0 records back no matter how many times we poll, so it would be I guess a good reason to have a total "import timeout" as above.

morazow commented 3 years ago

We've just received empty sets from polling for X seconds (maybe due to factors outside of our control)

We already have this, consumer client waits for 30s (by default) until it gets any records from a partition.

In general, I think for now we should assume that we reach the last offset of a partition since we query that end offset beforehand.

I am going add changes so that it continues given in between empty records.

jwarlander commented 3 years ago

I pushed a few changes + tests that I think should cover what we need :smile: