nats-io / nats-server

High-Performance server for NATS.io, the cloud and edge native messaging system.
https://nats.io
Apache License 2.0
15.67k stars 1.39k forks source link

Allow stream "republish" to republish multiple subjects #5269

Open anthonyjacques20 opened 6 months ago

anthonyjacques20 commented 6 months ago

Proposed change

Make stream republish a list instead of an object

Use case

I am trying to forward specific subjects from one stream to another and the current subject architecture doesn't allow for wildcard manipulation. An example: The stream subjects:

  1. hello.a
  2. hello.b
  3. hello.c

I want to republish hello.a -> world.a and hello.b -> world.b but not republish hello.c There's currently no way to do the above since republishing hello.* would include hello.c and I specifically do not want to republish that subject.

I also want to be able to publish messages directly to this new stream so using a mirror doesn't work for this use case.

Contribution

I'm interested in contributing if accepted.

derekcollison commented 6 months ago

Why not use a filtered mirror?

anthonyjacques20 commented 6 months ago

I'll add more detail to the use case cause I see now I didn't add enough info, I also want to be able to publish messages directly to this other stream (due to legacy systems) which eliminates mirroring.

derekcollison commented 6 months ago

Sourced streams can have messages from other streams and their own subjects..

anthonyjacques20 commented 6 months ago

Definitely! The reason I was trying to use republish is because it seems like core nats doesn't get notified on a "sourced" message. I assumed this was a design decision? Here's my example:

➜  ~ nats stream info hello
Information for Stream hello created 2024-04-03 15:12:39

              Subjects: hello.>
              Replicas: 1
               Storage: File

Options:

             Retention: Limits
       Acknowledgments: true
        Discard Policy: Old
      Duplicate Window: 2m0s
            Direct Get: true
     Allows Msg Delete: true
          Allows Purge: true
        Allows Rollups: false

Limits:

      Maximum Messages: unlimited
   Maximum Per Subject: unlimited
         Maximum Bytes: unlimited
           Maximum Age: unlimited
  Maximum Message Size: unlimited
     Maximum Consumers: unlimited

Cluster Information:

                  Name: stargate
                Leader: stargate-0

State:

              Messages: 0
                 Bytes: 0 B
        First Sequence: 0
         Last Sequence: 0
      Active Consumers: 0
➜  ~ nats stream info hello_sourced
Information for Stream hello_sourced created 2024-04-03 15:13:45

                      Subjects: hello_sourced.>
                      Replicas: 1
                       Storage: File

Options:

                     Retention: Limits
               Acknowledgments: true
                Discard Policy: Old
              Duplicate Window: 2m0s
                    Direct Get: true
             Allows Msg Delete: true
                  Allows Purge: true
                Allows Rollups: false

Limits:

              Maximum Messages: unlimited
           Maximum Per Subject: unlimited
                 Maximum Bytes: unlimited
                   Maximum Age: unlimited
          Maximum Message Size: unlimited
             Maximum Consumers: unlimited

Replication:

                       Sources: hello

Cluster Information:

                          Name: stargate
                        Leader: stargate-0

Source Information:

                   Stream Name: hello
  Subject Filter and Transform: hello.> to hello_sourced.>
                           Lag: 0
                     Last Seen: 291ms

State:

                      Messages: 0
                         Bytes: 0 B
                First Sequence: 0
                 Last Sequence: 0
              Active Consumers: 0

Once the streams are created, I follow these steps:

  1. Start subscription on sourced subject nats sub "hello_sourced.>"
  2. Publish message: nats pub "hello.hi" world
  3. We do not see any message come through on the nats sub command but we do see the message in the stream view

    
    ➜  ~ nats stream view hello_sourced
    [1] Subject: hello_sourced.hi Received: 2024-04-03T15:15:09-07:00
    
    Nats-Stream-Source: hello 1 hello.> hello_sourced.>

world

15:17:09 Reached apparent end of data

derekcollison commented 6 months ago

For any stream, its default behavior is to pub ack once the message is committed, but only if you supply a reply subject for the server to send it to. Meaning in your example above (step 2) if you did nats req "hello.hi" world you should see it.

You can turn that behavior off. For instance if the reply subject has prior semantic meaning from an existing system but you want to capture those messages in a stream.

Create the stream, then do nats str edit STREAM --no-ack -f

anthonyjacques20 commented 6 months ago

Hmmm...I tried setting --no-ack for both hello and hello_sourced streams and then doing nats req "hello.hi" world but still didn't see anything show up under nats sub "hello_sourced.>"...I do still see it under the nats stream view hello_sourced" though

But even if that worked, we don't want to use no-ack because in our actual workflow, hello_sourced is of type workqueue and we want the ack because it's really important that these messages make it into the workqueue

derekcollison commented 6 months ago

Let's switch to a video call to discuss.

Feel free to shoot me an email: derek@synadia.com

jnmoyne commented 5 months ago

We do not see any message come through on the nats sub command but we do see the message in the stream view

  1. For that you need to set republish on the stream hello_sourced
  2. And if you do set republish on that stream you would have to republish on a subject other than hello_source.> (as that's what the stream is listening on, therefore you can't republish to that subject (as it would create a loop)
  3. You actually don't need hello_sourced to listen to anything (since it's sourcing), in which case you could then republish to the subject hello_sourced