astronomy-commons / adc-streaming

Core Python(ic) client libraries and utilities for alert streaming
BSD 3-Clause "New" or "Revised" License
6 stars 6 forks source link

Allow specifying logical offset or datetime start position on Consumer.stream() #65

Closed alchzh closed 1 year ago

alchzh commented 1 year ago

Previously, the only way to specify the position to stream from was via the auto.offset.reset key, which only affected the first read with a given group id (otherwise it always used committed offsets).

This PR introduces a new parameter start_at to Consumer.stream() that takes either a kafka defined logical offset END = -2, BEGINNING = -1, INVALID = -1001, STORED = -1000 or a datetime object. The correct offsets for the datetime are looked up with confluent_kafka.Consumer.offsets_for_times. The best way I could find to do this was by reassigning the topics on a call to Consumer.stream since seek will fail before the server is updated with the local state and we can't poll without consuming data.

hop-client passes keyword args down to Consumer.stream from hop.io.Consumer.read, so no changes are needed on that front.

To reduce confusion with the start_at option in ConsumerConfig, I've renamed the Enum to ConsumerResetPosition (ConsumerDefaultPosition might also work), with an alias for backward compatibility for now. I suggest renaming the config property and using the Kafka reserved offsets (-2, -1) in LogicalOffset for the next breaking change/major release.

New test cases were developed. I also made some fixes to previously broken test cases, and made test_consume_stored_offsets more comprehensive. (note: for some reason the timestamps reported from kafka in the scimma/server image are 48h offset from the host's time?)

alchzh commented 1 year ago

I've changed the behavior to avoid reassigning the topicpartitions by default (this would cause a regression if clients manually manage offsets outside of adc)

Do you think we should allow offsets specified in terms of integer timestamps (as kafka uses internally) rather than datetime objects (which can get a little confusing with timezones involved)?

We could use a separate parameters for logical offsets and timestamps to distinguish between the two types of integers-- it should be clear that we don't take actual offsets since it's partition-dependent.

alchzh commented 1 year ago

Note on the timezone issue: the standard convention is for "timezone naive" datetimes (and the returns from datetime.now()) to represent times in the local timezone. These will be converted to the correct timestamps by our call to .timestamp(), which allows us to pass datetime.now() - timedelta(hours=1) and correctly retrieve the past hour of messages.

However, this can cause pitfalls if applications supply datetimes expecting them to be processed as UTC without attaching a tzinfo=timezone.utc. Annoyingly, astropy.time.Time uses the "default UTC" rather than "default local" convention.

cnweaver commented 1 year ago

If we want to be complete, I suppose accepting Kafka-style millisecond timestamps would make sense. This would allow someone who has inspected the timestamp metadata for a message to select a place in the stream in time relative to it. It would also be possible to take offsets, but in that case the input would need to be a collection of them (either a list, using the indices implicitly to correspond to partitions, or a dictionary mapping partition numbers to offset values? The latter might be better for handling sparsity when there are multiple consumers in a consumer group.), which would be distinguishable (I think) from any of the scalar values we've thus far considered which apply across all partitions. If the specified set of offsets did not correspond to the right set of partitions, it would presumably be treated as an error.

In terms of dealing with timezones, I think that the current handling of aware/naïve datetimes seems fairly sensible with naïve objects being treated as local times and converted internally to durations since the unix epoch, which is defined in UTC. I'm not familiar with astropy.time beyond a short glance at its docs just now. Honestly, the fact that it does not appear to use its own scale value to construct timezone-aware results in to_datetime seems like something of a bug or significant design flaw (Time('2010-01-01T00:00:00',scale='utc').to_datetime().timestamp() constructs an explicitly UTC time, mistakenly reinterprets it as a local time, and so produces a different timestamp than datetime(2010, 1, 1, 0, 0, 0, tzinfo=timezone.utc).timestamp(), as best I can tell). In any case, I figure that I will write documentation for this for hop-client, which users will hopefully read, and there isn't much else we can do to protect them from astropy's dubious choices.

In general, I think it's up to you whether you want to further extend this feature to include more input forms (Kafka timestamps, etc.). It looks good to me as is, so I'm going to mark it as approved, and can re-review if you want to add more. Otherwise, I'll be happy to merge.

alchzh commented 1 year ago

If we want to be complete, I suppose accepting Kafka-style millisecond timestamps would make sense. This would allow someone who has inspected the timestamp metadata for a message to select a place in the stream in time relative to it.

I think the better solution here is to have Metadata.timestamp in hop-client automatically deserialized to datetime in the future. My motivation for using datetime is that integer milliseconds don't work well with anything else in the python ecosystem without / 1000 and * 1000 everywhere, and it's not a part of kafka internals that needs to be exposed.

It would also be possible to take offsets, but in that case the input would need to be a collection of them (either a list, using the indices implicitly to correspond to partitions, or a dictionary mapping partition numbers to offset values?

I think for this advanced use case, they just call assign with the list of TopicPartitions themselves, and it's a little pointless to abstract on top of it.


I ran into some edge case in testing yesterday where it started reading from the beginning of one of the partitions when I specified a very recent datetime, but I wasn't able to figure out what the cause was or if I had made a mistake somewhere. I'm going to add some handling for incorrect results before it's ready for merge (also offsets_for_times will return errors on very old Kafka servers, which shouldn't matter for hop, but I guess the handling should be there just in case).

cnweaver commented 1 year ago

I think the better solution here is to have Metadata.timestamp in hop-client automatically deserialized to datetime in the future. My motivation for using datetime is that integer milliseconds don't work well with anything else in the python ecosystem without / 1000 and * 1000 everywhere, and it's not a part of kafka internals that needs to be exposed.

This would be doable, but would involve needing to replace the entire metadata object, I think, in order to override the type/logic compiled in the confluent-kafka-python extension. Given that confluent-kafka-python doesn't make this transformation, I'm undecided on whether hop-client should undertake doing it.

I think for this advanced use case, they just call assign with the list of TopicPartitions themselves, and it's a little pointless to abstract on top of it.

That's a fair point.

I ran into some edge case in testing yesterday where it started reading from the beginning of one of the partitions when I specified a very recent datetime, but I wasn't able to figure out what the cause was or if I had made a mistake somewhere.

Interesting; I'll have to see if I can reproduce this. Perhaps this happens if one or more TopicPartitions returned by offsets_for_times represents an error, and so has its offset left at a default value of zero?

Even if error results aren't the cause of the preceding issue, including handling for them is a very sound idea.

alchzh commented 1 year ago

Getting back to this- I'm not actually sure if there's any error checking that we need to do. The source for confluent-kafka/librdkafka is too difficult for me to understand. The javadoc suggests that null can be returned if there's no newer message than the timestamp, but I seem to be getting -1 (i.e. LogicalOffset.LATEST) in this case, which is the behavior we want I think.

alchzh commented 1 year ago

Bump, would like to see this merged!

cnweaver commented 1 year ago

I've reviewed this a bit more, and I don't think I have any further technical concerns. This also seems like a feature that is unlikely to be used in long-running infrastructure in the immediate future, so we can gather more experience on whether any failure modes remain; it certainly seems robust enough for common usage as it is.