nats-io / stan.net

The official NATS .NET C# Streaming Client
Apache License 2.0
137 stars 41 forks source link

when using connection.subscribe with Subscription opts, can't get qgroup to work #187

Closed paul42 closed 3 years ago

paul42 commented 4 years ago

I have a bit of code, here, but essentially when I'm connecting using different signatures of the subscribe() method I get different results for queue groups

when I try to build a proof of concept to demonstrate queue groups like this gist: https://gist.github.com/paul42/3b027c5daf989643097a582055ef589e

I get messages delivered to all 3 of my workers - when I comment out my 'subscriptionOptions' parameter, I get the desired 1-message handled per subscription behavior - am I doing something wrong in my setup?

Thanks in advance, Paul

paul42 commented 4 years ago

is it related to durable name, or options somehow? I think I got a bit closer and added some tests in here: https://github.com/nats-io/stan.net/pull/188

ColinSullivan1 commented 4 years ago

I've reproduced your test results locally and will take a look at this over the weekend. Thanks for the PR (#188) with the tests!

ColinSullivan1 commented 3 years ago

When you are using subscribers in a queue group they act as one and so must use the same durable subscriber name. Here's some test code based on your PR that demonstrates queue subs sharing the work w/o overlapped sequence numbers.

        [Fact]
        public void TestBasicQueuePubSubWithSubOptionsExpectFailDueToDurableName()
        {
            byte[] payload = System.Text.Encoding.UTF8.GetBytes("hello");
            Exception ex = null;
            ConcurrentDictionary<ulong, bool> seqDict = new ConcurrentDictionary<ulong, bool>();
            int count = 10;
            var ev = new AutoResetEvent(false);

            using (Context.StartStreamingServerWithEmbedded(Context.DefaultServer))
            {
                using (var c1 = Context.GetStanConnection(Context.DefaultServer, null, clientId: "clientId1"))
                using (var c2 = Context.GetStanConnection(Context.DefaultServer, null, clientId: "clientId2"))
                {
                    IStanSubscription sub1 = null, sub2 = null;
                    int sub1Count = 0;
                    int sub2Count = 0;

                    var subOptions = StanSubscriptionOptions.GetDefaultOptions();
                    subOptions.DurableName = "name1Durable";

                    var otherSubOptions = StanSubscriptionOptions.GetDefaultOptions();
                    otherSubOptions.DurableName = "name1Durable";  // Use the same durable name.

                    EventHandler<StanMsgHandlerArgs> eh = (obj, args) =>
                    {
                        try
                        {
                            if (args.Message.Subscription == sub1)
                            {
                                Interlocked.Increment(ref sub1Count);
                            }
                            else
                            {
                                Interlocked.Increment(ref sub2Count);
                            }

                            Assert.True(args.Message.Sequence > 0);
                            Assert.True(args.Message.Time > 0);
                            Assert.True(args.Message.Data != null);
                            var str = System.Text.Encoding.UTF8.GetString(args.Message.Data);
                            Assert.Equal("hello", str);

                            if (seqDict.ContainsKey(args.Message.Sequence))
                                throw new Exception("Duplicate Sequence found");

                            seqDict[args.Message.Sequence] = true;
                            if (sub1Count + sub2Count == count)
                                ev.Set();
                        }
                        catch (Exception e)
                        {
                            ex = e;
                        }
                    };

                    sub1 = c1.Subscribe("foo", "bar", subOptions, eh);
                    sub2 = c2.Subscribe("foo", "bar", otherSubOptions, eh);

                    for (int i = 0; i < count; i++)
                    {
                        c1.Publish("foo", payload);
                    }

                    Assert.True(ev.WaitOne(Context.DefaultWait));
                    Assert.True(seqDict.Count == 10);
                    Assert.True(sub1Count > 0);
                    Assert.True(sub2Count > 0);
                }
            }
            if (ex != null)
                throw ex;
        }
paul42 commented 3 years ago

ahhh, thanks so much for helping me out there, I did not realize that act as one and so must use the same durable subscriber name but it makes sense now. Thanks for going in detail for me, I really appreciate the help!

ColinSullivan1 commented 3 years ago

Anytime!