n8n-io / n8n

Free and source-available fair-code licensed workflow automation tool. Easily automate tasks across different services.
https://n8n.io
Other
48.94k stars 7.77k forks source link

Kafka trigger don't listen but emits an error #10933

Open bastianfbr opened 1 month ago

bastianfbr commented 1 month ago

Bug Description

When enabling my Kafka Trigger workflow, it never listen my messages and i just have an error when i activate/test it : 2024-09-23 14:24:49 {"level":"ERROR","timestamp":"2024-09-23T12:24:49.860Z","logger":"kafkajs","message":"[Connection] Response Heartbeat(key: 12, version: 3)","broker":"hidden for privacy","clientId":"n8n","error":"The group is rebalancing, so a rejoin is needed","correlationId":978,"size":10}

To Reproduce

Create a Kafka Trigger workflow Test or Activate it Send a message

Expected behavior

Kafka Trigger should be triggered by a message according to given topic.

Operating System

Windows 11

n8n Version

1.57.0

Node.js Version

18

Database

SQLite (default)

Execution mode

main (default)

Joffcom commented 1 month ago

Hey @bastianfbr,

We have created an internal ticket to look into this which we will be tracking as "GHC-249"

Joffcom commented 1 month ago

Hey @bastianfbr,

It looks like Kafka is returning an error "The group is rebalancing, so a rejoin is needed", Are you using a Kafka cluster and what is the general health like as that message typically only appears when a new consumer is added or removed from a cluster, If that error keeps occuring it would suggest the cluster is unhappy with something as it likely shouldn't be always changing.

bastianfbr commented 1 month ago

I'm using a Kafka cluster. Everything's fine but trigger don't listen to kafka as it's never triggered...

Joffcom commented 1 month ago

Hey @bastianfbr,

That kind of makes sense as when n8n is trying to connect to the cluster the cluster is returning an error message.

The group is rebalancing, so a rejoin is needed

I am not sure why this is happening but a quick check online suggested it was likely to be an issue with the cluster, I will see if I can set up a Kafka cluster and reproduce the issue.

bastianfbr commented 1 month ago

I understand what you're saying so i tested myself with a tiny kafkajs consumer/producer code and i also have

The group is rebalancing, so a rejoin is needed

but after, everything's fine, so it's just like a "warning" rather than an error...

Joffcom commented 1 month ago

Hey @bastianfbr,

I wonder if our Kafka package treats it as an error because the cluster is saying it is an error 🤔 I would also not expect it to always happen which is strange. In n8n when the activation fails does it work if you try to activate it again after or just it always just get that error and fail?

bastianfbr commented 1 month ago

Hey @bastianfbr,

I wonder if our Kafka package treats it as an error because the cluster is saying it is an error 🤔 I would also not expect it to always happen which is strange. In n8n when the activation fails does it work if you try to activate it again after or just it always just get that error and fail?

The activation doesn't fail It just a log in my kafka and the trigger loops and doesn't listen, even if i send a message to the topic... If i activate again and again, it still doesn't listen a single message.

maipal-c commented 1 month ago

@Joffcom facing same issue on n8n 1.56.2 and kafka bitnami helm chart (on k8s) :v3.8.0 -

reproducing steps -

  1. install bitnami kafka on k8s cluster with 1 controller
  2. install n8n helm chart on same cluster
  3. create an workflow with kafka trigger on an existing topic (with json parse enabled) group id - "n8n-admin-workflows"
  4. wait for few workflow executions. make sure you got 0 consumer lag.
  5. create another workflow with kafka trigger on different topic (with json parse enabled) and use same group id - "n8n-admin-workflows"

now both execution halts and kafka starts showing consumer lag on first trigger and no consumer group for the 2nd trigger in 2nd workflow. when i channged the consumer group of 2nd to "n8n-admin-workflow-2" it worked instantly. like both got resumed.

i am attaching logs below -

n8n logs -

