dotnet / orleans

Cloud Native application framework for .NET
https://docs.microsoft.com/dotnet/orleans
MIT License
10.03k stars 2.02k forks source link

Need to replay the historic messages #990

Closed angshuman-agarwal closed 8 years ago

angshuman-agarwal commented 8 years ago

Hi, Documentation on PullingProtocol says - once the consumer has subscribed to the stream it will see all events that were generated after it has subscribed (in addition, using StreamSequenceToken allows to subscribe in the past).

Can someone point out on how to use this in the persistent provider with own queue adapter ? I would like to replay the historic data (snapshot data) first for every new client subscribing to an existing stream and then send the ticking data (latest data). Something on these lines - http://goo.gl/KPdZvy. I do not think I need to go via path of writing a completely new StreamProvider (last option in this link).

Thanks a lot for any suggestions.

regards Angshuman

jason-bragg commented 8 years ago

As described in the referenced documentation (Queue Cache), the agent delivers events from the queue cache (IQueueCache). This means that the ability of the existing persistent stream provider to replay historical data is limited to the cached data.

For instance, assume your adapter's cache caches all events less than an hour old. If this is the case, any consumer can replay data less than an hour old. If a consumer requests data older than an hour the consumer should receive a DataNotAvailable exception on the observers OnErrorAsync call.

This pattern was mainly developed for the purposes of stream processing resiliency. It allows consumers to replay recent events while recovering from transient stream processing errors. Is this type of replay sufficient for you're purposes? If so, you will need to develop an implementation of IQueueCache that can hold messages long enough to service your application's replay needs.

gabikliot commented 8 years ago

https://github.com/dotnet/orleans/issues/756

amamh commented 8 years ago

@jason-bragg

If my purpose is to have a stream that will purposely store the last N points of data and be able to replay that to a client requesting all data starting from EventSequenceToken(X), does it make sense to implement my own IQueueCache as you said?

Inspecting SimpleQueueCache code, it seems like that's not what the cache is made for, it seems like cache's purpose is to empty the underlying queue to relieve it and immediately purge that data once it has been "received"!? I'm not sure why the PersistenStreamPullingAgent is calling TryPurgeFromCache all the time. This last part is not very clear to me.

Another question that's probably related is, what is "IsRewindable" supposed to do in IQueueAdapter?

Thanks, I can help document this stuff, if possible, once I understand them properly.

amamh commented 8 years ago

@gabikliot

I see. It seems like the answer to at least my last question is in there. So, "IsRewindable" is indeed intended to indicate/control whether the stream can replay past events or not.

As I understand, the current PersistentStreamProvider, which is the main template to extend, does not support rewinding.

Do you think that's going to be implemented in the near future?

jason-bragg commented 8 years ago

"Inspecting SimpleQueueCache code, it seems like that's not what the cache is made for"

That cache was built mainly with azure queue in mind, but it is general purpose enough to work for most any non-rewindable streams. That's why I didn't bring it up and instead suggested that you'd need to write your own cache.

"Do you think that's going to be implemented in the near future?"

Unfortunately, we do not yet have an implementation of a rewindable stream provider in Orleans. The rewind capabilities have been jointly developed with a (large) customer who is using the capabilities in production code, so the code path has had some hardening, but all customers using this capability currently must implement their own cache. We expect one or more of the rewindable caches currently being developed by customers to be contributed back to the community. We also have plans to have an implementation of a rewindable stream provider as part of Orleans. Unfortunately, I can't yet provide dates for when either of these will be available.

amamh commented 8 years ago

@jason-bragg

I've implemented my own IQueueCache which enables replays. I'll post it later today (London time).

jason-bragg commented 8 years ago

@amamh :+1: Shiny! Can't wait to see it!

amamh commented 8 years ago

@jason-bragg

https://github.com/amamh/orleans-PipeStreamProvider

I've written a summary of what I've changed in the readme file so that you wouldn't need to go through the whole code to know what I've done.

Would be nice if you could give some feedback !

jason-bragg commented 8 years ago

"I've written a summary of what I've changed in the readme file so that you wouldn't need to go through the whole code to know what I've done."

Awesome, that will help a lot :)

Want to spend more time on this, but initial feedback:

amamh commented 8 years ago

@jason-bragg

I think I shouldn't have called it "cache" because that makes it sound like it's supposed to be temporary.

