dotnet / orleans

Cloud Native application framework for .NET
https://docs.microsoft.com/dotnet/orleans
MIT License
10.11k stars 2.04k forks source link

How to create a reader-writer grain with parallel reads #3841

Closed Arshia001 closed 6 years ago

Arshia001 commented 6 years ago

I'm facing an issue similar to what is discussed (but not resolved) in #1468. Basically, I need a reader-writer grain(s?) to hold some values so other parts of the system can reference them frequently and in parallel.

What I'm after is storing some system-wide config values which are accessed frequently, and subject to change, but only extremely rarely (once a month or so at most). The system should be reconfigurable without downtime. What I'm currently considering is to store the data in some database. Then it will be read at silo startup, and there will be a special callback to read the data again after it changes externally. I don't want to read the data from the database every time I need it because:

  1. It'll create unnecessary processing overhead inside the silo, as some of the data must be processed and filtered out.
  2. It'll increase load on the database, which I cannot guarantee will be as good at handling high load as the silo environment.
  3. The data is verified by the silo environment before it is updated. Reading directly from the database means there will be no middle layer to hold the last known valid data while the operator updates and fixes the new data.

I can easily create a reader-writer locked in-memory data store, but Orleans' single-threaded execution policy doesn't allow parallel access to the grain that holds the data. I can think of the following ways to bypass this:

  1. Have multiple copies of the data inside multiple grains. This is obviously not optimal.
  2. Use static fields to store the data and make the grain a stateless worker. This means every silo has its own copy of the data (which also helps with reducing network load), but there is no means of asking every silo to update its copy of the data (that I know of at least).

Suggestions?

enewnham commented 6 years ago

Why don't you have an actor per entity in the database? I generally have one actor per entity. OnActivateAsync reads from the database (one and only one time)

I've succesfully used Orleans streams to broadcast a Refresh event. I have a helper function that I add to all OnActivateAsync() functions to have all grains subscribe to it. Go to the docs

Client.cs

//Get one of the providers which we defined in config
var streamProvider = GetStreamProvider("SMSProvider");
//Get the reference to a stream. Lets use Guid.Empty as the "broadcast" channel...
var stream = streamProvider.GetStream<object>(Guid.Empty, "refresh");
stream.OnNextAsync(null)

Grain.cs

// NOTE implicit streams would not be necessary for broadcast events
//Get one of the providers which we defined in config
var streamProvider = GetStreamProvider("SMSProvider");
//Get the reference to a stream
var stream = streamProvider.GetStream<object>(Guid.Empty, "refresh");
//Subscribe a a local private function to the stream
await stream.SubscribeAsync<object>(this.RefreshAsync);

NOTE: one gotcha for broadcasting on Guid.Empty, is you shouldn't create an actor with an ID of Guid.Empty

Arshia001 commented 6 years ago

Actor-per-entity is useless for select all, which make up a sizable portion of all queries in the system.

enewnham commented 6 years ago

True, Then I would use a local stateless worker on each silo, which subscribes to the broadcast refresh event.

rikbosch commented 6 years ago

Basically, I would create one grain, authorative for the data, which is statefull.

Then use a statelessworker grain, which will fetch the latest data from the statefull grain on activation, and subscribes to changes.

edit: ... @enewnham beat me to it :)

jason-bragg commented 6 years ago

If I'm understanding the stateless worker over stream suggestion correctly, I don't see that working as expected. Stateless workers can subscribe to streams, but calls to stateless workers are always local to the caller. So if one had a cluster of 10 silos and a stateless worker per silo subscribes to a stream, then a producer generated an event on that stream from silo A, the event would be sent to 10 stateless workers on silo A rather than being distributed across the cluster.

@Arshia001 some quick questions: How often is this data accessed normally, in requests per second. How important is it that this data be updated uniformly and in a timely manner across the cluster? For instance, if the data changed in storage, how long can it take to update the cluster before out of date info harms the service? Similarly, if one silo gets the updated data, how long can it take for other silos to be updated before the out of date information harms the service?

sebastianburckhardt commented 6 years ago

This is a good use-case example for the reactive caching feature.

