openedx / openedx-events

Open edX events from the Hooks Extensions Framework
Apache License 2.0
11 stars 21 forks source link

Discovery: Signal metadata, event bus consumers, and timestamps #162

Closed robrap closed 1 year ago

robrap commented 1 year ago

We recently ran into an issue where we want our event bus signal consumer logic to know the broker's timestamp of the message. However, that is not available in the signal handler, and brings up a number of issues:

  1. If an event bus signal consumer looked at the "metadata" dict in the kwargs, what data would it find? Some valid and some invalid data? All invalid? Other?
    • Note: The signal currently has a time metadata with now() as the implementation, which would give the consumer time.
  2. Is it safe to assume that the metadata on the consumer should match what would be seen on the producer? How should this data be sent, and where should the metadata be updated? Note that most of this data is sent in the message headers as a start.
  3. If we want to send event bus specific data to the signal handler, like the event bus broker's timestamp for the message, how would we do this?
  4. Is there a way to not be blocked on https://github.com/openedx/openedx-events/issues/159 to find a single definition of "time"? Can we avoid that field, and use more specifically named and defined fields?

A/C:

Additional tasks:

robrap commented 1 year ago

Note: discovery needs a temporary hack for delaying event processing, and we need to know the event timestamp to know whether or not to delay.

robrap commented 1 year ago
  1. Here are docs for Kafka timestamp, which may be from producer or broker: https://docs.confluent.io/platform/current/clients/confluent-kafka-python/html/index.html#confluent_kafka.Message.timestamp
  2. We wanted to add an artificial delay to the discovery consumer (when necessary) to avoid a potential concurrency issue they were having where the event bus updates are coming back from Studio before they complete the Discovery db transaction in the request that updated Studio and initiated the event in the first place. To add the artificial delay, we'd need to do it based on the produced timestamp, so we don't delay when it isn't needed.
ormsbee commented 1 year ago

We wanted to add an artificial delay to the discovery consumer (when necessary) to avoid a potential concurrency issue they were having where the event bus updates are coming back from Studio before they complete the Discovery db transaction in the request that updated Studio and initiated the event in the first place. To add the artificial delay, we'd need to do it based on the produced timestamp, so we don't delay when it isn't needed.

Could you go into more details about this concurrency issue? I'm assuming this was a "we'll persist some model in the web service and then the worker will reference it" issue, but artificial delays seems like a fragile way to address it.

robrap commented 1 year ago

@ormsbee: Are you ok if I move the conversation about the possible delay elsewhere? Although that might affect priority of this ticket, I think this ticket stands on its own and there is enough to be discussed that I’d like to keep this ticket focused on how we resolve the questions noted in the ticket.

ormsbee commented 1 year ago

@ormsbee: Are you ok if I move the conversation about the possible delay elsewhere? Although that might affect priority of this ticket, I think this ticket stands on its own and there is enough to be discussed that I’d like to keep this ticket focused on how we resolve the questions noted in the ticket.

@robrap: Sure. The main reason I asked it here is because this ticket starts out with:

We recently ran into an issue where we want our event bus signal consumer logic to know the broker's timestamp of the message. However, that is not available in the signal handler, and brings up a number of issues:

I'm curious to know why the code cares specifically about the broker's timestamp, as opposed to the one claimed by the producer. In my probably-naive view of things, the event should always carry the producer-claimed time, even if that time was a week in the past, because we have use cases like backfills. Having an async system means that some processing is going to get delayed at some point for any number of reasons, making the consumer-received time highly variable.

My first pass at this would be:

  1. Require producers to specify a time on the event, that semantically means "this thing happened at this time" regardless of what time it is now.
  2. Always use this as the metadata for the time field in the signal–so consumers see exactly the time specified by producers for this field, not the time received.
  3. If there's a requirement for time received, create a separate metadata field that the consumer will add when it takes the message off the bus and converts it into a signal. But I want to very be careful here, because I think that this will usually be the wrong field to use most of the time, and that relying on it will cause weird operational edge case problems down the road.
ormsbee commented 1 year ago

Tagging @bmtcril on this, since this also has implications that the Data Working Group is likely interested in.

ormsbee commented 1 year ago

@robrap: I do want to discuss the transaction/delay issue sooner rather than later though, because XQueue had issues like that and it was a source of great operational pain for a long time. I hope we can find patterns to better avoid those kinds of issues altogether.

bmtcril commented 1 year ago

