Closed msallin closed 4 years ago
Ciao Marc
Thank you for the questions.
What is logged by default in each of those three cases?
It depends. There are multiple places where something is logged:
Some details about the message are automatically appended to the log message (for example if the deserialized message contains property named "Id", its value will be added to the logged message). See here for further details: src/Silverback.Integration/Messaging/Messages/MessageLogger.cs
Does every exception in those cases stop the consumer?
Yes, every unhandled exception will cause the consumer to stop. To avoid that you have to configure the error policies.
Is it possible to enable the stopped consumer in code?
No, I currently don't provide any convenient API to restart the consumer. In my opinion, if the stuff is properly configured, it should only stop if an error in the code causes an unexpected exception to be thrown. And in most of the cases a policy would catch every kind of error anyway.
Of course you can invoke Disconnect
and Connect
again on the Broker
.
Does a policy get applied for each of those three cases (especially before serialization)?
And here is the part were I really thank you for asking and I'm very happy to start this discussion and hear your opinion on this matter.
No, the policies are only applied for error happening after the deserialization. So 2. and 3. should really be covered. Note that the IBehavior
are applied by IPublisher
and are therefore able to catch all kind of messages passing through Silverback, not only the inbound messages. A behavior could be a really good place to implement your custom logging.
I decide not to handle the deserialization errors through policies at first because if the deserializer fails there is most likely some error in it. The deserializer simply shouldn't fail. It is also difficult to apply some policies without knowing a single thing about the message; even republishing it to another topic would currently be impossible (with the current design).
The IMessageSerializer
could return null tho, causing the message to be ignored. (I should add some logging for this case).
I'm not sure if this is the best and the expected behavior. What do you think?
A compromise that wouldn't require to change too many things would be to publish an UndeserializedMessage
exposing a property with the original payload in such case. Giving you the possiblity to handle it via policy and decide whether to skip it or move it to another topic (bad messages).
What does the skip policy log?
The skip policy logs a message with trace level with a message like "The message will be skipped." and the message id (if available). I currently fail to log other important information like offset and topic name...I'll be fixing it in the next release, out sometime next week. I'll also rise the level to at least information.
Some details about the message are automatically appended to the log message (for example if the deserialized message contains property named "Id", its value will be added to the logged message). See here for further details: src/Silverback.Integration/Messaging/Messages/MessageLogger.cs
Sounds reasonable. Just had to remind my self about the fact, that a Kafka topic isn't a queue.
Does a policy get applied for each of those three cases (especially before serialization)?
And here is the part were I really thank you for asking and I'm very happy to start this discussion and hear your opinion on this matter. No, the policies are only applied for error happening after the deserialization. So 2. and 3. should really be covered. Note that the
IBehavior
are applied byIPublisher
and are therefore able to catch all kind of messages passing through Silverback, not only the inbound messages. A behavior could be a really good place to implement your custom logging.
I am going to have a closer look at IBehavior
anyway. So, I will come back to this later.
I decide not to handle the deserialization errors through policies at first because if the deserializer fails there is most likely some error in it. The deserializer simply shouldn't fail. It is also difficult to apply some policies without knowing a single thing about the message; even republishing it to another topic would currently be impossible (with the current design).
The
IMessageSerializer
could return null tho, causing the message to be ignored. (I should add some logging for this case).I'm not sure if this is the best and the expected behavior. What do you think?
A compromise that wouldn't require to change too many things would be to publish an
UndeserializedMessage
exposing a property with the original payload in such case. Giving you the possiblity to handle it via policy and decide whether to skip it or move it to another topic (bad messages).
I'm currently too far away to say if policies are the correct instrument handle exceptions from serialization but I think that there should be a way to do so. For me, Silverback is kind of an entry point (like the WebHost is), and its nice to have one concept to ensure the global error handling.
There are different scenarios (especially take into account, that we not only have JSON-serializers).
a) The deserialization throws an exception (you can do crazy stuff with deserializers.. I did it.. ups). b) The deserialization can not be performed but does not throw an exception. c) What is with schema validation errors?
a) It is possible that this is only because of one malformed message. As there is no possibility to handle this, this will stop the consumer. Well, you could say a deserializer should take care of the exception handling it self. But I would rather prefer to define it globally, once, in my blueprint and then not everyone has to remeber to do it, for every new serializer.
b) I'm a fan of letting things fail. If its not possible to deserialize, I prefer to get an error instead of a half baked object e.g. a "required" property is null. Currently the behavior with the default serializer is like this: If I've a valid empty JSON { }
and configured a serializer to deserialize the JSON to a certain type (because the JSON contains no type information), this will lead to a empty object, which is passed further.
c) Just came to my mind while finishing my writing and I'm to tired to think about. Good night!
What does the skip policy log?
The skip policy logs a message with trace level with a message like "The message will be skipped." and the message id (if available). I currently fail to log other important information like offset and topic name...I'll be fixing it in the next release, out sometime next week. I'll also rise the level to at least information.
Yes, I would definitely log at least information. But the underlining error which made the error policy triggering is logged? If the underlining error isn't logged as error, I would prefer to be able to set the loglevel used from the Skip-Policy. Because I want to get a notification as soon as messages are skipped because of errors. I could imagine that this is a pragmatic default setting we can use. You don't want to have another topic and implement retry because you don't expect anything to fail. If messages fail, you can simply reconsume (because the resulting operations are idempotent or so) BUT you have to notice this.
Yes, I would definitely log at least information. But the underlining error which made the error policy triggering is logged? If the underlining error isn't logged as error, I would prefer to be able to set the loglevel used from the Skip-Policy. Because I want to get a notification as soon as messages are skipped because of errors. I could imagine that this is a pragmatic default setting we can use. You don't want to have another topic and implement retry because you don't expect anything to fail. If messages fail, you can simply reconsume (because the resulting operations are idempotent or so) BUT you have to notice this.
The original exception is logged as Warning and the Skip policy now (>= 0.9.0) logs with level Information.
If you really don't expect anything to fail just avoid setting up the policies and you will get a Critical when and if the consumer is stopped (that should trigger enough alarms). On the other hand if you can live with skipping a message a Warning is enough. As a rule of thumb I log with level Error or above only the stuff you should normally react to. If the error is handled via policy, you normally don't have to care about it.
That being said, if you really want/need to do something different when a policy is applied you can always publish an event (to the internal bus) and do whatever you need: https://beagle1984.github.io/silverback/docs/configuration/inbound#publishing-messages-events
In my opinion it is in any case a good idea to move the message into another topic (you can have one global "bad messages" topic for your whole solution) or store it somewhere else, if you want to be able to replay it without much trouble.
Now it's a bit late, I'll carefully reply to the other points tomorrow. In the next release I should be able to catch the exceptions thrown by the deserializer and handle them via the normal policies....but I had to refactor a couple of stuff and I will need some good testing. 🙅♂
Currently I've a problem because there are corrupt messages in a topic. Therefore the serializer can not deserialize. I just return null but this leads to an exception:
at System.Object.GetType() at Silverback.Messaging.Messages.InboundMessageHelper.CreateNewInboundMessage[TMessage](TMessage message, IInboundMessage sourceInboundMessage) at Silverback.Messaging.ErrorHandling.InboundMessageProcessor.DeserializeIfNeeded(IInboundMessage message) at Silverback.Messaging.ErrorHandling.InboundMessageProcessor.HandleMessage[TInboundMessage](TInboundMessage message, Action
1 messageHandler, IErrorPolicy errorPolicy, Int32 attempt) at Silverback.Messaging.ErrorHandling.InboundMessageProcessor.TryDeserializeAndProcess[TInboundMessage](TInboundMessage message, IErrorPolicy errorPolicy, Action
1 messageHandler) at Silverback.Messaging.Connectors.InboundConsumer.ProcessSingleMessage(IInboundMessage message) at Silverback.Messaging.Broker.KafkaConsumer.TryHandleMessage(Message`2 message, TopicPartitionOffset tpo)
Hi Marc Did you upgrade to 0.10.0? I didn't carefully check what happens if you return null, after the latest refactoring. The good news is that you don't need to do so anymore, since the error policies can catch the deserializer errors too. You can just define via policies to skip the messages that cannot be deserialized.
Unfortunately, it doesn't work. I will open an new Issue for that. Lets use this here just to continue the conversation about error handling. #32
This looks outdated. Feel free to re-open an issue if necessary.
@BEagle1984 we are implementing methods to make the application more resilient. Part of this approach is the use of a circuit breaker for a message consumption. But, we`re struggling to reconnecting to the broker.
Do you have any examples of how to disconnect and reconnect?
@BEagle1984 we are implementing methods to make the application more resilient. Part of this approach is the use of a circuit breaker for a message consumption. But, we`re struggling to reconnecting to the broker.
Do you have any examples of how to disconnect and reconnect? I would be interested to on knowing how to manage disconnect and reconnect with this library. Any examples would be really appreciated.
Please provide some details, it's not clear what you are trying to do and whether there is an issue or not.
How is the circuit breaker implemented? Do you need to disconnect on a topic basis? Consumer basis? Everything?
What have you tried so far? Which problems did you encounter? Is ConnectAsync
not working for some reason or you didn't find a way to call it?
What I'd do is something like this:
public class CircuitBreaker
{
private readonly KafkaBroker _broker;
public CircuitBreaker(KafkaBroker broker)
{
_broker = broker;
}
private Task DisconnectAsync(string endpointName) =>
_broker.Consumers.Single(consumer => consumer.Endpoint.FriendlyName == "whatever").DisconnectAsync();
private Task ConnectAsync(string endpointName) =>
_broker.Consumers.Single(consumer => consumer.Endpoint.FriendlyName == "whatever").ConnectAsync();
}
Hi Sergio
I thinking about how to handle errors and I've some questions.
Let me start how I think about exception handling, to give a context. Basically, I want to have one global error handling in place, which ensures that all exceptions logged. With ASP.NET Core this is a middleware (shipped with the framework). Once I use the middleware, it is not necessary to catch exception which I can not handle.
I would like to have such a behavior also for my messages. Basically there are three different "places" in the pipeline where errors can happen. 1) Within Silverback (e.g. Silverback can not parse a message with the default serializer). 2) Within Silverback-Extension points (e.g. my serializer or behavior throws an exception). 3) In my code, which is invoked as result of inbound message (e.g. a subscriber).
Different questions: