kestra-io / plugin-kafka

https://kestra.io/plugins/plugin-kafka/
Apache License 2.0
4 stars 5 forks source link

Kafka trigger timeout #93

Open nikolicdragoslav opened 1 month ago

nikolicdragoslav commented 1 month ago

Describe the issue

Hi,

I am having a weird issue when using Kafka realtime plugin as a trigger to my workflow.

This is my trigger:

triggers:
  - id: kafkaTrigger
    type: io.kestra.plugin.kafka.RealtimeTrigger
    keyDeserializer: JSON
    groupId: "kestraConsumer"
    properties:
      auto.offset.reset: earliest
      bootstrap.servers: "{{envs.kafka_bootstrap_servers}}"
      sasl.client.callback.handler.class: software.amazon.msk.auth.iam.IAMClientCallbackHandler
      sasl.jaas.config: software.amazon.msk.auth.iam.IAMLoginModule required;
      sasl.mechanism: AWS_MSK_IAM
      security.protocol: SASL_SSL
    topic: test-topic
    valueDeserializer: JSON

Trigger works great for couple of days, but at one point it stops pooling on the Kafka topic. The messages are there and can be read by other consumers, it just seems that the Kestra somehow stops consuming the messages and in return does not kickoff any execution.

Workaround at the moment is editing the flow code by simply adding or removing a blank line and saving it. When saved, all missed messages are being read at the same time and it starts bunch of executions at the same time.

As of now, there doesn't seem to be any log that shows an error, both in Kafka and in Kestra, it just seems that the trigger is disabled, but it is in fact enabled.

Is there some setting in Kafka or in Kestra that causes this timeout and needs to be altered? If not, is this simply a bug in the plugin?

Best, Dragoslav

Environment

tchiotludo commented 1 month ago

Can you provide the log from scheduler please?

nikolicdragoslav commented 1 month ago

Sure, I will provide a log when I encounter the issue, which should be in the next couple of days

nikolicdragoslav commented 1 month ago

Hi @tchiotludo,

The issue happened again, here is the scheduler log:

2024-09-23 11:43:52,827 INFO  main         org.flywaydb.core.FlywayExecutor Database: jdbc:postgresql://url:5432/postgres (PostgreSQL 14.10)
2024-09-23 11:43:53,252 INFO  main         o.f.core.internal.command.DbValidate Successfully validated 20 migrations (execution time 00:00.172s)
2024-09-23 11:43:53,394 INFO  main         o.f.core.internal.command.DbMigrate Current version of schema "public": 1.21
2024-09-23 11:43:53,442 INFO  main         o.f.core.internal.command.DbMigrate Schema "public" is up to date. No migration necessary.
2024-09-23 11:44:04,945 INFO  scheduler    io.kestra.cli.AbstractCommand Starting Kestra with environments [k8s, cloud, cli]
2024-09-23 11:44:05,881 INFO  scheduler    i.kestra.core.plugins.PluginScanner Registered 78 core plugins (scan done in 795ms)
2024-09-23 11:44:14,092 INFO  scheduler    i.kestra.core.plugins.PluginScanner Registered 466 plugins from 95 groups (scan done in 8150ms)
2024-09-23 11:44:16,896 INFO  scheduler    io.kestra.cli.AbstractCommand Server Running: http://kestra-scheduler-56b9994b77-gm2xk:8080, Management server on port http://kestra-scheduler-56b9994b77-gm2xk:8081/health
2024-09-23 11:44:34,638 INFO  scheduler    i.k.c.c.servers.SchedulerCommand Scheduler started
nikolicdragoslav commented 1 month ago

I also took a scheduler log print from when it was working and it looks like:

Autoscroll:On      FullScreen:On      Timestamps:Off     Wrap:On
2024-09-12 09:59:37,766 INFO  main         org.flywaydb.core.FlywayExecutor Database: jdbc:postgresql://url:5432/postgres (PostgreSQL 14.10)
2024-09-12 09:59:38,478 INFO  main         o.f.core.internal.command.DbValidate Successfully validated 20 migrations (execution time 00:00.562s)
2024-09-12 09:59:38,508 INFO  main         o.f.core.internal.command.DbMigrate Current version of schema "public": 1.21
2024-09-12 09:59:38,639 INFO  main         o.f.core.internal.command.DbMigrate Schema "public" is up to date. No migration necessary.
2024-09-12 09:59:50,018 INFO  scheduler    io.kestra.cli.AbstractCommand Starting Kestra with environments [k8s, cloud, cli]
2024-09-12 09:59:51,197 INFO  scheduler    i.kestra.core.plugins.PluginScanner Registered 78 core plugins (scan done in 997ms)
2024-09-12 09:59:59,217 INFO  scheduler    i.kestra.core.plugins.PluginScanner Registered 466 plugins from 95 groups (scan done in 8007ms)
2024-09-12 10:00:01,959 INFO  scheduler    io.kestra.cli.AbstractCommand Server Running: http://kestra-scheduler-8555dd45d9-9x7xb:8080, Management server on port http://kestra-scheduler-8555dd45d9-9x7xb:8081/health
2024-09-12 10:00:16,224 INFO  scheduler    i.k.c.c.servers.SchedulerCommand Scheduler started
2024-09-13 00:00:02,166 INFO  pool-4-thread-1 i.k.c.schedulers.AbstractScheduler [namespace: system] [flow: sync_namespaces] [trigger: daily_trigger] Scheduled execution 4kqbBIIdenwKP3rM5RpLhZ at '2024-09-13T00:00Z' started at '2024-09-13T00:00:02Z[Etc/UTC]'
2024-09-14 00:00:01,954 INFO  pool-4-thread-1 i.k.c.schedulers.AbstractScheduler [namespace: system] [flow: sync_namespaces] [trigger: daily_trigger] Scheduled execution 7OJ9pJQIYOpCIil9yKzFK1 at '2024-09-14T00:00Z' started at '2024-09-14T00:00:01Z[Etc/UTC]'
2024-09-15 00:00:01,935 INFO  pool-4-thread-1 i.k.c.schedulers.AbstractScheduler [namespace: system] [flow: sync_namespaces] [trigger: daily_trigger] Scheduled execution 77lD4hRQ8C8R15pwYNvflu at '2024-09-15T00:00Z' started at '2024-09-15T00:00:01Z[Etc/UTC]'
2024-09-16 00:00:01,935 INFO  pool-4-thread-1 i.k.c.schedulers.AbstractScheduler [namespace: system] [flow: sync_namespaces] [trigger: daily_trigger] Scheduled execution K3CYv9o9FHWFnE3sX4h9O at '2024-09-16T00:00Z' started at '2024-09-16T00:00:01Z[Etc/UTC]'
2024-09-17 00:00:01,928 INFO  pool-4-thread-1 i.k.c.schedulers.AbstractScheduler [namespace: system] [flow: sync_namespaces] [trigger: daily_trigger] Scheduled execution 1cFqwzYMtJisnPC3hvgRH at '2024-09-17T00:00Z' started at '2024-09-17T00:00:01Z[Etc/UTC]'
2024-09-18 00:00:01,923 INFO  pool-4-thread-1 i.k.c.schedulers.AbstractScheduler [namespace: system] [flow: sync_namespaces] [trigger: daily_trigger] Scheduled execution 5CYcYKIcFbefmK1TbtZZqz at '2024-09-18T00:00Z' started at '2024-09-18T00:00:01Z[Etc/UTC]'
2024-09-18 00:00:01,955 INFO  pool-4-thread-1 i.k.c.schedulers.AbstractScheduler [namespace: system] [flow: sync_namespace_sst] [trigger: daily_trigger] Scheduled execution 7N0rCt8emqKTfWZdIr7OPn at '2024-09-18T00:00Z' started at '2024-09-18T00:00:01Z[Etc/UTC]'
2024-09-19 00:00:01,924 INFO  pool-4-thread-1 i.k.c.schedulers.AbstractScheduler [namespace: system] [flow: sync_namespaces] [trigger: daily_trigger] Scheduled execution 5ULFFwfGuHVIlKkue592J3 at '2024-09-19T00:00Z' started at '2024-09-19T00:00:01Z[Etc/UTC]'
2024-09-19 00:00:01,979 INFO  pool-4-thread-1 i.k.c.schedulers.AbstractScheduler [namespace: system] [flow: sync_namespace_sst] [trigger: daily_trigger] Scheduled execution 2kEo25r8LGsMrh9SEl1DWd at '2024-09-19T00:00Z' started at '2024-09-19T00:00:01Z[Etc/UTC]'
tchiotludo commented 1 month ago

