n8n-io / n8n

Free and source-available fair-code licensed workflow automation tool. Easily automate tasks across different services.
https://n8n.io
Other
47.26k stars 7.05k forks source link

RabbitMQ Workflow had to be deactivated #9499

Open barn4k opened 4 months ago

barn4k commented 4 months ago

Bug Description

Yesterday I have found that one of my workflows started to send workflow errors with message:

There was a problem with the trigger node "Start", for that reason did the workflow had to be deactivated

I've started to dig into the issue and didn't find anything suspicious. There was no failed execution either, but the workflow keeps sending that error for every 30 minutes. I have a wait node that is configured to wait for 1 minute to fetch some stuff from an external service and if the results are in the pending state it asks for the status and waits for another 1 minute. Then there is a hard limit of 30 "waits" to finish the workflow and consider the result to be in the timeout state. image

I suppose that the issue is with the RabbitMQ node that is set to remove the message after the Execution Finishes. In this case as the Wait node is set to wait for 1 minute at a time so the workflow won't be offloaded and the RabbitMQ node will "keep" the message (it remains in the unacked state in the broker) after 30 minutes the broker will terminate the session (considering it is stucked) so even if the workflow will be finished successfully, the RabbitMQ node will fail to delete the message from the queue as the connection is already terminated.

Error message ('Start' node here is the RabbitMQ node):

[
  {
    "trigger": {
      "error": {
        "message": "There was a problem with the trigger node \"Start\", for that reason did the workflow had to be deactivated",
        "timestamp": 1716456899271,
        "name": "WorkflowActivationError",
        "context": {},
        "cause": {}
      },
      "mode": "trigger"
    },
    "workflow": {
      "id": "fyg5VC8YuQnsibw9",
      "name": "0029 ATP - Sub - Scanned device status"
    }
  }
]

There was only one message in the queue, but it seems after 30 minutes (default timeout for the broker) the RabbitMQ node reconnected to the broker and grab the same message once more

image

To Reproduce

Copy the workflow and send something to the RabbitMQ

{
  "meta": {
    "templateCredsSetupCompleted": true,
    "instanceId": "b2b258e8cf8e4b51c801b3bc5a9906c37aad0545d9cdbdf096a6542b5d192385"
  },
  "nodes": [
    {
      "parameters": {
        "queue": "test_queue30m",
        "options": {
          "assertQueue": true,
          "acknowledge": "executionFinishes"
        }
      },
      "id": "f0b44c22-09df-4bad-8dac-47903fd4ea28",
      "name": "RabbitMQ Trigger",
      "type": "n8n-nodes-base.rabbitmqTrigger",
      "typeVersion": 1,
      "position": [
        820,
        380
      ],
      "credentials": {
        "rabbitmq": {
          "id": "aS74IeKoi3bRoXCp",
          "name": "RabbitMQ account"
        }
      }
    },
    {
      "parameters": {},
      "id": "483097bf-cc14-4c4d-83af-b6723a6f088c",
      "name": "End",
      "type": "n8n-nodes-base.noOp",
      "typeVersion": 1,
      "position": [
        1760,
        360
      ]
    },
    {
      "parameters": {},
      "id": "0940be7e-d70a-462e-a9be-0ee0cb43b8c5",
      "name": "Do Smth",
      "type": "n8n-nodes-base.noOp",
      "typeVersion": 1,
      "position": [
        1040,
        380
      ]
    },
    {
      "parameters": {
        "amount": 1,
        "unit": "minutes"
      },
      "id": "6e734c0e-9429-4a6f-9920-c3f1ece2dd59",
      "name": "Wait 1m",
      "type": "n8n-nodes-base.wait",
      "typeVersion": 1.1,
      "position": [
        1240,
        380
      ],
      "webhookId": "9bde1337-f90a-4922-8609-84e051492d5e"
    },
    {
      "parameters": {
        "conditions": {
          "options": {
            "caseSensitive": true,
            "leftValue": "",
            "typeValidation": "strict"
          },
          "conditions": [
            {
              "id": "e159b403-3981-4349-bc37-681015a6132e",
              "leftValue": "={{ $runIndex }}",
              "rightValue": 31,
              "operator": {
                "type": "number",
                "operation": "gt"
              }
            }
          ],
          "combinator": "and"
        },
        "options": {}
      },
      "id": "ba006715-40cd-4364-b3c9-e7fa7152df65",
      "name": "If 31 runs",
      "type": "n8n-nodes-base.if",
      "typeVersion": 2,
      "position": [
        1460,
        380
      ]
    }
  ],
  "connections": {
    "RabbitMQ Trigger": {
      "main": [
        [
          {
            "node": "Do Smth",
            "type": "main",
            "index": 0
          }
        ]
      ]
    },
    "Do Smth": {
      "main": [
        [
          {
            "node": "Wait 1m",
            "type": "main",
            "index": 0
          }
        ]
      ]
    },
    "Wait 1m": {
      "main": [
        [
          {
            "node": "If 31 runs",
            "type": "main",
            "index": 0
          }
        ]
      ]
    },
    "If 31 runs": {
      "main": [
        [
          {
            "node": "End",
            "type": "main",
            "index": 0
          }
        ],
        [
          {
            "node": "Wait 1m",
            "type": "main",
            "index": 0
          }
        ]
      ]
    }
  },
  "pinData": {}
}

