80dB / AsyncNats

An async Nats.io client written using C# 8.0 language options
MIT License
6 stars 2 forks source link

Next work items #8

Open israellot opened 2 years ago

israellot commented 2 years ago

I would like to discuss the next up for grab items. What I currently have in mind :

80dB commented 2 years ago

Reconnect buffer: There's two places where loss occurs:

The buffer from WriteSocketAsync could be stored somewhere and send after "Connect" (before starting the WriteSocketAsync task). But the pipeline could get messy. It could be incomplete and never completely deserialize. A possible solution is to handle as much of it as possible before throwing it away [e.g. reading it all before disposing the pipe]).

The queue (_senderChannel) and the subscription queues (inside ProcessMessagesAsync) are not reset during a reconnect.

Disconnected client support: Doesn't it do this already? I tested this by publishing on a closed connection (and listening on an open connection), the message published was received after Connect was called on the publisher. Even calling Subscribe before connecting seems to works (moved reader/writer task before connect in the SimpleAsyncNatsSample, worked for me).

Connect to DNS hostname: In theory an easy fix. Just resolve the hostname before connecting should be enough.

Discover other NATS servers in cluster: I never looked into this. Does the official client handle this?

Connection multiplexer: What part are you trying to speed up? Contention on _senderChannel?

Grouped subscriptions: I've thought about this, there's a possible benefit to this with a possible performance downside. Upside: Bla.> Bla.Bla.> Would only be 1 subscription (and as such 1 data flow and 1 deserialize). But you'd have to add the logic to do the subject-matching. Which would possibly happen for all subscriptions. And would be a performance hit. Also additional logic is required when first subscribing to Bla.Bla.> and later to Bla.> .

If you'd only want to match the same subjects (e.g. the same "Bla.>"), the fix is easier with almost no performance hit.

israellot commented 2 years ago

Discover other NATS servers in cluster Yes, official nats client has a server pool filled by parsing info messages received, and an option to ignore or not those discovered servers. https://github.com/nats-io/nats.net/blob/master/src/NATS.Client/ServerPool.cs https://github.com/nats-io/nats.net/blob/e9310451b63b5c6772374b17d1642708a2200d7f/src/NATS.Client/Options.cs#L695

Grouped subscriptions I'm looking for an easy win, just handle exact duplicates as you've said. Things can get quite messy otherwise.

Reconnect buffer I agree. My biggest concern is on the publish side of things. Publish* methods return when data is inserted into the send queue, not when the message has actually been sent through the wire, so users today might be unaware that messages can be dropped even if from their perspective the publish method returned ok.

If you are ok with that, I'll create the issues and post some more PRs.

80dB commented 2 years ago

The grouped subscription easy win I think would require a subject-manager of some sort with a reference counter to the subject and a list of subscriptions that listen to it. I suspect it's not too much work to add this. Seeing as it only affects one function (InternalSubscribe).

80dB commented 2 years ago

For the reconnect buffer (or the actual 8Kb send buffer inside WriteSocketAsync), it's a bit tricky on how to solve this without having a performance hit. I would think something like:


await SendReconnect(...);
await Resubscribe(...);
await SendLastBuffer(...); // This would send _reconnectSendBuffer 

Memory<byte>? lastBuffer = null;
try
{
// Before send fill "lastBuffer" with a Memory ,  either "lastMemory = buffer.AsMemory(0,position); 
// or lastMemory = result.Memory
// After sending make sure to null lastBuffer
}
finally
{
  if (lastBuffer == null || lastBuffer.Length == 0)
  {
    _reconnectSendBuffer = Array.Empty<byte>();
  }
  else
  {
    // Copy "lastBuffer" to a class field [need to actually copy as a result from reader.ReadAsync would be lost once the Pipe is destroyed]
    _reconnectSendBuffer = new byte[lastBuffer.Length];
    lastBuffer.CopyTo(_reconnectSendBuffer);
  }
}

[the above code is untested and typed straight in here]

israellot commented 2 years ago

Agreed. NATS has at-most-once semantic, so we should also adhere to that and make a best-effort to retry, but not risk sending data twice.