Arshia001 commented 6 years ago

@enewnham @rikbosch Someone else also suggested this solution over on stack overflow, which I ended up adopting, but instead of a broadcast, the workers are on a timer and query the stateful grain for changes to the data. This works, but is wasteful IMO; not performance-impairing, but wasteful in the sense that you get an average of ~100 calls per second that do exactly nothing. I have a soft spot for elegant solutions.

@jason-bragg I'll look into it tomorrow, but since stateless workers aren't uniquely identifiable in any way, I also think stream subscription won't work in this case. To answer your questions: First, it really depends on the number of clients currently connected to the system, but at worst each client could cause an access about every 10 seconds (this is system-wide config we're talking about), so I'd say about 10k requests per second for 100k clients (which I expect will grow over time). It is also extremely important that access to the data generate no overhead, so keeping a copy on each silo is a must. Second, propagation time across the cluster isn't really a huge concern for me. A few minutes or so is acceptable time for the update to reach the entire cluster. It really doesn't matter that much. But, it's extremely important that no caller gets partly updated data, since any references among the entries may become corrupted. In this sense, it is very important that either:

  1. Each silo gets the update at once, or
  2. Data is accessed atomically.

I would prefer solution one, but I can configure access to the data in a way that interrelated items are fetched simultaneously. However, I don't really think solution 1 is feasible, because even if the update was truly atomic on each silo, it is still possible for a grain to request part of the data before the update and part of it afterwards. The best way I can think of is to return huge parts of the data from the workers (which are guaranteed to be local) as immutable read-only collections, so there's no serialization/deep copy overhead, and then just take what's needed on the caller. The workers would simply replace the collection objects when they update the data rather than modify the existing collections, so there will be no concurrency issues.

@sebastianburckhardt I haven't heard of that...

Arshia001 commented 6 years ago

The most elegant solution I can think of is one grain per silo subscribed to a broadcast event from the master grain, but I don't know of any tricks to have exactly one activation per silo in a uniquely identifiable way. Maybe one could use the local silo address as a unique key for a PreferLocalPlacement grain? Would this work?

shlomiw commented 6 years ago

@Arshia001 This is a use case I'm very interested as well, I wonder what good solutions will appear here. I'll tell you how I ended up in something similar. I have a cache which I need per silo, this cache is rarely modified, and I want my grains to use it very frequently. I've decided that this cache wouldn't be a grain, to reduce the overhead of requesting via the grains mechanism (though if grains are talking to each other locally and you set the cached data as 'immutable' it wont deep copy it and will be more performant, but still, nothing beats simple in-memory objects). So inside my silo I have a kind of DI singleton cached object, which is initialized by bootstrap-provider -> before any grain call is made, that way I can make sure the object is initialized and ready for use by the grains. So this is a simple object you can read any time you want.

How do you modify it? You need to hook a kind of cache event that updates the object in atomic manner (you can load the data and then switch references), or you can have it lazy loaded when needed - up to you. Now for the tricky part - you need this event fired once per silo! this is not currently easy. My solutions:

Some notes about streams you need to consider: (maybe you already aware of :))

Another relevant use case I faced: I have grains which are responsible for players sessions. Sometimes I need to broadcast an update event to all active sessions. Simplest solution would be to subscribe all these grains to a stream. But what if I have 10Ks or 100Ks of these grains? as I understood, it's not a good practice to have so many subscribers to one stream. In addition I want to limit the parallelism of the event to avoid a huge spike during this event. It is ok for the sessions to be gradually updated. So I've ended up using my above mentioned solution with my one ClusterClient per silo subscribed to this stream. The grains themselves, when activated, are being hooked to a custom event-handler, triggered by the ClusterClient. This event-handler is repsonsible to invoke these grains (you should be aware of the thread-context you are in - ThreadPool or Orleans, and invoke the grains accordingly). In addition it's limiting the parallelism handling to avoid the spike.

Sorry for the long comment..

Arshia001 commented 6 years ago

@shlomiw I don't get it, are you using a ClusterClient inside a silo? Isn't that a textbook case of an anti-pattern?

