dotnet / orleans

Cloud Native application framework for .NET
https://docs.microsoft.com/dotnet/orleans
MIT License
9.94k stars 2.02k forks source link

PubSub Streams SMS scalability issues #6919

Open stephenlautier opened 3 years ago

stephenlautier commented 3 years ago

We need some tips in order to make PubSub (SMS) more robust/performant as after some time (3-7 days) it seems we are having issues with the cluster due to this; and we would need to restart and clear state in order to recover.

Generally the issue with scaling is when we have a stream to target multiple grains ("all") such as following:

The reason why the invalidate-all will give issues is because the state of the pubsub state will keep increasing per each user and read/write will be heavy/slow and eventually end up timing out or reaching the size limit (depending what store you use).

Also, is it good to unsubscribe for a stream during grain deactivation? If im not wrong in some git issue I had read that its not good as it might deadlock (not 100% sure tho); As unsubscribing would definitely help size to remain smaller, we recently had disabled unsubscribing as we were experiencing several timeouts on deactivations during stream cleanup.

Maybe also using different serializer which is more compact/faster would probably help, we had tried that in the past alto with pubsub I believe we were having issues with deserializing; but I can give it a go again (maybe mention a serializer which works well if possible)

Any suggestions how to handle such scenarios with stream? The scenario is a sample, we do have others which is similar as a pattern

We are running on:

enewnham commented 3 years ago

What I'm doing is mitigating the problem which is the PubSubRendezvousGrain. It is my bottleneck when activating thousands of grains concurrently. There are 2 main scaling problems with the current system.

  1. An actor must query the PubSubRendezvousGrain if they need to re-subscribe/resume an existing handle. Scaling out to thousands of actors that poor PubSubRendezvousGrain must service both queries and subscriptions at the same time. It's simply not fast enough.
  2. The PubSubRendezvous issues WriteStateAsync using the OrleansStorage engine. This is a full read/modify/write of the entire state. If this is megabytes of data, that's a big SQL query just to add what is essentially 1 row.

I solved problem 1 by making a separate repository/sql table to "remember" if a grain needs to re-subscribe/resume an existing handle. This cuts down 50% of the grain calls to PubSubRendezvousGrain

I solved problem 2 by adding a load balancer in front of the PubSubRendezvousGrain. If a grain needs to subscribe to stream "invalidate-all" it actually subscribes to ""invalidate-all-x", where x is 0-15. Which a grain chooses randomly at activation like "invalidate-all-3" . So now WriteStateAsync is 1/15th the data-over-the-wire.

It's garbage, there are tons of deliveries to grains that have unsubscribed or are in the process of unsubscribing. But it does work... The shit show is below...

// m_OrleansRepository is my way to "remember" if this grain, needs to resume/resubscribe. It's a super simple entity framework SQL table/repository pattern...
var subscriptionHandle = await m_OrleansRepository.GetSubscriptionHandleAsync<T>( IdentityString, streamId, streamNamespace, providerName );

try
{
    if( subscriptionHandle == null )
    {
        // I have a bag of "broadcast grain ids". AKA that's my load-balancer IDs
        // Oh yeah, don't forget. For testing you need to NOT have a round-robin load balencer, That makes it hard to test
        var streamIds = OrleansStreams.TestMode
            ? new List<Guid> { Guid.Empty }
            : Events.BroadcastGrainIds
        var streamProvider = GetStreamProvider( providerName );
        var stream = streamProvider.GetStream<T>( streamIds[ ms_Random.Next( streamIds.Count ) ], streamNamespace );

        subscriptionHandle = await stream.SubscribeAsync( observer );
    }
    else
    {
        var streamProvider = GetStreamProvider( providerName );
        var stream = streamProvider.GetStream<T>( subscriptionHandle.StreamIdentity.Guid, streamNamespace );
        var subscriptionHandles = await stream.GetAllSubscriptionHandles( );

        if( subscriptionHandles == null || subscriptionHandles.Count == 0 )
        {
            subscriptionHandle = await stream.SubscribeAsync( observer );
        }
        else if( subscriptionHandles.Count == 1 )
        {
            subscriptionHandle = subscriptionHandles[ 0 ];

            m_Logger.Debug( "Resume Existing Handle {@SubscriptionHandle}", subscriptionHandle );

            subscriptionHandle = await subscriptionHandle.ResumeAsync( observer );
        }
        else if( subscriptionHandles.Count > 1 )
        {
            m_Logger.Debug( "Too many Existing Handles {@SubscriptionHandles}. Unsubscribe all.", subscriptionHandles );

            foreach( var h in subscriptionHandles )
            {
                await h.UnsubscribeAsync( );
            }

            subscriptionHandle = await stream.SubscribeAsync( observer );
        }
    }

    // "Remember" that we have an existing handle if this grain de-activates
    await m_OrleansRepository.UpsertStreamSubscriptionHandleAsync( IdentityString, streamId, streamNamespace, providerName, subscriptionHandle );
}
catch( SqlException ex )
{
    m_Logger.Error( ex, "Failed to store {@SubscriptionHandle}", subscriptionHandle );

    await subscriptionHandle.UnsubscribeAsync( );
}

// This is how I publish to the load-balanced streams. Of course it's a helper func in the 3rd layer of some broadcaster debouncer 
                var tasks = Events.BroadcastGrainIds.Select( id =>
                {
                    var provider = GetStreamProvider( m_ProviderName );
                    var stream = provider.GetStream<List<T>>( id, m_StreamNamespace );
                    return stream.OnNextAsync( objects );
                } );
enewnham commented 3 years ago

I see you're using redis already. that'd make the "rememberer" even faster. However be ready during disaster-recovery to fully clean out redis of all orleans streams stuff.

stephenlautier commented 3 years ago

@enewnham thank you for your suggestion. We've done exactly what your option 2 suggested, basically partition the topic with random ids and just wrote a reusable way to group streams (similar to what you've done); Had already something similar but it was the other way round, we were publishing too many messages from the same grain and we were publishing in a random id (and the listener listens to all)

Would be nice something like these gets handled nicely in Orleans, as its quite easy to end up with such bottlenecks

TimoSchmechel commented 2 months ago

I know its an old thread but also running into this issue.

@enewnham thanks for the workaround. In your solution I'm guessing you're setting PubSubRendezvousGrain to use the in memory grain state ie. .AddMemoryGrainStorage(ProviderConstants.DEFAULT_PUBSUB_PROVIDER_NAME). As its still going to hold all the pub/sub data in memory but just doesnt need to persist anything as thats been delegated to the custom repository.