onebeyond / rascal

A config driven wrapper for amqp.node supporting multi-host connections, automatic error recovery, redelivery flood protection, transparent encryption / decryption and channel pooling.
MIT License
451 stars 69 forks source link

How do I get the processed message (by worker) in express #167

Closed antick closed 3 years ago

antick commented 3 years ago

Hi,

This is a much better library than other available amqplb libraries out there.

Although, I am not able to figure out one thing with this.

I have the following code in my node express route:

module.exports.testing = controller(async (req, res) => {
    const broker = await Broker.create(config);
    broker.on('error', err => {
      console.error(err);
    });

    const publication = await broker.publish('demo_pub', params.text);

    publication.on('error', err => {
      console.error(err);
    });

    publication.on('success', taskId => {
      res.send({
        taskId
      });
    });

});

I'm running a worker on yarn start with this code:

const Broker = require('rascal').BrokerAsPromised;
const config = require('../api-1/config/queue');

(async () => {
  const broker = await Broker.create(config);
  broker.on('error', err => {
    console.error(err);
  });

  const subscription = await broker.subscribe('demo_sub');
  subscription.on('message', async (message, content, ackOrNack) => {
    const processed = await doingSomethingWithContent(content);
    console.log(processed);
    ackOrNack();
  }).on('error', console.error);
})();

Everything works fine so far. I'm able to get the processed message in this worker and it's printed in the terminal.

Now I want to create an endpoint in node express to fetch the above processed output based on the taskId/messageId that I received while I published it. I'm not sure how can I do this. I have tried this but it's not working:

module.exports.worker1 = controller(async (req, res) => {
  console.log(req.query.taskId); // not sure where to use this messageId/taskId

  const broker = await Broker.create(config);
  broker.on('error', err => {
    console.error(err);
  });

  const subscription = await broker.subscribe('demo_sub');
  console.log('code wont run after this');

  subscription.on('message', async (message, content, ackOrNack) => {
    console.log('this never executes while hitting this endpoint')
    // ackOrNack();

    res.send({
      message
    });
  }).on('error', console.error);
});

Please can someone help me with this?

cressie176 commented 3 years ago

Hi @antick, Sorry missed this. Will respond shortly.

cressie176 commented 3 years ago

Hi @antick,

After going through your issue, I understand you want to do the following...

Stage 1

  1. Receive an HTTP request
  2. Publish a message
  3. Respond to the HTTP request with a task id associated with the message

Stage 2 (separate worker)

  1. Receive the message
  2. Process the message

Stage 3

  1. Receive an HTTP request, specifying the task id
  2. Fetch the output of the processed message from stage 2
  3. Respond to the HTTP request with this output

If my understanding is correct, you need to use a database to store the output of the message in stage 2, and retrieve this from the database in stage 3. You cannot retrieve messages from a queue by id.

Also you should only need to call Broker.create once per application, rather than from express middleware.

antick commented 3 years ago

This makes sense. Thanks a lot.