I generally prefer to play "by the rules" if and when possible (I'm still using the official build of Orleans after so many enticing opportunities to modify the source!). That includes not using static objects, not using anything outside the silo environment from within the silo and not making assumptions about any part of the system unless strictly specified in the docs. That said, one place where static objects are worth considering (very very very carefully) is inside stateless workers, of which there will be many and they'll all be local anyway, so they can probably share a reader-writer lock and work on the same set of data if the performance gain is worth it. This might actually be worth looking into for the stateless worker grains that cache the master's data, since it reduces the memory footprint and lowers the number of requests to the master (given some mechanism to elect one worker per silo to ask the master for data, which should be simple, if not trivial).

If I understood your solution correctly, one immediate problem I can see is that the server running the ClusterClient may go down, in which case you'll have no way (native to Orleans) of tracking what you were doing when the crash happened. You'd have to implement custom logic to work around that, but at this point you're just implementing your own cloud solution. If I were to update a few hundred thousand records, I'd create a grain to do the job, storing progress at predefined intervals along the way so I can just pick up where I left off. I'd use Task.WhenAll to control the concurrency level, and perhaps have it report progress to a second grain which a watchdog on a different server could track and reactivate the worker in case it goes down? Just off the top of my head.

Also, I can't leave this link out XD XKCD on parenthetical statements with emoticons

shlomiw commented 6 years ago

@Arshia001 - yep, I'm using a ClusterClient inside a silo. Them main reason is for shared common lib usage for both clients and silos. It has some Orleans stuff in it, such as streams and triggering some grains in specific cases. @ReubenBond also addresses local clients in #3362. You are absolutely right that it is risky, and I'm doing some stuff not by the rules.. I know, but after many iterations that what I've ended up with, and shared it with you. Unfortenately Orleans lacks some easy ways to have one Grain per silo (and it is needed in some cases), broadcasting messages to grains, which currently done via streams and has its own issues. So sometimes, rarely I have to say, I had to find my own, maybe a bit hacky, solutions.

Besides that I'm a real advocate of Orleans and I pitch it to every developer I meet :)

I'm not sure I got your point about when the server go down, in my own cases I (hopefully) got it covered. I use the mentioned method just to update players sessions. When the grains are re-activated again - they are handling the case of unhandled sessions.

(tnx about the link :wink:)

Arshia001 commented 6 years ago

@shlomiw I agree that sometimes you just have to use hacky ways. I'm also loving Orleans, just as much as I love WCF and Entity Framework and numerous other .Net libraries. Thanks for sharing! XD

What I mean is that if the server that's running your ClusterClient crashes (loses power, blows up, catches fire, get eaten by a server-eating monster, etc.) the update process will get interrupted with no way to know how much of it completed successfully.

jason-bragg commented 6 years ago

@Arshia001, thanks for the clarifications.

The best way I can think of is to return huge parts of the data from the workers (which are guaranteed to be local) as immutable read-only collections, so there's no serialization/deep copy overhead, and then just take what's needed on the caller. The workers would simply replace the collection objects when they update the data rather than modify the existing collections, so there will be no concurrency issues.

Agreed. Main problem (I think) is notification of change across the cluster, so each silo's cache knows to refresh. Two possible solutions come to mind, but I've not vetted either, so plz take this with a large grain of salt.

Grain Services For the per silo behaviors, a grain service may be viable. See Tester.GrainServiceTests A grain service can contain a silo specific cache of the data and subscribe to a stream for changes. A reminder can trigger a storage check which triggers the cache updates via a stream event upon state change. I think this would work but I’ve not tried streaming with system targets (Grain services are system targets). A further complication would likely be subscription management, because system targets that failed to unsubscribe from the stream due to ungraceful shutdown would need to have their subscriptions removed via the subscription manager.

Controllable Providers An alternative would be to use a provider (bootstrap?) which is controllable, and use a control command to trigger the cache updates instead of streams. Control messages can be sent to all of the active silos and dispatched to a specific provider configured on each silo. This is essentially the 'broadcast' behavior you need (I think). See UnitTests.StorageTests.MockStorageProvider and UnitTests.StorageTests.PersistenceGrainTests_Local for an example of a controllable provider. It should be noted that control signal of this sort, to my knowledge, have never been used in production systems, only our test scenarios. This is not to say there is anything wrong with them, only that under production load there may be shortcomings we've not yet encountered.

shlomiw commented 6 years ago

@Arshia001 @jason-bragg FYI - I've also wanted to try Grain Services for the same purpose and asked in gitter (last September) - got answer from @sergeybykov: image

Let me know if it does work, Tnx!

Arshia001 commented 6 years ago

@jason-bragg I like your second idea better. I'll get to work investigating if and how it can be done right away. As for the first, subscription management is generally painful enough that I'd rather avoid it where possible. What do you think about using the local silo address as a key to find the cache grain on each silo? It seems possible with a custom placement strategy which places grains on the silo that matches their key no matter where they're activated from. The master would then grab a list of all active silos and push the update to each. There would also be a bootstrap provider to initialize the data in each silo from the master. I think this is also worth investigating if the controllable provider option fails.

Arshia001 commented 6 years ago

@jason-bragg I just read the source for control commands. Aside from the primitive design of IControllable and the fact that there is no caching when looking for providers (shouldn't be a problem, since there isn't likely to be that many providers inside a silo anyway) I don't see why it shouldn't be fit for production use.

Arshia001 commented 6 years ago

So, I finally have a working solution. Here's how:

  1. There is a master grain, responsible for reading data from the database. This grain also receives external UpdateConfig calls when the config changes. The database is updated 'manually', so I don't care if it doesn't pick updates up automatically.

  2. There are stateless worker cache grains on every silo. These grains store the data in static objects (to help with memory footprint) and use a reader-writer lock to manage access to the data. When the first one wakes up on a silo, it updates the data and the rest just use that data. They also support an external UpdateConfig call which will in turn ask the master grain for new data.

  3. There is a bootstrap provider which does two things. First, it wakes the master grain up at init time so data is available before it is needed and thus we can avoid lazy init hiccups. Second, the provider also supports a control command. Upon receiving this command, it will ask the first available cache grain (guaranteed to be local) to update the static objects. This way, data is updated within each silo. The master grain simply sends this control command whenever it updates its data.

I think this is a complete solution to the problem. It is also generic enough to be used in any similar situations. I'd love to hear what everybody thinks about this solution though; and thanks for all the help XD

shlomiw commented 6 years ago

@Arshia001 - looks like a nice solution! Didn't know that bootstrap-provider can get a control command! useful, especially to avoid the streams.. One small note - you might even consider removing the reader-writer lock on your static (immutable?) data, by just creating a new data object and replacing the reference to it (reference assignment is atomic). Thanks for sharing!

Arshia001 commented 6 years ago

@shlomiw Reference assignment is atomic, assignment of multiple references is not - which means the lock can be removed by wrapping all data objects inside one. Nice catch.

Also, I've added a read-only wrapper for the data storage object which can safely be returned from the cache grains. This is what they look like now:

public class ConfigData
{
    public Dictionary<int, ConfigItemType1> ConfigItems1;
    public Dictionary<int, ConfigItemType2> ConfigItems2;
    ...
}

public class ReadOnlyConfigData
{
    public IReadOnlyDictionary<int, ConfigItemType1> ConfigItems1 => Data.ConfigItems1;
    public IReadOnlyDictionary<int, ConfigItemType2> ConfigItems2 => Data.ConfigItems2;
    ...

    ConfigData Data;

    public ReadOnlyConfigData(ConfigData Data) => this.Data = Data;
}
jason-bragg commented 6 years ago

Sounds good. A couple comments.

If this is just a cache of read-only data and the stateless workers do no actual work, I'm unclear about their purpose. They seem to merely introduce overhead.

I'm assuming you're using orleans 1.5.x or higher. If so you've access to dependency injection and the reference to the cached data can be kept in the di container. The reference can then be injected into any grain that needs it, with no overhead of a grain call to the cache grain. The bootstrap, upon getting a change notification can request the new data and update the reference (kept in the container). As long as (like shomiw pointed out) the cached data is a single immutable set of data, this should be simple and safe, with much less overhead than having grains read from stateless workers.

I concede that some of this is not really an actor model approach, but, imho, the goal is to solve the problem and if the actor model helps (as it often does) great, but if it doesn't there is no value in shoehorning it in.

As an additional suggestion, if the version of the data has some sort of unique id (etag?) it may be valuable to send that in the change notification command to the providers, so they can ignore change requests if they already have the latest data. It’s a minor optimization but may be relevant if there are frequent updates or if the cached data set becomes exceptionally large over time.

poor code example below:

Cache can be defined as:

public interface ICache<T>
{
    T Value {get;}
}

Grains can declare in their constructor

public SomeGrain(ICache<CachedStuff> cachedStuff)
{
}

which allows them read access to the immutable cached data.

A cached publisher can be defined as:

public interface ICachePublisher<T>
{
    T Value {set;}
}

And the bootstrap can get such a publisher from the container and set the value on change notifications.

The cache object:

public class Cache<T> : ICache<T>, ICachePublisher<T>
{
    T Value {get;set;}
}

can be registered as a singleton in the DI container.

        services.AddSingleton<Cache<CachedStuff>>();
        services.AddSingleton<ICache<CachedStuff>>(sp => sp.GetService<Cache<CachedStuff>>());
        services.AddSingleton<ICachePublisher<CachedStuff>>(sp => sp.GetService<Cache<CachedStuff>>());

This eliminates the grain calls to stateless workers and the related data copies, while also decoupling the cache source from readers. Main problem with this is that if the data is not technically immutable there would be nothing preventing grains from modifying the shared cache. Since the calls to the stateless workers (assuming data is not marked immutable) will perform a copy, there is slightly more protection when using them.

Arshia001 commented 6 years ago

@jason-bragg In the words of Genie, the mostly forgotten about big blue guy from the Aladdin series, 'I feel sheepish' XD I was thinking about grains and the actor model so much that I never even considered putting the data anywhere except inside a grain. I also didn't know grains could utilize DI in this manner. I'll get to work on replacing the cache grains with a DI container when I get back after the weekend, and I'll post the results back here. The ETag is probably a generally good idea as well, though I suspect it won't do much good if the data is updated once a month. Also, in my case, the data is immutable in every sense of the word. It is exposed as IReadOnlyDictionary instances containing bitwise immutable objects. Access would be a disaster otherwise, since there'd be a lot of deep copying.

veikkoeeva commented 6 years ago

System.Runtime.Caching is also one choice, depending if one is on .NET Full (or .NET Standard). It's integrated to system performance counters too.

Arshia001 commented 6 years ago

@veikkoeeva In a more complex situation, yes. I need all the data on all the servers at all times anyway, so it won't help much in my case.

jason-bragg commented 6 years ago

https://www.youtube.com/watch?v=YIj0qhYzqCU

shlomiw commented 6 years ago

@Arshia001 - you wrote you use the StatelessWorker to update static objects, and somehow I missed that part that these StatelessWorker grains are responsible for the cache, I thought they are used only to update it..

In my solution I'm doing something similar to what @jason-bragg suggested. My cache is injected in DI container and all Grains can use them without the overhead of invoking other Grains - that what I tried to describe in my first comment in this issue thread :)

The only difference is how I update the cache. As I said, since this Cache is used in a shared lib for both silos and clients, then I'm using ClusterClient to subscribe a stream for updating this cache (it's actually not an Orleans stream). In the silo - the bootstrap provider initialize this ClusterClient.

