astronomy-commons / adc-streaming

Core Python(ic) client libraries and utilities for alert streaming
BSD 3-Clause "New" or "Revised" License
6 stars 6 forks source link

Rewrite `AlertBroker._committed` #23

Closed spenczar closed 4 years ago

spenczar commented 4 years ago

The current AlertBroker keeps an internal cache of messages that have been mapped and filtered, but which it has not told Kafka that it has read.

At most every 5s, it will commit its read position by iterating over all of these messages and committing each one. I think this is unnecessary - all we need to commit is the highest offset for each topic-partition pair. I could be wrong, but this was the old behavior of Kafka.

That means we need to store a different data structure, which is a mapping of topic-partition to current offset. We should give it a less confusing name, perhaps _uncommitted.

myNameIsPatrick commented 4 years ago

I think we can simplify a lot of this functionality by letting the confluent-kafka client handle committing offsets, rather than us doing it ourselves. We would still manually store the offsets with Consumer.store_offsets(), but we can configure the client appropriately with the following options and let it actually commit those offsets in the best manner.

Relevant configuration:

'enable.auto.commit': True
'enable.auto.offset.store': False
'auto.commit.interval.ms': 5000
spenczar commented 4 years ago

I'm realizing as I read more that I thought it was _committed, but it's actually _consumed which behaves in the way I described. _committed already does keep topic-partition:offset mappings.

I think _consumed should be removed, and _committed should be renamed, probably.


@myNameIsPatrick, I would like that. What is the Python API, then? I think it's something like:

myNameIsPatrick commented 4 years ago

@spenczar, do you mean the API we'll expose or the confluent client's Consumer? If you're talking about confluent's consumer: https://docs.confluent.io/current/clients/confluent-kafka-python/#confluent_kafka.Consumer.store_offsets

It can take either a Message object or a list of TopicPartition. We likely would need to make the TopicPartition rather than the message because those Message objects aren't pickleable which causes issues in multiprocessing. We may be able to get away with calling self._c.store_offsets([topic_partition]) in self._filtered_stream(), but could also opt for storing the topic/partition/offsets and calling it instead where self._commit_to_kafka() is done in self._raw_stream() instead. Either way, it should be doable. Behind the scenes, the client should auto-commit based on these stored offset roughly every interval.

mjuric commented 4 years ago

To provide some history/perspective on why I avoided Kafka's autocommit: The rationale for the (likely too convoluted) code is that you can't guarantee that the client has processed all messages if you allow kafka to autocommit for you.

Here's a scenario I worry about:

  1. The user consumes from the stream
  2. They process the messages, perhaps save them to a file that's kept open
  3. They consume again from the stream. Kafka happily autocommits the offsets.
  4. The user's code or computer crashes. The file where the alerts were "saved" actually hasn't yet been flushed to the disk and is now gone.
  5. When the user restarts their code and reconnects to the stream, the lost alerts will not be re-sent; they're effectively lost.

The current API's failure mode (by design) is to fail in such a way to get an alert twice (or more), but never to lose it. It requires you to explicitly call stream.commit() when you've processed the messages. It also alerts you you should probably call flush or some other mechanism to ensure your alerts truly have been processed. And if you forget to run stream.commit(), you'll become aware of it immediately when the same alerts show up next time you run -- it's an easy bug to find and fix. Contrast that with an issue like the above, that will show up exceedingly rarely.

One could argue that this was the user's fault that they forgot to run .flush() before letting kafka autocommit the offset, but we're all human -- the API should guide us to write code whose failure mode is the least unacceptable one. I once spent a week trying to track down why we were losing about ~1 in 20 million messages in ZTF; I worry this would be that sort of thing.

mjuric commented 4 years ago

