dotnet / orleans

Cloud Native application framework for .NET
https://docs.microsoft.com/dotnet/orleans
MIT License
9.98k stars 2.02k forks source link

Make persistent streams rewindable #756

Open yoniabrahamy opened 8 years ago

yoniabrahamy commented 8 years ago

Currently the PersistentStreamProvider has a preparation for rewinding streams, but it is not implemented in any current StreamProvider. We want to achieve a PersistentStreamProvider with full rewinding capabilities that would be able any subscriber to rewind to any event that was saved on the persistent stream (even if it is not currently in the cache). We offer to make some changes in the current PersistentStreamProvider (Or extend it to a new PersistentStreamProvider, maybe RewindableStreamProvider) model to fit for rewinding. Here is the general description of our plan:

Our plan is to make the PersistentPullingAgent hold two types of caches:

Currently we think that both caches can just be based on SimpleQueueCache.

The PersistentPullingAgent will hold all theses caches. The Rewinding Caches will be sorted from new to old (Caches with newer messages will be before caches with older messages). Then the Agent will have to fill all the caches accordingly and distribute the data between the subscribers.

In order to prevent having too many Rewinding Caches, another thing the PersistentPullingAgent will do is to try to move the cursors in older Rewinding Caches to newer ones. Before pulling batches to the caches the Agent will check if there are any overlapping caches in the Rewinding Caches and will transfer the subscribers' cursors to the newer caches if possible (if the cursors are currently in the overlapping part). Then Rewinding Caches without any cursors will be removed. This process will continue until all subscribers have "Caught up" with the Main Cache.

Here are the changes we think needs to be made in order to implement our idea:

So this is basically our idea, would love to get some feedbacks about it to know if we are in the right direction, and also to get a discussion going on :smiley:

gabikliot commented 8 years ago

I think in general it is a good idea. We entrained with an idea of having hot and cold sort of caches in the past. So I think its worth at least trying. About the concrete suggestion for code changes: 1) IQueueAdapterReceiver - sure, we can add the extra arg to the method. Not a problem.

2) Agent changes - would not it be better instead of an agent being aware of the caches hierarchy, just write a completely new cache, which will have all the multiple caches inside and will hide the complexity from the agent. That way the agent does not have to change at all.

3) Multi-cache may be based on the simple cache inside it, but does not have to be. For example, in our internal Event Hub provider we have a different cache. So we can start with having a different cache, which does not use the SimpleQueueCache inside at all. Or you can start by copying the SimpleQueueCache initially and change it as you wish and later on we can look at similarities and reconcile back.

4) Changes to SimpleQueueCache - we went a lot back and forth with @jason-bragg about whether there should be "one cache to rule them all" or different caches for different scenarios. The juries are still out but at this point we think the cache should be optimized for a specific scenario (Event Hub scenario is quite different from AQ and probably very different from the hierarchical cache scenario for Kafka). For each scenario the cache is optimized for a specific access pattern and data layout. So at least for now our preference is to keep the caches separate, until we have a better understating of the different scenarios and at this point we can reconcile. As I said above, nothing prevents you from forking the SimpleQueueCache into SimpleMultiQueueCache. There is also a middle ground - for example, we can easily expose publicly any information we already have in the SimpleQueueCache (First and Last tokens). But any additional tracking ("keeping reference to the cursors that are referencing to the cache") will be maintained outside, in the wrapping cache, so it does not present any extra overhead or complexity to the SimpleQueueCache.

jason-bragg commented 8 years ago

Very good questions. It’s clear you’ve put some thought into this, and I’m happy to see others exploring these problems. What you’ve proposed is very close to ideas we’ve explored, but not settled on.

As Gabriel has touched on, the caching behavior is provided by the adapter due to the suspicion that different cache patterns would be needed for different queues and different requirements.

In application specific recoverable adapters under development, we’ve elected not to take the approach you’ve described, but please don’t take that as discouragement, it really depends on your requirements.

The goal of our internal recoverable streams was to support a resilient real time stream processing system. That means we needed to recover, in real time, from usual intermittent failures, but not significant outages. With this in mind, the recoverable pattern we’re developing maintains a much larger cache, wherein most common failures can be recovered from the cache. In cases of prolonged outages or unexpected errors, an ‘offline’ processor can pick up processing any streams that were not successfully processed in real time. This approach focuses the recoverable behaviors on streams that can be processed in a timely manner, while offloading persistent failures to a processing system that prioritizes reliability and eventual consistency over speed.

If this pattern is of interest, you may want to look at IStreamFailureHandler and how it’s used, as it can be used to record failures that the offline processing can use to trigger recovery.

Some questions - Rewinding Caches

