nats-io / nats.node

Node.js client for NATS, the cloud native messaging system.
https://nats.io
Apache License 2.0
1.54k stars 162 forks source link

Missing created consumers and created subscriptions (New and Improved?) #559

Closed Amansg4 closed 1 year ago

Amansg4 commented 1 year ago

Preface:

I'm currently working on Stephen Grider's microservices udemy video (which I am sure you guys get harassed with all the time), but I'm trying to establish the use of the new NATS Jetstream, (which I really like and am excited to use with the Object Store that was added to nats.js) instead of the soon to be depreciated Nats Streaming and I have come across a (personal) problem.

This specific infrastructure revolves around the use of k8s and NATS.js.

Infra

Contains environmental variables such as the server URL which is nats://127.0.0.1:4222) and the "name" of the server to be "tickets".

Services

Each service starts by creating a Nats Connection.

continuing...

Each service has unique specific durable push consumers & corresponding binding subscriptions created after establishing a NATS connection.

Each subscription has, what I hope acts as, a "fail-safe" deliver-subject so that it can create an ephemeral consumer that can be created for the services' needs, just in case a durable consumer fails to be formed.

This was mainly done because I ran into MAIN PROBLEM 1 below. Not sure if it is good practice to have an intended bind subscription also have a delivery-subject.

Also publishers to send relevant messages/data to the shared stream "TICKETS" for other services' subscribers to consumer.

Problem 1

Very frequently after running the application and everything builds, when interacting with the NATS CLI to see what has been created, there will be consumers that should have been created (and that are EXPICITLY stated to have been created via extremely verbose console log checks) that are missing.

Problem 2

When a service must restart (crash due to some mongo error or like Problem 4), any messages within consumers that are unprocessed tend to remain unprocessed despite what should be a new subscription being created (since the service deployment is restarted). Messages sent are received by consumers if they happen to be present, but are NOT processed by any subscriptions. Unfortunately I cant check to see what specific subscriptions are/are not available. Furthermore, it sometimes it will also report that the consumer has switched to being NOT ACTIVE instead of being within its designated active queue-group.

Problem 3

