nats-io / nats.net

Full Async C# / .NET client for NATS
https://nats-io.github.io/nats.net
Apache License 2.0
256 stars 52 forks source link

Expose a public API for getting NatsHeaders in the byte array (serialization) #638

Open Farrukhjon opened 1 month ago

Farrukhjon commented 1 month ago

Proposed change

Such kind of methods :

Use case

Need to calculate/take into account NatsHeaders size (length in bytes) up front to call Publish for large Nats message which exceeds payload size against server's maximum payload size.

Contribution

No response

mtmk commented 1 month ago

we can open HeaderWriter that's what we're using internally https://github.com/nats-io/nats.net/blob/d7b6baa7600dcaaeaad2a4de247b3740e5d44aef/src/NATS.Client.Core/Internal/HeaderWriter.cs#L8

example usage: https://github.com/nats-io/nats.net/blob/c84f216316b025e2c8ce3398d9a9b7d3fb60fd16/tests/NATS.Client.CoreUnit.Tests/NatsHeaderTest.cs#L40-L43

Farrukhjon commented 1 month ago

I want to publish a message with data (payload) size close to the ServerInfo.MaxPayload. I thought if I take the message headers size into account for the data size as overhead, then I'll be able to publish the message to NATS Server.

But the publish fails with NATS.Client.Core.NatsException: Payload size 1398091 exceeds server's maximum payload size 1048576 error.

The headers size is calculated with:

https://github.com/nats-io/nats.net/blob/a16344561dde584e2621b6972b7be5b9c4e84a26/tests/NATS.Client.Core.Tests/NatsHeaderTest.cs#L40-L44

I based my assumption on these lines:

https://github.com/nats-io/nats.net/blob/a16344561dde584e2621b6972b7be5b9c4e84a26/src/NATS.Client.Core/Commands/CommandWriter.cs#L288-L292

This is the test:

[Fact(DisplayName = "Test no NatsException is thrown when headers size took into account as payload overhead for publishing message with MaxPayload")]
    public async Task TestNoMaxPayloadExceedsNatsExceptionIsThrown()
    {
        // Given
        var headers = new NatsHeaders() { ["foo"] = new StringValues(["bar1", "bar2", "bar3"]) };
        var natsHeadersUtil = new NatsHeadersUtil(Encoding.UTF8);
        var headersSizeInBytes = (int)natsHeadersUtil.NatsHeadersSizeInBytes(headers);
        var connection = await _natsServerContainer.GetOrCreateNatsConnectionAsync();
        var serverInfo = connection.ServerInfo ?? throw new ArgumentException("ServerInfo is null!");
        var maxPayloadSize = serverInfo.MaxPayload;
        var maxAllowedPayload = this._stringRandomizer.GenerateRandomStringInBytes(maxPayloadSize - headersSizeInBytes);

        Assert.True(maxAllowedPayload.Length < maxPayloadSize);
        Assert.Equal(maxPayloadSize - headersSizeInBytes, maxAllowedPayload.Length);
        Assert.Equal(45, headersSizeInBytes);

        // When a message with the max allowed payload is published.
        var exception = await Record.ExceptionAsync(
            async () => await connection.PublishAsync("test.subject", maxAllowedPayload, headers)
        );
        // Then no exception is thrown.
        Assert.Null(exception);
    }

And this is implementation of the NatsHeadersSizeInBytes method:

public class NatsHeadersUtil(Encoding encoding)
{
    private readonly Encoding _encoding = encoding;

    public long NatsHeadersSizeInBytes(NatsHeaders headers)
    {
        var pipe = new Pipe(new PipeOptions(pauseWriterThreshold: 0));
        var writer = new HeaderWriter(_encoding);
        var headerSizeInBytes = writer.Write(pipe.Writer, headers);
        return headerSizeInBytes;
    }

}

Stack trace of the failure

Assert.Null() Failure: Value is not null
Expected: null
Actual:   NATS.Client.Core.NatsException: Payload size 1398091 exceeds server's maximum payload size 1048576
   at NATS.Client.Core.Commands.CommandWriter.PublishAsync[T](String subject, T value, NatsHeaders headers, String replyTo, INatsSerialize`1 serializer, CancellationToken cancellationToken)
   at NATS.Client.Core.NatsConnection.PublishAsync[T](String subject, T data, NatsHeaders headers, String replyTo, INatsSerialize`1 serializer, NatsPubOpts opts, CancellationToken cancellationToken)
--- End of stack trace from previous location ---

