Azure / azure-sdk-for-js

This repository is for active development of the Azure SDK for JavaScript (NodeJS & Browser). For consumers of the SDK we recommend visiting our public developer docs at https://docs.microsoft.com/javascript/azure/ or our versioned developer docs at https://azure.github.io/azure-sdk-for-js.
MIT License
2.09k stars 1.2k forks source link

`processError` incorrectly being called after closing and re-opening receiver #28798

Open JPStrydom opened 8 months ago

JPStrydom commented 8 months ago

Describe the bug Whenever I close a Service Bus receiver and restart it. Then next message to the queue will process correctly (by processMessage), but the processError function gets called a lot with seemingly outdated errors.

To Reproduce Steps to reproduce the behavior:

  1. Start the receiver with class code similar to the following:

    async processMessage(message) {
      logger.info(`Message received with body: ${message.body}`);
      await this.receiver.completeMessage(message);
    }
    
    async processError(error) {
      logger.error(
        `Error occurred with ${args.entityPath} within ${args.fullyQualifiedNamespace}: `,
         args.error
       );
    }
    
    async startSubscription() {
      // Logic to prevent restarting already active subscription
      // Logic to read connection string and queue name from config
    
      this.serviceBusClient = new ServiceBusClient(connectionString);
      this.receiver = this.serviceBusClient.createReceiver(notificationQueueName);
      this.subscription = this.receiver .subscribe({
        processMessage: this.processMessage.bind(this), // I've also tried message => this.processMessage(message) but the issue persisted
        processError: this.processError.bind(this) // I've also tried error=> this.processError(error) but the issue persisted
      });
    }
  2. Close the receiver with code similar to the following:

    async stopSubscription() {
       // Logic to prevent re-stopping already inactive subscription
    
      // I've tried stopping these in every order/combination possible, but the issue persists
      await this.receiver.close();
      // await this.subscription.close(); 
      // await this.serviceBusClient.close();  
    }
  3. Restart the receiver with the startSubscription method.
  4. Send a message to the queue.
  5. See how the processMessage gets called correctly once:
    Message received with body: XXX
  6. See how the processError gets called incorrectly with the following error 250 times:
     Error occurred with XXX within XXX:  The receiver for "XXX" has been closed and can no longer be used. Please create a new receiver using the "createReceiver" method on the ServiceBusClient.

Expected behavior No matter how message handlers are passed in, they should be stopped correctly when closing the receiver.

Additional context When passing the message handlers in as inline functions, the issue seems to disappear. The following startReceiver code doesn't seem to introduce the error.

async startSubscription() {
  // Logic to prevent restarting already active subscription
  // Logic to read connection string and queue name from config

  this.serviceBusClient = new ServiceBusClient(connectionString);
  this.receiver = this.serviceBusClient.createReceiver(notificationQueueName);
  this.subscription = this.receiver .subscribe({
    processMessage: async message => {
      logger.info(`Message received with body: ${message.body}`);
      await this.receiver.completeMessage(message);
    },
    processError: async args => {
      logger.error(
        `Error occurred with ${args.entityPath} within ${args.fullyQualifiedNamespace}: `,
        args.error
      );
    }
  });
}
deyaaeldeen commented 8 months ago

@JPStrydom Thanks for reaching out! That behavior doesn't sound right indeed. Let me take a look and will get back to you.

deyaaeldeen commented 8 months ago

Hi @JPStrydom, I am not able to repro the behavior you're reporting. Here is my code which is largely based on yours but is self-contained:

const { ServiceBusClient } = require("@azure/service-bus");

// Load the .env file if it exists
require("dotenv").config();

// Define connection string and related Service Bus entity names here
const connectionString = process.env.SERVICEBUS_CONNECTION_STRING || "<connection string>";
const notificationQueueName = process.env.QUEUE_NAME || "<queue name>";

const firstSetOfMessages = [
  { body: "Albert Einstein" },
  { body: "Werner Heisenberg" },
  { body: "Marie Curie" },
  { body: "Steven Hawking" },
  { body: "Isaac Newton" },
];

const secondSetOfMessages = [
  { body: "Niels Bohr" },
  { body: "Michael Faraday" },
  { body: "Galileo Galilei" },
  { body: "Johannes Kepler" },
  { body: "Nikolaus Kopernikus" },
];

class App {
  async processMessage(message) {
    console.log(`Message received with body: ${message.body}`);
    await this.receiver.completeMessage(message);
  }

  async processError(args) {
    console.error(
      `Error occurred with ${args.entityPath} within ${args.fullyQualifiedNamespace}: `,
       args.error
     );
  }