I agree with what Dave said, especially having a "time this thing actually happened" timestamp. In addition to backfilling old data to new consumers it's important for things like repairing buggy messages and replaying lost messages if they're no longer in the Kafka logs (or at all in the case of other message buses that don't do durable storage).

In the long term I'd love to have an implementation of tracking log events that go over the message bus(es), and they rely heavily on very specific "time of event creation" for lots of things (ex. a registration close to a month boundary for financial reporting).

I'd also like to be in on the discussion about deferred processing. On the surface it seems like we shouldn't be uncommitted data around, since it can be rolled back and cause state issues.

robrap commented 1 year ago

Thanks again for all your thoughts. Let me try to clear some things up and answer some questions.

Could you go into more details about this concurrency issue? I'm assuming this was a "we'll persist some model in the web service and then the worker will reference it" issue, but artificial delays seems like a fragile way to address it.

You can now read all about it here: https://github.com/edx/edx-arch-experiments/issues/159

My first pass at this would be:

1. Require producers to specify a `time` on the event, that semantically means "this thing happened at this time" regardless of what time it is now.

I mostly agree. However, since there are implementation implications for db created_at time vs event signal time, I am moving that discussion to https://github.com/openedx/openedx-events/issues/159#issuecomment-1372888998.

2. Always use this as the metadata for the `time` field in the signal–so consumers see exactly the time specified by producers for this field, _not_ the time received.

I also agree. I was not trying to say that time received is useful. I was trying to say that time received is what we currently have implemented. See the current metadata implementation in the signal code in openedx-events. Unfortunately, it seems this was written assuming an in-process signal, and not accounting for a signal that represents the event sent across the event bus. Note, this doesn't just affect time, it also affects sourcehost, etc.

The event bus event has the correct sourcehost in the event header, but if someone reads the metadata of the signal on the consuming side, they are going to get bad data. That is one of the main points of this ticket.

3. If there's a requirement for time received, create a separate metadata field that the consumer will add when it takes the message off the bus and converts it into a signal. But I want to very be careful here, because I think that this will usually be the wrong field to use most of the time, and that relying on it will cause weird operational edge case problems down the road.

I don't think any of us want this, and don't think we need it, so I'll mostly stop here. I'm guessing this stems from a misinterpretation of my stating that we currently have this. We just don't want it.

robrap commented 1 year ago

UPDATED: Ignore this comment.

robrap commented 1 year ago

Outcomes of discussion with @ormsbee:

I think this covers it.

robrap commented 1 year ago

The following is an example metadata from a unit test:

EventsMetadata(
    id=UUID('e9d6233c-90f6-11ed-b17c-acde48001122'), 
    event_type='org.openedx.learning.session.login.completed.v1',
    minorversion=0,
    source='openedx/lms/web',
    sourcehost='edx.devstack.lms',
    time=datetime.datetime(2023, 1, 10, 14, 56, 7, 783370, tzinfo=datetime.timezone.utc),
    sourcelib=(0, 1, 0)
)

Wouldn't we want to override all of these on the event bus consumer side? The only detail we wouldn't need to override might be event_type, but I'm wondering if it would be more clear to not make an exception for this, and just use what is passed in the header like we will do for all the other fields?

Also, I had originally punted on minorversion and created https://github.com/openedx/openedx-events/issues/161, because I wasn't aware that there was a naive implementation of setting this to 0. The event bus can just pass this along, and we can punt on how and when this would be incremented for a particular event.

robrap commented 1 year ago

Another question: When a time is sent to send_event (not used for the event bus), should we confirm that the time has a timezone, and should we convert it to utc if it is supplied with a different timezone?

ormsbee commented 1 year ago

Wouldn't we want to override all of these on the event bus consumer side? The only detail we wouldn't need to override might be event_type, but I'm wondering if it would be more clear to not make an exception for this, and just use what is passed in the header like we will do for all the other fields?

I'm not clear on what you mean by "override" here. I don't think the client should change any of these values, if that's what you're getting at.

ormsbee commented 1 year ago

As much as possible, I think that the signal that is received after the event has gone through the message bus should be the same as the signal that would have been received by a signal handler in the originating process.

ormsbee commented 1 year ago

Maybe we're saying the same thing and I'm fumbling on terminology?

There are going to be two different types of sends invoked for this signal type:

  1. Send on the originating process. This is where all the metadata gets created initially (e.g. new UUID). This send invokes both the Django Signals in-process send as well as sending to the message bus.
  2. Send on receiving service. This copies metadata from the event as it was received and invokes Django Signals in-process send. It does not send anything to the message bus.

These are two different methods, and (2) is only invoked internally by the openedx-events receiving code (i.e. it's framework-level code and developers working with events don't use it). Does that match your vision of this?

ormsbee commented 1 year ago

Another question: When a time is sent to send_event (not used for the event bus), should we confirm that the time has a timezone, and should we convert it to utc if it is supplied with a different timezone?

Can we be strict and throw a ValueError if we don't have an explicit TZ of UTC, instead of trying to convert it?

robrap commented 1 year ago

Can we be strict and throw a ValueError if we don't have an explicit TZ of UTC, instead of trying to convert it?

Yes. I can do that.

I'm not clear on what you mean by "override" here. I don't think the client should change any of these values, if that's what you're getting at.

I’ll start by saying I agree with all your follow-up comments. What I meant is that the new call, something like send_event_in_bus_consumer, will be taking arguments to represent each of these metadata fields. They will come from the message header (from the producer), and will be used to “override” what would have been the default value for each metadata field.

In the case of the event type, the default value should be the same value. For this case, we could have an argument or not. If we have the argument, we’d be what I was calling “override”, by using the argument instead. Separately, we could compare the two and fail or log an error explaining that the unexpected happened. I was proposing that we have an argument for ALL fields for consistency, pulling from the header and setting the metadata fields.

ormsbee commented 1 year ago

I’ll start by saying I agree with all your follow-up comments. What I meant is that the new call, something like send_event_in_bus_consumer, will be taking arguments to represent each of these metadata fields. They will come from the message header (from the producer), and will be used to “override” what would have been the default value for each metadata field.

Okay, great–it sounds like we're in agreement. Maybe instead of using "override" terminology, we could phrase it like "generating metadata" vs. "copying metadata from the event"?

In the case of the event type, the default value should be the same value. For this case, we could have an argument or not. If we have the argument, we’d be what I was calling “override”, by using the argument instead. Separately, we could compare the two and fail or log an error explaining that the unexpected happened. I was proposing that we have an argument for ALL fields for consistency, pulling from the header and setting the metadata fields.

But don't we need to use the event type to even find the right Signal to send? At which point there would be no point in setting that explicitly in the send (since it's in the Signal definition init)?

