nats-io / nats.net.v1

The official C# Client for NATS
Apache License 2.0
646 stars 151 forks source link

NotSupportedException in Connection.publish getting BufferedStream.Position #586

Closed DavidSimner closed 6 months ago

DavidSimner commented 2 years ago

We are seeing a System.NotSupportedException with the following stack trace:

   at System.IO.__Error.SeekNotSupported()
   at System.IO.BufferedStream.get_Position()
   at NATS.Client.Connection.publish(String subject, String reply, Byte[] headers, Byte[] data, Int32 offset, Int32 count, Boolean flushBuffer)
   at NATS.Client.Connection.<requestAsync>d__141.MoveNext()

It happens intermittently and isn't 100% reproducible, but here is my best guess as to what is happening:

In the C# class Connection there is a field private Stream bw which is a BufferedStream over another underlying Stream.

In the method publish if the status is ConnState.RECONNECTING then the getter for the property bw.Position is called. This getter is only supported if the underlying Stream supports seeking. As we can see from the stack trace above, in that case, the underlying stream does not support seeking, and this causes a NotSupportedException to be thrown.

Here is my best guess as to what happened:

When the connection first enters ConnState.RECONNECTING, bw is created over a MemoryStream called pending which does support seeking and so is not the cause.

However, later on, the method doReconnect calls the method createConn which changes the field bw to now be created over a stream associated with the TCP socket (which does not support seeking). After createConn has returned successfully, doReconnect then calls processConnectInit. As a thought experiment, consider what happens if an exception is thrown by processConnectInit: it will be caught in the catch block in doReconnect, and the status will be set to ConnState.RECONNECTING but it will not change bw to be over a MemoryStream and so bw will continue to have an underlying stream associated with the TCP socket (which does not support seeking). The catch block will then continue and it will then select the next server and release the mu lock before it calls the ReconnectDelayHandler. At this point imagine that another thread calls publish. publish will throw the stack trace above, because mu is not locked, and the status is ConnState.RECONNECTING, and the underlying stream of bw does not support seeking.

I think the fix is that in every place where the status is set to be ConnState.RECONNECTING, bw should be reset to be created over the MemoryStream called pending

pcarcereri commented 2 years ago

I also had this issue, can you please keep me posted here?

rlabbe82 commented 1 year ago

I've come across this issue, and it is consistently repeatable in my environment with a single NATS server. My expectation was that this buffer would be present and used even if the connection is out for a period of time and isn't just when handling switching between servers.

@DavidSimner I think you are definitely on the right track and seems to be a race condition. The moment I put break points in, the publishing thread is updated with the "correct" updated bw BufferedStream. Since there isn't a second NATS server to connect to in my test environment, the createConn method never changes bw. Unless I'm missing somewhere else that bw would get reset to a stream on the tcp connection.

It's repeatable across multiple .Net and NATS.Client packages on my machine. I haven't gotten familiar enough with the source to know what would be a good fix but I'm willing to help test if anyone wants to look at this.

scottf commented 1 year ago

@rlabbe82 Any chance you have some pseudo / code I could use to repeat this? What version of the clients and service are you using? You said it's repeatable across clients? Which client and also do you have any code I could pass on to the client dev team?

rlabbe82 commented 1 year ago

@scottf Yeah I do. After writing this in I think I figured out what is happening. @DavidSimner's thought experiment is what I am hitting. In the test I'm stopping the local container that is running the NATS server. This has the server close the connection and forces the client to go into the reconnection process.

When the reconnection thread runs and calls createConn I expected that the conn.open method would fail, however it was not. So I monitored the socket with wireshark and found that the reconnection attempt at the TCP level was initially successful, then FINed immediately after. Likely a sign that the docker container hasn't completely closed the socket even though the NATS server started exiting.

image

image

This then replaces the bw stream with the TCP NetworkStream and successfully completes createConn. However the next step, processConnectInit fails, since the socket was immediately closed, which in turn has it catch into changing the status back to reconnecting without updating bw back to the pending memory stream.

image

Seems to be a corner case but I could see it popping up sporadically with network layers like Docker that may not have completely released the external socket and allows for the reconnection to initially be successful. As a quick test I added a line to replace bw with the pending memory stream and that seemed to allow the connection to properly manage the buffer and retry without hitting this again.