  async startSubscription() {
    // Logic to prevent restarting already active subscription
    // Logic to read connection string and queue name from config

    this.serviceBusClient = new ServiceBusClient(connectionString);
    this.receiver = this.serviceBusClient.createReceiver(notificationQueueName);
    this.subscription = this.receiver .subscribe({
      processMessage: this.processMessage.bind(this), // I've also tried message => this.processMessage(message) but the issue persisted
      processError: this.processError.bind(this) // I've also tried error=> this.processError(error) but the issue persisted
    });
  }

  async stopSubscription() {
    // Logic to prevent re-stopping already inactive subscription

   // I've tried stopping these in every order/combination possible, but the issue persists
   await this.receiver.close();
   // await this.subscription.close(); 
   // await this.serviceBusClient.close();  
 }
}

async function main() {
  const app = new App();
  await app.startSubscription();
  const sbClient = app.serviceBusClient;

  // createSender() can also be used to create a sender for a topic.
  let sender = sbClient.createSender(notificationQueueName);

  try {
    // Tries to send all messages in a single batch.
    // Will fail if the messages cannot fit in a batch.
    console.log(`Sending the first 5 scientists (as an array)`);
    await sender.sendMessages(firstSetOfMessages);

    // Sends all messages using one or more ServiceBusMessageBatch objects as required
    let batch = await sender.createMessageBatch();

    for (const message of secondSetOfMessages) {
      if (!batch.tryAddMessage(message)) {
        // Send the current batch as it is full and create a new one
        await sender.sendMessages(batch);
        batch = await sender.createMessageBatch();

        if (!batch.tryAddMessage(message)) {
          throw new Error("Message too big to fit in a batch");
        }
      }
    }
    // Send the batch
    console.log(`Sending the last 5 scientists (as a ServiceBusMessageBatch)`);
    await sender.sendMessages(batch);

    // Close the sender
    console.log(`Done sending, closing...`);
    await sender.close();

    await app.stopSubscription();
    console.log(`Done subscribing.`);

    await app.startSubscription();
    console.log(`Re-subscribed.`);

    sender = sbClient.createSender(notificationQueueName);
    console.log(`Sending another 5 scientists`);
    await sender.sendMessages(batch);

    console.log(`Done sending`);
  } finally {
    await sbClient.close(); 
  }
}

main().catch((err) => {
  console.log("sendMessages Sample: Error occurred: ", err);
  process.exit(1);
});

And here is a sample output:

Sending the first 5 scientists (as an array)
Sending the last 5 scientists (as a ServiceBusMessageBatch)
Message received with body: Albert Einstein
Done sending, closing...
Message received with body: Werner Heisenberg
Done subscribing.
Re-subscribed.
Sending another 5 scientists
Done sending
Message received with body: Marie Curie
Message received with body: Steven Hawking
Message received with body: Isaac Newton
Message received with body: Niels Bohr
Message received with body: Michael Faraday
Message received with body: Galileo Galilei
Message received with body: Johannes Kepler
Message received with body: Nikolaus Kopernikus
Message received with body: Niels Bohr
Message received with body: Michael Faraday
Message received with body: Galileo Galilei
Message received with body: Johannes Kepler
Message received with body: Nikolaus Kopernikus

Also, notice that the processError in your code is not referencing the input argument.

github-actions[bot] commented 8 months ago

Hi @JPStrydom. Thank you for opening this issue and giving us the opportunity to assist. To help our team better understand your issue and the details of your scenario please provide a response to the question asked above or the information requested above. This will help us more accurately address your issue.

JPStrydom commented 7 months ago

@deyaaeldeen thanks so much for looking into this!

Here is the code I'm using (Module type NodeJS):

For receiving

notification-queue.class.js

import { BadRequest } from '@feathersjs/errors';
import { ServiceBusClient } from '@azure/service-bus';

export class NotificationQueueService {
  async processMessage(message) {
    console.info(`Notification Queue Subscription - Message received with body: ${message.body}`);
    await this.queueReceiver.completeMessage(message);
  }

  async processError(args) {
    consols.error(
      `Notification Queue Subscription - Error occurred with ${args.entityPath} within ${args.fullyQualifiedNamespace}: `,
      args.error
    );
  }

  constructor(app) {
    this.app = app;

    this.processMessage = this.processMessage.bind(this);
    this.processError = this.processError.bind(this);

    this.create();
  }

  async find() {
    const subscriptionActive = Boolean(this.serviceBusClient && this.queueReceiver);
    return { subscriptionActive };
  }