According to some NATS stats (checking 8222 monitor), I have an absurd amount of subscriptions that are stated despite it being impossible (If I'm not mistaken)! I'm only supposed to have maybe 13 or something other but not 78 or whatevs. Same thing with messages, sometimes I have an overwhelming amount of messages that are listed in the 8222 monitor (does some background information transferred between consumers, stream, subscriptions count? Do pings count?!) I've sent a handful of messages only to be informed that hundreds or thousands have accumulated.

Problem 4

I keep getting a Bad JSON error when attempting to submit data that I believe has been encoded into a Unit8Array already despite being informed otherwise. I've been scouring online and came across a similar issue where the replied answer implied that in Node, I might have to wrap the information I wish to encode in a try/catch block as to overcome an issue with a possible the data being null/undefined causing the service to crash? Not sure if it applies to my case yet.


I have an extreme amount of verbose information that I have went through and created console logs to isolate every step of connections formed, so sorry if it is overwhelming. There is an attached markdown that contains this question AND example outputs as well. It might appear better in a different markdown reader. natsIssues.md I can zero in on an aspect if requested and create whatever log of a scenario. Also TLDRs spread between and around both cases below to give you a quick understanding of what is transpiring. Thanks!


Amansg4 commented 1 year ago

I don't know if this is just happening to me or even why it is, but I apologize if for some reason you have a seizure inducing red pattern of highlights within the code blocks above.

aricart commented 1 year ago

Can you show the code you use to create your consumers?

aricart commented 1 year ago

If the consumers are not durable they will go away if they are not connected.

aricart commented 1 year ago

@ripienaar all the consumers above are created with nats cli, but it seems that there's some discrepancy in the reporting.

aricart commented 1 year ago

@Amansg4 while you can create your consumers and streams with NATS cli, the example would b much simpler to follow and attest, if you built them in code:

https://github.com/nats-io/nats.deno/blob/main/jetstream.md#jetstreammanager

aricart commented 1 year ago
  • Question 1:

    • I am still a little unsure of how naming this server will influence the overall interaction between the services. I assumed it would put all services under the same name but as stated a little further below, it seems each service creates its own "server connection"?

Not sure what you are referring to here - but effectively if you have a nats cluster, doesn't matter what server your services connect to, they are all be wired together.

Typically you will make each service have its own connection.

aricart commented 1 year ago

Each service starts by creating a Nats Connection.

  • Question 2:

    • Instead of all being unified under one "server" it seems each service creates its own "server" despite sharing one stream source. Is that normal? I might be wrong and misunderstanding that the NATS CLI is specifically showing me. Is it:

Not sure what "each service creates its own server means" - services don't create servers - services have connections to servers.

aricart commented 1 year ago

Each subscription has, what I hope acts as, a "fail-safe" deliver-subject so that it can create an ephemeral consumer that can be created for the services' needs, just in case a durable consumer fails to be formed. (This was mainly done because I ran into MAIN PROBLEM #1 below. Not sure if it is good practice to have an intended bind subscription also have a delivery-subject.)

Also publishers to send relevant messages/data to the shared stream "TICKETS" for other services' subscribers to consumer.

You don't need this complication - the question is why are your consumers not showing - you should just create them and your subscriber should be binding to it.

aricart commented 1 year ago

I have no idea of your stream, and whether your consumer are getting clobbered because you are specifying some options on the cli, and something else on the code - hint - you should be using bind so the subscribes fail if the consumer doesn't exist.

aricart commented 1 year ago

Problem 2

When a service must restart (crash due to some mongo error or like Problem 4), any messages within consumers that are unprocessed tend to remain unprocessed despite what should be a new subscription being created (since the service

The job of the consumer is to keep track of messages you have seen. So the question is what is your ack window, and whether you enabled manual acks on the client - if you didn't your messages are possibly getting ack'ed even if you failed to process them. But since no code, difficult to know.

deployment is restarted). Messages sent are received by consumers if they happen to be present, but are NOT processed by any subscriptions. Unfortunately I cant check to see what specific subscriptions are/are not available. Furthermore, it sometimes it will also report that the consumer has switched to being NOT ACTIVE instead of being within its designated active queue-group.

If the jetstream subscription is not active, messages queue for that consumer - when it resubscribes, the messages will be presented.

Problem 3

According to some NATS stats (checking 8222 monitor), I have an absurd amount of subscriptions that are stated despite it being impossible (If I'm not mistaken)! I'm only supposed to have maybe 13 or something other but not 78 or whatevs. Same thing with messages, sometimes I have an overwhelming amount of messages that are listed in the 8222 monitor

Seems like you are mixing CLI with code, so I don't know what to expect, if your Kubernetes setup is doing something strange it is possible. Again cannot tell what is jetstream vs. corenats.

(does some background information transferred between consumers, stream, subscriptions count? Do pings count?!) I've sent a handful of messages only to be informed that hundreds or thousands have accumulated.

Problem 4

I keep getting a Bad JSON error when attempting to submit data that I believe has been encoded into a Unit8Array already despite being informed otherwise. I've been scouring online and came across a similar issue where the replied answer implied that in Node, I might have to wrap the information I wish to encode in a try/catch block as to overcome an issue with a possible the data being null/undefined causing the service to crash? Not sure if it applies to my case yet.

If you try to decode a message without a payload, JSON decoding will fail. Validate your input.

Amansg4 commented 1 year ago

@aricart

Problem 2

When a service must restart (crash due to some mongo error or like Problem 4), any messages within consumers that are unprocessed tend to remain unprocessed despite what should be a new subscription being created (since the service

The job of the consumer is to keep track of messages you have seen. So the question is what is your ack window, and whether you enabled manual acks on the client - if you didn't your messages are possibly getting ack'ed even if you failed to process them. But since no code, difficult to know.

Messages are all manually acked

deployment is restarted). Messages sent are received by consumers if they happen to be present, but are NOT processed by any subscriptions. Unfortunately I cant check to see what specific subscriptions are/are not available. Furthermore, it sometimes it will also report that the consumer has switched to being NOT ACTIVE instead of being within its designated active queue-group.

If the jetstream subscription is not active, messages queue for that consumer - when it resubscribes, the messages will be presented.

Understood. Messages do queue once the consumer becomes present, unfortunately when a subscriber appears (or presumably appears) the messages do not get processed. The only means (I know )of checking to see if there are subscriptions is the monitoring endpoints (8222) like above to see if any exist. Above is an example, but it states an overwhelming amount of subscriptions are present, despite no messages being processed.

Problem 3

According to some NATS stats (checking 8222 monitor), I have an absurd amount of subscriptions that are stated despite it being impossible (If I'm not mistaken)! I'm only supposed to have maybe 13 or something other but not 78 or whatevs. Same thing with messages, sometimes I have an overwhelming amount of messages that are listed in the 8222 monitor

Seems like you are mixing CLI with code, so I don't know what to expect, if your Kubernetes setup is doing something strange it is possible. Again cannot tell what is jetstream vs. corenats.

Absolutely no CLI is being used to create anything. Just to observe what is present after the application builds. All the console logs are actual checks using information once a consumer, subscriber, stream, or nats connection is build/made in code

(does some background information transferred between consumers, stream, subscriptions count? Do pings count?!) I've sent a handful of messages only to be informed that hundreds or thousands have accumulated.

Problem 4

I keep getting a Bad JSON error when attempting to submit data that I believe has been encoded into a Unit8Array already despite being informed otherwise. I've been scouring online and came across a similar issue where the replied answer implied that in Node, I might have to wrap the information I wish to encode in a try/catch block as to overcome an issue with a possible the data being null/undefined causing the service to crash? Not sure if it applies to my case yet.

If you try to decode a message without a payload, JSON decoding will fail. Validate your input.

I have added some additional code of the consumers, subscribers, publishers, and services to clarify some confusion and reference. I have included a case to demonstrate Problem 4.

Once again, I apologize if I am bombarding you with a bunch of code. Feel free to disregard for higher priority issues. I might just be confusing myself or looking at code for too long.


Code

I'm utilizing typescript to create base structures to Consumers, Subscribers, and Publishers that can be further modified for specifics if I decide to later.

Consumer Base

const jc = JSONCodec();
const sc = StringCodec();

interface Event {
    stream:Streams;
    subject: Subjects;
    queueGroupName:QueueGroupName;
    data: any;
}

export abstract class Listener<T extends Event> {
  abstract stream: T[ 'stream' ];
  abstract subject: T[ 'subject' ];
  abstract queueGroupName: T['queueGroupName'];
  abstract onMessage ( data: T[ 'data' ], msg: JsMsg ): void;

  protected natsConnection: NatsConnection;
  protected ackWait = 5 * 1000;

//If no durable then attach a consumer name for ephemeral consumer (EPH)
abstract durableName?:string;
//!! OR IS THIS FOR PULL or PUSH CONSUMERS???
abstract deliverGroup?: QueueGroupName;
abstract filterSubject?:string;
abstract description?:string;  
abstract storage?:StorageType;
abstract retention?: RetentionPolicy;
abstract ackPolicy?:AckPolicy;
//abstract ackWait?:Nanos;
abstract maxAckPending?:number;
//T:  DeliverPolicy  
abstract deliverPolicy?:DeliverPolicy;
abstract maxDeliver?:number;
abstract replayPolicy?:ReplayPolicy;
//*:  PUSH CONSUMER SPECIFIC ~~~~ 
abstract deliverSubject?:string;
//abstract deliverGroup: QueueGroupName;
abstract flowControl?:boolean;
abstract idleHeartbeat?:Nanos;
abstract rateLimitBps?:number;
abstract headersOnly?:boolean;
abstract backoff?:Nanos[];

  constructor ( natsConnection: NatsConnection ) {
    this.natsConnection = natsConnection;
  }

//A: Stream Configuration to Create Streams through Listener (Fail-safe or otherwise)

streamConfig() {
  const opts:Partial<StreamConfig> = {
    name:this.stream, 
    subjects: [ `${ this.stream }.>` ],
}
return opts;
}

//A: JSM Consumer Configurations for establishing Stream/Consumer/Subscriber Binding
consumerConfig () {

    const durableName = `q-${ this.queueGroupName }-sub-${ this.subject.replaceAll( '.', '-' ) }-durable`;
    const jc = JSONCodec<T[ 'data' ]>();
    let deliveryInbox = `delivery.${this.subject}`; 
    //let deliveryInbox = createInbox(`delivery.${this.subject}`); 

const opts:Partial<ConsumerConfig> = {
//name:this.ephemeralName,
durable_name:this.durableName || durableName,
//filter_subject:this.filterSubject,
filter_subject:`${this.stream}.${this.subject}`,
description:this.description || `Standard Base Consumer Policies`,
// T:  AckPolicy  
ack_policy:this.ackPolicy || AckPolicy.Explicit,
ack_wait:this.ackWait,
max_ack_pending:this.maxAckPending || -1,
//T:  DeliverPolicy  
deliver_policy:this.deliverPolicy || DeliverPolicy.All,
//*:  PUSH CONSUMER SPECIFIC ~~~~ 
deliver_subject:this.deliverSubject || deliveryInbox,
deliver_group: this.deliverGroup || this.queueGroupName,
flow_control:this.flowControl,
idle_heartbeat:this.idleHeartbeat,
rate_limit_bps:this.rateLimitBps,
headers_only:this.headersOnly,
backoff:this.backoff,
  }   
      return opts;

}; 

async listen() {
          const jsm = await this?.natsConnection.jetstreamManager({})  

//check if stream exists
  try {

  //found existing stream and displaying info

  const ifStreamExists = await jsm.streams.find(`${this.stream}.>`)
  const existingStream = await jsm.streams.info(ifStreamExists,{
  deleted_details:true,
})

  new StreamMsg([{
    message:`Stream ${ifStreamExists} Exists!`,
    param:`

Config: 
  Name: ${existingStream.config.name}
  Subjects: ${existingStream.config.subjects}
  Storage: ${existingStream.config.storage}
  Sources: ${existingStream.config.sources}
  More....
Created: ${existingStream.created}
State: 
  Consumer Count: ${existingStream.state.consumer_count}
  # of Subjects: ${existingStream.state.num_subjects}
    Subjects: ${existingStream.state.subjects}
  Messages: ${existingStream.state.messages}
  Lost: ${existingStream.state.lost}
  Lost: ${existingStream.state.subjects}
  More...
  Cluster: 
    Leader: ${existingStream.cluster?.leader}
    Name: ${existingStream.cluster?.name}
    Replicas: ${existingStream.cluster?.replicas}
    `
  }]).chalkStreams()

} catch (err) {
  //Did not find stream so report absence... 
  new StreamMsg([{
    message:`Stream does not exist... `,
    param:`Adding Stream [ ${this.stream} ] according to Base-Listener profile`
  }]).chalkStreams()  
  //...And create stream, with internal stream Config
  const createdStream = await addStream( jsm, this.stream, this.subject, this.streamConfig())

}

//A: Establishing/Verifying Consumer 
        try{

//First, check to see if consumer exists
          const ifConsumerExists = await jsm.consumers.info(this.stream, this.consumerConfig().durable_name!);

//IF so, report current basic info of Consumer   
          new ConsumerMsg([{
            message:`Current Consumer Exists Exists Already!`,
            param:`
Stream Name: ${ifConsumerExists.stream_name}
Name: ${ifConsumerExists.name} <-- (Either Durable or Machine Generated)
Config: 
  Deliver Group:  ${ifConsumerExists.config.deliver_group}
  Deliver Subject:  ${ifConsumerExists.config.deliver_subject}
  Durable Name:  ${ifConsumerExists.config.durable_name}
  Filter Subject:  ${ifConsumerExists.config.filter_subject}
  Deliver Policy:  ${ifConsumerExists.config.deliver_policy}
  Ack Policy:  ${ifConsumerExists.config.ack_policy}
              `}]).chalkStreams()

          } catch ( err ) {

            const consumerList = jsm.consumers.list(this.stream)
            const consumersInfo = await consumerList.next()
            new ConsumerMsg([{
              message:`Consumer does not exist. List of current consumers`,
              param:`
${(consumersInfo).map((consumer)=>{
return `
ConsumerName:${consumer.name}
Push-bound?:${consumer.push_bound}
Stream:${consumer.stream_name}
`
})}
                `}]).chalkStreams()

//IF not, create consumer with the internal consumer configuration settings
            const createdConsumer = await jsm.consumers.add(this.stream,this.consumerConfig())

        new ConsumerMsg([{
message:`Consumer Created`,
param:`
Stream Name: ${createdConsumer.stream_name}
Name: ${createdConsumer.name} <-- (Either Durable or Machine Generated)
Config: 
  Deliver Group:  ${createdConsumer.config.deliver_group}
  Deliver Subject:  ${createdConsumer.config.deliver_subject}
  Durable Name:  ${createdConsumer.config.durable_name}
  Filter Subject:  ${createdConsumer.config.filter_subject}
  Deliver Policy:  ${createdConsumer.config.deliver_policy}
  Ack Policy:  ${createdConsumer.config.ack_policy}
Cluster: 
  Leader: ${createdConsumer.cluster?.leader}
  Name: ${createdConsumer.cluster?.name}
  Replicas: ${createdConsumer.cluster?.replicas}
          `}]).chalkStreams()}

    }

    parseMessage(msg: JsMsg): T["data"] {
        const data = msg.data;
        return typeof data === "string"
            ? JSON.parse(data)
            : JSON.parse(data.toString());
          } 
    }

Subscriber Base

const jc = JSONCodec();
const sc = StringCodec();

interface Event {
    stream:Streams;
    subject: Subjects;
    queueGroupName:QueueGroupName;
    data: any;
}

export abstract class Subscriber<T extends Event> {
  abstract stream: T[ 'stream' ];
  abstract subject: T[ 'subject' ];
  abstract queueGroupName: T['queueGroupName'];
  abstract onMessage ( data: T[ 'data' ], msg: JsMsg ): void;
  abstract onMessageKV (data: T[ 'data' ], msg: JsMsg ): void;
  //private 
  protected natsConnection: NatsConnection;
  protected ackWait = 5 * 1000;

  //Need to find hw to make these optional and also difference between abstract, private, protected

  constructor ( natsConnection: NatsConnection ) {
    this.natsConnection = natsConnection;
  }

//A: Establishing JSC Client Subscriber/Subscription Options
protected jsc(){
  const jsc = this!.natsConnection.jetstream({})
return jsc
}

subscriberConfig? () {
  const durableName = `q-${ this.queueGroupName }-sub-${ this.subject.replaceAll( '.', '-' ) }-durable`;
  const jc = JSONCodec<T[ 'data' ]>();
  let maxDeliver = 5; // number of attempts per message

  const opts = consumerOpts();

 //opts.durable( durableName );
 opts.bind(this.stream, durableName);
 opts.bindStream(this.stream)
  //opts.filterSubject();
  opts.description(`Standard Base Subscriber Policies`);
  opts.replayInstantly();

  //T: Ack Policy

  opts.ackExplicit();
  opts.manualAck();
  opts.ackWait( this.ackWait );

  //T: Delivery Policy
  opts.maxDeliver(maxDeliver);  
  opts.deliverAll();
  opts.maxAckPending(-1); 

  //*:  PUSH SUBSCRIPTION SPECIFIC ~~~~    
  opts.deliverTo(`deliver.${this.subject}`);
  opts.deliverGroup( this.queueGroupName );

  opts.callback( ( _err, msg ) => {
    if ( msg ) {
        const adjustedQName = this.queueGroupName.replace( '-', ' ' );
        this.onMessage( jc.decode( msg.data ), msg );
      }
    });
    return opts;
};

async subscribe(){

  try {

    //Initiation of Subscription

    const jscSubscription = await this.jsc().subscribe(
      `${this.stream}.${this.subject}`,
      this.subscriberConfig!())

      new SubscriberMsg([{
        message:`Subscribing to Consumer:`,
        param:`
Name : ${((await jscSubscription.consumerInfo()).name)},
Durable_name: ${((await jscSubscription.consumerInfo()).config.durable_name)},
Cluster: ${((await jscSubscription.consumerInfo()).cluster)},
Stream_name: ${((await jscSubscription.consumerInfo()).stream_name)},
Subject: ${this.subject}, 
Created: ${((await jscSubscription.consumerInfo()).created)},
Deliver_subject: ${((await jscSubscription.consumerInfo()).config.deliver_subject)}
More...`
          }]).chalkStreams()

        return jscSubscription;  

  } catch (err) {
    throw new SubscriberError([{message:`Problem Subscribing`,param:`
Error: ${err}
Stream:${this.stream}
Attempted Subject of Subscription:${this.subject}
    `}]).chalkStreams()
  }

}

    parseMessage(msg: JsMsg): T["data"] {
        const data = msg.data;
        return typeof data === "string"
            ? JSON.parse(data)
            : JSON.parse(data.toString());
          } 
    }

Publisher Base

interface Event {
    stream:Streams;
    subject: Subjects;
    queueGroupName:QueueGroupName;
    data: any;
}

export abstract class Publisher<T extends Event> {
    abstract stream: T["stream"];
    abstract subject: T["subject"];
    private natsConnection: NatsConnection;
    abstract queueGroupName: T["queueGroupName"];

    constructor ( natsConnection: NatsConnection ) {
    this.natsConnection = natsConnection;
    }

//A: Establishing JSC Client Subscriber/Subscription Options
jsc(){
  const jsc = this.natsConnection.jetstream({})
return jsc
}

jetPubOptions (){

  const opts:Partial<JetStreamPublishOptions> = {
    expect:{
      streamName:this.stream,
      },
  }
   return opts;
}

    async publish ( data: T[ 'data' ] ): Promise<PubAck> {
        // create a codec
        const jc = JSONCodec();
        const jsm = await this.natsConnection.jetstreamManager();

        try {
            const currentData = jc.encode(data)
            const publishedJsMsg = await this.jsc().publish(`${this.stream}.${this.subject}`, currentData, this.jetPubOptions());

            new PublisherMsg([{
                    message: `Published a message to:`,
                    param: `
Stream:${this.stream}
Subject: ${this.subject}
Consumer Info of this stream/subject:
    Name: ${(await jsm.consumers.info(this.stream,`q-${ this.queueGroupName }-sub-${ this.subject.replaceAll( '.', '-' ) }-durable`)).name}
    Filter_Subject: ${(await jsm.consumers.info(this.stream,`q-${ this.queueGroupName }-sub-${ this.subject.replaceAll( '.', '-' ) }-durable`)).config.filter_subject}
    Durable_Name: ${(await jsm.consumers.info(this.stream,`q-${ this.queueGroupName }-sub-${ this.subject.replaceAll( '.', '-' ) }-durable`)).config.durable_name}
    Delivered: 
        Consumer_Seq: ${(await jsm.consumers.info(this.stream,`q-${ this.queueGroupName }-sub-${ this.subject.replaceAll( '.', '-' ) }-durable`)).delivered.consumer_seq} 
        Last_active: ${(await jsm.consumers.info(this.stream,`q-${ this.queueGroupName }-sub-${ this.subject.replaceAll( '.', '-' ) }-durable`)).delivered.last_active} 
        Stream_Seq: ${(await jsm.consumers.info(this.stream,`q-${ this.queueGroupName }-sub-${ this.subject.replaceAll( '.', '-' ) }-durable`)).delivered.stream_seq} 
                `
            }]).chalkStreams();
        return publishedJsMsg

        }
        catch (err) {

            const consumerList = jsm.consumers.list(this.stream)
            const consumersInfo = await consumerList.next()

            throw new PublisherError([{
                    message: `Error in Publishing Message`,
                    param: `
                    Data attached: ${jc.decode(data)}
                    Sent to Subject : ${this.stream}.${this.subject}
                    Error: ${err}

                    ${(consumersInfo).map((consumer)=>{
                        return `
                        ConsumerName:${consumer.name}
                        Push-bound?:${consumer.push_bound}
                        Stream:${consumer.stream_name}
                        `
                        })}
                        `
                }]).chalkStreams();
            }
        }
    }

TicketCreated Event

export interface TicketCreatedEvent {
    stream: Streams.Ticket;
    subject: Subjects.TicketCreated;
    queueGroupName:QueueGroupName;
    data: {
        id: string;
        title: string;
        price: number;
        version: number;
        userId: string;
        orderId?: string;
    };
}

TicketCreated Publisher

export class TicketCreatedPublisher extends Publisher<TicketCreatedEvent> {
    stream: Streams.Ticket = Streams.Ticket;
    subject: Subjects.TicketCreated = Subjects.TicketCreated;
    queueGroupName: QueueGroupName.TicketService= QueueGroupName.TicketService;

}

Ticket Service Index

Establishing a connection and setting up Streams, Consumers, Subscriptions. Once this service creates a ticket, it should publish a message that contains ticket data that will be received by a listener(consumer) within the Orders Services.

async function start() {
console.clear();

    if (!process.env.NATS_SERVER_URL) {
        throw new Error("NATS_URL must be defined");
    }
    if (!process.env.NATS_CLUSTER_STREAM_NAME) {
        throw new Error("NATS_CLUSTER_STREAM_NAME must be defined");
    } 
    if (!process.env.MONGO_URI) {
        throw new Error("MONGO_URI must be defined");
    }

    await natsInitializer.connect({
         name: process.env.NATS_CLUSTER_STREAM_NAME, // 'ticket' 
         servers: process.env.NATS_SERVER_URL, //'nats://127.0.0.1:4222' 
         maxReconnectAttempts:50,
         reconnect:true, 
         reconnectTimeWait:1000,
         debug:true,
         verbose:true, 
    });

        process.on("SIGTERM", natsInitializer.natsConnection.close);
        process.on("SIGINT", natsInitializer.natsConnection.close);

        await mongoose
    .connect(process.env.MONGO_URI,{});
    console.log(`connected to PORT: ${PORT} and listening...although it may not bee that port`);

    app.listen(PORT,()=>{
        console.log(
            `APP is listening`)
        });
    }

    start()

Order Services

The order service creates a consumer and subscriber for subjects published to TICKET.ticket.created

let PORT = 3000;  
async function start() {

    if (!process.env.NATS_SERVER_URL) {
        throw new Error("NATS_URL must be defined");
    }
    if (!process.env.NATS_CLUSTER_STREAM_NAME) {
        throw new Error("NATS_CLUSTER_STREAM_NAME must be defined");
    } 
    if (!process.env.MONGO_URI) {
        throw new Error("MONGO_URI must be defined");
    }

   await natsInitializer.connect({
        name: process.env.NATS_CLUSTER_STREAM_NAME, // 'ticket' 
        servers: process.env.NATS_SERVER_URL, //'nats://127.0.0.1:4222' 
        maxReconnectAttempts:50,
        reconnect:true, 
        reconnectTimeWait:1000,
   });

        process.on("SIGTERM", natsInitializer.natsConnection.close);
        process.on("SIGINT", natsInitializer.natsConnection.close);

       try {

           await new TicketCreatedListener(natsInitializer.natsConnection).listen();
           const ticketCreatedSubscription = await new TicketCreatedSubscriber(natsInitializer.natsConnection).subscribe();

} catch (err) {
    console.log(` Orders:TicketCreated ASG Index-Error, ${err}`)
}

        await mongoose
        .connect(process.env.MONGO_URI)
        console.log(`connected to DB!`);

    app.listen(PORT,()=>{
        console.log(
            `connected to PORT: ${PORT} and listening...although it may not bee that port`)
        });
    }

    start()

Create Ticket Route Handler

export const tickets_create = async (req: Request, res: Response) => {

 try {
    const newTicket = await Ticket.createTicket({
        title: req.body.title,
        price: req.body.price,
        userId: req.body.userId,
    });

    console.log(newTicket);
    console.log(newTicket.id);
    console.log(newTicket.title);
    console.log(newTicket.userId);
    console.log(newTicket.version);

    const published = await new TicketCreatedPublisher(natsInitializer.natsConnection).publish({
        id: newTicket.id,
        title: newTicket.title,
        price: Number(newTicket.price),
        userId: newTicket.userId,
        version: newTicket.version,
    });
    res.status(201).send(newTicket);
 } catch (err) {
    console.log(`Caught Error in Ticket: TicketCreatedPublisher :
    ${err}
    `)
    res.end()
} 
    }

Case Example 2 for Problem 4

In the case of the Bad Json Error, I have successfully ran a build with all consumers present and active. Just to give an idea of the starting state before sending any message

SpinUp

Successful with all consumers present according to the CLI. Still cannot make sense of some properties values (num_subscriptions, messages, etc...)

nats consumer list

http://localhost:8222/varz

{
  "server_id": "ND5VKGSH7WYCCHWRCJOTMC7IJJDRNVUVBF2X23Q4UQFNAUYCWHILT76Y",
  "server_name": "asg4-nats",
  "version": "2.9.14",
  "proto": 1,
  "git_commit": "74ae59a",
  "go": "go1.19.5",
  "host": "0.0.0.0",
  "port": 4222,
  "max_connections": 65536,
  "ping_interval": 120000000000,
  "ping_max": 2,
  "http_host": "0.0.0.0",
  "http_port": 8222,
  "http_base_path": "",
  "https_port": 0,
  "auth_timeout": 2,
  "max_control_line": 4096,
  "max_payload": 1048576,
  "max_pending": 67108864,
  "cluster": {
    "name": "asg4-nats-cluster"
  },
  "gateway": {},
  "leaf": {},
  "mqtt": {},
  "websocket": {},
  "jetstream": {
    "config": {
      "max_memory": 6107409408,
      "max_storage": 759418245120,
      "store_dir": "/tmp/nats/jetstream"
    },
    "stats": {
      "memory": 0,
      "storage": 0,
      "reserved_memory": 0,
      "reserved_storage": 0,
      "accounts": 1,
      "ha_assets": 0,
      "api": {
        "total": 141,
        "errors": 9
      }
    }
  },
  "tls_timeout": 2,
  "write_deadline": 10000000000,
  "start": "2023-02-22T23:08:15.521820205Z",
  "now": "2023-02-22T23:14:07.886440841Z",
  "uptime": "5m52s",
  "mem": 15216640,
  "cores": 8,
  "gomaxprocs": 8,
  "cpu": 0,
  "connections": 4,
  "total_connections": 12,
  "routes": 0,
  "remotes": 0,
  "leafnodes": 0,
  "in_msgs": 141,
  "out_msgs": 141,
  "in_bytes": 3898,
  "out_bytes": 107203,
  "slow_consumers": 0,
  "subscriptions": 89,
  "http_req_stats": {
    "/": 1,
    "/varz": 2
  },
  "config_load_time": "2023-02-22T23:08:15.521820205Z",
  "system_account": "$SYS"
}

http://localhost:8222/jsz

{
  "server_id": "ND5VKGSH7WYCCHWRCJOTMC7IJJDRNVUVBF2X23Q4UQFNAUYCWHILT76Y",
  "now": "2023-02-22T23:15:28.075512965Z",
  "config": {
    "max_memory": 6107409408,
    "max_storage": 759418245120,
    "store_dir": "/tmp/nats/jetstream"
  },
  "memory": 0,
  "storage": 0,
  "reserved_memory": 0,
  "reserved_storage": 0,
  "accounts": 1,
  "ha_assets": 0,
  "api": {
    "total": 141,
    "errors": 9
  },
  "streams": 1,
  "consumers": 9,
  "messages": 0,
  "bytes": 0
}

http://localhost:8222/subsz

{
  "num_subscriptions": 89,
  "num_cache": 172,
  "num_inserts": 97,
  "num_removes": 8,
  "num_matches": 625,
  "cache_hit_rate": 0.3824,
  "max_fanout": 1,
  "avg_fanout": 0.9883720930232558
}

Submission of data to create ticket

This is the result:

[tickets]
[tickets] < PING␍␊
[tickets] > PONG␍␊
[tickets] {
[tickets]   title: 'Title2',
[tickets]   price: '24',
[tickets]   userId: 'asg1',
[tickets]   orderId: null,
[tickets]   _id: new ObjectId("63f6a31179a1538f6a75e510"),
[tickets]   version: 0
[tickets] }
[tickets] 63f6a31179a1538f6a75e510
[tickets] Title2
[tickets] asg1
[tickets] 0
[tickets] < PUB $JS.API.INFO _INBOX.73L000R2MM216UEG6R1R1E.73L000R2MM216UEG6R1UXI 0␍␊␍␊
[tickets] > +OK␍␊
[tickets] > MSG _INBOX.73L000R2MM216UEG6R1R1E.73L000R2MM216UEG6R1UXI 1 324␍␊{"type":"io.nats.jetstream.api.v1.account_info_response","memory":0,"storage":0,"streams":1,"consumers":9,"limits":{"max_memory":-1,"max_storage":-1,"max_streams":-1,"max_consumers":-1,"max_ack_pending":-1,"memory_max_stream_bytes":-1,"storage_max_stream_bytes":-1,"max_bytes_required":false},"api":{"total":141,"errors":9}}␍␊
[tickets] < HPUB TICKET.ticket.created _INBOX.73L000R2MM216UEG6R1R1E.73L000R2MM216UEG6R1V2W 42 131␍␊NATS/1.0␍␊Nats-Expected-Stream: TICKET␍␊␍␊{"id":"63f6a31179a1538f6a75e510","title":"Title2","price":24,"userId":"asg1","version":0}␍␊
[tickets] > +OK␍␊
[tickets] > MSG _INBOX.73L000R2MM216UEG6R1R1E.73L000R2MM216UEG6R1V2W 1  28␍␊{"stream":"TICKET", "seq":1}␍␊      
[tickets] < PUB $JS.API.CONSUMER.INFO.TICKET.q-ticket-services-sub-ticket-created-durable _INBOX.73L000R2MM216UEG6R1R1E.73L000R2MM216UEG6R1V8A 0␍␊␍␊
[tickets] > +OK␍␊
[tickets] > MSG _INBOX.73L000R2MM216UEG6R1R1E.73L000R2MM216UEG6R1V8A 1 131␍␊{"type":"io.nats.jetstream.api.v1.consumer_info_response","error":{"code":404,"err_code":10014,"description":"consumer not found"}}␍␊
[tickets] < PUB $JS.API.CONSUMER.LIST.TICKET _INBOX.73L000R2MM216UEG6R1R1E.73L000R2MM216UEG6R1VDO 12␍␊{"offset":0}␍␊
[tickets] > +OK␍␊
[tickets] > MSG _INBOX.73L000R2MM216UEG6R1R1E.73L000R2MM216UEG6R1VDO 1 6861␍␊{"type":"io.nats.jetstream.api.v1.consumer_list_response","total":9,"offset":0,"limit":256,"consumers":[{"stream_name":"TICKET","name":"q-expiration-services-sub-order-created-durable","created":"2023-02-22T23:08:28.090537356Z","config":{"durable_name":"q-expiration-services-sub-order-created-durable","name":"q-expiration-services-sub-order-created-durable","description":"Standard Base Consumer Policies","deliver_policy":"all","ack_policy":"explicit","ack_wait":5000,"max_deliver":-1,"filter_subject":"TICKET.order.created","replay_policy":"instant","max_ack_pending":-1,"deliver_subject":"delivery.order.created","deliver_group":"expiration-services","num_replicas":0},"delivered":{"consumer_seq":0,"stream_seq":0},"ack_floor":{"consumer_seq":0,"stream_seq":0},"num_ack_pending":0,"num_redelivered":0,"num_waiting":0,"num_pending":0,"push_bound":true},{"stream_name":"TICKET","name":"q-order-services-sub-expiration-complete-durable","created":"2023-02-22T23:08:34.900899771Z","config":{"durable_name":"q-order-services-sub-expiration-complete-durable","name":"q-order-services-sub-expiration-complete-durable","description":"Standard Base Consumer Policies","deliver_policy":"all","ack_policy":"explicit","ack_wait":5000,"max_deliver":-1,"filter_subject":"TICKET.expiration.complete","replay_policy":"instant","max_ack_pending":-1,"deliver_subject":"delivery.expiration.complete","deliver_group":"order-services","num_replicas":0},"delivered":{"consumer_seq":0,"stream_seq":0},"ack_floor":{"consumer_seq":0,"stream_seq":0},"num_ack_pending":0,"num_redelivered":0,"num_waiting":0,"num_pending":0,"push_bound":true},{"stream_name":"TICKET","name":"q-order-services-sub-payment-created-durable","created":"2023-02-22T23:08:35.051733188Z","config":{"durable_name":"q-order-services-sub-payment-created-durable","name":"q-order-services-sub-payment-created-durable","description":"Standard Base Consumer Policies","deliver_policy":"all","ack_policy":"explicit","ack_wait":5000,"max_deliver":-1,"filter_subject":"TICKET.payment.created","replay_policy":"instant","max_ack_pending":-1,"deliver_subject":"delivery.payment.created","deliver_group":"order-services","num_replicas":0},"delivered":{"consumer_seq":0,"stream_seq":0},"ack_floor":{"consumer_seq":0,"stream_seq":0},"num_ack_pending":0,"num_redelivered":0,"num_waiting":0,"num_pending":0,"push_bound":true},{"stream_name":"TICKET","name":"q-order-services-sub-ticket-created-durable","created":"2023-02-22T23:08:34.701078281Z","config":{"durable_name":"q-order-services-sub-ticket-created-durable","name":"q-order-services-sub-ticket-created-durable","description":"Standard Base Consumer Policies","deliver_policy":"all","ack_policy":"explicit","ack_wait":5000,"max_deliver":-1,"filter_subject":"TICKET.ticket.created","replay_policy":"instant","max_ack_pending":-1,"deliver_subject":"delivery.ticket.created","deliver_group":"order-services","num_replicas":0},"delivered":{"consumer_seq":2,"stream_seq":1,"last_active":"2023-02-22T23:19:45.68665026Z"},"ack_floor":{"consumer_seq":0,"stream_seq":0},"num_ack_pending":1,"num_redelivered":1,"num_waiting":0,"num_pending":0,"push_bound":true},{"stream_name":"TICKET","name":"q-order-services-sub-ticket-updated-durable","created":"2023-02-22T23:08:34.799205125Z","config":{"durable_name":"q-order-services-sub-ticket-updated-durable","name":"q-order-services-sub-ticket-updated-durable","description":"Standard Base Consumer Policies","deliver_policy":"all","ack_policy":"explicit","ack_wait":5000,"max_deliver":-1,"filter_subject":"TICKET.ticket.updated","replay_policy":"instant","max_ack_pending":-1,"deliver_subject":"delivery.ticket.updated","deliver_group":"order-services","num_replicas":0},"delivered":{"consumer_seq":0,"stream_seq":0},"ack_floor":{"consumer_seq":0,"stream_seq":0},"num_ack_pending":0,"num_redelivered":0,"num_waiting":0,"num_pending":0,"push_bound":true},{"stream_name":"TICKET","name":"q-payment-services-sub-order-cancelled-durable","created":"2023-02-22T23:08:35.08621783Z","config":{"durable_name":"q-payment-services-sub-order-cancelled-durable","name":"q-payment-services-sub-order-cancelled-durable","description":"Standard Base Consumer Policies","deliver_policy":"all","ack_policy":"explicit","ack_wait":5000,"max_deliver":-1,"filter_subject":"TICKET.order.cancelled","replay_policy":"instant","max_ack_pending":-1,"deliver_subject":"delivery.order.cancelled","deliver_group":"payment-services","num_replicas":0},"delivered":{"consumer_seq":0,"stream_seq":0},"ack_floor":{"consumer_seq":0,"stream_seq":0},"num_ack_pending":0,"num_redelivered":0,"num_waiting":0,"num_pending":0,"push_bound":true},{"stream_name":"TICKET","name":"q-payment-services-sub-order-created-durable","created":"2023-02-22T23:08:34.844505142Z","config":{"durable_name":"q-payment-services-sub-order-created-durable","name":"q-payment-services-sub-order-created-durable","description":"Standard Base Consumer Policies","deliver_policy":"all","ack_policy":"explicit","ack_wait":5000,"max_deliver":-1,"filter_subject":"TICKET.order.created","replay_policy":"instant","max_ack_pending":-1,"deliver_subject":"delivery.order.created","deliver_group":"payment-services","num_replicas":0},"delivered":{"consumer_seq":0,"stream_seq":0},"ack_floor":{"consumer_seq":0,"stream_seq":0},"num_ack_pending":0,"num_redelivered":0,"num_waiting":0,"num_pending":0,"push_bound":true},{"stream_name":"TICKET","name":"q-ticket-services-sub-order-cancelled-durable","created":"2023-02-22T23:09:09.869095289Z","config":{"durable_name":"q-ticket-services-sub-order-cancelled-durable","name":"q-ticket-services-sub-order-cancelled-durable","description":"Standard Base Consumer Policies","deliver_policy":"all","ack_policy":"explicit","ack_wait":5000,"max_deliver":-1,"filter_subject":"TICKET.order.cancelled","replay_policy":"instant","max_ack_pending":-1,"deliver_subject":"delivery.order.cancelled","deliver_group":"ticket-services","num_replicas":0},"delivered":{"consumer_seq":0,"stream_seq":0},"ack_floor":{"consumer_seq":0,"stream_seq":0},"num_ack_pending":0,"num_redelivered":0,"num_waiting":0,"num_pending":0,"push_bound":true},{"stream_name":"TICKET","name":"q-ticket-services-sub-order-created-durable","created":"2023-02-22T23:09:09.797439114Z","config":{"durable_name":"q-ticket-services-sub-order-created-durable","name":"q-ticket-services-sub-order-created-durable","description":"Standard Base Consumer Policies","deliver_policy":"all","ack_policy":"explicit","ack_wait":5000,"max_deliver":-1,"filter_subject":"TICKET.order.created","replay_policy":"instant","max_ack_pending":-1,"deliver_subject":"delivery.order.created","deliver_group":"ticket-services","num_replicas":0},"delivered":{"consumer_seq":0,"stream_seq":0},"ack_floor":{"consumer_seq":0,"stream_seq":0},"num_ack_pending":0,"num_redelivered":0,"num_waiting":0,"num_pending":0,"push_bound":true}]}␍␊    
[tickets] Caught Error in Ticket: TicketCreatedPublisher :
[tickets]     NatsError: Bad JSON
[tickets]
[tickets] ::ffff:127.0.0.1 POST /api/tickets/new 200 - Wed, 22 Feb 2023 23:19:45 GMT web 52.540 ms undefined    
[orders] 
[orders] 
[orders]         From Orders:TicketCreatedSubscriber onMessage
[orders]
[orders]         _id: undefined
[orders]          id: undefined
[orders]          title: undefined
[orders]          price: undefined
[orders]          errors: undefined
[orders]
[orders]
[orders]
[orders]         From Orders:TicketCreatedSubscriber onMessage
[orders]
[orders]         _id: undefined
[orders]          id: undefined
[orders]          title: undefined
[orders]          price: undefined
[orders]          errors: undefined
[orders]
[orders] /app/node_modules/mongoose/node_modules/mongodb/src/sdam/server.ts:503
[orders]       return callback(undefined, result);      
[orders]              ^
[orders] MongoServerError: E11000 duplicate key error collection: orders.tickets index: _id_ dup key: { _id: ObjectId('63f6a31179a1538f6a75e510') }
[orders]     at /app/node_modules/mongoose/node_modules/mongodb/src/operations/insert.ts:85:25
[orders]     at /app/node_modules/mongoose/node_modules/mongodb/src/cmap/connection_pool.ts:569:13
[orders]     at /app/node_modules/mongoose/node_modules/mongodb/src/sdam/server.ts:370:13
[orders]     at handleOperationResult (/app/node_modules/mongoose/node_modules/mongodb/src/sdam/server.ts:503:14)
[orders]     at Connection.onMessage (/app/node_modules/mongoose/node_modules/mongodb/src/cmap/connection.ts:452:5)
[orders]     at MessageStream.<anonymous> (/app/node_modules/mongoose/node_modules/mongodb/src/cmap/connection.ts:240:56)
[orders]     at MessageStream.emit (node:events:512:28) 
[orders]     at MessageStream.emit (node:domain:489:12) 
[orders]     at processIncomingData (/app/node_modules/mongoose/node_modules/mongodb/src/cmap/message_stream.ts:189:12)
[orders]     at MessageStream._write (/app/node_modules/mongoose/node_modules/mongodb/src/cmap/message_stream.ts:70:5) {
[orders]   index: 0,
[orders]   code: 11000,
[orders]   keyPattern: { _id: 1 },
[orders]   keyValue: { _id: ObjectId { [Symbol(id)]: [Buffer [Uint8Array]] } },
[orders]   [Symbol(errorLabels)]: Set(0) {}
[orders] }
[orders] 
[orders] > orders@1.0.0 start
[orders] > ts-node src/index.ts
[orders]
Amansg4 commented 1 year ago

@aricart

Order TicketCreated Listeners and Subscriber

I forgot to add these as further reference for created consumer and subscribers of the subject

Order Service TicketCreated Consumer


export class TicketCreatedListener extends Listener<TicketCreatedEvent> {
    stream: Streams.Ticket = Streams.Ticket;
    subject: Subjects.TicketCreated = Subjects.TicketCreated;
    queueGroupName: QueueGroupName.OrderService = QueueGroupName.OrderService;
}

Order Service TicketCreated Subscriber

export class TicketCreatedSubscriber extends Subscriber<TicketCreatedEvent> {
    stream: Streams.Ticket = Streams.Ticket;
    subject: Subjects.TicketCreated = Subjects.TicketCreated;
    queueGroupName: QueueGroupName.OrderService = QueueGroupName.OrderService;

    async onMessage(data: TicketCreatedEvent["data"], msg: JsMsg) {

      try {
        const ticket = Ticket.createTicket({
            id: data.id,
            price: data.price,
            title: data.title,
        });
        //ticket.save({});   

        console.log(`
        From Orders:TicketCreatedSubscriber onMessage 

        _id: ${ticket._id}
         id: ${ticket.id}
         title: ${ticket.title}
         price: ${ticket.price}
         errors: ${ticket.errors}
        `,)

      } catch (err) {
        console.log(`Orders: TicketCreatedSubscriber Error (This needs to be developed)
        ${err}`)

      }finally{
        msg.ack()
      }

    }}
Amansg4 commented 1 year ago

@aricart

k8s

NATS deployment

apiVersion: apps/v1
kind: Deployment
metadata:
  name: nats-depl
spec:
  replicas: 3
  selector:
    matchLabels:
      app: nats
  template:
    metadata:
      labels:
        app: nats
    spec:
      containers:
      - name: nats
#             volumeMounts: 
#               - mountPath: 
#                 name: 
#             volumeDevices:
#               - devicePath: 
#                 name:
        image: nats
        ports:
        - containerPort: 4222
          name: nats-depl
        args:
            [
                "-p",
                "4222",
                "-m",
                "8222",
                "-D",
                "--cluster_name",
                "asg4-nats-cluster",
                "-n",
                "asg4-nats",
                "-DV",
                "-js",
            ]
#      - name: nginx
#        image: k8s.gcr.io/nginx
#        ports:
#        - containerPort: 80
#          name: web
#        volumeMounts:
#        - name: www
#          mountPath: /usr/share/nginx/html
---
apiVersion: v1
kind: Service
metadata:
    name: nats-srv
spec:
    selector:
        app: nats
    clusterIP: None
    ports:
        - name: client
          protocol: TCP
          port: 4222
          targetPort: 4222

        - name: clustering
          protocol: TCP
          port: 6222
          targetPort: 6222

        - name: monitoring
          protocol: TCP
          port: 8222
          targetPort: 8222

        - name: metrics
          protocol: TCP
          port: 7777
          targetPort: 7777

        - name: leafnodes
          protocol: TCP
          port: 7422
          targetPort: 7422

        - name: gateways
          protocol: TCP
          port: 7522
          targetPort: 7522

ticket deployment

apiVersion: apps/v1
kind: Deployment
metadata:
 name: tickets-depl
spec:
 replicas: 1
 selector:
  matchLabels:
   app: tickets
 template:
  metadata:
   labels:
    app: tickets
  spec:
   containers:
    - name: tickets
      image: asg4/tickets:latest
      env:
      - name: NATS_SERVER_URL
        value: 'nats://nats-srv:4222'
#        value: 'nats://127.0.0.1:4222'
      - name: NATS_CLUSTER_STREAM_NAME
        value: 'ticket'
      - name: NATS_CLIENT_ID
        valueFrom:
         fieldRef:
          fieldPath: metadata.name
      - name: MONGO_URI
        value: 'mongodb://tickets-mongo-srv:27017/tickets'
      - name: JWT_KEY
        valueFrom:
         secretKeyRef:
          name: jwt-secret
          key: JWT_KEY
---
apiVersion: v1
kind: Service
metadata:
 name: tickets-srv
spec:
 selector:
  app: tickets
 ports:
  - name: tickets
    protocol: TCP
    port: 3000
    targetPort: 3000
aricart commented 1 year ago

Sorry you are asking me to debug your app. Love to help create a test case that I can wrap my head around in a reasonable amount of time.

Amansg4 commented 1 year ago

@aricart Not my intention.

Sorry you are asking me to debug your app. Love to help create a test case that I can wrap my head around in a reasonable amount of time.

I have no idea of your stream, and whether your consumer are getting clobbered because you are specifying some options on the cli, and something else on the code - hint - you should be using bind so the subscribes fail if the consumer doesn't exist.

Seems like you are mixing CLI with code, so I don't know what to expect, if your Kubernetes setup is doing something strange it is possible. Again cannot tell what is jetstream vs. corenats.

You implied earlier that I might have been mixing code etc... and that you wouldn't know what was CLI (none) or internals unless I provided code. Also you couldn't tell my streams, consumers, subs, k8s structure.

I can point precisely to the regions that are of concern, I just thought you might have an easier insight if I provided the internal verbose property in the NatsConnection in addition to detailed responses as to what is occurring step by step.

Hence my personally created and exhaustive console logs within the code so that it can be simply read instead of interpreted... Most of the code is just console.log() and specific property choices (all reflected in the terminal output). If I removed that I am absolutely sure it would be relatively straight forward, especially for you.

You debugging wasn't my intention. Just proof that I am using code and the problem repeats.

Basically I'm just trying to send simple JSON formatted data from a publisher in Ticket Service to a subject, have a consumer in Order Service receive it, and finally a subscriber within Order Service to process it. That and trying to understand how my monitoring outputs may/may not present valid info.

Is there something more specific, condensed or something else I can offer? Have I committed a github issues faux paw of sorts? I can help create a test for you perhaps? I never created a sample test for another and it would be my first time, but I wouldn't mind learning.

Amansg4 commented 1 year ago

@ripienaar

or anyone else...

Just to clarify the misunderstanding, nothing is created by the NATS CLI (or at least not directly by my hands). All the code is in typescript and ran in Node. NATS CLI is used to observe and check the presence of consumers, stream, connections, etc...

Subscribers are meant interact with consumers by binding to them and are coded as such.

Amansg4 commented 1 year ago

@aricart I think I picked up what your putting down. This is a little testing grounds for you to run the code and take a look when you get a chance. It's reduced and hopefully straightforward (https://github.com/Amansg4/asg-microservices-nats.git). Thanks.

aricart commented 1 year ago

@Amansg4 not sure what to tell you, the levels of indirection in the meta framework make it very difficult to understand your code without spending considerable amount of time, your assertions possibly make it more complicated.

In the case of the consumers, some of the options seem to be directly assigned rather than using the API (might be reading it wrong) - but for example ack_wait in JetStreamBaseListener - seems to have a 5000 nanosecond wait - again might be reading it wrong. - note that with the builder, it takes nanos, but on the configurations they are nanos may want to check that.

Make sure you are not calling destroy() on jetstream subscriptions - that api call destroys the consumer.

You may consider creating a single stream - your subjects for the different events already prefix the stream name, so you can just put them in a single stream and filter accordingly.

If you want to monitor the consumer rather than invoke apis that require multiple round trips to the server - jetstreamManager(), consumer.info(), etc, instead create a task that runs periodically showing the state of your consumers.

On the various configurations you are telegraphing and in using many of the options - in most cases you just need the default options - unless you have really good reason to change something, it is best to let the server do the defaults. An update on the JetStream API simplifies all this, and mostly focuses you on the number of messages that should be buffered to you. That should be out this quarter or next.

You should favor pull consumers vs the push consumers, as they will do a couple of things for you - if the consumer is indeed getting destroyed, you'll know about it rather than stalling because the pulls will fail. If the network topology of the cluster changes, etc, it will reorganize, and lastly they scale horizontally without having to use the queue functionality - also noticed that the subject you use for deliver is a plain subject that could easily be used by some other client, the server likely will send jetstream messages there if you do a core nats subscription - that is not what you want.

If you want to create a simple example where you create a stream and however many consumers and read from the stream and show how the consumer is going away, I can take a look at that - I would suspect such a sample would be 100 lines of code or less.

aricart commented 1 year ago

May also want to try nats events --all --no-context --json if there was any call to delete etc or advisory, it possibly would show there. (the --no-context will try to access nats on localhost:4222, you have have to specify different options)

Amansg4 commented 1 year ago

@aricart I really appreciate you taking the time to take a deeper look into the code and offering suggestions, safeguards, fyi's, etc... I'll take some more time in evaluating it per your suggestions. I don't mind knowing the details of NATS (don't want to rely too much on simplified API although it does have benefits in maintaining structure and preventing mistakes) but I do sometimes have a tendency to confuse myself with what is possible. Also thanks for offering guidelines of what is best in terms of creating a not so overwhelming tests for issues and what not. If I come across any fixes I'll post them here ASAP and close the issue. Thanks again and looking forward to further NATS updates!