ScalingService] Added job 45210 (execution 45365)
[ScalingService] Added job 45211 (execution 45366)
{"level":"ERROR","timestamp":"2024-09-27T17:05:44.418Z","logger":"kafkajs","message":"[Connection] Response Heartbeat(key: 12, version: 3)","broker":"kafka-controller-0.kafka-controller-headless.core.svc.cluster.local:9092","clientId":"in-cluster-n8n","error":"The group is rebalancing, so a rejoin is needed","correlationId":2138,"size":10}
[ScalingService] Added job 45213 (execution 45369)
{"level":"ERROR","timestamp":"2024-09-27T17:08:38.726Z","logger":"kafkajs","message":"[Connection] Response Heartbeat(key: 12, version: 3)","broker":"kafka-controller-0.kafka-controller-headless.core.svc.cluster.local:9092","clientId":"in-cluster-n8n","error":"The group is rebalancing, so a rejoin is needed","correlationId":2214,"size":10}
{"level":"ERROR","timestamp":"2024-09-27T17:09:18.747Z","logger":"kafkajs","message":"[Connection] Response Heartbeat(key: 12, version: 3)","broker":"kafka-controller-0.kafka-controller-headless.core.svc.cluster.local:9092","clientId":"in-cluster-n8n","error":"The group is rebalancing, so a rejoin is needed","correlationId":2235,"size":10}
Removed triggers and pollers for workflow "MfdONxFJzUL71juW"
{"level":"ERROR","timestamp":"2024-09-27T17:12:23.812Z","logger":"kafkajs","message":"[Connection] Response Heartbeat(key: 12, version: 3)","broker":"kafka-controller-0.kafka-controller-headless.core.svc.cluster.local:9092","clientId":"in-cluster-n8n","error":"The group is rebalancing, so a rejoin is needed","correlationId":2276,"size":10}
[ScalingService] Added job 45220 (execution 45376)
[ScalingService] Added job 45221 (execution 45377)

kafka logs -

