nats-io / stan.net

The official NATS .NET C# Streaming Client
Apache License 2.0
137 stars 41 forks source link

Stream server re-send messages #168

Closed minhhungit closed 4 years ago

minhhungit commented 4 years ago

Issue with nats-stream I have about 10k messages in subject, I run consumer with param -all expected is it will fetch message from 1 => 10k, right ? but sometime it just processes some messages, wait a bit and and re-process them (Redelivered = true)

image

See above image, I have 10k message but it processed to message #1362 then re-send some old messages (from #340 with dedelivered =True )


Steps to reproduce:


nats-streaming-server.zip test - code.zip video-nats-stream-steps-procedure.zip

minhhungit commented 4 years ago

I attached video above

ColinSullivan1 commented 4 years ago

@minhhungit , thank you for using NATS! We appreciate the detail and video you've provided describing this problem. Many thanks to Thomas Pihl on NATS slack for identifying this.

I've verified that reducing the SubscriptionOptions MaxInflight resolves this.

            sOpts.ManualAcks = true;
            sOpts.AckWait = 5 * 1000; // 20 seconds
            sOpts.MaxInflight = 64;  // <=== Changed this to 64
            sOpts.LeaveOpen = true;

Also, I've verified that your client was a slow consumer. By adding an error handler to the underlying NATS connection, and passing it into the STAN connection, we can see you are receiving a Slow Consumer error.

Here's the modified connection creation code. First, a core NATS connection is created (NATS.Client.IConnection), and error handler is added to print out any errors. The NATS connection is then used by STAN by by assigned to the StanOptions.NatsConn property.

       public void Run(string[] args)
        {
            parseArgs(args);
            banner();

            // Create core NATS options and setup an error handler.
            var nOpts = NATS.Client.ConnectionFactory.GetDefaultOptions();
            nOpts.AsyncErrorEventHandler = (obj, args) => {
                Console.WriteLine("NATS Error:  {0}", args.Error);
            };

            // Create a core NATS connection with the options.
            var nc = new NATS.Client.ConnectionFactory().CreateConnection(nOpts);

            var opts = StanOptions.GetDefaultOptions();

            // Tell the NATS streaming client to use the core NATS connection
            opts.NatsConn = nc; 
            using (var c = new StanConnectionFactory().CreateConnection(clusterID, clientID, opts))
            {
                TimeSpan elapsed = receiveAsyncSubscriber(c);

                Console.Write("Received {0} msgs in {1} seconds ", received, elapsed.TotalSeconds);
                Console.WriteLine("({0} msgs/second).",
                    (int)(received / elapsed.TotalSeconds));

            }
        }

Here is a snippet from the results from the run without modifying the MaxInflight subscriber option, showing that this client is a slow consumer with the default options due to the large message sizes.

Received seq with ack # 953: False / 1111111111
Received seq with ack # 954: False / 1111111111
NATS Error:  Slow Consumer
NATS Error:  Slow Consumer
Received seq with ack # 955: False / 1111111111
Received seq with ack # 956: False / 1111111111
NATS Error:  Slow Consumer
Received seq with ack # 957: False / 1111111111
NATS Error:  Slow Consumer
Received seq with ack # 958: False / 1111111111
Received seq with ack # 959: False / 1111111111

After setting MaxInflight to 64, the error does not occur.

minhhungit commented 4 years ago

it works, thank you