alexandrainst / node-red-contrib-chunks-to-lines

Node-RED node to read line by line from a stream of chunks of text.
MIT License
6 stars 4 forks source link

Few observations when trying to use the chunk node as a queue for multiple batches of input. #4

Closed prathamesh-gharat closed 1 year ago

prathamesh-gharat commented 1 year ago

Consider a batch size of 3 with input as a CSV and output set to CSV.

Example input 1.

1,1
2,2
3,3
4,4

Example input 2.

5,5
6,6
7,7
8,8

The outputs from the chunk node look like this currently. First output after the first input

1,1
2,2

Injected 2nd input into the chunk node. Injected tick into the chunk node. (the number of rows changes for some reason)

3,3
4,4
A,B

Injected tick into the chunk node.

5,5
6,6
7,7

Injected tick into the chunk node.

8,8
Alkarex commented 1 year ago

Hello and sorry for the delay. I am not sure to understand whether there is any problem. Would you be able to share a flow with explanations regarding what you expect vs. what you get?

prathamesh-gharat commented 1 year ago

Here is the flow. Inject it and wait for it to finish (~5s), then run it again. It looks like the column name is cached in memory. Is there any way to reset chunk-to-lines node without restarting node-red?

[
    {
        "id": "2bf85398182f60ba",
        "type": "inject",
        "z": "469fbb318319d39f",
        "name": "",
        "props": [
            {
                "p": "payload"
            },
            {
                "p": "topic",
                "vt": "str"
            }
        ],
        "repeat": "",
        "crontab": "",
        "once": false,
        "onceDelay": 0.1,
        "topic": "",
        "payload": "",
        "payloadType": "date",
        "x": 100,
        "y": 120,
        "wires": [
            [
                "4ae4a2870c6b83b5",
                "c48671df1b145b59"
            ]
        ]
    },
    {
        "id": "4ae4a2870c6b83b5",
        "type": "template",
        "z": "469fbb318319d39f",
        "name": "",
        "field": "payload",
        "fieldType": "msg",
        "format": "text",
        "syntax": "plain",
        "template": "A,B\n1,1\n2,2\n3,3\n4,4\n",
        "output": "str",
        "x": 380,
        "y": 120,
        "wires": [
            [
                "a95b88abd723cbc8"
            ]
        ]
    },
    {
        "id": "a95b88abd723cbc8",
        "type": "chunks-to-lines",
        "z": "469fbb318319d39f",
        "name": "",
        "nbLines": "3",
        "linesFormat": "csv",
        "decoder": "UTF-8",
        "x": 660,
        "y": 120,
        "wires": [
            [
                "3716556fea9a444c",
                "c4c03dee6020e079"
            ]
        ]
    },
    {
        "id": "c48671df1b145b59",
        "type": "delay",
        "z": "469fbb318319d39f",
        "name": "",
        "pauseType": "delay",
        "timeout": "0.5",
        "timeoutUnits": "seconds",
        "rate": "1",
        "nbRateUnits": "1",
        "rateUnits": "second",
        "randomFirst": "1",
        "randomLast": "5",
        "randomUnits": "seconds",
        "drop": false,
        "allowrate": false,
        "outputs": 1,
        "x": 390,
        "y": 180,
        "wires": [
            [
                "293e24a8e48b275d"
            ]
        ]
    },
    {
        "id": "293e24a8e48b275d",
        "type": "template",
        "z": "469fbb318319d39f",
        "name": "",
        "field": "payload",
        "fieldType": "msg",
        "format": "text",
        "syntax": "plain",
        "template": "5,5\n6,6\n7,7\n8,8\n",
        "output": "str",
        "x": 380,
        "y": 220,
        "wires": [
            [
                "a95b88abd723cbc8"
            ]
        ]
    },
    {
        "id": "3716556fea9a444c",
        "type": "debug",
        "z": "469fbb318319d39f",
        "name": "debug 6",
        "active": true,
        "tosidebar": true,
        "console": false,
        "tostatus": false,
        "complete": "payload",
        "targetType": "msg",
        "statusVal": "",
        "statusType": "auto",
        "x": 860,
        "y": 120,
        "wires": []
    },
    {
        "id": "c4c03dee6020e079",
        "type": "delay",
        "z": "469fbb318319d39f",
        "name": "",
        "pauseType": "delay",
        "timeout": "1",
        "timeoutUnits": "seconds",
        "rate": "1",
        "nbRateUnits": "1",
        "rateUnits": "second",
        "randomFirst": "1",
        "randomLast": "5",
        "randomUnits": "seconds",
        "drop": false,
        "allowrate": false,
        "outputs": 1,
        "x": 640,
        "y": 220,
        "wires": [
            [
                "ed13c8191fda4636"
            ]
        ]
    },
    {
        "id": "ed13c8191fda4636",
        "type": "change",
        "z": "469fbb318319d39f",
        "name": "",
        "rules": [
            {
                "t": "set",
                "p": "tick",
                "pt": "msg",
                "to": "true",
                "tot": "bool"
            }
        ],
        "action": "",
        "property": "",
        "from": "",
        "to": "",
        "reg": false,
        "x": 650,
        "y": 260,
        "wires": [
            [
                "a95b88abd723cbc8"
            ]
        ]
    }
]
Alkarex commented 1 year ago

At the moment, this node cannot be used to simultaneously work with multiple upstream sources sending some chunks at the same time.