I just curious, how such much exceeding number of bytes (1398091-1048576 = 349515) is computed?

Note: I made available accessibility of HeaderWriter class for this test.

This is dev environment:

dotnet --info
.NET SDK:
 Version:           8.0.401
 Commit:            811edcc344
 Workload version:  8.0.400-manifests.f51a3a6b
 MSBuild version:   17.11.4+37eb419ad

Runtime Environment:
 OS Name:     Mac OS X
 OS Version:  14.6
 OS Platform: Darwin
 RID:         osx-x64
mtmk commented 1 month ago

thanks for the code. I wonder in your example maxAllowedPayload is much larger because of string encoding?

This is what I was able to try, it worked as expected for me:

using System.Text;
using NATS.Client.Core;
using NATS.Client.Core.Internal;

var nc = new NatsConnection();
await nc.ConnectAsync();
var max = nc.ServerInfo!.MaxPayload;

var headers = new NatsHeaders { { "key1", "value1" }, { "key2", "value2" } };
var buffer = new NatsBufferWriter<byte>();
var headersLength = new HeaderWriter(Encoding.ASCII).Write(buffer, headers);
var payloadLength = max - headersLength;

Console.WriteLine($"headers length: {headersLength}");
Console.WriteLine($"payload length: {payloadLength}");
Console.WriteLine($"    server max: {max}");

await nc.PublishAsync("foo", new byte[payloadLength], headers: headers);
Console.WriteLine("OK");

try
{
    await nc.PublishAsync("foo", new byte[payloadLength + 1], headers: headers);
    Console.WriteLine("OK");
}
catch (NatsException ex)
{
    Console.WriteLine(ex.Message);
}

// Output:
// headers length: 40
// payload length: 1048536
//     server max: 1048576
// OK
// Payload size 1048577 exceeds server's maximum payload size 1048576

running against server with no config:

$ nats-server
Farrukhjon commented 1 month ago

maxAllowedPayload

The generated maxAllowedPayload is actually small to the headersSizeInBytes amount.

It's assured here:

Assert.True(maxAllowedPayload.Length < maxPayloadSize);
Assert.Equal(maxPayloadSize - headersSizeInBytes, maxAllowedPayload.Length);

Using the same encoding for the payload and the headers (ASCII) resulted to the same error:

var encoding = Encoding.ASCII;
var natsHeadersUtil = new NatsHeadersUtil(encoding);
var headersSizeInBytes = (int)natsHeadersUtil.NatsHeadersSizeInBytes(headers);
var connection = await _natsServerContainer.GetOrCreateNatsConnectionAsync();
var serverInfo = connection.ServerInfo ?? throw new ArgumentException("ServerInfo is null!");
var maxPayloadSize = serverInfo.MaxPayload;
var maxAllowedPayload = this._stringRandomizer.GenerateRandomStringInBytes(maxPayloadSize - headersSizeInBytes, encoding);
mtmk commented 1 month ago

what is the type of maxAllowedPayload? it's not possible for me reproduce the issue with the code above not knowing the dependencies. Are you able to create a minimal console app and post Program.cs?

Farrukhjon commented 1 month ago

The much exceeding number of bytes in payload issue was in serialization put in connection NatsOpts

var natsOpts = NatsOpts.Default with
{
    SerializerRegistry = NatsJsonSerializerRegistry.Default
};
var nc = new NatsConnection(natsOpts);

I was mislead with such behavior. My other tests which have mix of publishing objects and small texts were fine. But when I wanted to examine maxPayload - headers overhead I faced with this issue.

So, below code reproduces the issue.

var natsOpts = NatsOpts.Default with
{
    SerializerRegistry = NatsJsonSerializerRegistry.Default
};
var nc = new NatsConnection(natsOpts);

await nc.PublishAsync("foo", Encoding.ASCII.GetBytes("Hello World"), headers: headers);
Console.WriteLine("Small text in ASCII bytes is ok");

try
{
    var maxPayload = new byte[payloadLength];
    Array.Fill(maxPayload, (byte)'A');
    await nc.PublishAsync("foo", maxPayload, headers: headers);
    Console.WriteLine("OK");
}
catch (NatsException ex)
{
    Console.WriteLine("Max size text in ASCII bytes is not ok");
    Console.WriteLine(ex.Message);
}

Output: Small text in ASCII bytes is ok Max size text in ASCII bytes is not ok Payload size 1398090 exceeds server's maximum payload size 1048576

P.S: we still need that HeaderWriter be publicly available to use.