nats-io / nats.go

Golang client for NATS, the cloud native messaging system.
https://nats.io
Apache License 2.0
5.49k stars 690 forks source link

JS: nats.NoEcho() option not working for Jetstream #752

Closed sloveridge closed 3 years ago

sloveridge commented 3 years ago

Defect

As Jetstream is using the underlying nats connection I would expect that setting a nats.NoEcho() option for the connection would prevent echoing of messages to subscribers on this connection.

A basic use case for this being needed would be something like having a subject ORDERS.updates with multiple services publishing updates and needing to listen for updates from other services to sync state / trigger actions.


Versions of nats.go and the nats-server if one was involved:

nats.go v1.11.0

OS/Container environment:

docker pull nats:2.2.6

Steps or code to reproduce the issue:

package main

import (
    "fmt"
    "github.com/nats-io/nats.go"
)

func main() {
    nc, _ := nats.Connect(nats.DefaultURL, nats.NoEcho())
    js, _ := nc.JetStream()
    js.AddStream(&nats.StreamConfig{Name: "TEST", Subjects: []string{"TEST.*"}})
    js.Subscribe("TEST.echo", func(m *nats.Msg) {
        fmt.Println(string(m.Data))
    })
    js.Publish("TEST.echo", []byte("Echo"))
    js.DeleteStream("TEST")
    nc.Close()
}

Expected result:

Messages published on the same nats connection are not passed on to subscribers on the same connection

Actual result:

Messages are arriving on a subscriber.

derekcollison commented 3 years ago

JetStream does not have the concept of NoEcho at the moment. The lower level NATS NoEcho does not help here.

sloev commented 2 years ago

@derekcollison @sloveridge does this mean that having a shared topic is a bad pattern for multiple client doing pub/sub on same subjects?

how to do acknowledged broadcast on shared medium then?

kind regards

derekcollison commented 2 years ago

Can you describe what you mean a bit more? Are you asking if an app is both the producer of messages that will be stored in JetStream but also is a generic consumer?

If using JetStream I would not process a message outside of a consumer. Meaning, even if an app generates a message it will not process it until its received back by the consumer.

If that is really not possible, you can place a header identifying the message and either ignore or NAK as needed.

sloveridge commented 2 years ago

@sloev I raised this initially as I expected NoEcho to work with jetstream as well but changed my mind on the use case.

I agree with Derek's statements on implementation. You shouldn't process a message pre-publish but instead at the consumer receive and should consider whatever request / event was the trigger for the generation of the message to have failed if the publish didn't succeed.

sloev commented 2 years ago

i dont have much time to answer but do anyway to not let this go unanswered so bear with me 🐻

my usecase: two actors: a: vessel, only one can exist per id, reads commands, produces data b: operators, multiple can exist per vessel id, reads data, produces commands

two kinds of information exist: a: data, needs to be persisted for a short while, no need for end to end acknowledgments, but acks are needed for delivery/push from/to server b: commands, needs to be ack'ed towards/from nats server plus end-to-end

i ended up doing the following:

for each vessel id there is three jetstreams:

for operators the flow is like this: they subscribe to data, and send commands to vessels, when they send commands they also listen for responses (acks) to their commands

for vessels: they publish data and subscribe to commands, when they get a command they process it and transmit an ok signal on the response stream with the id given in the command payload.

the command and command_response streams dont need retention but they need acks

it works great and lets me have end to end ack where needed plus retention for data so when a new operator joins it will have a complete dataset for the last hour