senecajs / seneca-amqp-transport

Official AMQP transport plugin for Seneca
MIT License
68 stars 25 forks source link

Idea: What about adding model:observe similar to mesh? #83

Open sberryman opened 7 years ago

sberryman commented 7 years ago

I would like to log some searches going on in the system but I don't need to reply back. In seneca-mesh there is the concept of model:consume and model:observe.

I added model:observe to the listen options and then made this minor change to lib/listener.js handleMessage function.

handleMessage(message, data) {
  return this.utils.handle_request(this.seneca, data, this.options, (out) => {
    if (!out) {
      return;
    }
    if (typeof this.options.model !== 'string' || this.options.model !== 'observe') {
        var outstr = this.utils.stringifyJSON(this.seneca, `listen-${this.options.type}`, out);
        this.transport.channel.sendToQueue(message.properties.replyTo, new Buffer(outstr), {
          correlationId: message.properties.correlationId
        });
    }
    this.transport.channel.ack(message);
  });
}

Based on your develop branch it would be pretty trivial to add it to the createReplyWith function

nfantone commented 7 years ago

@sberryman I don't dislike the idea. But if you don't handle the client side of things, any client acting on a consumer with model:observe will eventually TIMEOUT and crash. Or am I missing something?

sberryman commented 7 years ago

Hmmm, model:observe should ONLY be set on seneca.listen() and not on seneca.client(). That is how it works on seneca-mesh as you only define what you are listening for.

The point of this is to peek into messages/acts or have other microservices react based on messages already going through the service bus. I don't like the idea of having a microservice broadcast something that is already hitting the bus, especially when it isn't manipulating the message or replying.

Example 1: role:business,cmd:search will contain some search parameters and will hit a microservice that will perform the search and return the results. Lets say one those search parameters was a geolocation (lat/lng). Now I can create another micro-service that will also listen for role:business,cmd:search and will simply log the customers location into elasticsearch with a timestamp. At this point I now have a dataset getting updated with searches which I can map based on time range but it wouldn't matter if it went down. The main microserivce created to respond to role:business,cmd:search would respond as expected.

Example 2: role:product,cmd:adjust-inventory would most likely update the quantity of product available and would then broadcast/publish role:product,cmd:updated or something along those lines. Then I would have two microservices setup in observe mode which could index that data into elastic search while another emails to customers who have subscribed to availability changes.

The point is to make it so the role:product service doesn't know anything about any other service that may be listening for a specific topic. I know this is a very sore spot for seneca at the moment as it assumes everything is req/res.

sberryman commented 7 years ago

@nfantone this may be a symptom of how I have setup my project. I have all api routes (hapi) running on a single service (load balanced) which has a single seneca.client({type:amqp, pin: 'role:*,cmd:*'}) defined. Then I have roughly 20 microservices (seneca.listen()) handling the business logic.

My other option for model:observe is creating a little client which connects to amqp and binds to the exchange and completely bypasses seneca and amqp-transport.

nfantone commented 7 years ago

@sberryman I think I understand your scenario. But, as you rightly pointed out, "seneca at the moment, [...] assumes everything is req/res". At least, this plugin does. So when you set up a listener to not return a value, any client communicating with it (i.e.: calling an .act with a matching pattern) will create a reply queue and wait for a response back. If that response doesn't arrive in under the timeout threshold, it will throw an exception.

That being said, you could use something like seneca-fire-and-forget + your modifications to this transport to achieve something similar.

sberryman commented 7 years ago

I tend to use seneca.fixedargs.fatal$ = false; on all the microservices until they figure out a good way of handling errors without changing the node developers are accustomed to errors in callbacks. Having to train everyone to use { ok:false, err } is quite annoying and adds more work.

With that said you don't really need fire-and-forget. I'll keep playing around with this idea.

nfantone commented 7 years ago

Even so, setting fatal$ to false just works around the actual problem. So, in this sense you do need something like seneca-fire-and-forget.

The problem here is that seneca-transport -what this transport is based on- is pretty restrictive in what it allows you to do in terms of avoiding the RPC pattern. One enhancement could be replacing it with custom, more flexible code that would allow for more room to implement features like the one you are describing here.