nats-io / nats.net

Full Async C# / .NET client for NATS
https://nats-io.github.io/nats.net
Apache License 2.0
262 stars 53 forks source link

Unexpected warning publishing to object store #467

Closed sgentry closed 6 months ago

sgentry commented 7 months ago

Observed behavior

When running NATS configuration that has a hub connecting two remote leaf nodes, we observe an warning when publishing a file to object store on a leaf node when it has a consumer listening on another leaf.

NATS.Client.Core.Internal.InboxSubBuilder[1004] Unregistered message inbox received for _INBOX.WqCK2S...

Expected behavior

Do not expect a warning message.

Server and client version

Server: 2.10.12 .NET Client: 2.2.0

Host environment

Running in Docker container using nats:2.10.12-alpine3.19 image.

Steps to reproduce

In order to reproduce, I've created a Docker compose file with two config files, to simulate the environment we've observed this behavior. Once that is running, you should be able to run code in Program.cs section below. From there, open the /swagger docs and chose a file to upload. This endpoint will publish file to object store. Observe console output to view warning message. Note: The warning is not always displayed on the first attempt. You may have to execute the file upload multiple times.

docker-compose.yml

version: "3.1"
name: "nats-playground"
services:
  hub:
    image: nats:2.10.12-alpine3.19
    container_name: "hub"
    ports:
      - 8222:8222
      - 4222:4222
    command:
      - "--config=/etc/nats/nats.conf"
    volumes:
      - ./hub.conf:/etc/nats/nats.conf
    networks:
      - nats

  domain-1:
    image: nats:2.10.12-alpine3.19
    container_name: "domain-1"
    ports:
      - 8223:8222
      - 4223:4222
    command:
      - "--config=/etc/nats/nats.conf"
    volumes:
      - ./leaf.conf:/etc/nats/nats.conf
    depends_on:
      - hub
    networks:
      - nats

  domain-2:
    image: nats:2.10.12-alpine3.19
    container_name: "domain-2"
    ports:
      - 8224:8222
      - 4224:4222
    command:
      - "--config=/etc/nats/nats.conf"
    volumes:
      - ./leaf.conf:/etc/nats/nats.conf
    depends_on:
      - hub
    networks:
      - nats

networks:
  nats:
    name: nats
    driver: bridge

Config files for Docker container

hub.conf

debug = true
trace = false

leafnodes {
    port: 7422
}

jetstream {
   store_dir=/data
   domain=hub-domain
}

leaf.conf

debug = true
trace = false

leafnodes {
    remotes = [
        {
            urls: ["nats://hub"]
        }
    ]
}

jetstream {
   store_dir=/data
   domain=leaf-domain
}

Web Api

using NATS.Client.Core;
using NATS.Client.JetStream;
using NATS.Client.ObjectStore;
using NATS.Extensions.Microsoft.DependencyInjection;

var builder = WebApplication.CreateBuilder(args);

var logger = LoggerFactory.Create(config =>
{
    config.AddConsole();
}).CreateLogger("Program");

builder.Services.AddEndpointsApiExplorer();
builder.Services.AddSwaggerGen();

builder.Services.AddHostedService<JsObjectMonitor>();
builder.Services.AddNatsClient(builder =>
{
    builder.ConfigureOptions(optsFactory =>
    {
        optsFactory = optsFactory with
        {
            Url = "nats://localhost:4223" // Domain 1 leaf
        };

        return optsFactory;
    });
});

var app = builder.Build();

if (app.Environment.IsDevelopment())
{
    app.UseSwagger();
    app.UseSwaggerUI();
}

app.UseHttpsRedirection();

