nats-io / nats.ts

TypeScript Node.js client for NATS, the cloud native messaging system
https://www.nats.io
Apache License 2.0
178 stars 13 forks source link

Subscription gets Auto-unsubscribe by default even when max is not set #29

Closed acostaf closed 5 years ago

acostaf commented 5 years ago

Hi All

Just found an really annoying issue on version 1.0.4, a simple subscription it is getting fired just once. BTW, I have checked this with node-nats 1.3 and works fine

nc.subscribe('updateUIbyInquiry', (err, msg) => {
    if(err) {
        // do something
    } else {
        // do something with msg.data
    }
});

Looking at my logs found that nats it is eager to Auto-unsubscribe

[11976] 2019/01/09 16:49:28.774146 [DBG] 127.0.0.1:57250 - cid:39 - Deferring actual **UNSUB**(**updateUIbyInquiry**): 1 max, 0 received
[11976] 2019/01/09 16:49:29.552146 [DBG] 127.0.0.1:55520 - cid:18 - Auto-unsubscribe limit of 5 reached for sid '1'
[11976] 2019/01/09 16:49:33.011146 [DBG] 127.0.0.1:57250 - cid:39 - Deferring actual **UNSUB**(**updateUIbyInquiry**): 1 max, 0 received
[11976] 2019/01/09 16:49:43.329146 [DBG] 127.0.0.1:57250 - cid:39 - Deferring actual **UNSUB**(**updateUIbyInquiry**): 1 max, 0 received
[11976] 2019/01/09 16:49:43.333146 [DBG] 127.0.0.1:56638 - cid:28 - **Auto-unsubscribe limit of 1 reached for sid '1'**

Changing the max to 999 will be changed to 1

aricart commented 5 years ago

Are you certain that is the code that you are executing? Looks like something is setting the subscription options max to 1.

Try this:

import {Client, connect, SubscriptionOptions} from '../src/nats'

connect("nats://localhost:4222")
.then((nc: Client) => {
    nc.subscribe("foo", (err, msg) => {
        console.log(msg.data);
    })
    nc.publish("foo", "a");
    nc.publish("foo", "b");
    nc.publish("foo", "c");

    nc.flush(() => {
        nc.close();
    });
})
.catch((ex) => {
    console.log('error', ex)
})
aricart commented 5 years ago

Just by any chance did you copy the tsnode-sub.ts program? I see a bug there.

acostaf commented 5 years ago

@aricart

This it just an example, but it is hit once, the same code works fine with mode-nats 1.2.

I have two different microservices talking through mats rather the same instance code.

aricart commented 5 years ago

Does the code I posted to you have the issue. The option for Max messages cannot be set unless a value is set

On Wed, Jan 9, 2019, 5:06 PM acostaf notifications@github.com wrote:

@aricart https://github.com/aricart

This it just an example, but it is hit once, the same code works fine with mode-nats 1.2.

I have two different microservices talking through mats rather the same instance code.

— You are receiving this because you were mentioned.

Reply to this email directly, view it on GitHub https://github.com/nats-io/ts-nats/issues/29#issuecomment-452886649, or mute the thread https://github.com/notifications/unsubscribe-auth/AA_DECgKMs1kYM7hKSWp3_HskEdApbRTks5vBmfugaJpZM4Z35ZK .

acostaf commented 5 years ago

@aricart

I managed to test your code, having the publish method after the subscription works as many publish you have, also prevent nats from writing on the logs Deferring actual UNSUB(updateUIbyInquiry) however when my second microservice publish the subscription it is already closed

acostaf commented 5 years ago

@aricart

I think I found the issue in the code, everything happens during subscription and assigning the sid to the new subscription, updateUIbyInquiry happen to be the first subscription some milliseconds later a second one is created overriding sid number 1, which is already used by updateUIbyInquiry

 ProtocolHandler.prototype.subscribe = function (s) {
        if (this.isClosed()) {
            throw (error_1.NatsError.errorForCode(error_1.ErrorCode.CONN_CLOSED));
        }
        if (this.draining) {
            throw (error_1.NatsError.errorForCode(error_1.ErrorCode.CONN_DRAINING));
        }
        var sub = this.subscriptions.add(s);
        if (sub.queue) {
            this.sendCommand(this.buildProtocolMessage("SUB " + sub.subject + " " + sub.queue + " " + sub.sid));
        }
        else {
            this.sendCommand(this.buildProtocolMessage("SUB " + sub.subject + " " + sub.sid));
        }
        if (s.max) {
            this.unsubscribe(this.ssid, s.max);
        }
        return new nats_1.Subscription(sub, this);
    };

Return by var sub = this.subscriptions.add(s); during updateUIbyInquiry subscription received:0 sid:1 subject:"updateUIbyInquiry"

Return by var sub = this.subscriptions.add(s); after second subscription max:1 received:0 sid:1 subject:"_INBOX.NY1S598LU1WRRXDWHYZFOL"

Update 1: Looking further into the code found that var sub = this.subscriptions.add(s) which call Subscriptions.prototype.add is not increasing this.sidCounter on the second call but start by 0

 Subscriptions.prototype.add = function (s) {
        this.sidCounter++;
        this.length++;
        s.sid = this.sidCounter;
        this.subs[s.sid] = s;
        var se = { sid: s.sid, subject: s.subject, queue: s.queue };
        this.emit('subscribe', se);
        return s;
    };