After 30 minutes there will be an error being sent to the error trigger, but the execution will remain in the In Progress state and don't fail. The message will be grabbed again and the workflow will start once more

Expected behavior

There will be no duplicated messages and during the first execution the RabbitMQ node will keep the message (and keep the connection up). Plus, after the execution finishes the node will remove the message from the queue, that is not happening right now due to the first execution has released the message (the broker forced to terminate the connection on the server side) and the next execution held the message again.

Current workaround is to set the node to remove the message Immediately. But in that case there is no option to set the Parallel Message Processing Limit option (it will be removed from the option list)

Operating System

Docker

n8n Version

1.41.1

Node.js Version

docker

Database

PostgreSQL

Execution mode

main (default)

Joffcom commented 4 months ago

Hey @barn4k,

Thanks for the report I am testing this now.

Joffcom commented 4 months ago

Hey @barn4k,

So it looks to be as you said, RabbitMQ is timing out so the initial node can't use the connection to delete the message. In cases like this if you are expecting there to be a long running connection using the RMQ node with the delete option would be the best approach.

image

Then add the delete node to the end and you are good to go image

barn4k commented 4 months ago

Actually, I think that one won't help 😄

When the connection drops, the message will be available for the workflow and will be grabbed again. So when it comes for the Deletion node to step in - there will be no available messages to be deleted (it will be in use by the next execution). And the Delete node will be executed successfully even if it won't delete anything :)

I've found only one way to avoid it - Using the "Immediately" option to delete the message from the queue. But in this case, I can't set the "Parallel message processing limit".

So maybe, it's possible to set the execution into the error state when the connection will be dropped?

Nevertheless, I'll try the Delete option, maybe it works somehow another

barn4k commented 4 months ago

Hmm... seems with the Delete RMQ Node the situation is a little different - The messages have been read but they never leave the Unacked state as the n8n node won't say it finished the message processing

Joffcom commented 4 months ago

@barn4k that is interesting in my test case it worked. Do you have the main node set to use the delete node a bit like with Webhook / Responsd to Webhook?

barn4k commented 4 months ago

@Joffcom yep, same issue with it. image

image

It says that it has deleted the message but keeps fetching it constantly. image

The only way it can be resolved is by using the "Immediately" Delete option.

Joffcom commented 4 months ago

Hey @barn4k,

That is odd, Is the message actually being deleted.

barn4k commented 4 months ago

I suppose the logic behind it is the same:

image

barn4k commented 4 months ago

From my perspective, the possible ways to eliminate the issue are below:

  1. Set a longer timeout for the broker service (e.g. 1 hour)
  2. Set the Delete RMQ node somewhere before the long-term chain
  3. Set the RMQ node to remove the message Immediately
  4. Refactor the RMQ trigger behavior so it keeps sending keep-alive packets until the execution is done.

The first three options are not actually a "fix", it's more like a way that the node is acting. So in that case the docs should be updated to include that behavior

Joffcom commented 4 months ago

I wonder if this comes down to how we execute nodes internally which may mean option 4 would be a fairly large core change (in theory).

I will do some digging in the amqlib docs to see if there is an option we can pass to the connection. I think a warning in the node about the broker timeout could be a good starting point.