nats-io / nats-server

High-Performance server for NATS.io, the cloud and edge native messaging system.
https://nats.io
Apache License 2.0
15.49k stars 1.38k forks source link

consumer filter does not work as expected #4401

Closed tpihl closed 1 year ago

tpihl commented 1 year ago

Defect

Make sure that these boxes are checked before submitting your issue -- thank you!

Versions of nats-server and affected client libraries used:

nats-server v2.9.21 nats.go v1.28.0

OS/Container environment:

linux

Steps or code to reproduce the issue:


func Test_ConsumerWildcardFilter1(t *testing.T) {
    /* why allow multiple subject filters on a stream?

    The most important reason is probably that the order between those messages are important!!

    The steps below would have been perfectly legal if i ordered my subject differently, but
    subject is only a hierarchy if you choose to use '>' wildcard. It should not matter if
    I use <event>.<species>.<name> or <species>.<name>.<event> or <name>.<event>.<species>,
    I should be able to use '*' wildcard instead of the literal

    */
    nc, err := nats.Connect("")
    if err != nil {
        t.Fatal(err)
    }
    js, err := jetstream.New(nc)
    if err != nil {
        t.Fatal(err)
    }
    streams := js.StreamNames(context.TODO())
    for name := range streams.Name() {
        js.DeleteStream(context.TODO(), name)
    }
    if streams.Err() != nil {
        t.Fatal(streams.Err())
    }

    // we're in start state, no conflicting streams

    streamConfigAnimals := jetstream.StreamConfig{
        Name:     "pet-events",
        Subjects: []string{"born.*.*", "bought.*.*", "died.*.*", "sold.*.*"},
        Storage:  jetstream.MemoryStorage,
    }
    if _, err = js.CreateStream(context.TODO(), streamConfigAnimals); err != nil {
        t.Fatal(err)
    }

    if _, err := js.Publish(context.TODO(), "born.chimp.hugo", []byte(`1970`)); err != nil {
        t.Fatal(err)
    }

    if _, err := js.Publish(context.TODO(), "born.babian.rupert", []byte(`1982`)); err != nil {
        t.Fatal(err)
    }

    if _, err := js.Publish(context.TODO(), "bought.orangutang.bert", []byte(`1984`)); err != nil {
        t.Fatal(err)
    }

    if _, err := js.Publish(context.TODO(), "born.orangutang.bert", []byte(`1980`)); err != nil {
        t.Fatal(err)
    }

    if _, err := js.Publish(context.TODO(), "sold.terrier.rupert", []byte(`2001`)); err != nil {
        t.Fatal(err)
    }

    if _, err := js.Publish(context.TODO(), "sold.orangutang.bert", []byte(`2002`)); err != nil {
        t.Fatal(err)
    }

    if _, err := js.Publish(context.TODO(), "washed.tesla.james", []byte(`2023`)); err == nil {
        t.Fatal(`this should have failed since nothing captures washed.

This test is only to verify that there isnt anything conflicting with the next consumer filter even if the consumer
was created without any stream reference. However, note that the consumer is created ON a stream and cannot see
any subject except those captured by the stream!!

`)
    }

    if _, err := js.CreateOrUpdateConsumer(context.TODO(), "pet-events", jetstream.ConsumerConfig{
        Name:          "history-of-organgutang-bert",
        FilterSubject: "*.orangutang.bert",
    }); err != nil {
        t.Error("how can i read about bert without reading all messages? trying with *.orangutang.bert gave", err)
    }
}

