apache / airflow

Apache Airflow - A platform to programmatically author, schedule, and monitor workflows
https://airflow.apache.org/
Apache License 2.0
37.35k stars 14.35k forks source link

ConsumeFromTopicOperator does not fail even if wrong credentials are given #43569

Open rawwar opened 3 weeks ago

rawwar commented 3 weeks ago

Apache Airflow version

Other Airflow 2 version (please specify below)

If "Other Airflow 2 version" selected, which one?

No response

What happened?

I created a kafka connection with the dummy values and yet the Operator succeeded with the following task log:

[2024-11-01, 07:35:06 IST] {local_task_job_runner.py:123} ▼ Pre task execution logs
[2024-11-01, 07:35:06 IST] {taskinstance.py:2613} INFO - Dependencies all met for dep_context=non-requeueable deps ti=<TaskInstance: kafka_issue.consume_task manual__2024-11-01T02:05:05.395642+00:00 [queued]>
[2024-11-01, 07:35:06 IST] {taskinstance.py:2613} INFO - Dependencies all met for dep_context=requeueable deps ti=<TaskInstance: kafka_issue.consume_task manual__2024-11-01T02:05:05.395642+00:00 [queued]>
[2024-11-01, 07:35:06 IST] {taskinstance.py:2866} INFO - Starting attempt 1 of 2
[2024-11-01, 07:35:06 IST] {taskinstance.py:2889} INFO - Executing <Task(ConsumeFromTopicOperator): consume_task> on 2024-11-01 02:05:05.395642+00:00
[2024-11-01, 07:35:06 IST] {standard_task_runner.py:104} INFO - Running: ['airflow', 'tasks', 'run', 'kafka_issue', 'consume_task', 'manual__2024-11-01T02:05:05.395642+00:00', '--job-id', '35', '--raw', '--subdir', 'DAGS_FOLDER/kafka_min.py', '--cfg-path', '/tmp/tmpflvhwk6b']
[2024-11-01, 07:35:06 IST] {standard_task_runner.py:105} INFO - Job 35: Subtask consume_task
[2024-11-01, 07:35:06 IST] {logging_mixin.py:190} WARNING - /usr/local/lib/python3.12/site-packages/airflow/task/task_runner/standard_task_runner.py:70 DeprecationWarning: This process (pid=217) is multi-threaded, use of fork() may lead to deadlocks in the child.
[2024-11-01, 07:35:06 IST] {standard_task_runner.py:72} INFO - Started process 218 to run task
[2024-11-01, 07:35:06 IST] {task_command.py:467} INFO - Running <TaskInstance: kafka_issue.consume_task manual__2024-11-01T02:05:05.395642+00:00 [running]> on host 65fc791ee8f4
[2024-11-01, 07:35:06 IST] {taskinstance.py:3132} INFO - Exporting env vars: AIRFLOW_CTX_DAG_OWNER='airflow' AIRFLOW_CTX_DAG_ID='kafka_issue' AIRFLOW_CTX_TASK_ID='consume_task' AIRFLOW_CTX_EXECUTION_DATE='2024-11-01T02:05:05.395642+00:00' AIRFLOW_CTX_TRY_NUMBER='1' AIRFLOW_CTX_DAG_RUN_ID='manual__2024-11-01T02:05:05.395642+00:00'
[2024-11-01, 07:35:06 IST] {taskinstance.py:731} ▲▲▲ Log group end
[2024-11-01, 07:35:06 IST] {base.py:84} INFO - Retrieving connection 'kafka_connection'
[2024-11-01, 07:36:06 IST] {consume.py:167} INFO - Reached end of log. Exiting.
[2024-11-01, 07:36:06 IST] {consume.py:182} INFO - committing offset at end_of_batch
[2024-11-01, 07:36:06 IST] {taskinstance.py:340} ▼ Post task execution logs
[2024-11-01, 07:36:06 IST] {taskinstance.py:352} INFO - Marking task as SUCCESS. dag_id=kafka_issue, task_id=consume_task, run_id=manual__2024-11-01T02:05:05.395642+00:00, execution_date=20241101T020505, start_date=20241101T020506, end_date=20241101T020606
[2024-11-01, 07:36:06 IST] {local_task_job_runner.py:266} INFO - Task exited with return code 0
[2024-11-01, 07:36:06 IST] {taskinstance.py:3901} INFO - 0 downstream tasks scheduled from follow-on schedule check
[2024-11-01, 07:36:06 IST] {local_task_job_runner.py:245} ▲▲▲ Log group end