Also, I'm creating another stream that will work like a dictionary. It will store id=>value and update values when the id already exists. When a client connects it will replay the dictionary then start streaming new messages like a normal stream.

angshuman-agarwal commented 8 years ago

@amamh You can later think of using ESE to persist dictionary on disk. Managed implementation is HERE

jason-bragg commented 8 years ago

@amamh

"Cold cache is not supposed to be purged."

Then I think there may be a bug in your cache. The cold cache has a max size, and when it becomes larger than the max size, the oldest messages are deleted. I consider this a 'purge'. Am I mistaken?

"You can later think of using ESE to persist dictionary on disk."

Storing the 'cold' cache to disk may be practical for your needs, but most users of the PersistentStreamProvider use it to read from some existing backend persistent queue, so persisting the 'cold' data (again), in most cases, won't really make sense.

The PersistentStreamProvider, as is, is sufficient for real-time (or near real-time) event processing with recovery, but it has no mechanism for rereading from the persistent queue if data is requested that is no longer in the cache. The cold cache you've introduced can extend the size of the cache, improving the recoverability, working within the fundamental limitations in the existing architecture. As a general solution, it's a good solution, assuming the purge behaviors are worked out.

amamh commented 8 years ago

@jason-bragg

Then I think there may be a bug in your cache. The cold cache has a max size, and when it becomes larger than the max size, the oldest messages are deleted. I consider this a 'purge'. Am I mistaken?

You are right.

Storing the 'cold' cache to disk may be practical for your needs, but most users of the PersistentStreamProvider use it to read from some existing backend persistent queue, so persisting the 'cold' data (again), in most cases, won't really make sense.

In our use case, we are dealing with a lot of timeseries data and normally we want to store everything since the start of the day. I think in our case it makes sense to persist the queue on disk and reread it if the stream is shutdown for some reason and comes back up again. Any comments on this?

The PersistentStreamProvider, as is, is sufficient for real-time (or near real-time) event processing with recovery, but it has no mechanism for rereading from the persistent queue if data is requested that is no longer in the cache.

I can do that in the IQueueCache e.g. save old data to disk with an ID that is a combination of the stream GUID and cache number.

amamh commented 8 years ago

@jason-bragg

Btw, I feel like I'm doing the wrong thing by implementing my own IQueueCache. I think the cache should remain just that, a cache.

My use case is a typical trading app where 'tick' data since the morning should be replayable for new clients. Am I doing the right thing by implementing a new cache inside a PersistentStreamProvider?

Another idea is to avoid implementing this in the stream at all and instead save the data in some persistent backend on the producer side and create a new stream for every client on which we replay old data when it's first created and the client is connected.

The problem here is that I don't know if that's good or bad performance-wise. So, is having hundreds of streams sending the exact same data (after replay, they are all streaming the same real-time data) too much work or is this how streams are supposed to be used (same way as how we can have thousands of actors)?

angshuman-agarwal commented 8 years ago

@amamh - Then the replay stream and real time stream will be different i.e. repetitive for every client. What you are suggesting is like having a method called Recover and Subscribe separately and each client has it's own unique stream. Your data is already stored in a tick database (backend). No need to persist separately once again somewhere. Rather have a physical queue cache to back it up (what you are doing currently).

This custom cache acts like an Accumulator i.e. will fold each update into the original snapshot maintaining an internal state of the world. We just want to track the state of the world and notify the observer when it subscribes, with a single notification. So that each client can then safely share the same underlying stream of updates.

jason-bragg commented 8 years ago

@amamh

"I feel like I'm doing the wrong thing by implementing my own IQueueCache"

As stated earlier, one of the current PersistentStreamProvider cache design goals was:

"This pattern was mainly developed for the purposes of stream processing resiliency. It allows consumers to replay recent events while recovering from transient stream processing errors"

If I understand your scenario correctly, this does not directly suit your needs. If your streaming requirements are such that one can keep an entire days worth of stream data in memory, than you can use a cache much like the one you've developed along with a persistent backend queue (EventHub, Kafka, ..) to address your needs. If not, you may need to develope your own stream provider (based off of PersistentStreamProvider) or take some other non-streaming approach.

Some questions: How do you want to organize your streams? By ticker? How many streams do you expect? How much data do you expect to flow through each stream (size/time). How far back do you need customers to be able to replay data? 24hours?

