Daniel-t / node-red-contrib-aws

A collection of Node-RED nodes for AWS.
Apache License 2.0
57 stars 59 forks source link

Issue when querying a dynamodb in a split loop sequence of messages #54

Closed TibGva closed 3 years ago

TibGva commented 6 years ago

Hi

I have an issue when using the dynamodb node within a split sequence of an array of values. I use an id part of the msg obtain sequentially after the split, but when I hit the dynamodb node, the output msg is transformed an only keep the values of the last msg of the sequence (last from the array). Is it a normal behaviour?

shrickus commented 5 years ago

No, this is not normal... I'm seeing the same thing when using the aws lambda node inside a split/join flow. In order for the join to work, the msg.parts object must remain intact. Instead, the last one that was received is being returned by all the callbacks.

In my tests, I have an array of 3 msg objects, representing 3 sets of args to pass to Lambda. After the split node, the first msg.parts looks like this:

{"id":"52f290c1.afbc","type":"array","count":3,"len":1,"index":0}

but when returned from aws, the output msg.parts index is set to the last input msg index value:

{"id":"52f290c1.afbc","type":"array","count":3,"len":1,"index":2}`

So after the join, all of the results that were calculated in Lambda are placed into a 3-element array -- but since they all have index = 2, the output array is full of nulls, except for the last element:

[null, null, {... results of the last aws call ...}]
Daniel-t commented 5 years ago

Sorry I'm not following.
Can one of you provide a test scenario? With an example lambda script or dynamo table, and flow.

shrickus commented 5 years ago

Here is a thread on slack that highlights what I found: https://node-red.slack.com/archives/C03M2TAQ8/p1544543818421100

Basically, the problem is within this code: (## comments are mine)

this.on('input', function(msg) {    ## called with every new msg object
    node.cb = function(err, data) {    ## these args do not have a reference to the incoming object...
        if (err) {
            node.status({fill:"red",shape:"ring",text:"error"});
            node.error("failed: " + err.toString(),msg);
            return;
        } else {
            msg.payload = data;    ## the scope of this msg object is outside of this callback function
            node.status({});
        }
        node.send(msg);    ## when the cb finally gets called, it uses the last msg that arrived
    };
    // pass 'msg' to lambda, and invoke callback...
    ## when multiple msgs arrive, they all get passed to AWS before any node.cb function is invoked
    ec2.invoke(params, node.cb);    
});

This is not a problem only with AWS, but with any async call that invokes a callback function when it returns its data. I'll see if I can find an example of how this is handled in other async service nodes.

shrickus commented 5 years ago

Ok, it looks like one technique is to pass the incoming msg object to a function that returns another function, which has the signature expected by the AWS callback logic -- something like this (untested):

this.on('input', function(msg) {
    function aws_cb(msg_in) {
        return function(err, data) {
            if (err) {
                node.status({fill:"red",shape:"ring",text:"error"});
                node.error("failed: " + err.toString(), msg_in);
                return;
            } else {
                msg_in.payload = data;
                node.status({});
            }
            node.send(msg_in);
        };
    };
    // pass incoming 'msg' to lambda, and invoke callback with original msg...
    ec2.invoke(params, aws_cb(msg));
});

So each time a new msg arrives, we create a callback function by passing in the msg object: aws_cb(msg) That callback function contains the original msg object (and its "parts" property for the join node to work correctly).

Daniel-t commented 3 years ago

I expect this to be resolved in the recent update. (which changed the scope of the msg object)