What you think should happen instead?

When credentials are wrong, the operator should fail

How to reproduce

Connection I used:

{
  "bootstrap.servers": "hello.com:9092",
  "group.id": "ea",
  "auto.offset.reset": "earliest",
  "sasl.mechanism": "PLAIN",
  "sasl.username": "ewfew",
  "sasl.password": "Owefw"
}

I used the following DAG:

from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.providers.apache.kafka.operators.consume import ConsumeFromTopicOperator
from datetime import datetime, timedelta
import json

KAFKA_TOPIC = "random"

def print_msg(msg):
    print(msg)

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5)
}

with DAG('kafka_issue',
         start_date=datetime(2024, 1, 20),
         max_active_runs=1,
         schedule_interval=None,
         default_args=default_args,
         ) as dag:

    consumer_topic = ConsumeFromTopicOperator(
        task_id="consume_task",
        kafka_config_id="kafka_connection",        
        topics=[KAFKA_TOPIC],
        apply_function="kafka_min.print_msg",
        commit_cadence="end_of_batch",
        max_messages=5,
        max_batch_size=2,
    )
    testconsume = DummyOperator(
        task_id='test_consume_kafka'
    )

    testconsume >> consumer_topic 

to reproduce the issue, please name your dag file kafka_min or update the apply_function accordingly

Operating System

MacOS - 15.0.1 (24A348)

Versions of Apache Airflow Providers

confluent-kafka apache-airflow-providers-apache-kafka

Deployment

Astronomer

Deployment details

No response

Anything else?

No response

Are you willing to submit PR?

Code of Conduct

VikSil commented 2 weeks ago

We were able to reproduce the issue with guidance from @potiuk during Man's hackathon. The environment was set up with Breeze integration to Kafka, and the environment was started by running:

breeze start-airflow --integration kafka --load-example-dags --load-default-connections

We also, tried to add Kafka credentials in /airflow/scripts/ci/docker-compose/integration-kafka.yml file. There appears to be two ways to do it:

  1. as suggested here: KAFKA_USERNAME: 'admin' KAFKA_PASSWORD: 'secret' KAFKA_SASL_MECHANISM: SCRAM-SHA-256
  2. as suggested here: KAFKA_SASL_MECHANISM: PLAIN KAFKA_SASL_JAAS_CONFIG: "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"kafka-user\" password=\"kafka-password\";"

With either configuration the Kafka container starts, up and Airflow marks the ConsumeFromTopicOperator task as success.

Furthermore, we noticed that the connection to bootstrap server "hello.com:9092" that we used does not lead anywhere. When entered into the browser the lookup lasts forever. https://hello.com/ is a project that has been discontinued.

SuccessMoses commented 2 weeks ago

@VikSil are you working on this?

potiuk commented 2 weeks ago

Hey @dylanbstorey and maybe @aritra24 -> we've been working with @KS0107 @VikSil over the weekend during the Man's hackathon trying to reproduce that one and test it and we have some interesting findings, and maybe you will be able to take a look and confirm them or help to guide the rest of the team (And I see @SuccessMoses is interested as well) - to nail and fix that one.

The first problem - we had a bit of hard time to see what's going on in our integration tests when we attempted to introduce user/password and tried to configure our "integration" Kafka compose setup to have authentication enabled.

