dotnet / orleans

Cloud Native application framework for .NET
https://docs.microsoft.com/dotnet/orleans
MIT License
10.15k stars 2.04k forks source link

ObserverManager needs a ConcurrentDictionary #9242

Open turowicz opened 1 week ago

turowicz commented 1 week ago

We keep getting errors like this in a high client density deployment:

 System.InvalidOperationException: Collection was modified; enumeration operation may not execute.
   at System.Collections.Generic.Dictionary`2.Enumerator.MoveNext()
   at Orleans.Utilities.ObserverManager`2.Notify(Func`2 notification, Func`2 predicate) in /_/src/Orleans.Core/Utils/ObserverManager.cs:line 156

It seems like an easy fix that we would love to see in Orleans 8.x. Please change the field _observers to ConcurrentDictionary<>.

cc @ReubenBond

ReubenBond commented 1 week ago

Are you able to share some code? This doesn't seem right - there should be no concurrent modifications. Is it 8.2.0+ or earlier? Changing the impl to use concurrent dictionary is ok, but I'd like to make sure we understand the cause first.

turowicz commented 1 week ago

Orleans 8.2.0

So I have a Speed Trap system that contains a UnitGrain that accepts an image frame (from camera stream) when a violation happens and publishes it to all observers. The observers list changes constantly as users jump in and out of the system, which provides realtime alerts when violations occur. So I imagine if a user hop in or out of the system while ObserverManager is iterating the _observers with a foreach that error will occurr.

Code is as follows:

People subscribe in a Signalr Hub:

var unitKind = (UnitKind)unit.Kind;
var grain = GetUnitGrain(_siloClient, unitKind, key);
var observer = new UnitGrainObserver(Context.ConnectionId, NotifySubscriberAsync);
var reference = _siloClient.CreateObjectReference<IUnitGrainObserver>(observer);
if (feeds.TryAdd(key, unitKind))
{
    await grain.SubscribeAsync(Context.ConnectionId, reference);
}

That then goes to the Grain:

public virtual Task SubscribeAsync(string key, IUnitGrainObserver observer)
{
    using (Logger.Start($"{GrainType}", $"{nameof(SubscribeAsync)}(key,observer)"))
    {
        previewSubscriptions.Subscribe(key, observer);
        MetricsLogger.GaugeInc(GraphMetrics.Viewers, Unit.Id.ToString(), Unit.Name);
        return Task.CompletedTask;
    }
}

The Unsubscribe happens the same way but it also autounsubscribes when a WebSocket connection dies.

public override async Task OnDisconnectedAsync(Exception exception)
{
    try
    {
        if (_connections.TryGetValue(Context.ConnectionId, out var feeds))
        {
            foreach (var feed in feeds.Distinct())
            {
                var split = feed.Key.Split('/');
                await Unsubscribe(split[1], split[2]);
            }
        }
}

The UnitObserver itself looks like this:

public class UnitGrainObserver : IUnitGrainObserver
{
    private readonly string _connectionId;
    private readonly Func<string, Immutable<ImmutableUnitContext>, Immutable<IEnumerable<IData>>, Task> _onReceive;
    private bool _isDisposed;
    public UnitGrainObserver(string connectionId, Func<string, Immutable<ImmutableUnitContext>, Immutable<IEnumerable<IData>>, Task> onReceive)
    {
        _onReceive = onReceive;
        _connectionId = connectionId;
    }
    public async Task ReceiveAsync(Immutable<ImmutableUnitContext> context, Immutable<IEnumerable<IData>> data)
    {
        if (!_isDisposed)
        {
            await _onReceive(_connectionId, context, data);
        }
    }
    public void Dispose()
    {
        Dispose(true);
        GC.SuppressFinalize(this);
    }
    protected virtual void Dispose(bool disposing)
    {
        if (!_isDisposed)
        {
            _isDisposed = true;
        }
    }
}

And the way we notify all observers is just one line:

previewSubscriptions = CreatePreviewSubscriptions(31);
...
using (Logger.Start($"{GrainType}", $"{nameof(StreamPreviewAsync)}(context).Notify"))
{
    await previewSubscriptions.Notify(async s =>
    {
        using (Logger.Start($"{GrainType}", $"{nameof(StreamPreviewAsync)}(context).Notify.Payload"))
        {
            var payload = message.Context.Clone();
                await s.ReceiveAsync(new Immutable<ImmutableUnitContext>(payload), new Immutable<IEnumerable<IData>>(message.Data));
        }
        MetricsLogger.CounterInc(GraphMetrics.FrameCounter, GrainType, "out", "preview", Unit.Id.ToString());
    });
}
ReubenBond commented 1 week ago

Is reentrancy involved anywhere?

turowicz commented 6 days ago

No reentrancy in the entire codebase

scalalang2 commented 11 hours ago

👀