  async create() {
    const { subscriptionActive } = await this.find();
    if (subscriptionActive) {
      throw new BadRequest('Subscription Already Active');
    }

    const { connectionString, notificationQueueName } = this.app.get('azureServiceBus');

    this.serviceBusClient = new ServiceBusClient(connectionString);
    this.queueReceiver = this.serviceBusClient.createReceiver(notificationQueueName);

    this.queueReceiver.subscribe({
      processMessage: createMessageProcessor(this),
      processError: createErrorProcessor(this)
    });

    return { subscriptionActive: true };
  }

  async remove() {
    const { subscriptionActive } = await this.find();
    if (!subscriptionActive) {
      throw new BadRequest('Subscription Already Inactive');
    }

    await this.queueReceiver.close();
    await this.serviceBusClient.close();

    this.queueReceiver = null;
    this.serviceBusClient = null;

    return { subscriptionActive: false };
  }
}

export const getOptions = app => app;

notification-queue.js

import { getOptions, NotificationQueueService } from './notification-queue.class.js';

export const notificationQueue = app => {
  app.use('notification-queue', new NotificationQueueService(getOptions(app)), {
    methods: ['find', 'create', 'remove'],
    events: []
  });
  app.service('notification-queue').hooks({
    around: {
      all: []
    },
    before: {
      all: [],
      find: [],
      get: [],
      create: [],
      patch: [],
      remove: []
    },
    after: {
      all: []
    },
    error: {
      all: []
    }
  });
};

For Sending

send-notification.class.js

import { ServiceBusClient } from '@azure/service-bus';

export class SendNotificationService {
  constructor(app) {
    this.app = app;
  }

  async create(data) {
    const { connectionString, notificationQueueName } = this.app.get('azureServiceBus');

    const serviceBusClient = new ServiceBusClient(connectionString);
    const queueSender = serviceBusClient.createSender(notificationQueueName);

    await queueSender.sendMessages({ body: JSON.stringify(data) });

    await queueSender.close();
    await serviceBusClient.close();

    return data;
  }
}

export const getOptions = app => app;

send-notification.js

import { getOptions, SendNotificationService } from './send-notification.class.js';

export const sendNotification = app => {
  app.use('send-notification', new SendNotificationService(getOptions(app)), {
    methods: ['create'],
    events: []
  });
  app.service('send-notification').hooks({
    around: {
      all: []
    },
    before: {
      all: [],
      find: [],
      get: [],
      create: [],
      patch: [],
      remove: []
    },
    after: {
      all: []
    },
    error: {
      all: []
    }
  });
};

Usage

I then configure the services on my app in my app.js file with:

app.configure(notificationQueue);
app.configure(sendNotification);

After which I can call it with:

My test case is usually:

  1. Send notification (Sending.1),
  2. Verify that it's received.
  3. Stop the subscription (Receiving.3).
  4. Start the subscription (Receiving.2)
  5. Send notification (Sending.1)

Update 19/03/2024

For some reason the error is not happening on my end anymore. I have an e2e test to catch it and it now passes with both implementations.

I see some of the libraries I've been using has been updated since (the ones from Feathers) mainly, which might have changed the implementation. Or there might have been some changes on our Service Bus instance that fixed the issue.

Quite strange. I'll keep an eye out for it and see if anything else changes. If I have some time I'll try to revert some of the libraries to see if I can reproduce the error.

Update 20/03/2024

This behavior started happening again today. Having my message processors inside the Feathers service causes this issue, i.e. having the following causes the issues:

export class NotificationQueueService {
  async processMessage(message) {
    console.info(`Notification Queue Subscription - Message received with body: ${message.body}`);
    await this.queueReceiver.completeMessage(message);
  }

  async processError(args) {
    consols.error(
      `Notification Queue Subscription - Error occurred with ${args.entityPath} within ${args.fullyQualifiedNamespace}: `,
      args.error
    );
  }

  constructor(app) {
    this.app = app;

    this.processMessage = this.processMessage.bind(this);
    this.processError = this.processError.bind(this);

    this.create();
  }

  // ...

    this.queueReceiver.subscribe({
      processMessage: createMessageProcessor(this),
      processError: createErrorProcessor(this)
    });

Changing to the following fixes the issue on my end:

const createMessageProcessor =
  ({ queueReceiver }) =>
  async message => {
    // TODO: Add dedupe logic by checking notification history
    // TODO: Add notification history logic and capture all auditing information around notification (notification type, enqueued time, processes time, processes count)
    // TODO: Add logic to send notification via notification service providers (Firebase, PushKit)
    // TODO: Add resend and dead letter logic
    // TODO: Once the correct logic has been implemented, the relevant tests will need to be updated accordingly (test/commands/send-notification/send-notification.test.js)
    logger.info(`Notification Queue Subscription - Message received with body: ${message.body}`);
    await queueReceiver.completeMessage(message);
  };

const createErrorProcessor = () => async args => {
  // TODO: Figure out exactly how this process error logic works. It appears to only get called if an error within processMessage occurs.
  // TODO: Add notification history logic and capture all auditing information around notification (notification type, enqueued time, processes time, processes count)
  // TODO: Add error, resend, and/or dead letter logic
  logger.error(
    `Notification Queue Subscription - Error occurred with ${args.entityPath} within ${args.fullyQualifiedNamespace}: `,
    args.error
  );
};

export class NotificationQueueService {