No matted what we did, it seems that some of the settings - for example bootstrap server, did not really matter as if they were either not used (or maybe we did not understnd how it works). But indeed it seems that the original error report is correct and Kafka consumer does not fail when wrong credentials are used.

Also @mrk-andreev had some comments on it- about the bootstrap server - but am unforutnately not to well wersed with Kafka. So @dylanbstorey @aritra24 -> maybe you cn take a close look and give some thoughts about it?

aritra24 commented 2 weeks ago

I can spend some time looking at this later today, let me get back to you in a few hours. 🤔

rawwar commented 2 weeks ago

@aritra24 , consumer config also takes error_cb. I was able to make Consumer fail by using an error_cb. Consider looking into this direction, if there is no better way

class KafkaAuthenticationError(Exception):
    """Custom exception for Kafka authentication failures"""
    pass

def error_callback(err):
    """Callback function to handle Kafka errors"""
    if err.code() == KafkaError._AUTHENTICATION:
        raise KafkaAuthenticationError(f"Authentication failed: {err}")
    print("Exception received: ", err)
aritra24 commented 2 weeks ago

Are you seeing this on main? Or a specific version of airflow?

VikSil commented 2 weeks ago

@aritra24 I had the main branch cloned.

dylanbstorey commented 1 week ago

IIRC - much of the library used doesn't require an external kafka service to be available when the code is interpretted so by design I believe its silently failing and needs the implementation of callbacks to take action on those interfaces/activities. rawwar's comment is the correct line of inquiry for changing default behavior and escalation of failures through the provider.

SuccessMoses commented 3 days ago

@rawwar I created a PR to try to fix this issue, but it is not complete yet. I wanted to hear your thoughts.

I managed to make task fail using error_cb in Consumer.

 ▶ Log message source details