However, I have made a new release 0.8.0, which fixes some problems with different sequences not using the msg.parts attribute. Now it is sufficient to add the msg.complete attribute to indicate the end of a sequence. Both are Node-RED conventions.

Tests welcome.

Please see this flow:

[
    {
        "id": "2bf85398182f60ba",
        "type": "inject",
        "z": "f6f2187d.f17ca8",
        "name": "",
        "props": [
            {
                "p": "payload"
            },
            {
                "p": "topic",
                "vt": "str"
            }
        ],
        "repeat": "",
        "crontab": "",
        "once": false,
        "onceDelay": 0.1,
        "topic": "",
        "payload": "",
        "payloadType": "date",
        "x": 200,
        "y": 300,
        "wires": [
            [
                "4ae4a2870c6b83b5",
                "c48671df1b145b59"
            ]
        ]
    },
    {
        "id": "4ae4a2870c6b83b5",
        "type": "template",
        "z": "f6f2187d.f17ca8",
        "name": "",
        "field": "payload",
        "fieldType": "msg",
        "format": "text",
        "syntax": "plain",
        "template": "A,B\n1,1\n2,2\n3,3\n4,4\n",
        "output": "str",
        "x": 540,
        "y": 260,
        "wires": [
            [
                "230fd29fd4470d36"
            ]
        ]
    },
    {
        "id": "a95b88abd723cbc8",
        "type": "chunks-to-lines",
        "z": "f6f2187d.f17ca8",
        "name": "",
        "nbLines": "3",
        "linesFormat": "csv",
        "decoder": "UTF-8",
        "x": 1020,
        "y": 300,
        "wires": [
            [
                "3716556fea9a444c",
                "c4c03dee6020e079"
            ]
        ]
    },
    {
        "id": "c48671df1b145b59",
        "type": "delay",
        "z": "f6f2187d.f17ca8",
        "name": "",
        "pauseType": "delay",
        "timeout": "5",
        "timeoutUnits": "seconds",
        "rate": "1",
        "nbRateUnits": "1",
        "rateUnits": "second",
        "randomFirst": "1",
        "randomLast": "5",
        "randomUnits": "seconds",
        "drop": false,
        "allowrate": false,
        "outputs": 1,
        "x": 400,
        "y": 340,
        "wires": [
            [
                "293e24a8e48b275d"
            ]
        ]
    },
    {
        "id": "293e24a8e48b275d",
        "type": "template",
        "z": "f6f2187d.f17ca8",
        "name": "",
        "field": "payload",
        "fieldType": "msg",
        "format": "text",
        "syntax": "plain",
        "template": "C,D\n5,5\n6,6\n7,7\n8,8\n",
        "output": "str",
        "x": 540,
        "y": 340,
        "wires": [
            [
                "590c3536097aa3b6"
            ]
        ]
    },
    {
        "id": "3716556fea9a444c",
        "type": "debug",
        "z": "f6f2187d.f17ca8",
        "name": "Debug",
        "active": true,
        "tosidebar": true,
        "console": false,
        "tostatus": false,
        "complete": "true",
        "targetType": "full",
        "statusVal": "",
        "statusType": "auto",
        "x": 1230,
        "y": 300,
        "wires": []
    },
    {
        "id": "c4c03dee6020e079",
        "type": "delay",
        "z": "f6f2187d.f17ca8",
        "name": "",
        "pauseType": "delay",
        "timeout": "1",
        "timeoutUnits": "seconds",
        "rate": "1",
        "nbRateUnits": "1",
        "rateUnits": "second",
        "randomFirst": "1",
        "randomLast": "5",
        "randomUnits": "seconds",
        "drop": false,
        "allowrate": false,
        "outputs": 1,
        "x": 1000,
        "y": 360,
        "wires": [
            [
                "ed13c8191fda4636"
            ]
        ]
    },
    {
        "id": "ed13c8191fda4636",
        "type": "change",
        "z": "f6f2187d.f17ca8",
        "name": "",
        "rules": [
            {
                "t": "set",
                "p": "tick",
                "pt": "msg",
                "to": "true",
                "tot": "bool"
            }
        ],
        "action": "",
        "property": "",
        "from": "",
        "to": "",
        "reg": false,
        "x": 1010,
        "y": 420,
        "wires": [
            [
                "a95b88abd723cbc8"
            ]
        ]
    },
    {
        "id": "230fd29fd4470d36",
        "type": "change",
        "z": "f6f2187d.f17ca8",
        "name": "",
        "rules": [
            {
                "t": "set",
                "p": "complete",
                "pt": "msg",
                "to": "true",
                "tot": "bool"
            }
        ],
        "action": "",
        "property": "",
        "from": "",
        "to": "",
        "reg": false,
        "x": 710,
        "y": 260,
        "wires": [
            [
                "a95b88abd723cbc8"
            ]
        ]
    },
    {
        "id": "590c3536097aa3b6",
        "type": "change",
        "z": "f6f2187d.f17ca8",
        "name": "",
        "rules": [
            {
                "t": "set",
                "p": "complete",
                "pt": "msg",
                "to": "true",
                "tot": "bool"
            }
        ],
        "action": "",
        "property": "",
        "from": "",
        "to": "",
        "reg": false,
        "x": 710,
        "y": 340,
        "wires": [
            [
                "a95b88abd723cbc8"
            ]
        ]
    }
]