moleculerjs / moleculer-channels

Reliable messages for Moleculer services via external queue/channel/topic.
MIT License
72 stars 15 forks source link

The NATS Channel adapter breaks when using wildcard topics #56

Closed dremekie closed 1 year ago

dremekie commented 1 year ago

Prerequisites

Please answer the following questions for yourself before submitting an issue.

Current Behavior

The adapter currently ignores any specified subjects in options.nats.streamConfig.subjects. Instead, it will always use [nameOfStream] as the subject. This occurs here: https://github.com/moleculerjs/moleculer-channels/blob/master/src/adapters/nats.js#L182 . The adapter is trying to create a stream for each subscription it finds.

{
    name: 'sub',
    'streamOne': {
      'streamOneTopic.abc': {
        group: 'other',
        async handler (payload) {
          console.log(`Processing streamOneTopic: ${JSON.stringify(payload)}}`);
        }
      },
      'streamOneTopic.xyz': {
        group: 'other',
        async handler (payload) {
          console.log(`Processing streamOneTopic: ${JSON.stringify(payload)}}`);
        }
      }
    },
  }

The above will work. We end up with two streams: streamOneTopic_abc (using subject=streamOneTopic.abc) and streamOneTopic_xyz (using subject=streamOneTopic.xyz).

But when you take the code snippet below (that listens for streamOneTopic.*, it fails with Error: no stream matches subject, even thought in though I specified options.nats.streamConfig.subjects = ["streamOneTopic.*"].

At a high level, the following is how the NATS adapter should operate:

The current logic is "almost" there. My team uses NATS (and we sponsor Moleculer) and we're happy to battle test this adapter further.

Steps to Reproduce

Reproduce code snippet

const { ServiceBroker } = require('moleculer');
const ChannelsMiddleware = require('@moleculer/channels').Middleware;

(async () => {
  const broker = new ServiceBroker({
    nodeID: 'channelTest',
    logger: false,
    logLevel: 'debug',
    middlewares: [
      ChannelsMiddleware({
        schemaProperty: 'streamOne',
        adapterPropertyName: 'streamOneAdapter',
        sendMethodName: 'sendToStreamOneChannel',
        channelHandlerTrigger: 'emitStreamOneLocalChannelHandler',
        adapter: {
          type: 'NATS',
          options: {
            nats: {
              url: process.env.NATS_SERVER,
              connectionOptions: {
                debug: true,
                user: process.env.NATS_USER,
                pass: process.env.NATS_PASSWORD,
              },
              streamConfig: {
                name: 'streamOne',
                subjects: ['streamOneTopic.*'],
              },
              consumerOptions: {
                config: {
                  deliver_policy: 'new',
                  ack_policy: 'explicit',
                  max_ack_pending: 1,
                }
              }
            },
            maxInFlight: 10,
            maxRetries: 3,
            deadLettering: {
              enabled: false,
              queueName: 'DEAD_LETTER_REG',
            }
          }
        }
      })
    ]
  });

  await broker.createService({
    name: 'sub',
    'streamOne': {
      'streamOneTopic.*': {
        group: 'other',
        async handler (payload) {
          console.log(`Processing streamOneTopic: ${JSON.stringify(payload)}}`);
        }
      }
    },
  });

  await broker.start().delay(2000);

  const msg = {
    id: 1,
    name: 'John',
    age: 25
  };

  await broker.sendToStreamOneChannel('streamOneTopic.abc', msg);
  await broker.Promise.delay(200);
})();

Context

Please provide any relevant information about your setup. This is important in case the issue is not reproducible except for under certain conditions.

Failure Logs

> INFO {"server_id":"NDPRE7K73FVFP6EQFU5YBDZCPRRB3ZREJKQJ734ARDWUJ5PUTIZUIYQP","server_name":"hub-server","version":"2.9.10","proto":1,"git_commit":"4caf6aa","go":"go1.19.4","host":"0.0.0.0","port":4411,"headers":true,"auth_required":true,"max_payload":1048576,"jetstream":true,"client_id":4709,"client_ip":"172.19.0.1","domain":"hub"} ␍␊
< CONNECT {"protocol":1,"version":"2.9.0","lang":"nats.js","verbose":false,"pedantic":false,"user":"acc","pass":"acc","headers":true,"no_responders":true}␍␊
< PING␍␊
> PONG␍␊
< SUB _INBOX.M4LCVGG6YT2SJ9E3EC0RZC.* 1␍␊PUB $JS.API.INFO _INBOX.M4LCVGG6YT2SJ9E3EC0RZC.M4LCVGG6YT2SJ9E3EC0RXR 0␍␊␍␊
> MSG _INBOX.M4LCVGG6YT2SJ9E3EC0RZC.M4LCVGG6YT2SJ9E3EC0RXR 1 349␍␊{"type":"io.nats.jetstream.api.v1.account_info_response","memory":0,"storage":371561,"streams":33,"consumers":10,"limits":{"max_memory":-1,"max_storage":-1,"max_streams":-1,"max_consumers":-1,"max_ack_pending":-1,"memory_max_stream_bytes":-1,"storage_max_stream_bytes":-1,"max_bytes_required":false},"domain":"hub","api":{"total":8982,"errors":156}}␍␊
< PUB $JS.API.STREAM.NAMES _INBOX.M4LCVGG6YT2SJ9E3EC0RZC.M4LCVGG6YT2SJ9E3EC0S0X 30␍␊{"subject":"streamOneTopic.*"}␍␊
> MSG _INBOX.M4LCVGG6YT2SJ9E3EC0RZC.M4LCVGG6YT2SJ9E3EC0S0X 1 145␍␊{"type":"io.nats.jetstream.api.v1.stream_names_response","total":2,"offset":0,"limit":1024,"streams":["streamOneTopic_abc","streamOneTopic_xyz"]}␍␊
/Users/dwayne/Documents/conscia-dev/mesh-server/node_modules/nats/lib/nats-base-client/jsbaseclient_api.js:80
                throw new Error("no stream matches subject");
                      ^

Error: no stream matches subject
    at JetStreamClientImpl.<anonymous> (/Users/dwayne/Documents/conscia-dev/mesh-server/node_modules/nats/lib/nats-base-client/jsbaseclient_api.js:80:23)
    at Generator.next (<anonymous>)
    at fulfilled (/Users/dwayne/Documents/conscia-dev/mesh-server/node_modules/nats/lib/nats-base-client/jsbaseclient_api.js:19:58)
    at processTicksAndRejections (node:internal/process/task_queues:96:5)
AndreMaz commented 1 year ago

Hi @dremekie thank you for reporting the issue and for supporting the project :smile:

Read options.nats.streamConfig and create one stream using the options specified there. (e.g. name="streamOne" and subjects=["streamOneTopic.*"]

Checked the code and you're right. ATM it's not possible to override stream name and subject so it creates one stream per handler. To be precise, it creates a stream where name is normalized handler name and subject is equal to an array with handler name. So in your example name=streamOneTopic_abc and subjects=['streamOneTopic.abc']

https://github.com/moleculerjs/moleculer-channels/blob/2ddbb9d3ce69e769c0f81d96a2b60193215f0397/src/adapters/nats.js#L322-L330

We can fix this by overriding handler default name and subjects with the ones that are defined in options.nats.streamConfig.subjects

For each subscription, create one consumer against the stream that is filtering by the specified topic (e.g. streamOneTopic. should create a consumer where filter_subject="streamOneTopic.".

The filter_subject can be set in handler configs:

broker.createService({
    name: "sub",
    streamOne: {
        "streamOneTopic.abc": {
            group: "other",
            nats: {
                consumerOptions: {
                    filter_subject: "streamOneTopic.*"
                },
                streamConfig: {
                    name: "streamOne", // Has NO effect at the moment
                    subjects: ["streamOneTopic.*"] // Has NO effect at the moment
                }
            },
            async handler(payload) {
                broker.logger.info(
                    `[---] Processing streamOneTopic: ${JSON.stringify(payload)}}`
                );
            }
        }
    }
});

However, handler name with * is prohibited (streamOneTopic.* in your example) more info. Here's the snipped where NATS Jetstream client creates the consumer:

https://github.com/moleculerjs/moleculer-channels/blob/2ddbb9d3ce69e769c0f81d96a2b60193215f0397/src/adapters/nats.js#L212-L215

Your example produces the following error:

[2023-01-02T14:29:11.055Z] ERROR channelTest/CHANNELS: An error ocurred while create NATS Stream Error: invalid stream name - stream name cannot contain '*'

Can you please clarify the desired behavior? You want a single stream (e.g., orders.*) to handle several topics (e.g, orders.created, orders.received, etc.) ?

dremekie commented 1 year ago

Hi @AndreMaz

Can you please clarify the desired behavior? You want a single stream (e.g., orders.*) to handle several topics (e.g, orders.created, orders.received, etc.) ?

Yes, correct. I'd like a single stream streamOne to handle multiple topics (streamOneTopic.abc and streamOneTopic.xyz). I need one handler to handle any message in streamOne that matches the topic streamOneTopic.*. This would require that we are able to set the subject(s) of a stream (like in your example above):

                streamConfig: {
                    name: "streamOne", // Has NO effect at the moment
                    subjects: ["streamOneTopic.*"] // Has NO effect at the moment
                }
AndreMaz commented 1 year ago

@dremekie this https://github.com/moleculerjs/moleculer-channels/pull/57 should solve the issue

dremekie commented 1 year ago

@icebob @AndreMaz Thanks for this fix. I tested this and it works. Would you be able to push this to NPM?

icebob commented 1 year ago

I will do it at the weekend.