dotnet / orleans

Cloud Native application framework for .NET
https://docs.microsoft.com/dotnet/orleans
MIT License
10.1k stars 2.03k forks source link

Data Replication, a.k.a single writer / multi reader pattern #446

Open rore opened 9 years ago

rore commented 9 years ago

Moving the discussion about this subject from issue #433 .

To summarize so far:

There's a rather basic need for multiple readers / single writer pattern.

This is a common pattern when you have a resource that has high read access. You want the updates to go to a single statefull grain, but to scale reads you want those to be done via StatelessWorkers (that also function as a local-silo cache).

These readers needs to subscribe to revoke events from the writer. Doing that via observers is a problem (since you need to persist them etc.).

A possible solution would be to use streams: each StatelessWorker will subscribe on activation and will unsubscribe on deactivation. That way stream messages will only go to the existing activations and will not activate any StatelessWorker that was deactivated.

Or, as @veikkoeeva rephrased it better:

You have a MasterCache that is an origin, a source of events to some well known stream. Let's say it's a cache stream with Id 0. Then you would like to instantiate local, per-silo caches, PerSiloCache grains, that have the exact same content as the master cache grain.

To query these per-silo caches grain you call them with a well-known ID, say 0 (PerSiloCacheFactory.GetGrain(0)). Upon OnActivateAsync this PerSiloCache retrieves data from MasterCache and subscribes to the aforementioned well known cache stream with stream ID 0. Upon DeactivateAsync you would unsubscribe from the well known cache stream.

This way MasterCache could feed changes to PerSiloCache grains and you wouldn't need to explicitly manage the caching and/or guess how many you would need.

And @gabikliot wrote:

I see 2 ways to solve your general problem, both don't involve StatelessWorkers: 1) Do it the way you are doing now, with a single write grain and a pool of read grains and managed the pool explicitly, via ids. Like you decide on 10 ids for 10 grains for that pool. No need for them to register anywhere, just use known ids 1...10. The readers just picks a random id out of the pol. The write will send invalidate to all of them. To get dynamic scaling, you basically rely on Orleans for short deactivation. Configure the readers to have a short 1 min deactivation, and write have a longer deactivation. So in the high read rate case all 10 grains will be in memory. But in the low period they will quickly be deactivated. In general, in your case, since you are managing consistency of your data yourself, with explicit invalidate command, this whole problem is relatively easy.

2) We are looking internally on some kind of similar single write multiple reader case. The difference from your case is that we want to manage the consistency automatically, not based on application telling us when to invalidate. This is still in quite an early stage.

sebastianburckhardt commented 9 years ago

I understand that it is possible to manually control replication by using multiple IDs as suggested above. However, it is not particularly elegant, and I am not sure that it will do the right thing if I am using it to scale load across silos - for example, can I really be sure that the Orleans load balancing will efficiently and quickly distribute the activations across silos?

On the other hand, I am not sure the [StatelessWorker] attribute is a good name for this pattern since its name seems to implicate certain characteristics of the grain (stateless, computation-bound) instead of what the user actually wants to ask for in this case (one activation per silo).

In general, I would expect that the programmer may want to independently choose characteristics:

gabikliot commented 9 years ago