func Test_ConsumerWildcardFilter2(t *testing.T) {
    /*
        why do we allow changes on a stream if that removes our chance to read older msgs that where captured but no longer is?

        we should be able to filter the stream at least on the subjects that streaminfo returns

        i cannot find a way to read the previously captured info (no longer captured) except to read every single message
        and manually matching subject in the client

    */
    nc, err := nats.Connect("")
    if err != nil {
        t.Fatal(err)
    }
    js, err := jetstream.New(nc)
    if err != nil {
        t.Fatal(err)
    }
    streams := js.StreamNames(context.TODO())
    for name := range streams.Name() {
        js.DeleteStream(context.TODO(), name)
    }
    if streams.Err() != nil {
        t.Fatal(streams.Err())
    }
    // we're in start state, no conflicting streams

    if _, err = js.CreateStream(context.TODO(), jetstream.StreamConfig{
        Name:     "my-stuff",
        Subjects: []string{"*.*.*"},
        Storage:  jetstream.MemoryStorage,
    }); err != nil {
        t.Fatal(err)
    }

    if _, err := js.Publish(context.TODO(), "ape.chimp.hugo", []byte(`1970-1988`)); err != nil {
        t.Fatal(err)
    }

    if _, err := js.Publish(context.TODO(), "ape.orangutang.bert", []byte(`1984-2003`)); err != nil {
        t.Fatal(err)
    }

    if _, err := js.Publish(context.TODO(), "dog.terrier.rupert", []byte(`1985-2001`)); err != nil {
        t.Fatal(err)
    }
    if _, err := js.Publish(context.TODO(), "car.tesla.james", []byte(`2023-`)); err != nil {
        t.Fatal(err)
    }

    // decide to split my stuff to pets and stuff, need to narrow stuff first
    if _, err := js.UpdateStream(context.TODO(), jetstream.StreamConfig{
        Name:     "my-stuff",
        Subjects: []string{"car.*.*"},
        Storage:  jetstream.MemoryStorage,
    }); err != nil {
        t.Fatal(err)
    }
    if _, err := js.CreateStream(context.TODO(), jetstream.StreamConfig{
        Name: "my-pets",
        Subjects: []string{
            "ape.*.*",
            "cat*.*",
            "dog.*.*",
        },
        Storage: jetstream.MemoryStorage,
    }); err != nil {
        t.Fatal(err)
    }

    stream, err := js.Stream(context.TODO(), "my-stuff")
    if err != nil {
        t.Fatal(err)
    }
    si, err := stream.Info(context.TODO(), jetstream.WithSubjectFilter(">"))
    if err != nil {
        t.Fatal(err)
    }

    subject := "ape.chimp.hugo"
    if si.State.Subjects[subject] != 1 {
        t.Fatalf("I expected there to be a hugo msg in there")
    }

    if _, err = js.CreateOrUpdateConsumer(context.TODO(), "my-stuff", jetstream.ConsumerConfig{
        Name:          "hugo",
        FilterSubject: subject,
    }); err != nil {
        t.Error("I cannot read about hugo even though i use the exact subject reported by stream.Info", err)
    }

    if _, err = js.CreateOrUpdateConsumer(context.TODO(), "my-stuff", jetstream.ConsumerConfig{
        Name:          "hugo",
        FilterSubject: "*.chimp.hugo",
    }); err != nil {
        t.Error("I cannot read about hugo even though i use a subject that is legal in the current stream def", err)
    }
}

Expected result:

The consumer filter should be evaluated only within the messages that is in the stream. The consumer filter have no relations to any other stream (or this streams current capture subjects)

Actual result:

Cannot create a consumer with wildcard spanning more than a single of the streams subject.

tpihl commented 1 year ago

@derekcollison or @bruth

Any chance for a quick review of this and some indication if this will be addressed in the near future?

bruth commented 1 year ago

Interesting, I ran your first test and I understand why this is confusing. Given a filter of *.orangutang.bert, a stream subject of foo.*.* could satisfy that as well, so it is not exclusive to this list born.*.*, bought.*.*, died.*.*, sold.*.*, but in the context of a stream the consumer is bound to, the filter should be valid for determining what messages to deliver.

Subset exclusivity is required check when creating streams to ensure there is no overlap of subjects, but agree that validating a subject filter could be more liberal since its scoped to a stream.

@derekcollison Does this seem reasonable or am I overlooking something subtle?

bruth commented 1 year ago

For a bit of historical context, when the original JS API was designed with the js.Subscribe* methods (for example), it depended on an exclusive "lookup stream by subject" behavior which is still how it works. However, with the shift of the API to explicitly create consumers bound to streams, this constraint could be relaxed. For the existing API, the lookup would need to be decoupled from the filter check.

derekcollison commented 1 year ago

2.10 it is more liberal..

bruth commented 1 year ago

@tpihl Confirmed, your first test passes with the dev branch (pending 2.10 release).

tpihl commented 1 year ago

@bruth There are so many things on dev branch changing that I'm waiting to do the detailed tests when it's out.

When will it be out?