nats-io / nats.go

Golang client for NATS, the cloud native messaging system.
https://nats.io
Apache License 2.0
5.47k stars 688 forks source link

PullSubscription must match Consumer Filter Subject #736

Open syacko-sote opened 3 years ago

syacko-sote commented 3 years ago

Defect

When calling PullSubscribe, with a subject that is a subset of the Consumer filter subject, the call fails with this error message: nats: subject does not match consumer

The code on line 1057 of js.go says that the pullsubscribe subject can be a subset of the consumer filter subject. On line 1058, the test is that the pullsunscribe matches the consumer filter subject. See image of code below.

Make sure that these boxes are checked before submitting your issue -- thank you!

Versions of nats.go and the nats-server if one was involved:

nats.go 1.11.0 which is running against NSG leaf server

OS/Container environment:

Linux?

Steps or code to reproduce the issue:

Consumer:

? Select a Consumer bsl-organizations-wildcard
Information for Consumer business-service-layer > bsl-organizations-wildcard created 2021-05-04T15:03:58-07:00

Configuration:

        Durable Name: bsl-organizations-wildcard
           Pull Mode: true
      Filter Subject: bsl.organization.>
        Deliver Next: true
          Ack Policy: Explicit
            Ack Wait: 30s
       Replay Policy: Instant
  Maximum Deliveries: 1
     Max Ack Pending: 20,000

State:

   Last Delivered Message: Consumer sequence: 292 Stream sequence: 2750
     Acknowledgment floor: Consumer sequence: 292 Stream sequence: 2750
         Outstanding Acks: 0 out of maximum 20000
     Redelivered Messages: 4
     Unprocessed Messages: 0

Pull Sub: Subject: bsl.organization.supplier.> Durable Name: bsl-organization-wildcard

js.go code:

Screen Shot 2021-05-26 at 3 49 19 PM

Expected result:

That the consumer with a filter subject of bsl.organization.> would accept a pull subscriber of bsl.organization.supllier.> because it is a sub set of the consumer filter subject.

Actual result:

Error is returned: nats: subject does not match consumer

wallyqs commented 3 years ago

Thanks for the report @syacko-sote, this looks related to this: https://github.com/nats-io/nats.go/issues/731 Looking at how to improve the support for filter subjects, in the meantime for PullSubscribe calling nats.BindStream("foo") may help being able to make the stream bind to the consumer: https://github.com/nats-io/nats.go/issues/731#issue-895907741

alikarimi999 commented 2 years ago

Thanks for the report @syacko-sote, this looks related to this: #731 Looking at how to improve the support for filter subjects, in the meantime for PullSubscribe calling nats.BindStream("foo") may help being able to make the stream bind to the consumer: #731 (comment)

I really need that functionality too and nats.BindStream() does not help and I would like to know did you implement this?

derekcollison commented 2 years ago

Why does nats.BindStream() not help?

alikarimi999 commented 2 years ago

I have a consumer with these configurations

Information for Consumer stream_0 > consumer_0 created 2022-06-15T20:47:17+04:30

Configuration:

        Durable Name: consumer_0
           Pull Mode: true
      Filter Subject: sub.>
      Deliver Policy: All
          Ack Policy: Explicit
            Ack Wait: 30s
       Replay Policy: Instant
     Max Ack Pending: 1,000
   Max Waiting Pulls: 512

State:

   Last Delivered Message: Consumer sequence: 176 Stream sequence: 519 Last delivery: 4.18s ago
     Acknowledgment floor: Consumer sequence: 160 Stream sequence: 510 Last Ack: 12m44s ago
         Outstanding Acks: 8 out of maximum 1,000
     Redelivered Messages: 8
     Unprocessed Messages: 0
            Waiting Pulls: 0 of maximum 512

and when I'm trying to pull messages from a subset of the filtered subject, for example, "sub.0", nats return this ErrSubjectMismatch


func main() {
    nc, _ := nats.Connect("localhost:4222")

    js, _ := nc.JetStream()

    sub, err := js.PullSubscribe("sub.0", "consumer_0", nats.BindStream("stream_0"))
    if err != nil {
        panic(err)
    }

    msgs, err := sub.Fetch(10)
    if err != nil {
        panic(err)
    }

    if len(msgs) > 0 {
        for _, msg := range msgs {
            msg.Ack()
            println(string(msg.Data))
        }
    }
}

Expected result: only messages from "sub.0"

Actual result: error panic: nats: subject does not match consumer

and here despite the comment, being a subset of the FilterSubject doesn't check, and only checks if the FilterSubject is not empty and that the selected subject is precisely equal to the FilterSubject

ripienaar commented 2 years ago

Once a consumer is made it gets all that’s in the consumer filter subject always.

You can though add another consumer with a different filter subject.

You can’t pull from an existing consumer using a different subject than it’s filter.

alikarimi999 commented 2 years ago

Once a consumer is made it gets all that’s in the consumer filter subject always.

You can though add another consumer with a different filter subject.

You can’t pull from an existing consumer using a different subject than it’s filter.

I want to pull from a subset of the consumer's FilterSubject, not a different subject

ripienaar commented 2 years ago

I understand that. But a subset is not the same as the whole so a new consumer is needed. We don’t support what you want

derekcollison commented 2 years ago

Would creating a new, ephemeral R1 consumer with the filter subject you want be an option?

andreib1 commented 2 years ago

I'm not sure from the conversation as to whether NATS server supports this. If not then the comment on the function should change. If it does, then the following function would fix the behavior in the go client:

func isSubjectSubset(subject, test string) bool {
    s := strings.Split(subject, ".")
    t := strings.Split(test, ".")
    if strings.HasSuffix(subject, ">") {
        if len(t) < len(s) {
            // Subject is too short to be a subset
            return false
        }
        // Truncate to >
        t = append(t[:len(s)-1], ">")
    }
    // Check they are the same length
    if len(t) != len(s) {
        return false
    }
    // Compare except wildcards
    for i := 0; i < len(s); i++ {
        if s[i] == "*" {
            continue
        }
        if s[i] != t[i] {
            return false
        }
    }
    return true
}

Should I make a PR?