Closed lerenn closed 3 months ago
Hi @stefanmeschke,
I have a proposition for the implementation of this, but we can act differently if you have a better idea.
Add a failure callback to the universal message struct that the generated code can call if the Subscribe
logic fails.
What I like about that is that once we have developped the functionnality for a broker, the asyncapi-codegen
users don't have to bother about it.
It will automatically send a NACK if the business logic returns a Go error. And that's what this library is aiming for: relieving developer from a maximum of things.
MarkAsFailed func()
MarkAsFailed: func() {
_ = msg.Nak()
}
// from
fn func (ctx context.Context, msg {{channelToMessageTypeName $value}})
// to
fn func (ctx context.Context, msg {{channelToMessageTypeName $value}}) error
MarkAsFailed
method to let the broker knows that it has failed (and also check that the MarkAsFailed
callback is well set in case some broker doesn't have this mechanism):
if err := fn(ctx, msg); err != nil && brokerMsg.MarkAsFailed != nil {
brokerMsg.MarkAsFailed()
}
If you agree with that, you or I can implement that. Otherwise, feel free to challenge it :smiley:
Hi @lerenn,
great work you have done here.
I am a heavy user of the openapi-codegen and am very happy to have found a counterpart for working with async apis.
I mainly work with Kafka where we work with retry topics and dead letter topics.
I'm just trying out the generator to see if that's something for us. And I'm running into a similar problem here. I would adapt your suggestion a little:
type BrokerMessage struct {
Headers map[string][]byte
Payload []byte
AcknowledgementHandler BrokerAcknowledgementHandler
}
type BrokerAcknowledgementHandler interface {
Ack()
Nak()
}
type AcknowledgementHandler struct {
provider func() *nats.Msg
}
func (k AcknowledgementHandler) Ack() {
msg := k.provider()
_ = msg.Ack()
}
func (k AcknowledgementHandler) Nak() {
msg := k.provider()
_ = msg.Nak()
}
type AcknowledgementHandler struct {
provider func() (context.Context, *kafka.Reader, *kafka.Message)
}
func (k AcknowledgementHandler) Ack() {
ctx, r, message := k.provider()
_ = r.CommitMessages(ctx, *message)
}
func (k AcknowledgementHandler) Nak() {
ctx, r, message := k.provider()
_ = r.CommitMessages(ctx, *message)
}
sub.TransmitReceivedMessage(extensions.BrokerMessage{
Headers: headers,
Payload: msg.Data,
AcknowledgementHandler: AcknowledgementHandler{provider: func() *nats.Msg {
return msg
}},
})
if err := fn(ctx, msg); err != nil {
brokerMsg.AcknowledgementHandler.Nak()
return nil
}
brokerMsg.AcknowledgementHandler.Ack()
I think this way you could provide a more open implementation where users with different use cases can plug in and cover their requirements without implementing their own broker.
To avoid data loss, we cannot rely on autocommit in our implementation for kafka and can only commit the message if it has been handled accordingly. This could also be solved with this pattern and the errorhandler.
I have already done some work on my evaluation. Let me know if this would be a pattern that suits you.
@magraef Thanks for you interest in this project 😊
Yes, I do also feel that it is clearly a must have and I may not be aware of all usecases. I was thinking at the time, that either the operation succeed (then ACK) or fail (then NACK) but if there is some cases where we should do more or different, then I definitely agree with and strongly support your proposition. Also, I'm not really experienced in this part, so I really appreciate your insight.
I would have an additional curiosity question: what would you put in the provider
function ? In your example, you just pass the message, but I suppose that there is more that you would put here, depending on the usecase. Also, do you think that there is some case where we should not send an ACK or NACK ?
Otherwise, from what I understand from your proposal, it would be really great ! If you have something you are working with, I would be glad to add it to the project ! Especially if it solve your use case. So feel free to PR, I would review with pleasue !
Also, if you've done the work for asyncapi v2, I can do the corresponding work for the v3 if you don't have lots of time for this :)
I see the provider function mainly for providing all the things needed for the Ack/Nak without the controller layer having to know anything about it. Everything should remain in the corresponding broker implementation.
At first glance, the Ack/Nak is possible with Nats directly via the message, so the message is sufficient here.
type AcknowledgementHandler struct {
provider func() *nats.Msg
}
func (k AcknowledgementHandler) Ack() {
msg := k.provider()
_ = msg.Ack()
}
func (k AcknowledgementHandler) Nak() {
msg := k.provider()
_ = msg.Nak()
}
sub.TransmitReceivedMessage(extensions.BrokerMessage{
Headers: headers,
Payload: msg.Data,
AcknowledgementHandler: AcknowledgementHandler{provider: func() *nats.Msg {
return msg
}},
})
For Kafka you need the context, the message and the reader to perform the commit.
type AcknowledgementHandler struct {
provider func() (context.Context, *kafka.Reader, *kafka.Message)
}
func (k AcknowledgementHandler) Ack() {
ctx, r, message := k.provider()
_ = r.CommitMessages(ctx, *message)
}
func (k AcknowledgementHandler) Nak() {
ctx, r, message := k.provider()
_ = r.CommitMessages(ctx, *message)
}
sub.TransmitReceivedMessage(extensions.BrokerMessage{
Headers: headers,
Payload: msg.Value,
AcknowledgementHandler: AcknowledgementHandler{provider: func() (context.Context, *kafka.Reader, *kafka.Message) {
return ctx, r, &msg
}},
})
In my use case, in the Kafka implementation, I commit the message in both the Nak and Ack scenarios. I am not aware of any Nak mechanism on the Kafka side. Then, I route the messages accordingly through an error handler for the subscriber to either the retry topics or the dead letter topics.
I think the actual handling of retry and dead letter patterns should ideally occur closely with the consumers and take place within a corresponding handler after processing the message in the subscription e.g.:
if err := fn(ctx, msg); err != nil {
c.ErrorHandler.Handle(ctx, msg, err)
brokerMsg.AcknowledgementHandler.Nak()
return nil
}
brokerMsg.AcknowledgementHandler.Ack()
I also think that the same applies to NATS, with the exception that a Nak can be performed against the nats broker.
Regarding your last question, personally, I'm not aware of any use case where I wouldn't want to send either an Ack or a Nack to the broker. But perhaps someone else has something in mind.
I will be working on it in the coming days and would be happy to provide my implementation. Let's then discuss the implementation details.
Seems good to me !
For my last question, lets say that until someone arrives with this particular need, then we shouldn't bother about it.
I have some other issue to complete on my side, but let me know if there is anything I can do to help :)
@lerenn I have set a PR, please let me know what you think of the changes.
Nice ! Thanks @magraef !
I'm currently in conference so I'm a bit busy, but I'll review as soon as I can :)
Ah. 🤦 I'm so sorry @lerenn. I've completely missed that. 😔 In case I still can support here please lemme know. 🙏
@magraef This is awesome! 🥳 Thank you!
@stefanmeschke, there is absolutely no problem! There has never been and there will never any obligation, life/job is already difficult enough 😬
Your help will always be appreciated ❤️ Feel free to take a look at the MR.
I will also work on a better way to on-board people that want to help (with issues and contributing.md).
And, still, if there is anything that you think could be better, feel free to mention it !
@lerenn i think this one can be closed with #163
@magraef, you're right ! I'll close this one. Thanks for pointing it to me :)
For now, when the business logic that receives a message fails, then there is no way to tell the broker.
We should be able to do this following this schema: