JasperFx / wolverine

Supercharged .NET server side development!
https://wolverinefx.net
MIT License
1.24k stars 135 forks source link

V3.0.0-RC-1 System.ArgumentNullException : Value cannot be null. (Parameter 'descriptor.ImplementationFactory') #1036

Closed dmalovric closed 1 month ago

dmalovric commented 1 month ago

Firstly, thank you for this fantastic library and the continuous improvements to it.

While trying out the latest RC version (3.0.0-rc-1) I've encountered what seems to be a regression. In my integration tests, after calling:

InvokeMessageAndWaitAsync

there is an exception thrown:

  Message: 
System.ArgumentNullException : Value cannot be null. (Parameter 'descriptor.ImplementationFactory')

  Stack Trace: 
NoHandlerExecutor.InvokeAsync(Object message, MessageBus bus, CancellationToken cancellation, Nullable`1 timeout, String tenantId) line 65
MessageBus.InvokeAsync(Object message, CancellationToken cancellation, Nullable`1 timeout) line 90
<>c__DisplayClass4_0.<InvokeMessageAndWaitAsync>b__0(IMessageContext c) line 71
TrackedSession.ExecuteAndTrackAsync() line 270
TrackedSession.ExecuteAndTrackAsync() line 271
WolverineHostMessageTrackingExtensions.ExecuteAndWaitAsync(IHost host, Func`2 action, Int32 timeoutInMilliseconds) line 165
Messaging.ConfirmsUser_WhenUserRegisteredMessageIsReceived() line 73
--- End of stack trace from previous location ---

I've tried both the IServiceCollection and IHostBuilder registration options but both have the same issue.

Below are both my registration attempts. The top one with the IHostBuilder works in 2.17.2 without any changes.

public static void AddMessaging(this IHostBuilder hostBuilder,
    IWebHostEnvironment hostEnvironment,
    IConfiguration configuration)
{
    KafkaConfigOptions kafkaOptions = new();
    configuration.GetRequiredSection(KafkaConfigOptions.ConfigurationSection).Bind(kafkaOptions);

    hostBuilder.UseWolverine(opts =>
    {
        opts.UseKafka(kafkaOptions.HostsString)
        .ConfigureClient(client =>
        {
            client.AllowAutoCreateTopics = kafkaOptions.AutoCreateTopics;
            client.BootstrapServers = kafkaOptions.HostsString;

            if (kafkaOptions.UseAuthentication)
            {
                client.SaslMechanism = Confluent.Kafka.SaslMechanism.Plain;
                client.SecurityProtocol = Confluent.Kafka.SecurityProtocol.SaslPlaintext;
                client.SaslUsername = kafkaOptions.Username;
                client.SaslPassword = kafkaOptions.Password;
            }
        })
        .ConfigureConsumers(consumerConfig =>
        {
            consumerConfig.GroupId = kafkaOptions.ConsumerGroupId;
        });

        opts.UseEntityFrameworkCoreTransactions();
        opts.Policies.AutoApplyTransactions();

        if (!hostEnvironment.IsProduction())
        {
            opts.Durability.Mode = DurabilityMode.Solo;
        }

        opts.Policies.UseDurableInboxOnAllListeners();
        opts.Policies.UseDurableOutboxOnAllSendingEndpoints();
        opts.Policies.UseDurableLocalQueues();

        var conStr = configuration.GetConnectionString(DB.ConnectionStringName);

        if (conStr is not null)
        {
            opts.PersistMessagesWithSqlServer(conStr, DB.SchemaName);
            opts.AutoBuildMessageStorageOnStartup = true;
        }

        opts.ListenToKafkaTopic(kafkaOptions.Topic);
        opts.Publish(config =>
        {
            config.ToKafkaTopic(kafkaOptions.Topic);
        });

        opts.PublishMessage<UserRegisteredMessage>()
            .ToKafkaTopic(kafkaOptions.Topic);

        opts.Policies.OnException<TimeoutException>().ScheduleRetry(5.Seconds());
        opts.Policies
            .OnAnyException()
            .Discard();
    });

    hostBuilder.ConfigureServices(services =>
    {
        services.AddPublishers();
        services.AddOptionsWithFluentValidation<KafkaConfigOptions>(KafkaConfigOptions.ConfigurationSection);
    });
}

public static IServiceCollection AddMessaging(this IServiceCollection services,
    IWebHostEnvironment hostEnvironment,
    IConfiguration configuration)
{
    services.AddOptionsWithFluentValidation<KafkaConfigOptions>(KafkaConfigOptions.ConfigurationSection);
    KafkaConfigOptions kafkaOptions = new();
    configuration.GetRequiredSection(KafkaConfigOptions.ConfigurationSection).Bind(kafkaOptions);

    services.AddWolverine(opts =>
    {
        opts.UseKafka(kafkaOptions.HostsString)
        .ConfigureClient(client =>
        {
            client.AllowAutoCreateTopics = kafkaOptions.AutoCreateTopics;
            client.BootstrapServers = kafkaOptions.HostsString;

            if (kafkaOptions.UseAuthentication)
            {
                client.SaslMechanism = Confluent.Kafka.SaslMechanism.Plain;
                client.SecurityProtocol = Confluent.Kafka.SecurityProtocol.SaslPlaintext;
                client.SaslUsername = kafkaOptions.Username;
                client.SaslPassword = kafkaOptions.Password;
            }
        })
        .ConfigureConsumers(consumerConfig =>
        {
            consumerConfig.GroupId = kafkaOptions.ConsumerGroupId;
        });

        opts.UseEntityFrameworkCoreTransactions();
        opts.Policies.AutoApplyTransactions();

        if (!hostEnvironment.IsProduction())
        {
            opts.Durability.Mode = DurabilityMode.Solo;
        }

        opts.Policies.UseDurableInboxOnAllListeners();
        opts.Policies.UseDurableOutboxOnAllSendingEndpoints();
        opts.Policies.UseDurableLocalQueues();

        var conStr = configuration.GetConnectionString(DB.ConnectionStringName);

        if (conStr is not null)
        {
            opts.PersistMessagesWithSqlServer(conStr, DB.SchemaName);
            opts.AutoBuildMessageStorageOnStartup = true;
        }

        opts.ListenToKafkaTopic(kafkaOptions.Topic);
        opts.Publish(config =>
        {
            config.ToKafkaTopic(kafkaOptions.Topic);
        });

        opts.PublishMessage<UserRegisteredMessage>()
            .ToKafkaTopic(kafkaOptions.Topic);

        opts.Policies.OnException<TimeoutException>().ScheduleRetry(5.Seconds());
        opts.Policies
            .OnAnyException()
            .Discard();           
    });

    services.AddPublishers();

    return services;
}

private static IServiceCollection AddPublishers(this IServiceCollection services)
{
    services.AddTransient<IUserRegisteredMessagePublisher, Register.UserRegisteredMessagePublisher>();
    return services;
}
jeremydmiller commented 1 month ago

@dmalovric First off, thank you for taking the time to write this up! But after saying that, the one single thing I'd need to understand is something about the handler and its dependency tree where the problem was seen. This is 100% about the dependency tree and absolutely nothing to do with any of your configuration code.

And is there a full stack trace being written out? Could you please try dotnet run -- codegen write and see if that tells us which handler exactly had the issue, and if that gives you a more complete stack trace?

I can look into this a bit just from what you've posted. I have another codebase / report that's maybe similar to look at already

jeremydmiller commented 1 month ago

@dmalovric That'll also be dealt with by this one: https://github.com/JasperFx/wolverine/pull/1042

I had a repro of this from a client, but just now got into it enough to see what the issue was. Thanks for reporting this w/ repro steps!