BEagle1984 / silverback

Silverback is a simple but feature-rich message bus for .NET core (it currently supports Kafka, RabbitMQ and MQTT).
https://silverback-messaging.net
MIT License
257 stars 37 forks source link

Using ApplyTo on SkipMessageErrorPolicy #177

Open alefcarlos opened 1 year ago

alefcarlos commented 1 year ago

Is it possible to apply rules on Skip ?

        public void Configure(IEndpointsConfigurationBuilder builder)
        {
            builder
                .AddKafkaEndpoints(
                    endpoints => endpoints

                        // Configure the properties needed by all consumers/producers
                        .Configure(
                            config =>
                            {
                                // The bootstrap server address is needed to connect
                                config.BootstrapServers = _configuration.GetValue<string>("Kafka:BootstrapServers");
                            })

                        .AddInbound<SampleMessage>(
                            endpoint => endpoint
                                .ConsumeFrom("samples-basic")
                                .Configure(config =>
                                {
                                    config.GroupId = "WebApi1";
                                })
                                .OnError(policy => policy.Skip(opts => opts.ApplyTo<ArgumentException>()))
                                )
                        .AddOutbound<SampleMessage>(
                            endpoint => endpoint
                                .ProduceTo("samples-basic")));
        }

When de exception occurs the consumer stops:

[19:02:04 INF] Processing inbound message. | endpointName: samples-basic, messageType: WebApi.SampleMessage, WebApi, Version=1.0.0.0, Culture=neutral, PublicKeyToken=null, messageId: 916776ce-85db-4729-b882-0ab48ac8479e, offset: [0]@1, kafkaKey: 916776ce-85db-4729-b882-0ab48ac8479e
[19:02:04 INF] Received 69
[19:02:05 ERR] Error occurred processing the inbound message. | endpointName: samples-basic, messageType: WebApi.SampleMessage, WebApi, Version=1.0.0.0, Culture=neutral, PublicKeyToken=null, messageId: 916776ce-85db-4729-b882-0ab48ac8479e, offset: [0]@1, kafkaKey: 916776ce-85db-4729-b882-0ab48ac8479e
System.Reflection.TargetInvocationException: Exception has been thrown by the target of an invocation.
 ---> System.ArgumentException: Value does not fall within the expected range.
   at WebApi.SampleMessageSubscriber.OnMessageReceived(SampleMessage message) in C:\repos\dev.azure.com\superdigital\arq-opentelemetry\samples\WebApi\SampleMessageSubscriber.cs:line 18
   --- End of inner exception stack trace ---
   at System.RuntimeMethodHandle.InvokeMethod(Object target, Span`1& arguments, Signature sig, Boolean constructor, Boolean wrapExceptions)
   at System.Reflection.RuntimeMethodInfo.Invoke(Object obj, BindingFlags invokeAttr, Binder binder, Object[] parameters, CultureInfo culture)
...
[19:02:05 FTL] Fatal error occurred processing the consumed message. The consumer will be stopped. | endpointName: samples-basic, messageType: WebApi.SampleMessage, WebApi, Version=1.0.0.0, Culture=neutral, PublicKeyToken=null, messageId: 916776ce-85db-4729-b882-0ab48ac8479e, offset: [0]@1, kafkaKey: 916776ce-85db-4729-b882-0ab48ac8479e
BEagle1984 commented 1 year ago

As per the log, the ArgumentException is wrapped in a TargetInvokationException so you'd need to check the inner exception instead (using ApplyWhen).

I know it's not ideal but I noticed just recently that the exceptions thrown by sync subscribers (not returning Task) aren't unwrapped correctly. It's possible that I will fix this in the next major release.

alefcarlos commented 1 year ago

Thanks! @BEagle1984. The subscriber was just a sample and it was void, I turned that into async Task and it solved ;)

Should I keep this issue open ?

BEagle1984 commented 1 year ago

Yes, let's keep this open. I'll fix it for the sync methods too eventually. 👍