hashicorp / nomad

Nomad is an easy-to-use, flexible, and performant workload orchestrator that can deploy a mix of microservice, batch, containerized, and non-containerized applications. Nomad is easy to operate and scale and has native Consul and Vault integrations.
https://www.nomadproject.io/
Other
14.81k stars 1.94k forks source link

Need a way of starting event stream at the end, or one past last received event #11296

Open tomqwpl opened 2 years ago

tomqwpl commented 2 years ago

When making an event subscription, you can optionally pass an index of the start point in the event stream. The way this seems to work though doesn't seem as useful as it could be.

The behaviour seems to be that if there isn't an event with the requested index, you get the last event. Well, kind of. You'll get that event if it satisfies your subscription filter, otherwise you get nothing until new events are generated that do satisfy your filter.

The trouble with this behaviour seems two fold. There's no apparent way of just starting an event subscription and only receiving new events. The best you can do seems to be to pass in MaxInt as your starting index. This gets rounded down to the index of the last event that exists, and then you either get that event immediately if it matches your filter, or you get nothing until new events arrive that do. The other problem seems to be that if your connection breaks and you remake the subscription, and you try and track the index of the last event you receive so as to pick up where you left off, you'll almost always get the event twice. So if you try and set the subscription index to "lastEventIndex+1" it says "no event of that index, closest is lastEventIndex, send that one".

Perhaps we're doing this wrong, perhaps I'm reading the code wrong, but this is my experience of what happens, and my reading of what the subscription code does.

cizezsy commented 2 years ago

Same problem here! It seems there is no way to avoid receiving duplicate events. Can anybody look at this issue?

lgfa29 commented 2 years ago

Hi @tomqwpl and @cizezsy 👋

A few questions to clarify some things.

You'll get that event if it satisfies your subscription filter, otherwise you get nothing until new events are generated that do satisfy your filter.

Not sure if I followed this one. If have a filter you will only receive events that satisfy the filter criteria, regardless of the index. Could you explain a bit more what's your expected outcome?

There's no apparent way of just starting an event subscription and only receiving new events.

I guess this ties back to your earlier comment about receiving the last event in case the index is larger than the last one. Would you expect to not receive any events until the request index is reached? This may result in missed messages if you, for some reason, set the index too far out (e.g. index + 2)

Same problem here! It seems there is no way to avoid receiving duplicate events. Can anybody look at this issue?

In general, when working with event streams, duplicate events are impossible to avoid, and Nomad is no exception. For example, here is a quote from the AWS Kinesis docs:

Your application must anticipate and appropriately handle processing individual records multiple times.

From Google's Dataflow blog post:

One very common challenge that developers often face when designing such pipelines is how to handle duplicate data.

So duplicate data is, unfortunately, a reality of event-based architectures, and your application will need to be able to cope with it. Here is a quick reference one way this can be done: https://chrisrichardson.net/post/microservices/patterns/2020/10/16/idempotent-consumer.html

Ingesting the Nomad event stream into a proper event broker, like Apache Kafka, is also recommended. You can then have exactly-once delivery with some careful configuration: https://cwiki.apache.org/confluence/display/KAFKA/FAQ#FAQ-HowdoIgetexactly-oncemessagingfromKafka?

I hope this helps clear some things about working event streams 🙂

tomqwpl commented 2 years ago

@lgfa29 What I'm trying to do is open an event stream and listen to the events. Seems a simple enough thing to want. Specifically, the behavior I'd like is that:

My point about whether I get a duplicate or not is based on the implementation. The implementation takes your start point index and finds the first event in the stream with that index, or the closest it can to that index. That's your starting point in the stream. However, whether you get that event depends on whether that event satisfies your subscription filter. So lets say I open a stream and specify a starting index of 5, and I only want events on the "job" topic. If event with index 5 is a "job" event I get it. If it's not I don't. Further. Lets say I attempt to track the index of the last event I receive so that I can "carry on where I left off" if the connection breaks. So lets say the last event received was number 5, and that's the last event published. No new events get generated between the connection closing and me reopening it. On reopening I specify I want to start at index 6, since the last one I got was 5 and I don't want that one again. No new events have been generated, so what happens is that I get number 5 again. Now lets say that a new event was generated, but it's not on a topic I've subscribed to. This time when I reopen the stream and specify I want to start an index 6, I don't get 5 again.

It feels like with only a few tweaks this behavior could be improved to be more intuitive. Broadly I'm expecting:

Both of these behaviors ought to be easily possible and actually amount to much the same thing, the ability to set the start point in the event stream to one off the end, rather than the starting point having to be on an actual event.