Closed dario-alo closed 5 months ago
thanks for the report @dario-alo. this isn't .NET specific, right?
afaik, because you setup the stream to pick up all messages using wildcard *.*
I think it's potentially picking up all _INBOX.xxxx
messages etc. Looks to me this is expected. I'm not familiar with using this type of wildcard in streams, I might be missing something. cc @aricart
Yes it would pick _INBOX.xxxx and that is very bad - aside from whatever happens on the JetStream side, there would be a side effect for all requests that use the inbox - the stream would capture the publish, and then reply its puback to that request, meaning that depending on the timing the response you get may be coming from JetStream ingesting the message.
The fix here is for the stream to be more explicit about the subjects that it wants to capture.
Thanks for you quick reply @mtmk @aricart
Not sure if it's .NET specific.
In my system, we want for the stream to take every message with that segment format, and we did some tests so we guess that there is a problem with the two segment format specifically. This is the result:
*
: OK - captures everythingtests.*
: OK - captures every message started with "tests" and two segments*.*
: KO - receiving an infinite loop where the message is consumed infinite times*.*.*
: OK - captures every message with 3 segments
.... OK with more segmentsAre you using next()
call to consume?
if you run the server in verbose with -V
mode for example nats-server -js -V
you should see the _INBOX.*
messages which are sent back from the server confirming pull requests which are used to consume the messages in a stream.
I'm using the Fetch function. Here you have how I'm consuming the messages:
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
stoppingToken.ThrowIfCancellationRequested();
while (!stoppingToken.IsCancellationRequested)
{
try
{
var js = new NatsJSContext((NatsConnection)_connection);
var consumer = await js.GetConsumerAsync(_streamName, _durableName, stoppingToken);
SetState(true);
Logger.Active(Name);
while (!stoppingToken.IsCancellationRequested)
{
try
{
var counter = 0;
await consumer.RefreshAsync(stoppingToken);
var opts = new NatsJSFetchOpts
{
MaxMsgs = _subscriptorOptions.BatchSize,
MaxBytes = _subscriptorOptions.MaxBytes,
Expires = TimeSpan.FromMilliseconds(_subscriptorOptions.MaxWaitMilis)
};
await foreach (var message in consumer?.FetchNoWaitAsync<NatsMemoryOwner<byte>>(opts: opts, cancellationToken: stoppingToken))
{
if (stoppingToken.IsCancellationRequested)
{
break;
}
await ConsumeMessage(message, stoppingToken);
counter++;
}
if (counter == 0)
{
await Task.Delay(1000, stoppingToken);
}
}
catch (NatsJSProtocolException ex)
{
Logger.NatsException(ex.ToString());
}
catch (NatsJSException ex)
{
Logger.NatsException(ex.ToString());
await Task.Delay(1000, stoppingToken);
}
}
}
catch (Exception ex)
{
Logger.Exception(ex.ToString());
SetState(false, ex);
Logger.Inactive(Name);
await Task.Delay(1000, stoppingToken);
continue;
}
SetState(false);
Logger.Inactive(Name);
}
}
private async Task ConsumeMessage(NatsJSMsg<NatsMemoryOwner<byte>> message, CancellationToken cancellationToken)
{
cancellationToken.ThrowIfCancellationRequested();
IEvent @event = null;
try
{
@event = await _mapper.MapAsync(_durableName, message, cancellationToken);
await HandleEventAsync(@event, cancellationToken);
await message.AckAsync(cancellationToken: cancellationToken);
}
catch (Exception ex)
{
var errorContext = new ErrorContext(ex, message, @event);
await HandleErrorAsync(errorContext, cancellationToken);
if (errorContext.SendAck)
{
await message.AckAsync(cancellationToken: cancellationToken);
}
else
{
await message.NakAsync(delay: errorContext.NACKDelay, cancellationToken: cancellationToken);
}
}
}
ah! you're using fetch-no-wait. OK that makes more sense that this would generate messages in a fast pace. essentially stream is picking up JetStream control messages, in this case request-timeout messages sent to the inbox such as _INBOX.7BM1QH59JGXZQALW977OE5
:
HMSG _INBOX.7BM1QH59JGXZQALW977OE5 13 $JS.ACK.s1.c1.1.45.45.1718195533928968700.0 81 81
NATS/1.0 408 Request Timeout
Nats-Pending-Messages: 1
Nats-Pending-Bytes: 0
Not sure if it's .NET specific.
This exact behaviour (messages being delivered to the application code) is .NET specific, but same issue happens silently in other clients I tried:
Javascript and Go clients are doing the same thing, except they are not passing the messages to the application but that doesn't matter, which arguably worst because you won't see the effect immediately that the steam is filled with 'Request Timeout' messages. anyway!
So basically it's not a good idea trying to wiretap (using all wildcard subjects e.g. >
, *.*
, *.*.*
etc.) all messages into a stream since there is a good chance you won't be able to read the messages back out of it.
Are you trying to build some kind of tooling around monitoring streams? Good people on slack.nats.io #jetstream channel might be able to help you perhaps.
Thanks @mtmk
I finally did what you explained, adding a more specific subject x.*.*
in the stream.
You may close the issue if this is happening in other clients.
For me, it's still a bug in the specific wilcard *.*
. Even it's not recommended to use the generic ones, other generic wildcards with different formats (segments), as I explained above, work perfectly. For example: *
, *.*.*
and *.*.*.*
Thanks @mtmk I finally did what you explained, adding a more specific subject
x.*.*
in the stream. You may close the issue if this is happening in other clients.
sure, no worries. I never thought of this. I learn something every day :)
For me, it's still a bug in the specific wilcard
*.*
. Even it's not recommended to use the generic ones, other generic wildcards with different formats (segments), as I explained above, work perfectly. For example:*
,*.*.*
and*.*.*.*
until implementation changes and starts using an inbox with three segments ;) it's just how request reply works under the hood.
edit e.g.
await using var nats = new NatsConnection(new NatsOpts { InboxPrefix = "_IN.BOX" });
I tried to add the segment in the prefix and it works now 👍
We will add the InboxPrefix
option in our library config with this behaviour explained in case of anyone need to use this two segments format.
Thanks again for your help :)
Observed behavior
Publish events on a stream with subjects
["*.*"]
triggers a loop where the messages are consumed infinitely. I test with 1 event, and one publish is executed correctly but the number of consume messages is infinite. The "NumPending" of message is increasing on every consume interation.Curiously, this only occurs with the subjects
["*.*"]
, if you use any other two segments values, it works as expected, like["tests.*"]
.Here it is my NATs configuration on the test:
"Nats" : { "ConnectionString" : "nats://localhost", "StreamName" : "test", "Stream": { "Name": "test", "Subjects": ["*.*"] }, "Durables": { "durable1": { "Subjects": "tests.*" } } }
Expected behavior
Streams with subjects value ["."] work in the same way as streams with other subjects values.
Server and client version
nats-server nats:2.10.14-alpine natsio/nats-server-config-reloader:0.14.2 natsio/prometheus-nats-exporter:0.14.0 nats v0.1.4
Host environment
No response
Steps to reproduce
No response