Closed stefanmeschke closed 9 months ago
Hello @stefanmeschke ! Thanks for your kind words :blush:
Well, I plan to support a maximum of brokers and, with an already supported basic NATS adapter, Jetstream mode is a must have. I don't have a lot of time to do it right now, but I'll implement it in the near future (especially if you have a need).
However, if you are in a hurry and/or want to give a try yourself, I invite you to take a look to existing brokers implementations to implement the Jetstream version.
The brokers adapters implement this interface:
// BrokerController represents the functions that should be implemented to connect
// the broker to the generated code.
type BrokerController interface {
// Publish a message to the broker
Publish(ctx context.Context, channel string, mw BrokerMessage) error
// Subscribe to messages from the broker
Subscribe(ctx context.Context, channel string) (BrokerChannelSubscription, error)
}
If you achieve to implement these two functions with Jetstream to Publish/Subscribe messages, then you have a NATS Jetstream implementation! Also, you can use the examples to test your implementation if you want.
Here are the files:
If you want to try it, do not hesitate to ping me anytime for help. Otherwise, no worry, I'll implement it this month :smiley:
Hey @lerenn
Thank you!
I'll gave it a try yesterday and ran into two issues/challenges:
func Subscribe(ctx context.Context, channel string)
is called, but I have to put this consumer as a local var, because different channels/subjects (in JetStream) are only possible to filter once the message hits the service. In theory we could also add multiple consumer, but this will hurt our use-case.Thanks in advance! π
Here is the full source code which I have so far:
package events
import (
"context"
"github.com/lerenn/asyncapi-codegen/pkg/extensions"
"github.com/lerenn/asyncapi-codegen/pkg/extensions/brokers"
"github.com/nats-io/nats.go"
"github.com/nats-io/nats.go/jetstream"
)
// Check that it still fills the interface.
var _ extensions.BrokerController = (*JetStreamController)(nil)
// JetStreamController is the Controller implementation for asyncapi-codegen.
type JetStreamController struct {
natsConnection *nats.Conn
jetStream jetstream.JetStream
jetStreamConsumer jetstream.Consumer
logger extensions.Logger
queueGroup string
stream string
consumer string
}
// JetStreamControllerOption is a function that can be used to configure a NATS controller
// Examples: WithLogger().
type JetStreamControllerOption func(controller *JetStreamController)
// WithLoggerExtension attaches a logger to the controller
func WithLoggerExtension(logger extensions.Logger) JetStreamControllerOption {
return func(controller *JetStreamController) {
controller.logger = logger
}
}
// NewJetStreamController creates a new NATS JetStream controller.
func NewJetStreamController(url string, stream, consumer string, options ...JetStreamControllerOption) *JetStreamController {
// Connect to NATS
nc, err := nats.Connect(url)
if err != nil {
panic(err)
}
// Create a JetStream management interface
js, err := jetstream.New(nc)
if err != nil {
panic(err)
}
// Check if stream exists
if _, err = js.Stream(context.Background(), stream); err != nil {
panic(err)
}
// Check if consumer exists
c, err := js.Consumer(context.Background(), stream, consumer)
if err != nil {
panic(err)
}
// Creates default controller
controller := &JetStreamController{
natsConnection: nc,
jetStream: js,
jetStreamConsumer: c,
queueGroup: brokers.DefaultQueueGroupID,
logger: extensions.DummyLogger{},
}
// Execute options
for _, option := range options {
option(controller)
}
return controller
}
// Publish a message to the broker.
func (c *JetStreamController) Publish(ctx context.Context, channel string, bm extensions.BrokerMessage) error {
msg := nats.NewMsg(channel)
// Set message headers and content
for k, v := range bm.Headers {
msg.Header.Set(k, string(v))
}
msg.Data = bm.Payload
// Publish message
if _, err := c.jetStream.PublishMsg(ctx, msg); err != nil {
return err
}
return nil
}
var subscriptions = make(map[string]extensions.BrokerChannelSubscription)
var consumeContext *jetstream.ConsumeContext = nil
// Subscribe to messages from the broker.
func (c *JetStreamController) Subscribe(_ context.Context, channel string) (extensions.BrokerChannelSubscription, error) {
if (subscriptions[channel] != extensions.BrokerChannelSubscription{}) {
return subscriptions[channel], nil
}
// Create a new subscription
sub := extensions.NewBrokerChannelSubscription(
make(chan extensions.BrokerMessage, brokers.BrokerMessagesQueueSize),
make(chan any, 1),
)
var cc jetstream.ConsumeContext
if consumeContext == nil {
// Consume messages
cc, err := c.jetStreamConsumer.Consume(c.MessagesHandler())
if err != nil {
return extensions.BrokerChannelSubscription{}, err
}
consumeContext = &cc
}
// Wait for cancellation and drain the NATS subscription
sub.WaitForCancellationAsync(func() {
cc.Stop()
})
// Add to subscriptions
subscriptions[channel] = sub
return sub, nil
}
func (c *JetStreamController) MessagesHandler() jetstream.MessageHandler {
return func(msg jetstream.Msg) {
// Get headers
headers := make(map[string][]byte, len(msg.Headers()))
for k, v := range msg.Headers() {
if len(v) > 0 {
headers[k] = []byte(v[0])
}
}
sub := subscriptions[msg.Subject()]
if sub == (extensions.BrokerChannelSubscription{}) {
c.logger.Error(context.Background(), "Received message for unknown subscription", extensions.LogInfo{
Key: "subject",
Value: msg.Subject(),
})
_ = msg.Ack()
return
}
// Create and transmit message to user
sub.TransmitReceivedMessage(extensions.BrokerMessage{
Headers: headers,
Payload: msg.Data(),
})
// Acknowledge message
_ = msg.Ack()
}
}
// Close closes everything related to the broker.
func (c *JetStreamController) Close() {
c.natsConnection.Close()
}
Hey @stefanmeschke !
Thanks for the amazing work ! π
For the first point, I don't see the drawback of having only one consumer, so we can try do design it this way ! But I don't understand what you need exactly: one consumer for all subscriptions on the AsyncAPI ? If that's it, we can add a subject to listen to the only consumer each time Subscribe
is call and do the message reception in two steps:
Subscribe
) that await a message from the reception goroutine and redirect it to the user (through sub.TransmitReceivedMessage
) graph TD;
JetStreamConsumer-->DispatchGoroutine;
DispatchGoroutine-->Subject1Goroutine;
DispatchGoroutine-->Subject2Goroutine;
Subject1Goroutine-->UserSubscriptionForSubject1;
Subject2Goroutine-->UserSubscriptionForSubject2;
For the second point, I see that you ACK the message at the end of MessagesHandler()
, what would you ideally want? A retry mecanism? A fail mecanism? For the last one, we could ACK the message before sending it to the user (so above sub.TransmitReceivedMessage
) so if the ACK fails, we don't send the message to the user. But again I'm not really experienced with JetStream, and that maybe not what you want.
Also, I tried to implement it quickly yesterday and put the branch on the repository if you want to take a look/reuse some of my code (spoiler alert: it doesn't work π): https://github.com/lerenn/asyncapi-codegen/tree/add-nats-jetstream-broker. But in order to match your use-case, and because community co-working is nice, I would prefer to use your version π
Hey @lerenn
Hope you have/had nice Holidays! π
I've created a draft pull request with your recommendations from above.
For the second point, I see that you ACK the message at the end of MessagesHandler(), what would you ideally want?
Ideally I could handle this in the business logic. Just thought of returning custom errors: RetryableError
and NonRetryableError
. π€ At the moment I don't see any possibility to do this due to the architecture, but I'm pretty sure that this would be awesome.
In the draft pull request I've just ack'd optimistically, which is in 99.999999% of the cases cool, but for the other things (short outage of other dependencies etc.) this could lead to unwanted behaviour in the app.
Really looking forward to your thoughts on the PR. π
Cheers, Stefan
PS: Sorry for the delayed response. There was a little year-end-rush on my side for other thingsβ¦
Hello @stefanmeschke !
Thanks for this awesome work ! :pray: And no worries, this is really nice of you to propose this PR in the first place (also Christmas time is no easy time :smile:).
For the returning of custom errors, that could be a great improvement! I think it would better to create another issue on the subject to add it to this broker, but others as well.
I'll take a look at your PR asap (lots going on on my side as well). Thanks again :relieved:
Also, I don't see exactly what you are suggesting with the Retry/NoRetry error in this sentence:
Ideally I could handle this in the business logic. Just thought of returning custom errors:
RetryableError
andNonRetryableError
Would you means that the subscriber could retry (or not) to retrieve the message, or the publisher to retry (or not) the sending of the message?
Please do not hesitate to elaborate on this for me to better understand what you would see for this problem :pray:
Oki. Lemme try to explain it what I meant:
In JetStream consumers can acknowledge messages or not acknowledge messages (with or without delay). Based on the policies this message will den re-delivered (push consumer) or re-pulled (pull-consumer). This is very handy when e.g. something went wrong in the business logic (thinking of a short outage from a dependent service or something else). At the moment there is no connection of the message to the JetStream message once this it was parsed and delivered to the subscriber. Therefore this message gets acknowledged in the broker, which is from my point of view not ideal, because something could happen in the subsequent business logic, which could/should led to e.g. not acknowledging this message. So it's all about the consumption, not the publishing.
I personally thought of something in the context, which gets passed to the subscription. π€·
Hope this makes sense? π
Okay, thanks for the explanation, I understand it better ! :pray:
So now we have:
flowchart LR
Emitter-->Broker;
Broker-->Reception;
Reception--ACK-->Broker;
Reception-->BusinessLogic;
BusinessLogic-->Failure[Possible failure]
So it would great to have:
flowchart LR
Emitter-->Broker;
Broker-->Reception;
Reception-->BusinessLogic;
BusinessLogic-->Failure[Possible failure]
Failure-->ReceptionRetry;
Broker-->ReceptionRetry;
ReceptionRetry-->BusinessLogicRetry;
BusinessLogicRetry-->Success;
Success--ACK-->Broker;
I'll create a ticket based on that, if that's okay for you ! :)
Yes and no. π In case of NATS JetStream it is something like this:
flowchart TB
Emitter-->Broker;
Broker-->Reception;
Reception-->BusinessLogic;
BusinessLogic-->Success;
Success--ACK-->Broker;
BusinessLogic-->Failure
Failure--NACK with or without delay-->Broker;
I would also be up to contribute on this feature. Maybe we could elaborate on the new issue how we wan't to achieve this (code wise)?
First though: This is super specific to any broker. Therefore I thought to make it more general and to handle then errors in the brokers separately.
Oh right! It makes more sense with your schema. I'll create a new issue :)
First things first: Thanks for this awesome package! π Are there any plans to support NATS JetStream?