This is a great topic, so let’s see where we are at. In general, providing read replication is a very good capability and we are all in for it. The question is how to do it: how general the pattern is, where to build it (runtime vs. application), etc… We have actually put a great deal of thought on that subject in the past, even had a PhD intern prototyping a number of solutions and have more people interested in that now (#447). So I will share our thoughts:

In general, with any advanced feature, our default preference is to keep it outside the runtime, on the application level, if possible. By application level I mean implementing the pattern/feature with grains, by using public APIs. The reason is that, following the analogy of Operating Systems, we want to keep a small “micro kernel”. The slimmer the Orleans runtime is, easier it is to evolve it, support it, it is more efficient, less clunky, less opportunities for bugs, more robustness … If we keep pushing any imaginable feature into the runtime, at some point it will become “too messy”, too cumbersome. We don’t want to be the next CORBA or WCF. Like in OS it’s better to keep certain functionally as system process outside the kernel, right?

But of courses sometimes it is not possible to implement a certain pattern in the app level and one needs a runtime support. Like the example with event sourcing (#343), or DI (#443). In those cases we need to consider runtime support. Even in those cases ideally we provide the smallest hook and the majority of the implementation is in the application code (like Journal provider for example in event sourcing). So we need to look at the case by case basis and see what is implementable in the app level and why (if any) it requires runtime support.

Now back to replication. I can see how replication can be done completely on the application level. There is more than one way to do. Examples: 1) One way is pointed here: https://github.com/dotnet/orleans/issues/447. It is a one way. I can see how it can be very useful for a lot of scenario, but someone might want different controls. For example, we can adjust the pattern, offer options to configure the pattern, or even offer multiple patterns. All doable and I hope @nehmebilal will be open to that. 2) Another way is what @rore suggested with application explicit invalidation and then a pool of read grains with managing ids, like I suggested above. There are some limitations to this approach: a. Invalidate will reactive a potentially deactivated grain for vain, like @rore pointed out here: https://github.com/dotnet/orleans/issues/433#issuecomment-105621546. However, this may not be as problem: we poke the grain, activate and then don’t use. Depending on the rate of invalidate calls it may be cheap. b. This simple pattern does not address locality problems. Let’s say you want one read grain per silo. Virtual actor abstraction, currently, does not provide that out of the box. But it can be build/imitated in the application level, in the following way.

Building per silo grain in the application level: 1) Write a PerSiloGrain with PreferLocalPlacement and send it one request from with BootstrapProvider, using either random guid as id or some silo specific id (use extend primary key with silo id as a string part of the grain id). PerSiloGrains will register with some higher level single-per-deployment Repository grain (it can also be the write grain). Or subscribe on one global stream. 2) The write grain will use those grain ids to broadcast invalidate request. 3) If this silo dies, the write grain will still be using the grain id of PerSiloGrain from that silo, and Orleans will reactive it on another silo. Which is not what you want in this case. So in the activate async this grain can check if it is on the right silo, and if not call the Repository grain with unsubscribe or unsubscribe from the stream. So basically this way you are managing the locality yourself.

Now this basic block can be wrapped with some nice application level abstraction: ReplicatableWrapper<W,R>, where W is a write grain id and R is read grain id and use hosting grains to wrap around your application logic grains and do all the plumbing in this hosting /base grain and in the wrapper. Can even code gen this ReplicatableWrapper and the base grain, like suggested here: #447. Or have an additional pluggable "topology" layer.

Alternatively, one can do this whole thing inside the runtime. There are still a lot of questions related to that: what consistency exactly, replication policy, a lot of open questions... This is definitely possible, like suggested here: https://github.com/dotnet/orleans/issues/446#issuecomment-105669523.

Ideally, we would like to see people try to do it on the application level first, learn from their experience, extend the pattern, see what consistency make sense to them the most, what hooks are needed, what was limiting them in the application-level replication and only then decide what makes sense to be moved inside the runtime. If it turns out we can build a very re-usable and efficient configurable abstraction for replication outside the runtime, like suggested for example here #447, that would be the preference I think. In a lot of middlewares/runtime systems, they start with a smaller system, people keep using it, building abstractions on top of it, and gradually the best of those application patterns/ abstractions find their way into the runtime. That what we would ideally like to happen here.

P.S. Sorry for a bit lengthy reply.

rore commented 9 years ago

@gabikliot A single PerSiloGrain will not be enough when you have high read load. That's the motivation for using StatelessWorker - so you have an automatic scalable silo-local cache. I still think the only thing missing in order to implement this pattern is to be able to subscribe to a stream from a StatelessWorker. If you could do that implementing the pattern is simple and elegant.

sergeybykov commented 9 years ago

@rore You are saying a single grain activation per silo will not be enough. I'm curious.what kind of scenario you have in mind here that ~1/Nth of a silo's CPU capacity of N cores wouldn't be enough for read load. Can this load be partitioned into K slices with one per silo activation per slice?

rore commented 9 years ago

Lets say for example I have a web service that handles around 20K qps, and each request needs a specific data item. So I have 20K qps on a specific reader. Now lets say I'm running on 24 core machines. A scalable PerSiloGrain will handle that load with no problem, but if I'm limited to a single activation per silo I need either to add machines or handle the activations myself. It just doesn't make sense.

gabikliot commented 9 years ago

You plan to serve 20K qps with a single (replicated) grain? You have 20K qps from a single application item? In Orleans you are supposed to use a large number of small actors, that is the whole point behind the Actor Model.

veikkoeeva commented 9 years ago

@sergeybykov A good point. I think @rore is after simplicity here so that he wouldn't need to think about slicing or mapping of IDs more than, say, having conceptually something like a single ConcurrentDictionary per silo that he can update from a stream.