Update 2: I also think this.sidCounter++; it is failing because I have multiple connections on my main microservice, So sid 1 it is assigned multiple time per connections.

aricart commented 5 years ago

@acostaf - each connection has independent subscription counters. So if connA makes 3 subscriptions the last will have a sid of 3. If connB on the same process or by itself, makes 1, it's sid will be 1.

Also - on your output the subscription for _INBOX. NY1S598LU1WRRXDWHYZFOL looks to me as usage of the request api, which would generate an inbox, and should only live until the first reply is received. If the prints you display are sequential, they are on different connections.

Could you write a sample test that demonstrates the issue?

acostaf commented 5 years ago

@aricart

That is not the case, when _INBOX. NY1S598LU1WRRXDWHYZFOL is added after updateUIbyInquiry with the sid as 1 and not max _INBOX. NY1S598LU1WRRXDWHYZFOL overrides max with 1 then calls for

(s.max) {
    this.unsubscribe(this.ssid, s.max);
}

My recommendation is to prefix the sid creating with a connection token or something similar to that.

acostaf commented 5 years ago

@aricart sorry to bother you, have you had an opportunity to see the issue ?

aricart commented 5 years ago

if you look at the code, max is only set if you set it - otherwise, the property is undefined.

aricart commented 5 years ago

Here's a sample program that illustrates that each client gets it's own subscription ids, and the subscriptions continue processing messages.

import {Client, connect, Msg} from '../src/nats'
import {NatsError} from "../src/error";

let max = 10;

Promise.all([connect("nats://localhost:4222"), connect("nats://localhost:4222")])
    .then((conns) => {
        conns.forEach(function(nc: Client, index: number) {
            let counter = 0;

            let logger = function(err: NatsError | null, msg: Msg): void {
                counter++;
                let m = `client: ${index} sub sid: ${msg.sid} sub: ${msg.subject}: ${msg.data}`;
                console.log(m);

                if(counter === max*2) {
                    nc.close();
                }
            };

            nc.subscribe("foo", logger);
            nc.subscribe("bar", logger);

            nc.flush();
        });

        conns.forEach((nc: Client, index: number) => {
            for (let i=0; i < max; i++) {
                    nc.publish("foo", `from ${index} - msg: ${i+1}`);
                    nc.publish("bar", `from ${index} - msg: ${i+1}`);
                }
            });

    }, (err)=> {
        console.log('error', err);
    });
> ts-node test.ts
client: 1 sub sid: 1 sub: foo: from 1 - msg: 1
client: 1 sub sid: 2 sub: bar: from 1 - msg: 1
client: 1 sub sid: 1 sub: foo: from 1 - msg: 2
client: 1 sub sid: 2 sub: bar: from 1 - msg: 2
client: 1 sub sid: 1 sub: foo: from 1 - msg: 3
client: 1 sub sid: 2 sub: bar: from 1 - msg: 3
client: 1 sub sid: 1 sub: foo: from 1 - msg: 4
client: 1 sub sid: 2 sub: bar: from 1 - msg: 4
client: 1 sub sid: 1 sub: foo: from 1 - msg: 5
client: 1 sub sid: 2 sub: bar: from 1 - msg: 5
client: 1 sub sid: 1 sub: foo: from 1 - msg: 6
client: 1 sub sid: 2 sub: bar: from 1 - msg: 6
client: 1 sub sid: 1 sub: foo: from 1 - msg: 7
client: 1 sub sid: 2 sub: bar: from 1 - msg: 7
client: 1 sub sid: 1 sub: foo: from 1 - msg: 8
client: 1 sub sid: 2 sub: bar: from 1 - msg: 8
client: 1 sub sid: 1 sub: foo: from 1 - msg: 9
client: 1 sub sid: 2 sub: bar: from 1 - msg: 9
client: 1 sub sid: 1 sub: foo: from 1 - msg: 10
client: 1 sub sid: 2 sub: bar: from 1 - msg: 10
client: 0 sub sid: 1 sub: foo: from 0 - msg: 1
client: 0 sub sid: 2 sub: bar: from 0 - msg: 1
client: 0 sub sid: 1 sub: foo: from 0 - msg: 2
client: 0 sub sid: 2 sub: bar: from 0 - msg: 2
client: 0 sub sid: 1 sub: foo: from 0 - msg: 3
client: 0 sub sid: 2 sub: bar: from 0 - msg: 3
client: 0 sub sid: 1 sub: foo: from 0 - msg: 4
client: 0 sub sid: 2 sub: bar: from 0 - msg: 4
client: 0 sub sid: 1 sub: foo: from 0 - msg: 5
client: 0 sub sid: 2 sub: bar: from 0 - msg: 5
client: 0 sub sid: 1 sub: foo: from 0 - msg: 6
client: 0 sub sid: 2 sub: bar: from 0 - msg: 6
client: 0 sub sid: 1 sub: foo: from 0 - msg: 7
client: 0 sub sid: 2 sub: bar: from 0 - msg: 7
client: 0 sub sid: 1 sub: foo: from 0 - msg: 8
client: 0 sub sid: 2 sub: bar: from 0 - msg: 8
client: 0 sub sid: 1 sub: foo: from 0 - msg: 9
client: 0 sub sid: 2 sub: bar: from 0 - msg: 9
client: 0 sub sid: 1 sub: foo: from 0 - msg: 10
client: 0 sub sid: 2 sub: bar: from 0 - msg: 10