app.MapPost("/file-upload", async (IFormFile file, NatsConnection natsConnection, CancellationToken ct) =>
{
    try
    {
        var js = new NatsJSContext(natsConnection, new NatsJSOpts(new NatsOpts
        {
        }));

        using (var fileStream = new MemoryStream())
        {
            await file.CopyToAsync(fileStream);
            fileStream.Seek(0, SeekOrigin.Begin);

            var obj = new NatsObjContext(js);
            var fileId = Guid.NewGuid();
            var store = await obj.CreateObjectStoreAsync(new NatsObjConfig("app-payloads")
            {
                MaxBytes = int.MaxValue
            }, ct);

            await store.PutAsync($"{fileId}.txt", fileStream, cancellationToken: ct);
        }
    }
    catch (Exception ex)
    {
        logger.LogError("Error publishing. Reason: {msg}", ex.Message);
    }
})
.WithName("upload-file")
.DisableAntiforgery();

app.Run();

/*
 * Background service subscribing to published objects.
 */
public class JsObjectMonitor(ILogger<JsObjectMonitor> logger, NatsConnection cn) : BackgroundService
{
    protected override async Task ExecuteAsync(CancellationToken stoppingToken)
    {
        var opts = new NatsOpts
        {
            Url = "nats://localhost:4224" // Domain 2 leaf
        };
        await using var nats = new NatsConnection(opts);

        while (!stoppingToken.IsCancellationRequested)
        {
            try
            {
                var js = new NatsJSContext(nats, new NatsJSOpts(new NatsOpts { }));
                var obj = new NatsObjContext(js);

                var store = await obj.CreateObjectStoreAsync(new NatsObjConfig("app-payloads")
                {
                    MaxBytes = int.MaxValue
                }, stoppingToken);

                await foreach (var m in store.WatchAsync(new NatsObjWatchOpts { IncludeHistory = false, IgnoreDeletes = true }, stoppingToken))
                {
                    var ms = new MemoryStream();
                    var f = await store.GetAsync(m.Name, ms, true, cancellationToken: stoppingToken);
                    logger.LogInformation("Received file. Size: {s}", m.Size);
                }
            }
            catch (Exception ex)
            {
                logger.LogError(ex, "Error subscibing to object store. Reason: {msg}", ex.Message);
            }
        }
    }
}
mtmk commented 7 months ago

It seems like we are receiving two ACKs for payload publishing, one from each domain e.g.:

--> PUB $O.app-payloads.C.X0wZEKrtI1yF0MUVaOevDD _INBOX.jNL85FQFkvrEfdz3DzIwyG.X0wZEKrtI1yFINUVaOevDD 9

<-- MSG _INBOX.jNL85FQFkvrEfdz3DzIwyG.X0wZEKrtI1yFINUVaOevDD 1 63
    {"stream":"OBJ_app-payloads", "domain":"leaf-domain", "seq":19}

<-- MSG _INBOX.jNL85FQFkvrEfdz3DzIwyG.X0wZEKrtI1yFINUVaOevDD 1 62
    {"stream":"OBJ_app-payloads", "domain":"hub-domain", "seq":11}
mtmk commented 7 months ago

I had some progress on this one: The reason you're receiving two responses is because the object store is created twice - two different stores. Hence when you publish anything to the streams backing the store (e.g. Put) then both streams responding separately.

If you specify a domain for your JetStream context for both connections, you shouldn't see the issue:

var js = new NatsJSContext(nats, new NatsJSOpts(nats.Opts, domain: "hub-domain"));
caleblloyd commented 7 months ago

Are the leaf nodes clustered? If not they need to be on separate JS domains. I think otherwise it's an invalid setup.

mtmk commented 6 months ago

Are the leaf nodes clustered? If not they need to be on separate JS domains. I think otherwise it's an invalid setup.

Yes, I think so. Which explains the why there were two stores being created since it'd be undefined behaviour if it's an invalid setup.

sgentry commented 6 months ago

Yes, the leaf nodes are clustered and configured with separate JS domains. The warning went away once I specified the domain like mentioned above.

I think we're all good on this issue. Thanks for the assistance. Still learning how to configure clusters with JS...