Open rampratapa opened 4 years ago
Thank you for using NATS!
The error is originating at the server, and is likely caused by a client with the same ID not closing a connection (e.g. starting a connection with the same cluster ID before the previous connection was closed). This is most likely.
It's possible (but unlikely) you're trying to establish a new connection before the NATS streaming server has entirely processed the previous close. We can verify this on the NATS streaming server logs with the debug flag (-SD
parameter), or in the config file (sd: true
).
tbh, I don't think this issue is related to #142.
The other exception you are seeing, NATS.Client.NATSNoServersException: Unable to connect to a server.
, indicates that the STAN client cannot create an underlying core NATS connection - so the server is down/unreachable. It's possible the underlying kubernetes service isn't responding. We've seen the k8s service drop out on us under load, however, it's more likely the server isn't available/running or can't respond in time.
One option to explore is creating an underlying NATS connection, increase the lower level connection timeout, and pass that to the STAN connection.
var natsOptions = ConnectionFactory.GetDefaultOptions();
natsOptions.Url = natUrl;
natsOptions.Timeout = 10000; // 10s, very generous
StanOptions cOpts = StanOptions.GetDefaultOptions();
cOpts.NatsConn = nc; // assign the NATS conn
// create NATS streaming connection, etc.
Good morning, Initially I was using the approach you suggested,
The following is to get the NATs options and created the NAT connection
private Options GetOpts(string natUrl)
{
var opts = ConnectionFactory.GetDefaultOptions();
opts.Url = natUrl; // "nats://localhost:4222";
opts.AllowReconnect = true;
opts.PingInterval = 5000;
opts.MaxPingsOut = 4;
opts.MaxReconnect = Options.ReconnectForever;
opts.ServerDiscoveredEventHandler += (sender, args) => Console.WriteLine("NATS server discovered");
opts.ReconnectedEventHandler +=
(sender, args) => Console.WriteLine("NATS server in proxy reconnected.");
opts.ClosedEventHandler +=
(sender, args) => Console.WriteLine("NATS connection closed in proxy");
opts.DisconnectedEventHandler += (sender, args) =>
Console.WriteLine("NATS connection disconnected in proxy");
opts.AsyncErrorEventHandler +=
(sender, args) => Console.WriteLine("NATS async error: {0}, Message={1}, Subject={2}", args.Conn.ConnectedUrl,
args.Error, args.Subscription.Subject);
return opts;
}
public IConnection getConnection(string NatsUrl)
{
var cf = new ConnectionFactory();
var natsConnection = cf.CreateConnection(GetOpts(NatsUrl));
return natsConnection;
}
The following value set in the stan options
var natConnection = getConnection(natUrl);
cOpts.NatsConn = natConnection;
cOpts.PingInterval = 5000;
cOpts.PingMaxOutstanding = 4;
The issue with this approach it is firing ConnectionLostEventHandler events and causing some time transaction to fail and other time it is taking too long to return the error, causing impact to the throughput.
one question is, when you receive this event ConnectionLostEventHandler, it prints the following error based on console.write
Console.WriteLine("NATS server in proxy reconnected.");
Do we need to write code in that block to reestablish StanConnection and its subscriptions. As I am seeing, when connection lost, the subscribers are not receiving the messages. I need to restart the pods to process those messages. Also, I noticed, consumers are slow.
Based on the following from the main document page
When no NATS connection is provided, the library creates its own NATS connection and will now set the reconnect attempts to "infinite", which was not the case before. It should therefore be possible for the library to always reconnect, but this does not mean that the streaming connection will not be closed, even if you set a very high threshold for the PINGs max out value. Keep in mind that while the client is disconnected, the server is sending heartbeats to the clients too, and when not getting any response, it will remove that client from its state. When the communication is restored, the PINGs sent to the server will allow to detect this condition and report to the client that the connection is now closed."
Based on the above, I felt that the best way is to use StandOptions and sets the NatsURL, so that Stan client will manage the same. However I will run the code the NAtConnection options and will publish the results soon.
My main goal is to find fast and reliable messaging engine that can fit into Kubernetes cluster.
Thank you for your quick response and your help.
Thanks Ram
good afternoon, I have executed a performance test the following are the details
The Stan options set are:
StanOptions cOpts = StanOptions.GetDefaultOptions();
// cOpts.NatsURL = natUrl;
var natConnection = getConnection(natUrl);
cOpts.NatsConn = natConnection;
cOpts.PingInterval = 10000;
cOpts.PingMaxOutstanding = 5;
cOpts.ConnectionLostEventHandler = HandleStanConnection;
The Nat options are:
var opts = ConnectionFactory.GetDefaultOptions();
opts.Url = natUrl; // "nats://localhost:4222";
opts.AllowReconnect = true;
opts.PingInterval = 10000;
opts.MaxPingsOut = 5;
opts.MaxReconnect = Options.ReconnectForever;
The following is from subscriber log file Tracker Lost Connection HandleStanConnection NATS connection disconnected in Traker NATS server in Tracker reconnected. NATS Starting Subscribers. Unable to restart tracker subscriptions stan: clientID already registered
the following error noticed in Stan-0 logs [1] 2020/09/28 20:25:44.280290 [ERR] STREAM: [Client:trkingservicenet-47] Timed out on heartbeats [1] 2020/09/28 20:43:13.333242 [ERR] STREAM: [Client:trkingservicenet-47] Timed out on heartbeats [1] 2020/09/28 21:11:40.625488 [ERR] STREAM: [Client:trkingservicenet-47] Timed out on heartbeats
Please let me know if you need more information.
Thanks ram
Thanks Ram. It's looking like there was a long period of time when the network was unavailable - the STAN client timed out on missing heartbeats and the NATS client disconnected (assuming the NATS connection disconnected in Traker
message is from a disconnected callback.
I have a few questions:
NATS Starting Subscribers.
is printed? Thanks, Colin
Colin, Good evening, Thank you for your response. I have the following setup.
I have deployed the NATS and STAN in AKS using the following steps ( Though I wanted to setup a FT mode and I could not find steps to setup in Azure where as I was able to find AWS steps). https://docs.nats.io/nats-on-kubernetes/minimal-setup#ha-setup-using-statefulset : HA Setup Using StatefulSets
Create a client with .NET to publish message after injecting the client using DI
private string publishMessage(string subject,string hubMessge, string hubId)
{
Stopwatch sw = Stopwatch.StartNew();
_logger.LogInformation("Published Message for Tracking");
var response = "<ack></ack>";
long acksProcessed = 0;
var payload = Encoding.UTF8.GetBytes((string)hubMessge);
AutoResetEvent ev = new AutoResetEvent(false);
string guid = _stanConnection.Publish(subject, payload, (obj, pubArgs) =>
{
response = "<ack>" + pubArgs.GUID + "</ack>";
_logger.LogInformation("Recieved ack for message {0}", pubArgs.GUID);
if (!string.IsNullOrEmpty(pubArgs.Error))
{
_logger.LogInformation("Error processing message {0}", pubArgs.GUID);
response = "<nack>" + pubArgs.GUID + "</nack>";
}
if (Interlocked.Increment(ref acksProcessed) == 1)
ev.Set();
});
sw.Stop();
Console.WriteLine("Metrics:publishMessage Request {0}:{1}ms", hubId, sw.Elapsed.TotalMilliseconds);
ev.WaitOne();
return response;
}
I have create a console application for subscribers.
The following is startup class
public static class Startup
{
private static readonly IConfiguration configuration;
public static IServiceProvider provider;
private static readonly ServiceCollection services;
private static readonly string clusterID;
private static readonly string clientID;
static Startup()
{
var random = new Random().Next(1, 100);
clusterID = "stan";
clientID = "trkingservicenet-" + random.ToString();
var environment = Environment.GetEnvironmentVariable("ASPNETCORE_ENVIRONMENT");
configuration = new ConfigurationBuilder()
.SetBasePath(Directory.GetCurrentDirectory())
.AddJsonFile("appsettings.json", optional: false, reloadOnChange: true)
.AddJsonFile($"appsettings.{environment}.json", optional: true)
.AddEnvironmentVariables()
.Build();
services = new ServiceCollection();
var natUrl = configuration.GetValue<string>("NATsService");
var TrackingboxSqlConnection = configuration.GetValue<string>("TrackingSqlConnectionString");
StanOptions cOpts = StanOptions.GetDefaultOptions();
// cOpts.NatsURL = natUrl;
var natConnection = getConnection(natUrl);
cOpts.NatsConn = natConnection;
cOpts.PingInterval = 10000;
cOpts.PingMaxOutstanding = 5;
cOpts.ConnectionLostEventHandler = HandleStanConnection;
// add necessary services
services.AddSingleton(configuration);
services.AddSingleton<ITracker, TrackingWorker>();
services.AddSingleton<IStanConnection>((s) =>
{
return new StanConnectionFactory().CreateConnection(clusterID, clientID, cOpts);
});
// services.AddLogging();
// build the pipeline
services.AddDbContext<TrackingContext>(
options => options.UseSqlServer(TrackingboxSqlConnection, providerOptions => providerOptions.EnableRetryOnFailure(maxRetryCount: 10,
maxRetryDelay: TimeSpan.FromSeconds(5),
errorNumbersToAdd: null)));
services.AddDbContext<TrackingContext>(
c => c.UseQueryTrackingBehavior(QueryTrackingBehavior.NoTracking));
services.AddScoped<ITrackingRepository, TrackingRepository>();
// var observer = new ApplicationInsightsKubernetesDiagnosticObserver(DiagnosticLogLevel.Trace);
// ApplicationInsightsKubernetesDiagnosticSource.Instance.Observable.SubscribeWithAdapter(observer);
// services.AddApplicationInsightsTelemetry();
services.AddApplicationInsightsKubernetesEnricher();
services.AddLogging();
/* services.AddLogging(builder =>
{
// Optional: Apply filters to configure LogLevel Trace or above is sent to
// Application Insights for all categories.
builder.AddFilter<Microsoft.Extensions.Logging.ApplicationInsights.ApplicationInsightsLoggerProvider>("", LogLevel.Trace);
builder.AddApplicationInsights("");
// Optional: Show the logs in console at the same time
builder.AddConsole();
});*/
var telemetryConfiguration = TelemetryConfiguration.CreateDefault();
telemetryConfiguration.InstrumentationKey = "";
var telemetryClient = new TelemetryClient(telemetryConfiguration);
services.AddSingleton(telemetryClient);
provider = services.BuildServiceProvider();
}
public static void StartMe()
{
Console.WriteLine("Initializing");
}
public static IConnection getConnection(string NatsUrl)
{
var cf = new ConnectionFactory();
var natsConnection = cf.CreateConnection(GetOpts(NatsUrl));
return natsConnection;
}
private static Options GetOpts(string natUrl)
{
var opts = ConnectionFactory.GetDefaultOptions();
opts.Url = natUrl; // "nats://localhost:4222";
opts.AllowReconnect = true;
opts.PingInterval = 10000;
opts.MaxPingsOut = 5;
opts.MaxReconnect = Options.ReconnectForever;
// opts.ReconnectWait = 1000;
// opts.Timeout = 4000;
opts.ServerDiscoveredEventHandler += (sender, args) => Console.WriteLine("NATS server discovered");
opts.ReconnectedEventHandler = ReconnectedEventHandler;
opts.ClosedEventHandler +=
(sender, args) => Console.WriteLine("NATS connection closed in Tracker");
opts.DisconnectedEventHandler += (sender, args) =>
Console.WriteLine("NATS connection disconnected in proxy");
opts.AsyncErrorEventHandler +=
(sender, args) => Console.WriteLine("NATS async error: {0}, Message={1}, Subject={2}", args.Conn.ConnectedUrl,
args.Error, args.Subscription.Subject);
return opts;
}
static void ReconnectedEventHandler(object obj, ConnEventArgs args)
{
Console.WriteLine("NATS server in Tracker reconnected.");
Console.WriteLine("NATS Starting Subscribers.");
try
{
var natConnection = args.Conn;
StanOptions cOpts = StanOptions.GetDefaultOptions();
cOpts.NatsConn = natConnection;
cOpts.PingInterval = 5000;
cOpts.PingMaxOutstanding = 4;
var sConnection = provider.GetRequiredService<IStanConnection>();
services.AddSingleton<IStanConnection>((s) =>
{
return new StanConnectionFactory().CreateConnection(clusterID, clientID, cOpts);
});
provider = services.BuildServiceProvider();
var service = provider.GetRequiredService<ITracker>();
service.TrackMessage();
}
catch(Exception ex)
{
Console.WriteLine("Unable to restart traker subscriptions " + ex.Message);
}
}
private static void HandleStanConnection(object obj, StanConnLostHandlerArgs args)
{
Console.WriteLine("Tracker Lost Connection HandleStanConnection");
}
}
The following is invoked from the program
public class TrackingWorker : ITracker
{
private readonly ILogger<TrackingWorker> _logger;
private readonly IStanConnection _stanConnection;
private readonly ITrackingRepository _trackingRepo;
private readonly string clusterID = "stan";
private readonly string subscriberChannel = "trackmessagenet";
private readonly string durableID = "tracking-service-durable";
//event = "track-message"
private readonly string queueGroup = "eligtrackinggroup";
private readonly TelemetryClient _telemetricClient;
public TrackingWorker(ILogger<TrackingWorker> logger, IStanConnection stanConnection, ITrackingRepository trackingRepo, TelemetryClient telemetricClient)
{
_logger = logger;
_stanConnection = stanConnection;
_trackingRepo = trackingRepo;
_telemetricClient = telemetricClient;
}
public Task<object> TrackMessage()
{
AppDomain.CurrentDomain.ProcessExit += (s, e) => FinalizeApplication();
var opts = StanSubscriptionOptions.GetDefaultOptions();
// opts.StartWithLastReceived();
opts.DurableName = durableID;
opts.ManualAcks = true;
EventHandler<StanMsgHandlerArgs> ackHandler = ProcessMessage;
var s = _stanConnection.Subscribe(subscriberChannel, queueGroup,opts, ackHandler);
return null;
}
private void ProcessMessage(object obj, StanMsgHandlerArgs args)
{
var channel = new InMemoryChannel();
Console.WriteLine("started ProcessMessage{0}", args.Message.Sequence);
Console.WriteLine("Message Redelivered{0}", args.Message.Redelivered);
_logger.LogInformation("started ProcessMessage{0}", args.Message.Sequence);
_logger.LogInformation(args.Message.Subject);
_telemetricClient.TrackEvent("Message Received");
var requetFromQueue = System.Text.Encoding.UTF8.GetString(args.Message.Data);
try
{
var trackMessage = JsonSerializer.Deserialize<TrackingMessage>(requetFromQueue);
var eventType = trackMessage.Event;
Console.WriteLine("Message Event{0}", eventType);
switch (eventType)
{
case MessageEventType.Tracking:
var task1 = Task.Run(async () => await _trackingRepo.InsertTracking(trackMessage, eventType, (args.Message.Sequence).ToString(), (args.Message.Redelivered).ToString()));
var result1 = task1.Result;
break;
case MessageEventType.TrackingDetails:
// Console.WriteLine("Message received{0}", requetFromQueue);
var task = Task.Run(async () => await _trackingRepo.InsertTrackingDetails(trackMessage, eventType, (args.Message.Sequence).ToString(), (args.Message.Redelivered).ToString()));
var result = task.Result;
break;
case MessageEventType.HubLog:
break;
default:
break;
}
args.Message.Ack();
}
catch (SqlException sqlEx)
{
// Hope this will return the message back to EventGrid topic for reprocessing.
_logger.LogInformation("SQL Exception Occurred in Tracking" + sqlEx.StackTrace);
throw sqlEx;
}
catch (Exception ex)
{
_logger.LogError(ex, "Error Occurred");
}
}
private void FinalizeApplication()
{
// Give TelemetryClient 5 seconds to flush it's content to Application Insights
_telemetricClient.Flush();
Thread.Sleep(5000);
}
}
The publisher is in one docker container and the subscriber is in a different docker container and deployed into AKS cluster. I am thinking that I may need to add the following code in ReconnectedEventHandler and I am going to add and try to make unique client ID.
var random = new Random().Next(1, 100);
var clusterID = "stan";
var clientID = "trkingservicenet-" + random.ToString();
However, I was wondering why the connection is dropping so frequently as NATS/STAN was deployed as high-availability mode in AKS with three pods each NATS-0, NATS-1, and NAT-2 and similarly we have pods STAN-0 to 2.
Please let me know if you need more information.
Thank you for your help. Thanks Ram
Thank you for sharing this code. The reconnected event handler is for the lower level underlying NATS connection, and upon successful core NATS reconnection, you might still have a valid NATS streaming connection. I'd move the STAN reconnect code to the ConnectionLostEventHandler.
e.g.
StanOption.ConnectionLostEventHandler = (obj, args) =>
{
Console.WriteLine("Lost connection to NATS Streaming.");
Console.WriteLine("NATS Starting Subscribers.");
try
{
var natConnection = args.Conn;
StanOptions cOpts = StanOptions.GetDefaultOptions();
cOpts.NatsConn = natConnection;
cOpts.PingInterval = 5000;
cOpts.PingMaxOutstanding = 4;
var sConnection = provider.GetRequiredService<IStanConnection>();
services.AddSingleton<IStanConnection>((s) =>
{
return new StanConnectionFactory().CreateConnection(clusterID, clientID, cOpts);
});
provider = services.BuildServiceProvider();
var service = provider.GetRequiredService<ITracker>();
service.TrackMessage();
}
catch(Exception ex)
{
Console.WriteLine("Unable to restart traker subscriptions " + ex.Message);
}
};
Can we see the entire log of the streaming server when this happens?
@kozlovic, any other thoughts here?
@wallyqs, would you know of a FT setup of NATS streaming for Azure? Would the helm chart be better?
Colin, good evening, I made the changes and ran the load test and the subscribers are going down and they are not processing messages as it is not able to restart the subscribers, if I re-deploy the pods it start picking up the messages. It seems the ConnectionLostEventHandler runs only once. I am hoping args.Connection.NATSConnection will return the original NATs connection. If not then I need to find a suitable one or reset this one. I will put a debug statement tomorrow and validate or please let me know if this is wrong.
I noticed the following logging statements
STAN Connection Lost Handler -- server in Tracker reconnected. STAN Starting Subscribers. Unable to restart tracker subscriptions Invalid connection.
Also, I noticed about 20% of messages have re-delivered status and hence they are duplicates. I will try to run tomorrow morning one more test and will publish the final results.
private static void HandleStanConnection(object obj, StanConnLostHandlerArgs args)
{
Console.WriteLine("STAN Connection Lost Handler -- server in Tracker trying to reconnect.");
Console.WriteLine("STAN Starting Subscribers.");
try
{
var natConnection = args.Connection.NATSConnection;
StanOptions cOpts = StanOptions.GetDefaultOptions();
cOpts.NatsConn = natConnection;
cOpts.PingInterval = 10000;
cOpts.PingMaxOutstanding = 5;
// var sConnection = provider.GetRequiredService<IStanConnection>();
var random = new Random().Next(1, 100);
var clusterID = "stan";
var clientID = "trkingservicenet-" + random.ToString();
services.AddSingleton<IStanConnection>((s) =>
{
return new StanConnectionFactory().CreateConnection(clusterID, clientID, cOpts);
});
provider = services.BuildServiceProvider();
var service = provider.GetRequiredService<ITracker>();
service.TrackMessage();
}
catch (Exception ex)
{
Console.WriteLine("Unable to restart tracker subscriptions " + ex.Message);
}
How do we retry the connection. Please let me know if you need any more information.
Thank you for your help.
Thanks Ram
Colin, Good evening, I am enclosing the three pods stan log files. The client connection is not stable and it disconnects after processing around 30K messages and here are the log files from streaming server after enabling debugging.
Can we see the entire log of the streaming server when this happens
Please let me know if you need more information.
Thanks Ram
Colin, good evening, here is the log from stan and my observation
The following is publisher 2020/10/04 03:00:34.108025 [TRC] STREAM: [Client:proxy-96] Received message from publisher subj=trackmessagenet guid=7YMN1B659T8O712ZCXT02H 2020/10/04 03:00:34.111173 [TRC] STREAM: [Client:proxy-96] Acking Publisher subj=trackmessagenet guid=7YMN1B659T8O712ZCXT02H
This is another publisher 2020/10/04 03:00:34.422305 [DBG] STREAM: [Client:Orchestrator-52] Connected (Inbox=_INBOX.UL4WMWCPGUBHDCUJ1OGZM9) 2020/10/04 03:00:36.495931 [TRC] STREAM: [Client:Orchestrator-52] Received message from publisher subj=trackmessagenet guid=UL4WMWCPGUBHDCUJ1OGZTD 2020/10/04 03:00:36.502145 [TRC] STREAM: [Client:Orchestrator-52] Acking Publisher subj=trackmessagenet guid=UL4WMWCPGUBHDCUJ1OGZTD This a durable subscriber 2020/10/04 03:01:18.157004 [ERR] STREAM: [Client:trkingservicenet-5] Timed out on heartbeats 2020/10/04 03:01:18.160362 [DBG] STREAM: [Client:trkingservicenet-5] Removed member from durable queue subscription, subject=trackmessagenet, inbox=_INBOX.A56VW3ELPEPRJE1BWZNTFE, queue=tracking-service-durable:eligtrackinggroup, subid=26 2020/10/04 03:01:18.160384 [DBG] STREAM: [Client:trkingservicenet-5] Closed (Inbox=_INBOX.A56VW3ELPEPRJE1BWZNT2K) 2020/10/04 03:01:18.588976 [ERR] STREAM: [Client:trkingservicenet-77] Timed out on heartbeats 2020/10/04 03:01:18.591760 [DBG] STREAM: [Client:trkingservicenet-77] Removed member from durable queue subscription, subject=trackmessagenet, inbox=_INBOX.IZRB3ZHNV0KY8HCVZCPB32, queue=tracking-service-durable:eligtrackinggroup, subid=27 2020/10/04 03:01:18.591776 [DBG] STREAM: [Client:trkingservicenet-77] Closed (Inbox=_INBOX.IZRB3ZHNV0KY8HCVZCPAZ6) 2020/10/04 03:01:18.670491 [ERR] STREAM: [Client:trkingservicenet-64] Timed out on heartbeats 2020/10/04 03:01:18.673567 [DBG] STREAM: [Client:trkingservicenet-64] Removed member from durable queue subscription, subject=trackmessagenet, inbox=_INBOX.HKUK8I2VM4TV3O8C65CELZ, queue=tracking-service-durable:eligtrackinggroup, subid=28 2020/10/04 03:01:18.673603 [DBG] STREAM: [Client:trkingservicenet-64] Closed (Inbox=_INBOX.HKUK8I2VM4TV3O8C65CEBZ) 2020/10/04 03:01:19.681294 [ERR] STREAM: [Client:trkingservicenet-27] Timed out on heartbeats 2020/10/04 03:01:19.683879 [DBG] STREAM: [Client:trkingservicenet-27] Removed member from durable queue subscription, subject=trackmessagenet, inbox=_INBOX.ZWS6WG8PFKYXUVMZAMSJIA, queue=tracking-service-durable:eligtrackinggroup, subid=30 2020/10/04 03:01:19.683901 [DBG] STREAM: [Client:trkingservicenet-27] Closed (Inbox=_INBOX.ZWS6WG8PFKYXUVMZAMSJ9G) 2020/10/04 03:01:20.378300 [ERR] STREAM: [Client:trkingservicenet-82] Timed out on heartbeats 2020/10/04 03:01:20.381044 [DBG] STREAM: [Client:trkingservicenet-82] Removed member from durable queue subscription, subject=trackmessagenet, inbox=_INBOX.HLKMZBB65TK2BVJGH6RADM, queue=tracking-service-durable:eligtrackinggroup, subid=31 2020/10/04 03:01:20.381078 [DBG] STREAM: [Client:trkingservicenet-82] Closed (Inbox=_INBOX.HLKMZBB65TK2BVJGH6RAB2) 2020/10/04 03:01:25.107908 [ERR] STREAM: [Client:trkingservicenet-6] Timed out on heartbeats 2020/10/04 03:01:25.110640 [DBG] STREAM: [Client:trkingservicenet-6] Removed member from durable queue subscription, subject=trackmessagenet, inbox=_INBOX.963XZ8N2XNVCOVI1Q5EMMZ, queue=tracking-service-durable:eligtrackinggroup, subid=33 2020/10/04 03:01:25.110667 [DBG] STREAM: [Client:trkingservicenet-6] Closed (Inbox=_INBOX.963XZ8N2XNVCOVI1Q5EMIV) :\NETCoreDeployment> once that trkingservicenet is closed,
The following is from Subscriber I am seeing the following exception at System.IO.BufferedStream.EnsureCanSeek() at System.IO.BufferedStream.get_Position() at NATS.Client.Connection.publish(String subject, String reply, Byte[] data, Int32 offset, Int32 count, Boolean flushBuffer) at NATS.Client.Connection.Publish(String subject, Byte[] data) at STAN.Client.AsyncSubscription.manualAck(StanMsg m) at TrackerService.TrackingWorker.ProcessMessage(Object obj, StanMsgHandlerArgs args) in /src/Services/Mdrx.Hub.Eligibility.TrackerService/TrackingWorker.cs:line 103
Also, noticing lot Redelivery though I issuing args.Message.Ack() as soon as I receive the message. Properties set
opts.ManualAcks = true; opts.AckWait = 60000;
[1] 2020/10/04 03:19:31.596582 [TRC] STREAM: [Client:trkingservicenet-96] Processing ack for subid=54, subject=trackmessagenet, seq=98444 [1] 2020/10/04 03:19:31.596674 [TRC] STREAM: [Client:trkingservicenet-96] Delivering msg to subid=54, subject=trackmessagenet, seq=104644 [1] 2020/10/04 03:19:31.613452 [TRC] STREAM: [Client:trkingservicenet-96] Redelivering msg to subid=54, subject=trackmessagenet, seq=99133 [1] 2020/10/04 03:19:31.613471 [TRC] STREAM: [Client:trkingservicenet-96] Redelivery for subid=54, skipping seq=103064 [1] 2020/10/04 03:19:31.620471 [TRC] STREAM: [Client:trkingservicenet-80] Redelivering msg to subid=56, subject=trackmessagenet, seq=99134 [1] 2020/10/04 03:19:31.620496 [TRC] STREAM: [Client:trkingservicenet-80] Redelivery for subid=56, skipping seq=103065 [1] 2020/10/04 03:19:31.624664 [TRC] STREAM: [Client:trkingservicenet-53] Processing ack for subid=58, subject=trackmessagenet, seq=98338 [1] 2020/10/04 03:19:31.624863 [TRC] STREAM: [Client:trkingservicenet-53] Delivering msg to subid=58, subject=trackmessagenet, seq=104645 [1] 2020/10/04 03:19:31.636387 [TRC] STREAM: [Client:trkingservicenet-96] Redelivering msg to subid=54, subject=trackmessagenet, seq=103064 [1] 2020/10/04 03:19:31.636823 [TRC] STREAM: [Client:trkingservicenet-96] Redelivery for subid=54, skipping seq=99142 [1] 2020/10/04 03:19:31.644505 [TRC] STREAM: [Client:trkingservicenet-92] Redelivering msg to subid=59, subject=trackmessagenet, seq=99136 [1] 2020/10/04 03:19:31.644728 [TRC] STREAM: [Client:trkingservicenet-92] Redelivery for subid=59, skipping seq=99143 [1] 2020/10/04 03:19:31.648904 [TRC] STREAM: [Client:trkingservicenet-35] Redelivering msg to subid=55, subject=trackmessagenet, seq=101685 [1] 2020/10/04 03:19:31.649121 [TRC] STREAM: [Client:trkingservicenet-35] Redelivery for subid=55, skipping seq=103066 [1] 2020/10/04 03:19:31.667335 [TRC] STREAM: [Client:trkingservicenet-80] Redelivering msg to subid=56, subject=trackmessagenet, seq=103065 [1] 2020/10/04 03:19:31.667566 [TRC] STREAM: [Client:trkingservicenet-80] Redelivery for subid=56, skipping seq=99139 [1] 2020/10/04 03:19:31.677009 [TRC] STREAM: [Client:trkingservicenet-57] Redelivering msg to subid=57, subject=trackmessagenet, seq=99137 [1] 2020/10/04 03:19:31.677237 [TRC] STREAM: [Client:trkingservicenet-57] Redelivery for subid=57, skipping seq=99140 [1] 2020/10/04 03:19:31.683092 [TRC] STREAM: [Client:trkingservicenet-80] Redelivering msg to subid=56, subject=trackmessagenet, seq=99139 [1] 2020/10/04 03:19:31.683295 [TRC] STREAM: [Client:trkingservicenet-80] Redelivery for subid=56, skipping seq=99144
After this point, even I redeploy the pods, there is no use and I have remove the NATS Streaming server and re-deploy it.
I will try to setup a FT mode and see if that works with .net client. I did not see this problem with Go client ( As part of my investigation I wrote the same functionality in GoLang) with current setup and the hardware but it may not work GoLang for our use case as we need to write the code in C#.
Thanks Ram
Ram, thank you for the information. I'm concerned about:
at System.IO.BufferedStream.EnsureCanSeek()
at System.IO.BufferedStream.get_Position()
at NATS.Client.Connection.publish(String subject, String reply, Byte[] data, Int32 offset, Int32 count, Boolean flushBuffer)
at NATS.Client.Connection.Publish(String subject, Byte[] data)
at STAN.Client.AsyncSubscription.manualAck(StanMsg m)
at TrackerService.**TrackingWorker.ProcessMessage(**Object obj, StanMsgHandlerArgs args) in /src/Services/Mdrx.Hub.Eligibility.TrackerService/TrackingWorker.cs:line 103
That stems from a bug (https://github.com/nats-io/nats.net/pull/349) that we fixed in the NATS.Client version v0.10.1. This would cause resends if you aren't re-acknowledging the message after encountering this exception. Can you check the client versions of both STAN.Client and NATS.Client assemblies?
Also, looking through the logs, all of your clients time out on heartbeats after a new leader has been elected and the existing client replaced. There is definitely a lot going on here. I'll look further, and coordinate with the rest of the team tomorrow.
Also, I'm following https://github.com/nats-io/nats-streaming-server/issues/1092. We definitely suggest FT mode over HA (clustering) when possible.
Best regards, Colin
Colin, Good evening, Thank you for the quick response, I am using STAN.Client 0.2.1 which is referring NATS.client 0.10.0.
I will spend some more time tomorrow to correlate the events and let you know what I find.
I need to find good steps to setup NATS/STAN on AKS using either Azure blob storage or disk files as I wanted to keep it simple to maintain instead of SQL server.
Thanks ram
@rampratapa I was asked to give some feedback.
I would personally have started with single NATS Streaming server instance and ensure that your app (pub and sub) are stable (no time out, able to handle traffic that you want, etc..). Then only move to cluster setup. There are too many things going on right now..
Finally, you may be aware that JetStream is our new streaming product and if you are starting a new project, you should have a look at JetStream instead (although not officially released yet, but available from the NATS Server main branch as a tech preview). Something that I feel important to always point to users that are new to NATS Streaming is the fact that NATS Streaming is not a typical message queue system. That is, messages are not removed from the channel when they have been acknowledged by applications, instead they are removed due to channel limits (https://docs.nats.io/nats-streaming-concepts/channels/message-log). JetStream will have other modes such as interest-based and work-queue that cause messages to be removed from the stream once acknowledged.
@kozlovic , good morning, Yesterday I spent my time on researching grpc related issue and I could not get time to look into NATS, I will be working on this today. Our requirement is to keep the messages for about 24 hours in the queue and purge after that time period. In case if we need to reprocess that one message, we should be able to get to it if we store the sequence number or if we need to investigate something. I read that the data is stored until the message limit reached or it expired ( if we set the TTL). I will update you tomorrow with my finding after fine tuning the properties. The following are the properties I set currently
Stan options
cOpts.PingInterval = 10000;
cOpts.PingMaxOutstanding = 5;
NATs options
opts.AllowReconnect = true;
opts.PingInterval = 10000;
opts.MaxPingsOut = 5;
opts.MaxReconnect = Options.ReconnectForever;
I have upgraded Stan to 0.18 alpine image. I think NATS is 2.18.
Thank you for your help.
Thanks Ram
I read that the data is stored until the message limit reached or it expired ( if we set the TTL)
Correct. I just wanted to make sure that you understood how messages were removed in NATS Streaming.
@ColinSullivan1, good evening, I am using the following Nuget package for STAN
this pulls, NAT 0.10.0, where can I find latest STAN client that refers NAT 0.10.1 so that I can use for my testing.
Thanks ram
I'll be creating a new release this week, but in the meantime you should be able to update the NATS.Client to 0.10.1 in your project. Using visual studio, I added NATS.Client 0.10.1 as a dependency and the STAN.Client then uses that version it via binding redirects.
e.g.
On another note, per our conversation yesterday, you might get better performance by reducing the number of tasks allocated to a subscriber.
Set NATS.Client.Options.SubscriberDeliveryTaskCount
to a reasonable number, like 3. Otherwise you'll be creating a long running task for every subscriber, which performs excellent with a few subscribers but puts undue stress on the system as you ramp up.
Hi All, good evening, I am trying to test STAN client in .Net Core deployed into Kubernetes as Pod. The following are the stand options set
Injected the Stan Connection using DI
I am seeing the an error during a load test
How to handle this case gracefully or do we have issue here? Do we need to handle this in ConnectionLostEventHandler but I am not seeing this error as I put the debug statement. Also, is there anyway we can enable debug to understand the connection issue.
Is this issue related to https://github.com/nats-io/stan.net/issues/142
I am made change to the following line
and now I am seeing the following error intermittently and also seeing
stan: clientID already registered
Please let me know if you need more information.
thank you