robrap commented 1 year ago

Okay, great–it sounds like we're in agreement. Maybe instead of using "override" terminology, we could phrase it like "generating metadata" vs. "copying metadata from the event"?

Yes. That language is more clear.

But don't we need to use the event type to even find the right Signal to send? At which point there would be no point in setting that explicitly in the send (since it's in the Signal definition init)?

Correct. That only point is some kind of consistency, but that is making less sense to me now. So, I'm all set. All fields will need arguments except event type.

robrap commented 1 year ago

I added a question in https://github.com/openedx/openedx-events/pull/7/files#r1080661874 about sourcelib as tuple rather than a string.

robrap commented 1 year ago

[question] When we produce Kafka messages, we can send Kafka the same timestamp in our produce call. This would use a producer timestamp in Kafka, rather than the broker current time for messages. This would affect Kafka's understanding of the time of each message, for example, if we ask Kafka to replay based on timestamp. Note that generally these times are similar enough that it won't matter. However, if some messages fail to produce at all, and we are getting them on to the topic for the first time, which timestamp would we want to replay from? I think the earlier one, but I'm not sure? I don't love the data-loss though. If we leave the default, we would have both times on the consumer.

Note, no matter what we do here, the current plan for the signal is to pull the CloudEvent time header which will always be the producer time, and would never be the broker time.

robrap commented 1 year ago

[question] When we produce Kafka messages, we can send Kafka the same timestamp in our produce call. This would use a producer timestamp in Kafka, rather than the broker current time for messages.

We decided to punt on making any changes at this time until and if we learn that this is needed for some reason.

There is a lot I don't understand about Kafka and timestamps, like: https://cwiki.apache.org/confluence/display/KAFKA/KIP-138%3A+Change+punctuate+semantics. Is this just about KafkaStreams, another layer on top of Kafka? Or other?

Also, although Kafka guarantees order by partition, I don't think Reddis Streams has such a thing. So, we might just need to code consumers as if order were not guaranteed. There's still lots to learn.

FYI: @bmtcril

rgraber commented 1 year ago

Updated tasks to include sending the original sourcelib over as a header, although on further reflection I'm not convinced that one in particular should be overridden. It feels important to know the library version of the app that's actually calling sendevent* on the consumer side. @robrap what are your thoughts?

robrap commented 1 year ago

Hello @rgraber. In an earlier discussion, Dave asked us to use "generating metadata" vs. "copying metadata from the event", with the latter used in place of "overriding", so I'll try that.

I think we should copy all metadata, including sourcelib, because that may equally be important. Additionally, I think "sourcelib" is meant to be the lib at the source. If we also think we want the consumer side lib version in the logs, we can add a way to request that information, which is locally available. We might want to know the hash of the service, which could then tell us the version of all libraries. Or, we might want to just want a quick way to see if sourcelib and consumerlib are out of sync, because it might explain errors. How would you feel about us ticketing exploring this as a Future Milestone ticket?

rgraber commented 1 year ago

On further reflection I don't think ticketing it is necessary. It's true we can pretty easily get the source lib version of the consuming service if we are worried about a disconnect. I don't think it's worth any extra tooling.