At high load it starts to matter how the data is laid out in processor cache and perhaps one part of the problem here is that how to ensure data itself isn't replicated needlessly, it gets updated and then how one can allow for maximum, unhindered read throughput. Even so, one problem would be then if that is purposeful as it'd starve other grains and Orleans wouldn't allow it (I assume).

It's a bit difficult to come up with concrete examples... Maybe one is payment transaction processing wherein one needs to quickly check if joint use card has positive balance by having all cards in a single data structure (e.g. a ConcurrentDictionary) or if a card is blacklisted. Even in this case I think modelling cards/accounts would be better as the individual ones shouldn't be hit often (or something criminal is going on). Blacklisting maybe. Proper implementation would probably be something like a bloom filter (perhaps bitmapped to Redis cache) and maybe it'd be nice have the data structure in a single grain with high read throughput while the silo would be processing also other things besides being a cache (so as to use it to other things as pure cache). Well, I don't know. As you can see, this get spiraling like this quickly...

So, maybe one thing accompanying https://github.com/dotnet/orleans/issues/447 would be having a discussion item on different aspects of caching and if one has the need of slow changing read-only data, perhaps some other option would suit better (like Redis). Something that would explain that Orleans works in some sense like a CPU scheduler that guarantees a slice for grains and in cases like this, I would think, the logical sequence would a a lot of StatelessWorker grains that'd starve other grains, but Orleans wouldn't allow it and so the multiple read throughput will be capped in some sense to a certain amount of activations. Strategies like slicing would then work a bit like in the aforementioned card/account analogy as such that they load hot data to memory, assuming the slicing function places "hotly co-located data" to memory.

I think I write too much already and perhaps I already imagine what @rore is after. :)

rore commented 9 years ago

@gabikliot Well, not everything in production systems abide to the rules of the actor model. If you have a configuration item that is needed on every request it means you either need a global in-memory cache (which is very much not recommended with actors), or a "cache" actor to provide this item.

veikkoeeva commented 9 years ago

@rore Ah, I think I captured some of that thinking in my writeup. Does it make sense to you? I think this would make a good use case scenario to cover in documentation. There are tradeoffs for sure on how to "tune" the actor runtime to provide CPU cycles for cache (and how one could split the data so that hot data would be in the physical CPU cache). I think that's the gist here.

rore commented 9 years ago

@veikkoeeva I'm not sure I understand how you think this use case should be handled with Orleans.

gabikliot commented 9 years ago

OK, got it @rore. So you indeed want a most-read cache, with high volume of reads. And the "cache" is just one item, or a small number of them, so you can't shard them to different grains.

So I still want to understand how you plan to use StatelessWorker. Please remember: 1) StatelessWorker grain is always created local to the caller, on the same silo with the caller. So if calling from one statefull grain, all StatelessWorker activation will be created on the same silo with the statefull grain. If calling from the client, client requests will be round robin across gateways, so you will get StatelessWorkers on every silo. 2) There is no reliable way to broadcast a msg to allStatelessWorkers.

Can you please provide more details on how you plan to use StatelessWorker.

sergeybykov commented 9 years ago

@rore It is a single hot data item or a number of such items? For one or a small number of really hot items I would violate the isolation rules of the actor model and use a per-silo ConcurrentDictionary - the global in-memory cache option you mentioned. That should be much cheaper than turning all the hot calls into messages and delivering and executing them.

The rule of thumb we use is a single activation can process up to 1-1.5K of simple requests per second (of course, there are other factors in play here as well). If you expect more than that, then you either partition the grain state, replicate it or choose to violate isolation.

rore commented 9 years ago

@gabikliot I'm not planning to use that from a client, it's an intra-cluster cache.

The cache grains are supposed to be local.

Lets step away from Orleans for a moment to explain the scenario. Lets talk about a non-Orleans web application.

You have a large web application that handles a lot of request. So the application is running on multiple web servers (stateless). There's a small number of configuration items that you need on every request. You keep those items cached in a global cache (memcached or redis), AND for performance you also keep an in-memory local cache on every web server (as you want to avoid the extra network hop to the global cache on each request). When the configuration item is updated (which doesn't happen too much) it is invalidated in the global cache, and an invalidation message is broadcasted to all web server (via some message queue) and the local cache is invalidated on each server.