But I like the approach of broadcasting control commands to silos, can be useful in many scenarios. In addition, the DI approach @jason-bragg suggested, looks much better than the old container methods I use.

btw - in some scenarios I also use System.Runtime.Caching as @veikkoeeva mentioned.

Insightful thread :smile:

Arshia001 commented 6 years ago

So, here's the final (greatly simplified, yet very elegant) version, which works just as expected. I don't see any further room for improvement.

    interface IConfigReader
    {
        ReadOnlyConfigData Config { get; }
    }

    interface IConfigWriter
    {
        ConfigData Config { set; }
    }

    class ConfigProvider : IConfigReader, IConfigWriter
    {
        ReadOnlyConfigData ConfigData;

        ReadOnlyConfigData IConfigReader.Config => ConfigData;

        ConfigData IConfigWriter.Config
        {
            set
            {
                if (value != null)
                    ConfigData = new ReadOnlyConfigData(value);
            }
        }
    }

    class Startup
    {
        public IServiceProvider ConfigureServices(IServiceCollection services)
        {
            var ConfigProvider = new ConfigProvider(); // Is this a bad thing to do? I don't see how it's any different from how Jason did it with sp => sp.GetService<Cache<CachedStuff>>()
            services.AddSingleton<IConfigWriter>(ConfigProvider);
            services.AddSingleton<IConfigReader>(ConfigProvider);

            return services.BuildServiceProvider();
        }
    }

    class ConfigBootstrapProvider : IBootstrapProvider, IControllable
    {
        public string Name { get; private set; }
        IGrainFactory GrainFactory;
        IConfigWriter ConfigWriter;

        public async Task Init(string name, IProviderRuntime providerRuntime, IProviderConfiguration config)
        {
            this.Name = name;
            GrainFactory = providerRuntime.GrainFactory;

            ConfigWriter = providerRuntime.ServiceProvider.GetRequiredService<IConfigWriter>();
            ConfigWriter.Config = (await GrainFactory.GetGrain<ISystemConfig>(0).GetConfig()).Value;
        }

        public Task Close()
        {
            return Task.CompletedTask;
        }

        public async Task<object> ExecuteCommand(int command, object arg)
        {
            ConfigWriter.Config = (await GrainFactory.GetGrain<ISystemConfig>(0).GetConfig()).Value;
            return null;
        }
    }