Here is the program that I just put together to test with.

using System;
using System.Text;
using System.Threading;

namespace NatsTest
{
    internal class Program
    {
        static void Main(string[] args)
        {
            ConnectionFactory cf = new ConnectionFactory();

            // Creates a live connection to the default
            // NATS Server running locally
            IConnection c = cf.CreateConnection();

            // Publish requests to the given reply subject:
            for (int i = 0; i < 1000; i++)
            {
                c.Publish("foo", "bar", Encoding.UTF8.GetBytes("help!"));
                Thread.Sleep(1000);
            }

            // Draining and closing a connection
            c.Drain();

            // Closing a connection
            c.Close();
        }
    }
}

Found with NATS.Client 1.07, checked on 1.1.1 and rolled back to 0.11.0 as a quick test. NATS server 2.9.19 and 2.10.1 Running this with a local instance, not container, of Nats server does not produce the issue, likely because the socket control isn't through a virtualized network layer.

scottf commented 1 year ago

Thanks for the input, that's an interesting observation about when the server is in a container.

jormenjanssen commented 8 months ago

@scottf Also seeing this bug in our logs (the broker was restarted by our update utility on an embedded Yocto system)

Mar 13 12:19:41 colibri-imx8x-07202716 XXX.API[636]: Unhandled exception. System.NotSupportedException: NotSupported_UnseekableStream
Mar 13 12:19:41 colibri-imx8x-07202716 XXX.API[636]:    at System.IO.BufferedStream.get_Position()
Mar 13 12:19:41 colibri-imx8x-07202716 XXX.API[636]:    at NATS.Client.Connection.PublishImpl(String subject, String reply, MsgHeader inHeaders, Byte[] data, Int32 offset, Nullable`1 inCount, Boolean flushBuffer)
Mar 13 12:19:41 colibri-imx8x-07202716 XXX.API[636]:    at NATS.Client.Connection.Publish(String subject, Byte[] data)
Mar 13 12:19:41 colibri-imx8x-07202716 XXX.API[636]:    at xxx.Broker.Nats.EncodedConnection.PublishAsync[T](String subject, T obj) in /builds/Riwo/customers/xxx/mfp-software/xxx/xxx.Broker.Nats/EncodedConnection.cs:line 32
Mar 13 12:19:41 colibri-imx8x-07202716 XXX.API[636]:    at xxx.Broker.Nats.EncodedConnectionProxy.PublishAsync[T](String subject, T obj) in /builds/Riwo/customers/xxx/mfp-software/xxx/xxx.Broker.Nats/EncodedConnectionProxy.cs:line 47
Mar 13 12:19:41 colibri-imx8x-07202716 XXX.API[636]:    at xxx.Sync.Service.SyncService.HandleStatusMessagesAsync(CancellationToken stoppingToken) in /builds/Riwo/customers/xxx/mfp-software/xxx/xxx.Sync/Service/SyncService.cs:line 49
Mar 13 12:19:41 colibri-imx8x-07202716 XXX.API[636]:    at System.Threading.Tasks.Task.<>c.<ThrowAsync>b__128_1(Object state)
Mar 13 12:19:41 colibri-imx8x-07202716 XXX.API[636]:    at System.Threading.ThreadPoolWorkQueue.Dispatch()
Mar 13 12:19:41 colibri-imx8x-07202716 XXX.API[636]:    at System.Threading.PortableThreadPool.WorkerThread.WorkerThreadStart()
Mar 13 12:19:41 colibri-imx8x-07202716 XXX.API[636]: fail: Microsoft.Extensions.Hosting.Internal.Host[9]
Mar 13 12:19:41 colibri-imx8x-07202716 XXX.API[636]:       BackgroundService failed
Mar 13 12:19:41 colibri-imx8x-07202716 XXX.API[636]:       System.NotSupportedException: NotSupported_UnseekableStream
Mar 13 12:19:41 colibri-imx8x-07202716 XXX.API[636]:          at System.IO.BufferedStream.get_Position()
Mar 13 12:19:41 colibri-imx8x-07202716 XXX.API[636]:          at NATS.Client.Connection.PublishImpl(String subject, String reply, MsgHeader inHeaders, Byte[] data, Int32 offset, Nullable`1 inCount, Boolean flushBuffer)
Mar 13 12:19:41 colibri-imx8x-07202716 XXX.API[636]:          at NATS.Client.Connection.Publish(String subject, Byte[] data)
scottf commented 8 months ago