you have the log from the executor please? The one from scheduler didn't give any clue :cry:

nikolicdragoslav commented 1 month ago

executor.txt @tchiotludo here is the executor log, seems like the triggers never switched on

nikolicdragoslav commented 1 month ago

Maybe a side note - seems that when the Scheduler Kubernetes pod is recreated, it does not pick up the processes and does not start the triggers to pool from Kafka, then when flows are manually edited it starts them again

tchiotludo commented 1 month ago

ok so now, I will need the worker log :sweat_smile: I was wondering if there is any error that blocked the scheduler

nikolicdragoslav commented 1 month ago

@tchiotludo We have three workers and the logs are quite extensive, they also contain some sensitive information. What should I be looking for?

tchiotludo commented 1 month ago

Since it's a bug, I don't know what could be the main issues. Probably searching for stacktrace if you don't have any others choice

nikolicdragoslav commented 1 month ago

Hi @tchiotludo,

I found lines like:

kestra-worker-docker-dind
time="2024-09-24T08:15:28.897147549Z" level=error msg="loading cgroup for 1606" error="cgroups: cgroup deleted"
kestra-worker-docker-dind
time="2024-09-24T08:15:28.936165515Z" level=error msg="loading cgroup for 1606" error="cgroups: cgroup deleted"

as a result I have a an empty execution with 0s of runtime and no trigger details.

In the worker logs I see also:

kestra-worker
2024-09-24 08:08:44,809 ERROR worker_318  f.cdmOrchestratorKafka.kafkaTrigger [namespace: orchestrators] [flow: cdmOrchestratorKafka] [execution: 6mQ4zhZ8Itsv2r2eya78ol] [date: 2024-09-24T00:00:43.448Z] Realtime trigger failed to be created in the worker with error: unknown

now these log prints happen when I save the flow with an added blank line, after that everything works properly.

Other messages when the executions were halted yield no error or warning or in fact any log for the affected flows..

To me it looks like the Realtime triggers are not restarted after pod failure, but I have one more idea that can potentially affect this. We are also using io.kestra.plugin.git.SyncFlows that is scheduled every day at midnight and copies flow files from git to Kestra. Maybe that would potentially play with the Realtime triggers and cause some mismatch somewhere. I noticed the 0s executions happening after the sync as well.

Looking forward to hearing from you, let me know if you need any additional information.

Thanks

nikolicdragoslav commented 1 month ago

Hi @tchiotludo,

any ideas how to proceed?

Best, Dragoslav

romangoldberg commented 1 month ago

Hi @tchiotludo , we are planning to go in production soon, could you please assist us with this bug?

tchiotludo commented 1 month ago

as I see, we need to have a full stack trace to understand, unknown exception could not help, this one should be on the flows > logs page.

nikolicdragoslav commented 1 month ago

Hi @tchiotludo ,

as I explained earlier, there is no error or stack trace anywhere or any kind of log, not even info log, that is related to the affected flows.

It seems that the Kafka realtime trigger gets disabled at some point and Kestra is not pooling on the new messages. It looks like some kind of timeout somewhere that is affecting Kestra as a consumer.

tchiotludo commented 1 month ago
Realtime trigger failed to be created in the worker with error: unknown

you are here: https://github.com/kestra-io/kestra/blob/7b73eed06830d7d13cc7e6c6ca57d9fe8eee2369/core/src/main/java/io/kestra/core/runners/Worker.java#L419-L430

You could increase the log level to capture it as trace

nikolicdragoslav commented 1 month ago

@tchiotludo I will try it and get back to you, thanks