sergeybykov commented 6 years ago

Seems like this was concluded with a needed solution. I'll close the issue then. Feel free to reopen, if you think there's more to it.

Arshia001 commented 6 years ago

After reading through #4062, one thing comes to mind: if some silos are unavailable for the control command, this setup will fail to update those indefinitely. A possible solution is to have a ConfigUpdater grain (to avoid reentrancy issues) specifically manage the control command call, and retry on failure (on a timer perhaps). To reduce system load, we'd need to also version the data so silos that already received the data know not to fetch it again. I'll get to work and post the results.

Arshia001 commented 6 years ago

Here's the updated code. A new Version parameter is added to the ConfigData object to track current version. It's incremented by one whenever an update happens.

    [PreferLocalPlacement]
    class ConfigUpdaterGrain : Grain, IConfigUpdaterGrain
    {
        IDisposable Timer;

        public Task PushUpdateToAllSilos(int Version)
        {
            if (Timer != null)
                Timer.Dispose();
            // We use a timer because we need the config grain to be free for subsequent calls from other silos
            Timer = RegisterTimer(OnPushConfigTimer, Version, TimeSpan.FromMilliseconds(1), TimeSpan.MaxValue);
            return Task.CompletedTask;
        }

        async Task OnPushConfigTimer(object State)
        {
            try
            {
                Timer.Dispose();
                Timer = null;

                await GrainFactory.GetGrain<IManagementGrain>(0).SendControlCommandToProvider("CTGrains.ConfigBootstrapProvider", "ConfigProvider", (int)State);
            }
            catch
            {
                Timer = RegisterTimer(OnPushConfigTimer, State, TimeSpan.FromSeconds(10), TimeSpan.MaxValue);
            }
        }
    }

And inside the config grain:

        public async Task UpdateConfig()
        {
            await InternalUpdateConfig(); // Read config from DB or wherever, increment version

            await GrainFactory.GetGrain<IConfigUpdaterGrain>(0).PushUpdateToAllSilos(Data.Version);
        }

and finally, on the provider:

        public async Task<object> ExecuteCommand(int command, object arg)
        {
            if (command > ConfigWriter.Version)
                ConfigWriter.Config = (await GrainFactory.GetGrain<ISystemConfig>(0).GetConfig()).Value;
            return null;
        }

Using the command argument to pass the version is a hack, but it's not too bad as it avoids an extra call to the config grain.