[2024-09-27 17:08:34,497] INFO [GroupCoordinator 0]: Preparing to rebalance group n8n-admin-workflows in state PreparingRebalance with old generation 58 (__consumer_offsets-23) (reason: Removing member in-cluster-n8n-8fb134dc-1668-4ea3-a3fd-6f282a2a770b on LeaveGroup; client reason: not provided) (kafka.coordinator.group.GroupCoordinator)
[2024-09-27 17:08:34,497] INFO [GroupCoordinator 0]: Member MemberMetadata(memberId=in-cluster-n8n-8fb134dc-1668-4ea3-a3fd-6f282a2a770b, groupInstanceId=None, clientId=in-cluster-n8n, clientHost=/10.42.0.47, sessionTimeoutMs=30000, rebalanceTimeoutMs=60000, supportedProtocols=List(RoundRobinAssigner)) has left group n8n-admin-workflows through explicit `LeaveGroup`; client reason: not provided (kafka.coordinator.group.GroupCoordinator)
[2024-09-27 17:08:38,728] INFO [GroupCoordinator 0]: Stabilized group n8n-admin-workflows generation 59 (__consumer_offsets-23) with 1 members (kafka.coordinator.group.GroupCoordinator)
[2024-09-27 17:08:38,730] INFO [GroupCoordinator 0]: Assignment received from leader in-cluster-n8n-8793f868-dc5c-44ea-a109-29f69e0371f5 for group n8n-admin-workflows for generation 59. The group has 1 members, 0 of which are static. (kafka.coordinator.group.GroupCoordinator)
[2024-09-27 17:09:14,382] INFO [GroupCoordinator 0]: Dynamic member with unknown member id joins group n8n-admin-workflows in Stable state. Created a new member id in-cluster-n8n-73c702cb-2ad2-4a09-aff3-ecffe7b197a7 and request the member to rejoin with this id. (kafka.coordinator.group.GroupCoordinator)
[2024-09-27 17:09:14,385] INFO [GroupCoordinator 0]: Preparing to rebalance group n8n-admin-workflows in state PreparingRebalance with old generation 59 (__consumer_offsets-23) (reason: Adding new member in-cluster-n8n-73c702cb-2ad2-4a09-aff3-ecffe7b197a7 with group instance id None; client reason: not provided) (kafka.coordinator.group.GroupCoordinator)
[2024-09-27 17:09:18,748] INFO [GroupCoordinator 0]: Stabilized group n8n-admin-workflows generation 60 (__consumer_offsets-23) with 2 members (kafka.coordinator.group.GroupCoordinator)
[2024-09-27 17:09:18,750] INFO [GroupCoordinator 0]: Assignment received from leader in-cluster-n8n-8793f868-dc5c-44ea-a109-29f69e0371f5 for group n8n-admin-workflows for generation 60. The group has 2 members, 0 of which are static. (kafka.coordinator.group.GroupCoordinator)
[2024-09-27 17:09:18,750] WARN [GroupCoordinator 0]: Setting empty assignments for members Set(in-cluster-n8n-8793f868-dc5c-44ea-a109-29f69e0371f5) of n8n-admin-workflows for generation 60 (kafka.coordinator.group.GroupCoordinator)
[2024-09-27 17:09:18,751] WARN [GroupCoordinator 0]: Sending empty assignment to member in-cluster-n8n-8793f868-dc5c-44ea-a109-29f69e0371f5 of n8n-admin-workflows for generation 60 with no errors (kafka.coordinator.group.GroupCoordinator)
[2024-09-27 17:12:18,810] INFO [GroupCoordinator 0]: Preparing to rebalance group n8n-admin-workflows in state PreparingRebalance with old generation 60 (__consumer_offsets-23) (reason: Removing member in-cluster-n8n-73c702cb-2ad2-4a09-aff3-ecffe7b197a7 on LeaveGroup; client reason: not provided) (kafka.coordinator.group.GroupCoordinator)
[2024-09-27 17:12:18,810] INFO [GroupCoordinator 0]: Member MemberMetadata(memberId=in-cluster-n8n-73c702cb-2ad2-4a09-aff3-ecffe7b197a7, groupInstanceId=None, clientId=in-cluster-n8n, clientHost=/10.42.0.47, sessionTimeoutMs=30000, rebalanceTimeoutMs=60000, supportedProtocols=List(RoundRobinAssigner)) has left group n8n-admin-workflows through explicit `LeaveGroup`; client reason: not provided (kafka.coordinator.group.GroupCoordinator)
[2024-09-27 17:12:18,832] INFO [GroupCoordinator 0]: Dynamic member with unknown member id joins group n8n-admin-workflows-2 in Empty state. Created a new member id in-cluster-n8n-a21c913d-e02b-4cc9-865c-437928d6edbd and request the member to rejoin with this id. (kafka.coordinator.group.GroupCoordinator)
[2024-09-27 17:12:18,833] INFO [GroupCoordinator 0]: Preparing to rebalance group n8n-admin-workflows-2 in state PreparingRebalance with old generation 0 (__consumer_offsets-12) (reason: Adding new member in-cluster-n8n-a21c913d-e02b-4cc9-865c-437928d6edbd with group instance id None; client reason: not provided) (kafka.coordinator.group.GroupCoordinator)
[2024-09-27 17:12:21,833] INFO [GroupCoordinator 0]: Stabilized group n8n-admin-workflows-2 generation 1 (__consumer_offsets-12) with 1 members (kafka.coordinator.group.GroupCoordinator)
[2024-09-27 17:12:21,836] INFO [GroupCoordinator 0]: Assignment received from leader in-cluster-n8n-a21c913d-e02b-4cc9-865c-437928d6edbd for group n8n-admin-workflows-2 for generation 1. The group has 1 members, 0 of which are static. (kafka.coordinator.group.GroupCoordinator)
[2024-09-27 17:12:23,813] INFO [GroupCoordinator 0]: Stabilized group n8n-admin-workflows generation 61 (__consumer_offsets-23) with 1 members (kafka.coordinator.group.GroupCoordinator)
[2024-09-27 17:12:23,816] INFO [GroupCoordinator 0]: Assignment received from leader in-cluster-n8n-8793f868-dc5c-44ea-a109-29f69e0371f5 for group n8n-admin-workflows for generation 61. The group has 1 members, 0 of which are static. (kafka.coordinator.group.GroupCoordinator)
[2024-09-27 17:16:36,183] INFO [SnapshotGenerator id=0] Creating new KRaft snapshot file snapshot 00000000000005500166-0000000074 because we have waited at least 60 minute(s). (org.apache.kafka.image.publisher.SnapshotGenerator)
Joffcom commented 1 month ago

Perfect hanks, I have updated the internal ticket so we will just need someone to pick it up and work on it.