MassTransit / MassTransit

Distributed Application Framework for .NET
https://masstransit.io
Apache License 2.0
6.98k stars 1.62k forks source link

System.InvalidOperationException When Using IAsyncDisposable #1662

Closed Cooksauce closed 3 years ago

Cooksauce commented 4 years ago

Is this a bug report?

Yes

Can you also reproduce the problem with the latest version?

Yes

Occurs When

When using MassTransit DI with a scoped consumer which has a dependency which only implements IAsyncDisposable.

It appears MassTransit should be calling DisposeAsync()

Stacktrace

MassTransit.ReceiveTransport: Error: R-FAULT rabbitmq://mq/report_queue 000a0000-ac16-0242-5f5f-08d794932d75 <redacted>.IMyCommand <redacted>.MyConsumer(00:00:06.8635823)

System.InvalidOperationException: '<redacted type name>' type only implements IAsyncDisposable. Use DisposeAsync to dispose the container.
   at Microsoft.Extensions.DependencyInjection.ServiceLookup.ServiceProviderEngineScope.Dispose()
   at MassTransit.Scoping.ConsumerContexts.CreatedConsumerScopeContext`3.Dispose()
   at MassTransit.Scoping.ScopeConsumerFactory`1.Send[TMessage](ConsumeContext`1 context, IPipe`1 next)
   at MassTransit.Pipeline.Filters.ConsumerMessageFilter`2.GreenPipes.IFilter<MassTransit.ConsumeContext<TMessage>>.Send(ConsumeContext`1 context, IPipe`1 next)

Environment

Dotnet version: .NET Core 3.0.0 Package: MassTransit (6.0.1) Package: MassTransit.Extensions.DependencyInjection (6.0.1)

Cooksauce commented 4 years ago

Service Configuration

services.AddScoped<MyConsumer>();

// Add MassTransit
services.AddMassTransit(x =>
{
    // Add Consumers
    x.AddConsumer<MyConsumer>();

    x.AddBus(provider => Bus.Factory.CreateUsingRabbitMq(cfg =>
    {
        var host = cfg.Host(new Uri(RabbitMqConstants.BASE_ADDRESS + Environment.GetEnvironmentVariable("RABBITMQ_SERVER")), h =>
        {
            h.Username(Environment.GetEnvironmentVariable("RABBITMQ_DEFAULT_USER"));
            h.Password(Environment.GetEnvironmentVariable("RABBITMQ_DEFAULT_PASS"));
        });

        cfg.ReceiveEndpoint(RabbitMqConstants.MY_QUEUE, ep =>
        {
            ep.PrefetchCount = 0;

            // Map Messages To Queue
            EndpointConvention.Map<IMyCommand>(ep.InputAddress);

            // Configure Consumers
            ep.Consumer<MyConsumer>(provider);
        });
    }));
});

services.AddSingleton<IPublishEndpoint>(provider => provider.GetRequiredService<IBusControl>());
services.AddSingleton<ISendEndpointProvider>(provider => provider.GetRequiredService<IBusControl>());
services.AddSingleton<IBus>(provider => provider.GetRequiredService<IBusControl>());
services.AddSingleton<IHostedService, BusService>();

MyConsumer

public class MyConsumer : IConsumer<IMyCommand>
{
     private readonly ISomeAsyncDisposable _disposable;

     public MyConsumer(ISomeAsyncDisposable disposable)
     {
          _disposable = disposable;
     }

     public async Task Consume(ConsumeContext<IMyCommand> context)
     {
          // Removed for brevity
     }
}
Cooksauce commented 4 years ago

Based on this 426, it seems like IAsyncDisposable was never added to the interface, but instead added to the implementation. So they're expecting a cast to IAsyncDisposable then call DisposeAsync()

Edit: Corrected IAsyncEnumerable to IAsyncDisposable

phatboyg commented 4 years ago

MassTransit will not manage the lifecycle of your dependencies if you're using a container.

In this case, a consumer with an IAsyncDisposable dependency, and I'm guessing you're using the .NET version of the interface, which MassTransit doesn't use or support. Unfortunately, they have the same name.

Seems like you figured out the issue, though.

Cooksauce commented 4 years ago

MassTransit will not manage the lifecycle of your dependencies if you're using a container.

@phatboyg I don't quite follow this. Isn't MassTransit creating a scope and disposing it?

The stacktrace seems to indicate that:

at MassTransit.Scoping.ConsumerContexts.CreatedConsumerScopeContext`3.Dispose()

In this case, a consumer with an IAsyncDisposable dependency, and I'm guessing you're using the .NET version of the interface, which MassTransit doesn't use or support. Unfortunately, they have the same name.

Would you mind telling what other version you're referring to here? The Reactive one?

phatboyg commented 4 years ago

This one: https://github.com/MassTransit/GreenPipes/blob/develop/src/GreenPipes/IAsyncDisposable.cs

phatboyg commented 4 years ago

MassTransit is calling dispose on the container scope, the container is responsible for calling any disposable methods on any dependent objects.

Cooksauce commented 4 years ago

I see. That's correct, I'm using the .NET version of IAsyncDisposable. Are there any plans to support it?

As of .NET Core 3.0, it is now a standard interface. The default Microsoft.Extensions.DependencyInjection container supports it. So Masstransit doesn't currently seem to support the full capabilities of the default container. Basically the container is expecting that if it resolves any dependencies implementing IAsyncDisposable (but not IDisposable), the scope is disposed via DisposeAsync(). I would imagine, moving forward, it will become more and more relevant.

My current workaround is to inject an IServiceProvider into my consumer, and then create a child scope to resolve my dependencies from. I later call DisposeAsync() on that scope before finishing the IConsumer<T>.Consume(..) method. Any suggestions on a better workaround?

phatboyg commented 4 years ago

Your workaround seems to be enough, given what you've stated.

It will take a while before I support the netstandard2.1 features, since they force developers to move to the latest and not everyone is there yet.

Cooksauce commented 4 years ago

Fair enough. Props on a great library!

phatboyg commented 4 years ago

Thanks!

Cooksauce commented 3 years ago

@phatboyg I was thinking about this again. Would this be doable using multi-targeting to avoid breaking people?

Something like:

#if NETSTANDARD2_1
await using var asyncScope = scope as IAsyncDisposable;
# endif
phatboyg commented 3 years ago

Why? v7 and beyond of MassTransit uses https://www.nuget.org/packages/Microsoft.Bcl.AsyncInterfaces/ so this shouldn't be an issue.

Cooksauce commented 3 years ago

That doesn't solve this issue. The issue is calling IServiceScope.Dispose(). Instead of casting to IAsyncDisposable, which is what AspNetCore does. The original error here reproduces in MassTransist 7.2.1.

Not wanting to force everyone into Standard 2.1 is perfectly logical. But if multi-targeting solves that issue, does it make sense to add this? This essentially prevents anyone from using consumers properly in DI if they have even a single async disposable dependency (forces you to use service location & manage the scope in every consumer). Admittedly, I don't know if it would add a bunch of maintenance burden to MT though.

phatboyg commented 3 years ago

I don't plan to multi-target, it's too difficult to deal with honestly.

phatboyg commented 3 years ago

Converting all the scope providers to use IAsyncDisposable is fairly extensive, but might be doable.

phatboyg commented 3 years ago

The latest develop NuGet packages should have this properly implemented now.

Cooksauce commented 3 years ago

Thanks! All works for me in my test project now w/ those new changes.

phatboyg commented 3 years ago

Great, will be in the next release.