So now we want to build the same web application with Orleans and we want to model this with Orleans. We don't need a global cache anymore - that's a writer grain. We still need the local cache on each silo. You can try to do it with a global cache with async locks, but there should be a better way to do it within the actor model. What you want are automatically scaling cache grains that will function as local cache and will be activated as needed (if a single one handled all the request, good, if not, more will be instantiated). At most there will be N cache grains as the number of N parallel requests that can be handled by the silo (as you're accessing this item on every request).

The only thing missing is a way to broadcast the invalidation messages to those cache grains, which can be solved if you could subscribe to a stream in a StatelessWorker...

gabikliot commented 9 years ago

The only thing missing is a way to broadcast the invalidation messages to those cache grains, which can be solved if you could subscribe to a stream in a StatelessWorker

This will NOT solve it, as I explained in the past. Even if StatelessWorker can subscribe to a stream, its one grain subscribing to a stream, so only one of the activations will get the item, not all activations. Each activation will get a different event, if multiple events are published to a stream. We do not have a pattern now to broadcast the same request to all activations.

My point from the beginnijng therefore was: you cannot use StatelessWorker for replication. There are other solutions, but I cant see how StatelessWorker can help.

rore commented 9 years ago

@gabikliot there's something I don't understand here. If on every activation of a StatelessWorker I subscribe to the stream, wouldn't I get the events on all of them? If you can have multiple subscriptions on the same activation, why won't it work with different activations of a StatelessWorker ?

gabikliot commented 9 years ago

@rore, do you think it will be more efficient to discuss it over a phone? We would be happy to have a quick chat call about it.

rore commented 9 years ago

@gabikliot Sure.

sergeybykov commented 8 years ago

Where are we at here in general and relative to https://github.com/dotnet/orleans/issues/447 in particular?

cvalusek commented 7 years ago

Not sure whether to post to here, #447 or #433 ... I know these are really old tickets, but I wanted to post some things that I experienced on this when production hardening our application. I want to first confirm how this might seem small but can make or break your application. When we started ramping for a production deployment of our first feature implemented in Orleans, I had to go through and introduce a reader/writer split. The app basically would fall down (flurry of timeouts) under spike load from deadlocks and excessive silo-to-silo communication. To get things out the door, I opted for some time based caching in a separate StatelessWorker. This alleviated both problems immediately. This had the single largest impact on performance for our application. This had an interesting side benefit of exposing our high-performance caches in our API.

A more traditional stateless web application does not run into this type of problem usually. Those usually fall down from db locks from all the queries they issue to pull this type of data. Then, someone will head to the path @rore has mentioned (in-memory cache) to reduce load at some loss of consistency. Someone new to Orleans will likely not expect this to be a problem, nor consider the implications of having unintentionally created an artificial bottleneck in the system. That is what we usually want when we take advantage of Orleans single threaded concurrency model.

Between the scenario @gabikliot mentions versus the one @rore mentions in #446, it's really a choice of whether you need the consistency more than the availability and vice versa. There are a lot of questions for the developer to consider to find the right model. I don't think a single solution is necessarily the right way to go. The options @yevhen mentioned in #447 with interleaving reads and event sourced grain does really implement the consistency level @gabikliot wants really well, but still has a lot of silo-to-silo communication.

Our requirements include some configuration data that is very slow changing and not required to be propagated immediately and is referenced in a large number of calculations. Depending on the context (admin portal vs. user frontend), the consistency requirements change. We also have sporadic spike background processing traffic as jobs are run throughout the day.

Some opinions... I don't feel like it is necessary the job of the framework to solve every one of the scenarios presented in one solution. I do feel though that we do need the tools to elegantly implement whatever model is best for your use case. These tickets push the opposite of what the current documentation (which probably evolved after them) at https://dotnet.github.io/orleans/Documentation/Advanced-Concepts/StatelessWorker.html#state with respect to how StatelessWorker should be used. Because of the client side stream observer support, it didn't register with me that grains subscribe, not activations. It makes sense when you think about it, and should be the default behaviour. I still think having something like IAsyncStream.SubsribeTransientAsync() mentioned in #433 should be done. It really sounds like this all stems from not being able to address Stateless grains, which sounds like something that may be need to eventually be resolved for other features.

I personally have to push forward to the next evolution of what was done to get the application out the door now. The requirements have increased to now support some kind of near real-time invalidation. We are shooting for around the usual event sourcing eventual consistency timing goal of <10 seconds. Right now, I am going to try a 3 grain approach. I have the normal grain with all the read and write methods. I have the reader grain that is a StatelessWorker. It defers all the read methods to my final internal cache grain. The reader will generate the id for the cache grain on activate and call a method to link it to the normal grain. The cache grain will PreferLocalPlacement. It will be subscribed to the relevant message on activate and holds the state it fetches from the normal grain. On messages, I just unset the state. My reader grain can be addressed consistently without me having to know the dynamic guid generated for the cache grain.