[2024-11-23, 13:24:36 UTC] {local_task_job_runner.py:121} ▼ Pre task execution logs
[2024-11-23, 13:24:36 UTC] {taskinstance.py:2403} INFO - Dependencies all met for dep_context=non-requeueable deps ti=<TaskInstance: kafka_issue.consume_task manual__2024-11-23T13:24:25.243056+00:00 [queued]>
[2024-11-23, 13:24:36 UTC] {taskinstance.py:2403} INFO - Dependencies all met for dep_context=requeueable deps ti=<TaskInstance: kafka_issue.consume_task manual__2024-11-23T13:24:25.243056+00:00 [queued]>
[2024-11-23, 13:24:36 UTC] {taskinstance.py:2654} INFO - Starting attempt 1 of 2
[2024-11-23, 13:24:36 UTC] {taskinstance.py:2677} INFO - Executing <Task(ConsumeFromTopicOperator): consume_task> on 2024-11-23 13:24:25.243056+00:00
[2024-11-23, 13:24:36 UTC] {standard_task_runner.py:131} INFO - Started process 15454 to run task
[2024-11-23, 13:24:36 UTC] {standard_task_runner.py:160} INFO - Running: ['airflow', 'tasks', 'run', 'kafka_issue', 'consume_task', 'manual__2024-11-23T13:24:25.243056+00:00', '--raw', '--subdir', 'DAGS_FOLDER/kafka_min.py', '--cfg-path', '/tmp/tmpmivb_e_5']
[2024-11-23, 13:24:36 UTC] {standard_task_runner.py:161} INFO - Subtask consume_task
[2024-11-23, 13:24:36 UTC] {task_command.py:446} INFO - Running <TaskInstance: kafka_issue.consume_task manual__2024-11-23T13:24:25.243056+00:00 [running]> on host 2c7e65d643ac
[2024-11-23, 13:24:36 UTC] {taskinstance.py:2910} INFO - Exporting env vars: AIRFLOW_CTX_DAG_OWNER='airflow' AIRFLOW_CTX_DAG_ID='kafka_issue' AIRFLOW_CTX_TASK_ID='consume_task' AIRFLOW_CTX_LOGICAL_DATE='2024-11-23T13:24:25.243056+00:00' AIRFLOW_CTX_TRY_NUMBER='1' AIRFLOW_CTX_DAG_RUN_ID='manual__2024-11-23T13:24:25.243056+00:00'
[2024-11-23, 13:24:36 UTC] {logging_mixin.py:191} INFO - Task instance is in running state
[2024-11-23, 13:24:36 UTC] {logging_mixin.py:191} INFO -  Previous state of the Task instance: queued
[2024-11-23, 13:24:36 UTC] {logging_mixin.py:191} INFO - Current task name:consume_task state:running start_date:2024-11-23 13:24:36.673356+00:00
[2024-11-23, 13:24:36 UTC] {logging_mixin.py:191} INFO - Dag name:kafka_issue and current dag run status:running
[2024-11-23, 13:24:36 UTC] {taskinstance.py:723} ▲▲▲ Log group end
[2024-11-23, 13:24:36 UTC] {base.py:66} INFO - Retrieving connection 'kafka_connection'
[2024-11-23, 13:25:07 UTC] {logging_mixin.py:191} INFO - Exception received:  KafkaError{code=_TRANSPORT,val=-195,str="hello.com:9092/bootstrap: Connection setup timed out in state CONNECT (after 30403ms in state CONNECT)"}
[2024-11-23, 13:25:36 UTC] {taskinstance.py:3097} ERROR - Task failed with exception
Traceback (most recent call last):
  File "/opt/airflow/airflow/models/taskinstance.py", line 759, in _execute_task
    result = _execute_callable(context=context, **execute_callable_kwargs)
  File "/opt/airflow/airflow/models/taskinstance.py", line 725, in _execute_callable
    return ExecutionCallableRunner(
  File "/opt/airflow/airflow/utils/operator_helpers.py", line 268, in run
    return func(*args, **kwargs)
  File "/opt/airflow/airflow/models/baseoperator.py", line 375, in wrapper
    return func(self, *args, **kwargs)
  File "/opt/airflow/providers/src/airflow/providers/apache/kafka/operators/consume.py", line 162, in execute
    msgs = consumer.consume(num_messages=batch_size, timeout=self.poll_timeout)
  File "/opt/airflow/providers/src/airflow/providers/apache/kafka/hooks/consume.py", line 31, in error_callback
    print("Exception received: ", err)
  File "/opt/airflow/providers/src/airflow/providers/apache/kafka/hooks/consume.py", line 32, in error_callback
    raise KafkaAuthenticationError(f"Authentication failed: {err}")
airflow.providers.apache.kafka.hooks.consume.KafkaAuthenticationError: Authentication failed: KafkaError{code=_TRANSPORT,val=-195,str="hello.com:9092/bootstrap: Connection setup timed out in state CONNECT (after 30403ms in state CONNECT)"}
[2024-11-23, 13:25:36 UTC] {logging_mixin.py:191} INFO - Task instance in failure state
[2024-11-23, 13:25:36 UTC] {logging_mixin.py:191} INFO - Task start:2024-11-23 13:24:36.673356+00:00 end:2024-11-23 13:25:36.809691+00:00 duration:60.136335
[2024-11-23, 13:25:36 UTC] {logging_mixin.py:191} INFO - Task:<Task(ConsumeFromTopicOperator): consume_task> dag:<DAG: kafka_issue> dagrun:<DagRun kafka_issue @ 2024-11-23 13:24:25.243056+00:00: manual__2024-11-23T13:24:25.243056+00:00, state:running, queued_at: 2024-11-23 13:24:25.252615+00:00. externally triggered: True>
[2024-11-23, 13:25:36 UTC] {logging_mixin.py:191} INFO - Failure caused by Authentication failed: KafkaError{code=_TRANSPORT,val=-195,str="hello.com:9092/bootstrap: Connection setup timed out in state CONNECT (after 30403ms in state CONNECT)"}
[2024-11-23, 13:25:36 UTC] {taskinstance.py:1139} INFO - Marking task as UP_FOR_RETRY. dag_id=kafka_issue, task_id=consume_task, run_id=manual__2024-11-23T13:24:25.243056+00:00, logical_date=20241123T132425, start_date=20241123T132436, end_date=20241123T132536
[2024-11-23, 13:25:36 UTC] {taskinstance.py:346} ▼ Post task execution logs
[2024-11-23, 13:25:36 UTC] {standard_task_runner.py:178} ERROR - Failed to execute task_id=consume_task pid=15454
Traceback (most recent call last):
  File "/opt/airflow/airflow/task/standard_task_runner.py", line 171, in _start_by_fork
    ret = args.func(args, dag=self.dag)
  File "/opt/airflow/airflow/cli/cli_config.py", line 49, in command
    return func(*args, **kwargs)
  File "/opt/airflow/airflow/utils/cli.py", line 112, in wrapper
    return f(*args, **kwargs)
  File "/opt/airflow/airflow/cli/commands/task_command.py", line 462, in task_run
    task_return_code = _run_task_by_selected_method(args, _dag, ti)
  File "/opt/airflow/airflow/cli/commands/task_command.py", line 257, in _run_task_by_selected_method
    return _run_raw_task(args, ti)
  File "/opt/airflow/airflow/cli/commands/task_command.py", line 325, in _run_raw_task
    return ti._run_raw_task(
  File "/opt/airflow/airflow/utils/session.py", line 97, in wrapper
    return func(*args, session=session, **kwargs)
  File "/opt/airflow/airflow/models/taskinstance.py", line 2790, in _run_raw_task
    return _run_raw_task(
  File "/opt/airflow/airflow/models/taskinstance.py", line 279, in _run_raw_task
    TaskInstance._execute_task_with_callbacks(
  File "/opt/airflow/airflow/models/taskinstance.py", line 2937, in _execute_task_with_callbacks
    result = self._execute_task(context, task_orig)
  File "/opt/airflow/airflow/models/taskinstance.py", line 2961, in _execute_task
    return _execute_task(self, context, task_orig)
  File "/opt/airflow/airflow/models/taskinstance.py", line 759, in _execute_task
    result = _execute_callable(context=context, **execute_callable_kwargs)
  File "/opt/airflow/airflow/models/taskinstance.py", line 725, in _execute_callable
    return ExecutionCallableRunner(
  File "/opt/airflow/airflow/utils/operator_helpers.py", line 268, in run
    return func(*args, **kwargs)
  File "/opt/airflow/airflow/models/baseoperator.py", line 375, in wrapper
    return func(self, *args, **kwargs)
  File "/opt/airflow/providers/src/airflow/providers/apache/kafka/operators/consume.py", line 162, in execute
    msgs = consumer.consume(num_messages=batch_size, timeout=self.poll_timeout)
  File "/opt/airflow/providers/src/airflow/providers/apache/kafka/hooks/consume.py", line 31, in error_callback
    print("Exception received: ", err)
  File "/opt/airflow/providers/src/airflow/providers/apache/kafka/hooks/consume.py", line 32, in error_callback
    raise KafkaAuthenticationError(f"Authentication failed: {err}")
airflow.providers.apache.kafka.hooks.consume.KafkaAuthenticationError: Authentication failed: KafkaError{code=_TRANSPORT,val=-195,str="hello.com:9092/bootstrap: Connection setup timed out in state CONNECT (after 30403ms in state CONNECT)"}
[2024-11-23, 13:25:36 UTC] {local_task_job_runner.py:263} INFO - Task exited with return code 1
[2024-11-23, 13:25:36 UTC] {local_task_job_runner.py:242} ▲▲▲ Log group end

This is the connection I used:

{
  "bootstrap.servers": "hello.com:9092",
  "group.id": "ea"
}