apache / airflow

Apache Airflow - A platform to programmatically author, schedule, and monitor workflows
https://airflow.apache.org/
Apache License 2.0
36.44k stars 14.11k forks source link

Commit failed: Local: No offset stored while using AwaitMessageTriggerFunctionSensor #32585

Closed ikholodkov closed 9 months ago

ikholodkov commented 1 year ago

Apache Airflow version

2.6.3

What happened

While trying to use AwaitMessageTriggerFunctionSensor i'm increasing count of dagruns. I've encountered an exception cimpl.KafkaException: KafkaError{code=_NO_OFFSET,val=-168,str="Commit failed: Local: No offset stored"}. I tried to set consumers count less, equal and more than partitions but every time the error happened. Here is a log:

[2023-07-13, 14:37:07 UTC] {taskinstance.py:1103} INFO - Dependencies all met for dep_context=requeueable deps ti=<TaskInstance: kafka_test_dag.await_message scheduled__2023-07-13T14:35:00+00:00 [queued]>
[2023-07-13, 14:37:07 UTC] {taskinstance.py:1308} INFO - Starting attempt 1 of 1
[2023-07-13, 14:37:07 UTC] {taskinstance.py:1327} INFO - Executing <Task(AwaitMessageTriggerFunctionSensor): await_message> on 2023-07-13 14:35:00+00:00
[2023-07-13, 14:37:07 UTC] {standard_task_runner.py:57} INFO - Started process 8918 to run task
[2023-07-13, 14:37:07 UTC] {standard_task_runner.py:84} INFO - Running: ['airflow', 'tasks', 'run', 'kafka_test_dag', 'await_message', 'scheduled__2023-07-13T14:35:00+00:00', '--job-id', '629111', '--raw', '--subdir', 'DAGS_FOLDER/dags/kafka_consumers_dag.py', '--cfg-path', '/tmp/tmp3de57b65']
[2023-07-13, 14:37:07 UTC] {standard_task_runner.py:85} INFO - Job 629111: Subtask await_message
[2023-07-13, 14:37:08 UTC] {task_command.py:410} INFO - Running <TaskInstance: kafka_test_dag.await_message scheduled__2023-07-13T14:35:00+00:00 [running]> on host airflow-worker-1.airflow-worker.syn-airflow-dev.svc.opus.s.mesh
[2023-07-13, 14:37:08 UTC] {taskinstance.py:1545} INFO - Exporting env vars: AIRFLOW_CTX_DAG_OWNER='airflow' AIRFLOW_CTX_DAG_ID='kafka_test_dag' AIRFLOW_CTX_TASK_ID='await_message' AIRFLOW_CTX_EXECUTION_DATE='2023-07-13T14:35:00+00:00' AIRFLOW_CTX_TRY_NUMBER='1' AIRFLOW_CTX_DAG_RUN_ID='scheduled__2023-07-13T14:35:00+00:00'
[2023-07-13, 14:37:09 UTC] {taskinstance.py:1415} INFO - Pausing task as DEFERRED. dag_id=kafka_test_dag, task_id=await_message, execution_date=20230713T143500, start_date=20230713T143707
[2023-07-13, 14:37:09 UTC] {local_task_job_runner.py:222} INFO - Task exited with return code 100 (task deferral)
[2023-07-13, 14:38:43 UTC] {taskinstance.py:1103} INFO - Dependencies all met for dep_context=non-requeueable deps ti=<TaskInstance: kafka_test_dag.await_message scheduled__2023-07-13T14:35:00+00:00 [queued]>
[2023-07-13, 14:38:43 UTC] {taskinstance.py:1103} INFO - Dependencies all met for dep_context=requeueable deps ti=<TaskInstance: kafka_test_dag.await_message scheduled__2023-07-13T14:35:00+00:00 [queued]>
[2023-07-13, 14:38:43 UTC] {taskinstance.py:1306} INFO - Resuming after deferral
[2023-07-13, 14:38:44 UTC] {taskinstance.py:1327} INFO - Executing <Task(AwaitMessageTriggerFunctionSensor): await_message> on 2023-07-13 14:35:00+00:00
[2023-07-13, 14:38:44 UTC] {standard_task_runner.py:57} INFO - Started process 9001 to run task
[2023-07-13, 14:38:44 UTC] {standard_task_runner.py:84} INFO - Running: ['airflow', 'tasks', 'run', 'kafka_test_dag', 'await_message', 'scheduled__2023-07-13T14:35:00+00:00', '--job-id', '629114', '--raw', '--subdir', 'DAGS_FOLDER/dags/kafka_consumers_dag.py', '--cfg-path', '/tmp/tmpo6xz234q']
[2023-07-13, 14:38:44 UTC] {standard_task_runner.py:85} INFO - Job 629114: Subtask await_message
[2023-07-13, 14:38:45 UTC] {task_command.py:410} INFO - Running <TaskInstance: kafka_test_dag.await_message scheduled__2023-07-13T14:35:00+00:00 [running]> on host airflow-worker-1.airflow-worker.airflow-dev.svc.opus.s.mesh
[2023-07-13, 14:38:46 UTC] {taskinstance.py:1598} ERROR - Trigger failed:
Traceback (most recent call last):
  File "/home/airflow/.local/lib/python3.11/site-packages/airflow/jobs/triggerer_job_runner.py", line 537, in cleanup_finished_triggers
    result = details["task"].result()
             ^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/airflow/.local/lib/python3.11/site-packages/airflow/jobs/triggerer_job_runner.py", line 615, in run_trigger
    async for event in trigger.run():
  File "/home/airflow/.local/lib/python3.11/site-packages/airflow/providers/apache/kafka/triggers/await_message.py", line 114, in run
    await async_commit(asynchronous=False)
  File "/home/airflow/.local/lib/python3.11/site-packages/asgiref/sync.py", line 479, in __call__
    ret: _R = await loop.run_in_executor(
              ^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/concurrent/futures/thread.py", line 58, in run
    result = self.fn(*self.args, **self.kwargs)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/airflow/.local/lib/python3.11/site-packages/asgiref/sync.py", line 538, in thread_handler
    return func(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^
cimpl.KafkaException: KafkaError{code=_NO_OFFSET,val=-168,str="Commit failed: Local: No offset stored"}
[2023-07-13, 14:38:47 UTC] {taskinstance.py:1824} ERROR - Task failed with exception
airflow.exceptions.TaskDeferralError: Trigger failure
[2023-07-13, 14:38:47 UTC] {taskinstance.py:1345} INFO - Marking task as FAILED. dag_id=kafka_test_dag, task_id=await_message, execution_date=20230713T143500, start_date=20230713T143707, end_date=20230713T143847
[2023-07-13, 14:38:48 UTC] {standard_task_runner.py:104} ERROR - Failed to execute job 629114 for task await_message (Trigger failure; 9001)
[2023-07-13, 14:38:48 UTC] {local_task_job_runner.py:225} INFO - Task exited with return code 1
[2023-07-13, 14:38:48 UTC] {taskinstance.py:2653} INFO - 0 downstream tasks scheduled from follow-on schedule check

What you think should happen instead

Sensor should get a message without errors. Each message should be committed once.

How to reproduce

Example of a DAG:

from airflow.decorators import dag
from airflow.models import Variable
from airflow.operators.trigger_dagrun import TriggerDagRunOperator
from airflow.utils.dates import days_ago
from airflow.providers.apache.kafka.sensors.kafka import \
    AwaitMessageTriggerFunctionSensor

import uuid

def check_message(message):
    if message:
        return True

def trigger_dag(**context):
    TriggerDagRunOperator(
        trigger_dag_id='triggerer_test_dag',
        task_id=f"triggered_downstream_dag_{uuid.uuid4()}"
    ).execute(context)

@dag(
    description="This DAG listens kafka topic and triggers DAGs "
                "based on received message.",
    schedule_interval='* * * * *',
    start_date=days_ago(2),
    max_active_runs=4,
    catchup=False
)
def kafka_test_dag():
    AwaitMessageTriggerFunctionSensor(
        task_id="await_message",
        topics=['my_test_topic'],
        apply_function="dags.kafka_consumers_dag.check_message",
        event_triggered_function=trigger_dag
    )

kafka_test_dag()

Operating System

Debian GNU/Linux 11 (bullseye)

Versions of Apache Airflow Providers

apache-airflow-providers-apache-kafka==1.1.2

Deployment

Other 3rd-party Helm chart

Deployment details

No response

Anything else

No response

Are you willing to submit PR?

Code of Conduct

boring-cyborg[bot] commented 1 year ago

Thanks for opening your first issue here! Be sure to follow the issue template! If you are willing to raise PR to address this issue please do so, no need to wait for approval.

hussein-awala commented 1 year ago

what do you have in the extra of your connection?

ikholodkov commented 1 year ago

@hussein-awala something like this { "bootstrap.servers": "server1:9093,server2:9093", "group.id": "test-consumer-group", "auto.offset.reset": "latest", "ssl.endpoint.identification.algorithm": "https", "sasl.username": "user", "sasl.password": "password", "sasl.mechanism": "SCRAM-SHA-256", "security.protocol": "SASL_PLAINTEXT" }

hussein-awala commented 1 year ago

I asked to check if you set the group id.

I tried to set consumers count less, equal and more than partitions but every time the error happened.

When you tried setting consumers less, how many consumers did you use and how many partition do you have in the topic? Do you have multiple running triggerers or just a single one?

ikholodkov commented 1 year ago

I have 3 partitions in the topic and tested this with 1,2,3,4 dagruns. Initially I wanted to check #31803 fix with one dugrun, further I checked ability to using more than one dagrun.

Do you have multiple running triggerers or just a single one?

I've tried with only one instance of Triggerer and only one DAG with multiple triggerer dagruns.

hussein-awala commented 1 year ago

I'll try to reproduce the issue. Which version of confluent-kafka and Kafka (in the cluster) are you using?

ikholodkov commented 1 year ago

confluent-kafka was installed while installing apache-airflow-providers-apache-kafka==1.1.2 and it's 2.2.0 Kafka cluster version is 3.2.x

ikholodkov commented 10 months ago

Airflow 2.7.2, apache-airflow-providers-apache-kafka==1.2.0. Unfortunately the problem is still relevant.

timonviola commented 10 months ago

I can confirm that this is still happening.

name = "apache-airflow"
version = "2.7.2"

name = "apache-airflow-providers-apache-kafka"
version = "1.2.0"

name = "confluent-kafka"
version = "2.3.0"

I also have 3 partitions, single consumer group, I define group.id in the connection definition. I can trigger this bug consistently by pushing new message from a console - @hussein-awala would some debug logs help to find out the root cause?

(The Consume and Produce operators work well from the provider package.)

hussein-awala commented 10 months ago

I can confirm that this is still happening.

name = "apache-airflow"
version = "2.7.2"

name = "apache-airflow-providers-apache-kafka"
version = "1.2.0"

name = "confluent-kafka"
version = "2.3.0"

I also have 3 partitions, single consumer group, I define group.id in the connection definition. I can trigger this bug consistently by pushing new message from a console - @hussein-awala would some debug logs help to find out the root cause?

(The Consume and Produce operators work well from the provider package.)

I will try to reproduce the issue with the provided versions

ddione84 commented 9 months ago

I am also experiencing the same problem with the same provider version and Airflow 2.7.2 and using apache-airflow-providers-apache-kafka==1.2.0. I have test it with on my local Dockers env. and on a on-prem VM deployment with Ubuntu 20.04

hussein-awala commented 9 months ago

@ikholodkov @ddione84 I just created #36272 to fix the issue. Could you please test it?

Also, I suggest testing the current version of the operator with config enable.auto.commit: false; As you see in my PR, we commit the consumed messages manually, so if the auto-commit is enabled, the consumer will try to commit the consumed offsets periodically, which may be the reason for your issue (the automatic commit doesn't find any offset to commit).

ddione84 commented 9 months ago

@ikholodkov @ddione84 I just created #36272 to fix the issue. Could you please test it?

Also, I suggest testing the current version of the operator with config enable.auto.commit: false; As you see in my PR, we commit the consumed messages manually, so if the auto-commit is enabled, the consumer will try to commit the consumed offsets periodically, which may be the reason for your issue (the automatic commit doesn't find any offset to commit).

Hi have tested this for my pipelines and it's seems to be working fine. thank you

timonviola commented 9 months ago

Hello, I can confirm that it fixes the issue. Thanks @hussein-awala .

I can also trigger the error again by:

  1. reverting the fix
  2. adding "enable.auto.offset.store": false to the consumer connection config

Would it make sense to add another integration test to cover "enable.auto.offset.store": false? I can make a pr.

hussein-awala commented 9 months ago

Hello, I can confirm that it fixes the issue. Thanks @hussein-awala .

I can also trigger the error again by:

  1. reverting the fix
  2. adding "enable.auto.offset.store": false to the consumer connection config

Would it make sense to add another integration test to cover "enable.auto.offset.store": false? I can make a pr.

Feel free to open a PR. Here you can find some examples for the integration tests and how we setup the Kafka configurations.