lerenn / asyncapi-codegen

An AsyncAPI Golang Code generator that generates all Go code from the broker to the application/user. Just plug your application to your favorite message broker!
Apache License 2.0
91 stars 23 forks source link

Jetstream broker limitations #186

Open nuttert opened 6 months ago

nuttert commented 6 months ago

Hey! I tried to use Jetstreams, it’s cool that it supports playback and acks, but I found some limitations from the natsjetstream wrapper.

  1. As I can see, you put the channel as a map key: image

But sometimes it's very convenient to use wildcards (and native NATS jetsteams support this) like "ORDERS.*" and I expect to subscribe to all such streams, not just certain ones, this works fine in the broker without jetstreams because you don't save it on the map:

image
  1. It is quite inconvenient to specify all the subjects when creating a jetstream broker:
    natsjetstream.WithStreamConfig(jetstream.StreamConfig{
    Name: "pingv3",
    Subjects: []string{
        "ping.v3", "pong.v3",
    },
    }), // Create the stream "ping"

Since this defeats the purpose of code generation, you should specify the channel names in the code manually and not just in the yaml schema.

I expect the Jetstream client to perform similarly to the classic client, with at least two advantages:

And also, what if it sends a lot of traffic, different services use different channels, etc., and as far as I understand the current logic, all clients will receive data from all channels, so it will load networks with channels that are not used(and make a warning log)

image

In the official example everything is quite clear:

image

But in the current wrapper the meaning of the names is unclear - streams, channel, subject, topic, etc. and I assume that there is a different logic there, not like in the example above. But correct me if I'm wrong.

lerenn commented 6 months ago

Hello @nuttert !

I'll try to answer as much as I understand everything, but feel free to correct me if there is something that I might have misunderstood:

  1. For this, I would assume that, even with the *, it would work. I would assume that specifying a channel, with * at the end in the address field, in an AsyncAPI document would work. But if you're pointing it to me, it means that it doesn't work. I'll try it out, but if you have more info one whats happening on Jetstream case, I would be glad to have it :)

  2. Okay, so there is 3 topics here, I'll try to pick them one by one:

    2.1. Subjects creation: I do agree that it is not ideal to manually specify each subject, as there is enough information in the asyncapi document to get the information from. To be honest, I was planning on this, but didn't take the time to implement it. If that's good for you, I'll make an issue with some solution I can think of.

    2.2. One consumer per controller: the logic behind it is that we have one connection for the controller, and avoid mutliplying connection, which I suppose, was a good use for @stefanmeschke who implemented it. However, that's right that the message shouldn't be acked by default. So I would add 2 new changes: specify either we want a consumer per subscription (and not listening to everything) or one consumer for all subscription (and listening to everything) and specify what we want to do with ones from unsubscribed subjects.

    2.3. Naming: Sorry about that, it's difficult to make a clear implementation of something as specific as a broker with something as agnostic as AsyncAPI.

    • Streams and subjects are specific to NATS Jetstream and both of them assembled are the same as the channel mentionned in AsyncAPI.
    • Topic is specific to Kafka and is the same as the channel mentionnend in AsyncAPI. Regarding the NATS Jetstream implementation, that's why the ConsumeMessage function correspond to a consumer from the NATS Jetstream schema you linked and new messages are sent to channelsthat are the AsyncAPI concept used by the codegen. Hope that's clearer, but if you have specific code extract that is obscure, feel free to point it out: I'll try to explain the names or change them if they need to be.

So if I'm correct, I would create 2 issues:

nuttert commented 5 months ago

Thanks for the answer @lerenn! I think you're right, but may be we have some misunderstanding about the point with streams. Let's me explain in the following code.

Here is Subscribe:

// Subscribe to messages from the broker.
func (c *Controller) Subscribe(ctx context.Context, channel string) (extensions.BrokerChannelSubscription, error) {
    // Create a new subscription
    sub := extensions.NewBrokerChannelSubscription(
        make(chan extensions.AcknowledgeableBrokerMessage, brokers.BrokerMessagesQueueSize),
        make(chan any, 1),
    )

    config := &nats.StreamConfig{
        Name:     randString(),
        Subjects: []string{channel},
        NoAck:    true,
    }
    c.jetStream.AddStream(config)

    c.jetStream.Subscribe(channel, func(msg *nats.Msg) {
        c.logger.Info(ctx, fmt.Sprintf("Received message for %s", channel), extensions.LogInfo{
            Key:   "message",
            Value: *msg,
        })
        c.HandleMessage(ctx, msg, sub)
    }, nats.DeliverAll())

    return sub, nil
}

And here is Publish:

// Publish a message to the broker.
func (c *Controller) Publish(ctx context.Context, channel string, bm extensions.BrokerMessage) error {
    msg := nats.NewMsg(channel)

    // Set message headers and content
    for k, v := range bm.Headers {
        msg.Header.Set(k, string(v))
    }
    msg.Data = bm.Payload

    // Publish message
    if _, err := c.jetStream.PublishMsgAsync(msg); err != nil {
        return err
    }

    return nil
}

I used "legacy jetstream" and implemented what I meant above. I guess the advantages of the implementation in that:

But it uses the legacy jetstreams)

stefanmeschke commented 5 months ago

Hey 👋

Interesting issue. Lemme add some thoughts on this:

All streams are independent (c.jetStream.AddStream(config) on each subscribe)

From my point of view each Stream comes with an own AsyncAPI definition and therefore also with an own code generation.

I used "legacy jetstream" and implemented what I meant above.

The legacy API is much powerful than the new API (e.g. subscriptions to one channel). But I don’t think it’s a good idea to implement this with the old API. There might be real world examples where ack’ing messages where the service is not interested in will lead to some issues, but I think with the new acking of messages in this the warning log could easily be circumvented? What I’ve seen so far acking non interesting messages is kind of a pattern.

I don't need to specify stream names when creating the broker client

This information could maybe also derived from the AsyncAPI. Next to this I would like to mention that’s also a good pattern not to provisioning the NATS/JetStream on every service/pod start (consumer, streams, …).