All that said, if there's a way to safely use autocommit=True, I'm all for it -- I just failed to find one yet :( .

myNameIsPatrick commented 4 years ago

@mjuric, I think I may be misunderstanding a few things about how the offset handling is set up. Currently, I would've thought the scenario you gave would still be a problem in the current setup.

My understanding is we store the current offset when we returned filtered, deserialized messages back to the user. When each batch of messages (from consumer.consume()) is finished processing, we call a commit() but with a defer (to commit at most every 5 seconds). We also commit regardless if the context manager is closed or if commit() is called explicitly with defer=False.

Going back to the scenario, after a message is returned from AlertBroker, it's essentially out of our control and the offset associated with that message can be committed at any time in the future after a batch of messages is processed (consumer.consume()). Depending on how you're defining messages to be consumed from the stream in step 3, messages could be lost regardless.

To be clear, I don't mean to use the default confluent consumer's offset behavior, since that will definitely cause some lost messages. The configuration to do that and auto-committing roughly every 5 seconds is the following:

'enable.auto.commit': True
'auto.commit.interval.ms': 5000

That will cause lost messages since the consumer will store offsets when a call to consumer.consume() is called, and could happily commit before the message is filtered, parsed, or returned to the user.

The extra configuration I'm hoping would simplify the committing logic would be to add: 'enable.auto.offset.store': False

which would give us full control of what offsets get committed when either the consumer auto-commits or when commit() is called explicitly. AlertBroker would need to handle offset management explicitly but we can now control when we want to store offsets to the consumer, say after a message is deserialized, filtered, and returned to the user.

In this case, whenever the consumer decides to auto-commit based on that interval, if the offsets haven't been updated yet based on calling consumer.store_offsets(), nothing would happen. So the expected behavior in this case should be to never lose a message, but possibly getting the same message multiple times.

spenczar commented 4 years ago

Yeah, today offset tracking is done internally to genesis-client, which actually makes it impossible for the caller to be sure that messages are handled at least once. The only way to get the guarantee you want @mjuric is to provide direct access to offset tracking and to do nothing within genesis-client. All the control has to be passed out to the caller, since only they really know when a message is truly fully handled and the offset can safely be advanced.

I should add that, if we do hand out all that control, this library gets awful thin. I think the API surface area would be tiny, and it would pretty much just be adding contextmanager and iterable support to confluent_kafka.Consumer and confluent_kafka.Producer.

spenczar commented 4 years ago

@spenczar, do you mean the API we'll expose or the confluent client's Consumer? If you're talking about confluent's consumer: https://docs.confluent.io/current/clients/confluent-kafka-python/#confluent_kafka.Consumer.store_offsets

I meant the API that genesis-client provides, sorry that was unclear.

mjuric commented 4 years ago

@myNameIsPatrick (& @spenczar): Thanks for that clarification of enable.auto.offset.store=False. Reading about it, the code we have in there now essentially tries to implement exactly what:

'enable.auto.commit': True
'auto.commit.interval.ms': 5000
'enable.auto.offset.store': False

does (as you pointed out). So I'm on board with getting rid of all of that custom code -- fantastic! I can't remember why I didn't just use that...

@spenczar:

All the control has to be passed out to the caller, since only they really know when a message is truly fully handled and the offset can safely be advanced.

Right, that's the intent -- to require the user to explicitly call .commit() to affirm they've dealt with all alerts read in so far. There's an example in https://github.com/astronomy-commons/tutorials/blob/master/streaming/02-streaming-demo.ipynb (search for 'Remembering "offsets"' heading). It's slightly annoying, as the user needs to remember to call stream.commit(), but I think it's the lesser of two evils (the other being the occasional very rare alert loss).

@myNameIsPatrick -- re what context manager __exit__() does: it shouldn't automatically store the offset and commit it; if it does, it's a bug. In general, the we should never store the offsets automatically; they should only be stored when the user explicitly calls .commit().

To explain the intended .commit() behavior a bit further: it's meant to give the user an easy way to signal the library that the message was processed, w/o necessarily committing the offset all the way to the broker (which would ruin the performance), unless they call it with .commit(defer=False). That's why commit(defer=True) is the default -- so one can blindly write .commit() after every single message and leave it up to the library when to actually commit the offset (i.e., it's equivalent to librdkafka's store). But if I really really want to commit the offset now, I'd call commit=(defer=False).

Bottom line: +1 on ripping this custom code out.

mjuric commented 4 years ago

Looking at it a bit more, I think there will still have to be a bit of custom code to track offsets (the _consumed and _committed dicts):

Still, the _committed dict can go away (note it was misnamed; it really should've been called _stored, to be in agreement with librdkafka terminology), and the "autocommit emulation" code (all those .time() calls) can go away as well.

spenczar commented 4 years ago

Thanks for helping work this out. I think I have a much clearer picture of this library's intentions, so I'm good to implement a bunch of this.

We should document the library's design goals loudly on the README. The bias towards repeating messages over conveniently committing them for the user is one that should be called out.

mjuric commented 4 years ago

+1e6 on updating the README. And thanks for opening these issues/PRs -- gives me a chance to core dump the experience/goals from the last ~3 yrs of experiments that never got written down (mea culpa!).

(n.b. -- I just noticed I've systematically flipped all defer=True and defer=False in the last paragraph of https://github.com/astronomy-commons/genesis-client/issues/23#issuecomment-605365087 -- just fixed it).

spenczar commented 4 years ago

This was done in the v1.0.0 release.