Farfetch / kafkaflow

Apache Kafka .NET Framework to create applications simple to use and extend.
https://farfetch.github.io/kafkaflow/
MIT License
653 stars 118 forks source link

[Bug Report]: Scope Middleware is Disposed of in OnConsumeCompleted or OnConsumeError Global Events #588

Open c5racing opened 3 months ago

c5racing commented 3 months ago

Prerequisites

Description

Using the setup below, I am able to resolve services in MessageConsumeStarted such as

using (var scope = context.DependencyResolver.CreateScope()) { //Code Here }

If I attempt to use the same code in OnConsumeCompleted or OnConsumeError, I get an error the following error:

System.AggregateException: One or more errors occurred. (Instances cannot be resolved and nested lifetimes cannot be created from this LifetimeScope as it (or one of its parent scopes) has already been disposed.)

In my message handlers, I'm injecting my DBContext for EF, so it's important that I'm able register these as scoped. I can't use Transient as I set different parameters that need to be available vi scoped DI down the pipeline.

services.AddKafkaFlowHostedService(kafka => kafka
                    .UseMicrosoftLog()
                    .SubscribeGlobalEvents(observers =>
                    {
                        observers.MessageConsumeStarted.Subscribe(async eventContext => await ActivityLogConsumerEventsHandler.OnConsumeStarted(eventContext.MessageContext));
                        observers.MessageConsumeCompleted.Subscribe(async eventContext => await ActivityLogConsumerEventsHandler.OnConsumeCompleted(eventContext.MessageContext));
                        observers.MessageConsumeError.Subscribe(async eventContext => await ActivityLogConsumerEventsHandler.OnConsumeError(eventContext.MessageContext, eventContext.Exception));
                    })
                    .AddCluster(cluster => cluster
                    .WithBrokers(brokers)
                    .CreateTopicIfNotExists("sample-topic", 1, 1)
                    .AddConsumer(consumer => consumer
                    .Topic("sample-topic")
                    .WithGroupId("test-group")
                    .WithWorkersCount(1)
                    .WithAutoOffsetReset(AutoOffsetReset.Earliest)
                    .WithBufferSize(200)
                    .WithAutoCommitIntervalMs(2000)
                    .WithMaxPollIntervalMs(null)
                    .WithPartitionsRevokedHandler((resolver, partitions) =>
                    {
                        var logger = resolver.Resolve<ILogHandler>();

                        partitions.ForEach(partition =>
                        {
                            logger?.Warning($"Partition is being revoked: [{partition}]", null);
                        });
                    })
                    .AddMiddlewares(middlewares => middlewares
                    .AddSingleTypeDeserializer<NewtonsoftJsonDeserializer>(typeof(BaseEntity))
                    .AddTypedHandlers(h => h.AddHandler<UpdateDbForLanguage>().WithHandlerLifetime(InstanceLifetime.Scoped))
                    ))
                    .EnableAdminMessages("kafkaflow.admin")
            ));
        });

Steps to reproduce

  1. Create Scope Consumer
  2. Create 3 Global Events, MessageConsumeStarted, MessageConsumeCompleted, and MessageConsumeError
  3. Observe dependencies can be resolved in MessageConsumeStarted but not in MessageConsumeCompleted or MessageConsumeError

Expected behavior

All Global Events can resolve dependencies

Actual behavior

In the Global Event Handlers, OnConsumeCompleted or OnConsumeError, Dependancies cannot be resolved.

Using var scope = context.DependencyResolver.CreateScope()or context.DependencyResolver.Resolve<BaseEntity>(); results in the folowwing error:

System.AggregateException: One or more errors occurred. (Instances cannot be resolved and nested lifetimes cannot be created from this LifetimeScope as it (or one of its parent scopes) has already been disposed.)

KafkaFlow version

3.0.10

tomaszprasolek commented 3 weeks ago

I have similar problem. I have the error:

System.ObjectDisposedException: Cannot access a disposed object.
Object name: 'IServiceProvider'.

on MessageConsumeError