“Here are the changes we think needs to be made in order to implement our idea: PersistentPullingAgent” - At a high level, the pulling agent ‘should’ be able to stay mostly the same, because it simply keeps a cursor into the cache for each consumer and delivers events from that cursor. The hot and cold caches can (I submit ‘should’) be internal implementation details of your cache. Why make it the responsibility of the agent to deal with multiple caches, when that’s not likely the common case? This seemed to be Gabriel’s intuition as well. I suggest some sort of ‘fragmented’ cache that keeps multiple caches that represent ‘loaded’ data of a bigger data set. For this to really work in a practical manner, the receivers (reading from the same queue, but multiple locations) would need to be able to update the cache more intelligently. For this to fit well in the adapter pattern, we’d need to replace the adapter receiver’s GetQueueMessagesAsync call with some sort of PollAsync() that returned the set of streams events were received from during the receiver's poll request, but not the actual events. With this information the agent could publish the streams or activate inactive subscribers, but would not be responsible for adding events to the cache. There receiver would need to do that itself.

I'll think on this more. I wasn't expecting a need for a resolution of these issues so soon. Glad to be mistaken :)

yoniabrahamy commented 8 years ago

Thank you @gabikliot and @jason-bragg for your thoughtful and interesting responses. I want to describe our use case a bit for you to see why we need such rewinding capabilities. We want to use Orleans Streams with Kafka in order to store and deliver notifications (they are our events in the stream) to our different clients (they register to our service with a certain callback url). Therefore, we cannot trust our client to be that resilient (who knows whats happening on the server that the url directs to), so we need to save up to 24 hours of event history, and we need to make sure the consumption of the events is performed in order.

I think that managing the cache on a dedicated class is a good idea, and keeping the PersistentPullingAgent as untouched as possible. What we can do like Jason suggested is to make the IQueueAdapterReceiver let the Agent know which streams have events coming in for them and awake the subscribers' cursors, while the Receiver itself will fill up the cache with wanted data.

Also, in order to make sure we do not create too many caches, we thought of managing a limited number of Rewinding Caches. In case a subscriber wants to subscribe with a token that forces creating a new cache that will exceed the limit, then it will wait until one of the caches will be deleted and then a cache will be created for him. But then we also need to check if one of the Rewinding Caches has loaded the token that this waiting subscriber wanted to start consuming from. If that's the case, he will get a cursor in that cache.

Anyway there is some more thinking that needs to be done on our side. We will update when we get closer to implementation to get another feedback. Of course if you have any more ideas or thoughts we would love to hear from you :smiley:

amamh commented 8 years ago

@yoniabrahamy

Hi, I've done something similar to what you said. I only implemented IQueueCache. You can have a look here:

https://github.com/amamh/orleans-PipeStreamProvider

yoniabrahamy commented 8 years ago

@amamh

Thanks for the input, buy your implementation only enables rewinding (or replaying) events that have already been loaded to the cache, you store them in your "cold cache" instead of purging them.

What I want to achieve is full rewinding abilities. Let's say I have a large number of events being produced in my system, and that I want to replay an event from 24 hours ago. Because of memory limitations on the machine I cannot hold the entire event history from the last 24 hours in my cache, so I want to be able to rewind to that point in time and get the events from the physical queue again into a temporary cache. That cache will get (and purge after consuming) the old messages until it will "catch up" with the regular cache.

amamh commented 8 years ago

@yoniabrahamy

I'm using a normal .NET linked list for the cold cache, why don't you simply replace that with something that persists on the disk?

randa1 commented 8 years ago

@amamh Saving it to the disk doesn't really make sense, that's what the actual physical queue is for :)

amamh commented 8 years ago

@randa1

Well, but I can't rewind the physical queue, because I get access to the requested token when I'm already inside the cache and have no access to the physical queue any more.

I'm talking about the case when using a PersistentStreamProvider with a custom adapter.

See my question here: https://github.com/dotnet/orleans/issues/990#issuecomment-155716011

So, yeah it does seem like a hack to me to do this inside the "cache", but that's the only option I see at the moment, unless I'm missing something.

jason-bragg commented 8 years ago

"I can't rewind the physical queue, because I get access to the requested token when I'm already inside the cache and have no access to the physical queue any more"

Both cache and the physical queue's receiver are created by the adapter factory. Their implementations can be tightly coupled. For instance, if a cache query generates a cache miss, it can create another queue receiver that reads from the requested location for that consumer. Once that consumer is caught up, cursor control can be returned to the hot cache. This is not a recommendation, just an elaboration on what is possible.

@yoniabrahamy and @amamh It looks like you both need the same (or similar) replay capability, which is beyond the design goals of the existing PersistentStreamProvider implementation. The queue adapter does provide enough of an abstraction to enable this capability, but queue readers and cache implementations will need to be more tightly coupled, with receivers interacting directly with the cache. Is this a hack? Hard to say. It's the adapter implementers decision, and the only option, I'm aware of, if one wants to use the PersistentStreamProvider as is.

Some options:

amamh commented 8 years ago

@jason-bragg

For instance, if a cache query generates a cache miss, it can create another queue receiver that reads from the requested location for that consumer

How do you go about doing that? I don't get it. If I raise cache miss exception in the cache, how do I catch that in the adapter?


EDIT: I think I get what you mean. I create the cache in Factory and the receiver in queue Adapter (which comes from Factory). I guess there might be a way to link them together.. doesn't seem so straightforward though