For instance: (I'm using powers of 2 to keep the math simple) if you have 1024 (2^10) streams, each pushing 1MB (2^20) of data per hour, and you want to be able to rollback 16 hours (2^4), that is 16GB (2^34) data that you need to cache. If you store the streams into an EventHub with 16 partitions, this would be ~1GB replay cache per partition. Load balanced across 4 silos, each silo would need to cache ~4BG. Load balanced across 8 silos would be ~2GB per silo.

From the numbers above, you can see that this 'could' work depending on your streaming numbers and how much hardware you're willing to throw at it. Being possible is not the same as being the right approach though. You'll need to look at your numbers and see what makes sense.

amamh commented 8 years ago

@jason-bragg

along with a persistent backend queue (EventHub, Kafka, ..)

Sorry, I think I'm missing something here, what is the point of doing that if the stream's "Rewindable" feature doesn't work? I can use a persistent backend queue like Kafka but then, I won't be able to replay that anyway, which is why I'm working on a custom IQueueCache which seems like a hack to me. I think it should be done properly using the underlying physical queue but I don't see any way of doing that without changing the source of Orleans itself and I would rather avoid that.

amamh commented 8 years ago

@angshuman-agarwal

What you are suggesting is like having a method called Recover and Subscribe separately

It would be the same stream, not two different ones, but it will be created per client and the source of data will be responsible for replaying all past events on this new client-specific stream.

Your data is already stored in a tick database (backend). No need to persist separately once again somewhere

That's what I was suggesting, keeping the data in the tick database (so, on the producer app/grain/machine side) instead of the stream and then replay it for every new client.

each client can then safely share the same underlying stream of updates.

Each client will get the same updates, but they will be on different streams (streaming the same data).

This custom cache acts like an Accumulator i.e. will fold each update into the original snapshot Ditto on these lines - http://goo.gl/KPdZvy

This is talking about merging updates into the existing internal state using an input accumulator function like the example given:

    public static StockTick Merge(StockTick a, StockTick b)
    {
        return new StockTick
        {
            Bid = b.Bid ?? a.Bid,
            Ask = b.Ask ?? a.Ask,
            Last = b.Last ?? a.Last,
            Volume = b.Volume ?? a.Volume,
        };
    }
    // underlying source
    var ibm = new Subject<StockTick>();

    // multicast via MergeSubject
    var published = ibm.Publish(Merge);

I don't want to do that at the moment, that will help with the next custom stream that I'm working on which will be the same as this MergeSubject.


Overall, I think it's not a good idea to have a unique stream per client. Even though in my use case I'll have maximum 1000 streams or so (e.g. 10 traders/clients using 100 streams each), I don't think it's a good idea to have duplicate streams. I would rather stick with the original idea, but I wanted to get some feedback. Thanks for the input everyone :)

jason-bragg commented 8 years ago

What is the point of doing that (use persistent backend queue) if the stream's "Rewindable" feature doesn't work?

@amamh Using a backend queue has several advantages.

In general, the PersistentStreamProvider was designed to read from a persistent queue. When reviewing https://github.com/amamh/orleans-PipeStreamProvider, I focused mainly on the cache because I thought that was the component you were presenting. I thought the in-memory queue was a placeholder for a real persistent queue. The in-memory queue will fail in any multi-silo deployment.

amamh commented 8 years ago

@jason-bragg

Ah, I see now.

The in-memory queue will fail in any multi-silo deployment.

Thanks for pointing that out, I didn't actually think of that.

In that case, maybe I can contribute this to OrleansContrib as a new stream provider for w/e persistent queue I end up using.

Just to make sure I'm following: So, eventually when you guys add the rewinding ability, I should then rewind using the persistent physical queue instead of the workaround I've done using a custom IQueueCache. Correct? I'm talking specifically for my use case, that is being able to reply the past message for X hours to any new client when the client asks for it.

jason-bragg commented 8 years ago

"maybe I can contribute this to OrleansContrib"

That would be great. OrleansContrib is a great place to start, but, depending on the utility of this to the community, you may well want to contribute it back to Orleans proper.

"you guys add the rewinding ability"

To be clear, we will likely first finish the 'rewind for recovery' story, using a rewindable cache capable of caching large amounts of data. Rewinding back to the persistent queue is a longer term goal, and not on our immediate road map. Having said that, a significant portion of our progress is made by the community, so if you and/or others need this capability sooner and are willing to develop these capabilities, we'll invest time supporting your efforts.

amamh commented 8 years ago