  // ..

    this.queueReceiver.subscribe({
      processMessage: createMessageProcessor(this),
      processError: createErrorProcessor()
    });
deyaaeldeen commented 7 months ago

I am glad that the issue is resolved! I'll close this issue for now but please feel free to re-open it if it re-occurs. Thank you for your patience!

JPStrydom commented 7 months ago

@deyaaeldeen I unfortunately noticed this incorrect behavior again today. Any change we can reopen this?

Update 20/03/2024

This behavior started happening again today. Having my message processors inside the Feathers service causes this issue, i.e. having the following causes the issues:

export class NotificationQueueService {
  async processMessage(message) {
    console.info(`Notification Queue Subscription - Message received with body: ${message.body}`);
    await this.queueReceiver.completeMessage(message);
  }

  async processError(args) {
    consols.error(
      `Notification Queue Subscription - Error occurred with ${args.entityPath} within ${args.fullyQualifiedNamespace}: `,
      args.error
    );
  }

  constructor(app) {
    this.app = app;

    this.processMessage = this.processMessage.bind(this);
    this.processError = this.processError.bind(this);

    this.create();
  }

  // ...

    this.queueReceiver.subscribe({
      processMessage: createMessageProcessor(this),
      processError: createErrorProcessor(this)
    });

Changing to the following fixes the issue on my end:

const createMessageProcessor =
  ({ queueReceiver }) =>
  async message => {
    // TODO: Add dedupe logic by checking notification history
    // TODO: Add notification history logic and capture all auditing information around notification (notification type, enqueued time, processes time, processes count)
    // TODO: Add logic to send notification via notification service providers (Firebase, PushKit)
    // TODO: Add resend and dead letter logic
    // TODO: Once the correct logic has been implemented, the relevant tests will need to be updated accordingly (test/commands/send-notification/send-notification.test.js)
    logger.info(`Notification Queue Subscription - Message received with body: ${message.body}`);
    await queueReceiver.completeMessage(message);
  };

const createErrorProcessor = () => async args => {
  // TODO: Figure out exactly how this process error logic works. It appears to only get called if an error within processMessage occurs.
  // TODO: Add notification history logic and capture all auditing information around notification (notification type, enqueued time, processes time, processes count)
  // TODO: Add error, resend, and/or dead letter logic
  logger.error(
    `Notification Queue Subscription - Error occurred with ${args.entityPath} within ${args.fullyQualifiedNamespace}: `,
    args.error
  );
};

export class NotificationQueueService {

  // ..

    this.queueReceiver.subscribe({
      processMessage: createMessageProcessor(this),
      processError: createErrorProcessor()
    });

The issue on my end is reproduced when:

  1. Send notification (Sending.1),
  2. Verify that it's received.
  3. Stop the subscription (Receiving.3).
  4. Send notification (Sending.1).
  5. Start the subscription (Receiving.2)
  6. See the incorrect errors being thrown by the subscription.
deyaaeldeen commented 7 months ago

Thanks for sharing the additional context! Re-opening and will try your repro.

jeremymeng commented 5 months ago

@JPStrydom did you meant that after changing to createMessageProcessor and createMessageProcessor you still see the issue happening? I would not expect that the subscription still receives errors after receiver is closed but there may be some bug in our code. If it is easy for you to reproduce, could you please enable more verbose logging for our SDK and share the logs? The instruction is here https://github.com/Azure/azure-sdk-for-js/blob/a295f2bb4d3c1e4292f1684500186a641380c58d/sdk/servicebus/service-bus/README.md#L484

JPStrydom commented 5 months ago

@JPStrydom did you meant that after changing to createMessageProcessor and createMessageProcessor you still see the issue happening? I would not expect that the subscription still receives errors after receiver is closed but there may be some bug in our code. If it is easy for you to reproduce, could you please enable more verbose logging for our SDK and share the logs? The instruction is here https://github.com/Azure/azure-sdk-for-js/blob/a295f2bb4d3c1e4292f1684500186a641380c58d/sdk/servicebus/service-bus/README.md#L484

We've paused work on the AppInsights integration due to all the instability, but as soon as I have some time again I'll take a look.