@scottf Also seeing this bug in our logs (the broker was restarted by our update utility on an embedded Yocto system) Side note, can we avoid "broker" and use "server" or "NATS server"

So the server running in the container was restarted? Or was the entire container restarted. Probably doesn't matter, it's the client that is having this error.

Since this is during a synchronous publish, it seems reasonable that your just handle the exception and wait for the reconnect to resume publishing. (I have a really good example of this in Java if you are interested.)

I think the most important question is does the connection go into reconnect logic and recover?

rlabbe82 commented 7 months ago

In my case it would never recover.

Kaarel commented 7 months ago

Sounds like a very similar issue I ran into. I can reliably reproduce this when connecting to Synadia Cloud. The entire Program.cs is below. When delay is 0 it works. Delay a few seconds or more throws.

Notably the library (not the repro code) writes to console every second or so (in case the delay is greater than a second) reconnect events:

DisconnectedEvent, Connection: 48457
ReconnectedEvent, Connection: 94424
DisconnectedEvent, Connection: 48459

Exception:

System.NotSupportedException: Stream does not support seeking.
   at System.IO.BufferedStream.get_Position()
   at NATS.Client.Connection.PublishImpl(String subject, String reply, MsgHeader inHeaders, Byte[] data, Int32 offset, Nullable`1 inCount, Boolean flushBuffer)
   at NATS.Client.Connection.Publish(String subject, Byte[] data)
   at Program.<Main>$(String[] args) in Program.cs:line 8
   at Program.<Main>(String[] args)

Program.cs:

using System.Text;
using NATS.Client;

var path = @"path\to\cloud.creds";
var delay = 2000;
var con = new ConnectionFactory().CreateConnection("tls://connect.ngs.global", path);
await Task.Delay(delay);
con.Publish("test.subject", Encoding.UTF8.GetBytes("message"));
con.Close();

Using net7.0 and NATS.Client 1.1.4

kaarel-sm commented 6 months ago

Hi @scottf , any thoughts on this? Happy to create a new issue if it's not exactly the same problem. But as it stands, we are evaluating Synadia Cloud, and the client just keeps disconnecting. Effectively we can't publish messages apart from immediately after initial connection.

scottf commented 6 months ago

Can you please share more information on your outgoing networking. Does this happen with non encoded connection? Does this happen when you are hitting a local dev server? Any vpn or proxy in between? You get the idea. I need more details here to be able to help. Do you have handlers in place that can give more information? ClosedEventHandler, DisconnectedEventHandler, ReconnectedEventHandler I wonder if the tls upgrade is failing which might explain why the publish works with no delay, but not after some delay, but that's just a guess.

Kaarel commented 6 months ago

Just a dev laptop, no fancy proxies. When using nats.net.v2 client the same scenario works as expected. We just have a lot invested into v1 client so the switch isn't trivial. We are currently evaluating if it would be feasible to migrate from on-prem NATS cluster to Synadia Cloud.

Literally the below 3 lines always throws for me on Publish when using v1 client (File - New - Project - Program.cs):

using var con = new ConnectionFactory().CreateConnection("tls://connect.ngs.global", @"path\to\synadia-cloud.creds");
Task.Delay(2000).Wait();
con.Publish("subject", System.Text.Encoding.UTF8.GetBytes("message"));

Similar functionality using v2 client works as expected on the same machine:

var opts = new NatsOpts
{
    Url = "tls://connect.ngs.global",
    AuthOpts = new NatsAuthOpts
    {
        CredsFile = @"path\to\synadia-cloud.creds"
    }
};
await using var con = new NatsConnection(opts);
await con.ConnectAsync();
await Task.Delay(2000);
await con.PublishAsync("subject", System.Text.Encoding.UTF8.GetBytes("message"));
scottf commented 6 months ago

This is being addressed in https://github.com/nats-io/nats.net/pull/888

Kaarel commented 6 months ago

@scottf this looks promising. Any ETA on NuGet package release?

scottf commented 6 months ago

@Kaarel I'm doing the release right now! 1.1.5

Kaarel commented 6 months ago

Can confirm the issue is fixed in 1.1.5 :) Thank you!