@jason-bragg

we will likely first finish the 'rewind for recovery' story, using a rewindable cache capable of caching large amounts of data. Rewinding back to the persistent queue is a longer term goal

So, let's say I want to develop 'proper' rewinding using the underlying physical queue rather than my big 'cold' cache. Is that currently possible without changing the source of Orleans itself?

When implementing a PersistentStreamProvider, I don't get the sequence token that the client sent as input until I'm already inside the cache, so past the point where I can use the physical underlying queue to rewind. Am I missing something?

amamh commented 8 years ago

@jason-bragg

If you don't mind, there is another issue I'm having. I didn't want to open a new 'issue' for it, it's probably something small that I'm missing.

In my implementation: https://github.com/amamh/orleans-PipeStreamProvider

I cannot get it to work with a non-grain observer. You can simply clone the repo and see it yourself. The logs don't show any issues.

It works fine when using the SimpleMessageStreamProvider instead.

jason-bragg commented 8 years ago

The fact that this works for SMS but not using your stream provider seems to hint towards an issue in the adapter implementation. I suggest adding logging to the inputs and outputs of the adapter and ensuring the data is flowing as expected. Specifically:

IQueueAdapter.QueueMessageBatchAsync IQueueCacheCursor.GetCurrent

If you've demonstrated that data is flowing correctly, please reference the grain that is acting strange and I'll take a look. I'm not familiar with your application code, so details help.

amamh commented 8 years ago

please reference the grain that is acting strange and I'll take a look

Sorry, maybe I wasn't clear. It's not a grain, it's the simple test class in Client/Program.cs

It works fine when using a grain, but not when using a "normal" object.

jason-bragg commented 8 years ago

Ah, I see.

This case is serializing an IAsyncStream from a grain to a client, then calling subscribe on the grain's stream. The impl of IAsyncStream is serializable, but we've no testing around that scenario.

Does it work if you subscribe to a stream acquired directly from the provider.

amamh commented 8 years ago

@jason-bragg

Thanks for the suggestion, but subscribing directly using the provider didn't work either. I also marked the class as Serializable and tried adding the project containing the class to the server project references (so it knows how to serialize it).

I've updated the code to do that, so if you wish to try it yourself you can simply just clone and run.

jason-bragg commented 8 years ago

Oh, I'm a dork. Client and grain are in different processes, so your in-memory queue won't work. This is the same issue as the multi-silo problem.

However, I do recall asking that you add tracing to adapter inputs and outputs to verify something like this wasn't the case. :)

amamh commented 8 years ago

@jason-bragg

I see, thanks :)

So, just to make sure I understand, is the stream adapter including its receiver and cache created at every end of the stream (every consumer/producer)?

jason-bragg commented 8 years ago

@amamh

"stream adapter including its receiver and cache created at every end of the stream (every consumer/producer)?"

Not exactly. The adapter is created for every instance of the stream provider. Receivers and caches for a queue are created on the silos responsible for reading from those queues.

Since in your adapter you are writing to and reading from an in-memory queue, the client will write to a different in memory queue from the one the receiver on the silo is reading from.

Same thing happens on multiple silos. All grains on a silo will write their events to the in-memory queue on their silo, but other silos have no way of getting those messages. When a queue is assigned to a silo, the receiver and cache will be created for that queue, but the only messages in that queue will be the ones created on that silo. Further, all messages sent to queues assigned to other silos will sit in the in-memory queue unread because the silo has no receiver or cache setup for queues it's not responsible for.

An in-memory queue is only viable for a single silo deployment with no client use of the streams. This can be useful for test purposes, but not much else.

amamh commented 8 years ago

@jason-bragg

Thanks for the clarification, I think I understand now.

I think it would be great if we could have some other way of communication (chat). Because of time difference and lots of small queries, it makes more sense if we could have a quick conversation some time if that's possible. Are you ok with that ? if yes, then do you use any chat service?

amamh commented 8 years ago

@jason-bragg

I opened a new issue https://github.com/dotnet/orleans/issues/1022

jason-bragg commented 8 years ago

@angshuman-agarwal To you're original question about replying historical messages, depending on your scenarios, the introduction of the PooledQueueCache may solve some of your issues. Also the test cases introduced in "Stream Recovery Tests #1289" should help one become familiar with the rewind/recovery capabilities currently supported.

angshuman-agarwal commented 8 years ago

Thanks. Will take a look at it.