apache / airflow

Apache Airflow - A platform to programmatically author, schedule, and monitor workflows
https://airflow.apache.org/
Apache License 2.0
36.49k stars 14.13k forks source link

After deleting a dag from the dags directory, it is still displayed in the UI #42542

Open shahar1 opened 1 day ago

shahar1 commented 1 day ago

Discussed in https://github.com/apache/airflow/discussions/40331

Originally posted by **tatyana12345** June 19, 2024 ### Apache Airflow version Other Airflow 2 version (please specify below) ### If "Other Airflow 2 version" selected, which one? 2.8.3 ### What happened? Dag has been removed from the airflow dags directory /opt/airflow/dags. Dag is not displayed when running the airflow dags list command. But it is still available in the Airflow UI, and it is also available in the Airflow postgres database. ### What you think should happen instead? It is expected that dag will be automatically removed from the web interface after deleting dag from the dags directory. ### How to reproduce Remove dag from the /opt/airflow/dags directory and it will still be available in the web interface even after a few days ### Operating System Linux ### Versions of Apache Airflow Providers _No response_ ### Deployment Official Apache Airflow Helm Chart ### Deployment details _No response_ ### Anything else? _No response_ ### Are you willing to submit PR? - [ ] Yes I am willing to submit a PR! ### Code of Conduct - [X] I agree to follow this project's [Code of Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
shahar1 commented 1 day ago

Following the reports in the discussion #40331 - I've reopened it as an issue. I didn't manage to reproduce it on breeze deployment. I would like to ask from the OP (@tatyana12345) and the commentors (@cjj1120, @falukelo, @gaelxcowi) to post the exact configuration, deployment environment, Airflow version, and steps to reproduce the issue (preferrably with accompanying timestamps and images from the UI). After we manage to reproduce the issue, I'll be happy to review PRs from the community that resolve it.

gaelxcowi commented 22 hours ago

@shahar1 Hopefully I can capture everything, if not let me know.

Here is a minimal exmaple, hopefully I have covered enough evidence:

Disclaimer: I wasn't yet able to reproduce 100% my behaviour - but I will comeback to that later.

So, here are the configs:

[core]
dags_folder = /opt/airflow/dags/repo/
hostname_callable = airflow.utils.net.getfqdn
might_contain_dag_callable = airflow.utils.file.might_contain_dag_via_default_heuristic
default_timezone = utc
executor = KubernetesExecutor
auth_manager = airflow.providers.fab.auth_manager.fab_auth_manager.FabAuthManager
parallelism = 32
max_active_tasks_per_dag = 18
dags_are_paused_at_creation = True
max_active_runs_per_dag = 18
max_consecutive_failed_dag_runs_per_dag = 0
# mp_start_method =
load_examples = false
plugins_folder = /opt/airflow/plugins
execute_tasks_new_python_interpreter = False
fernet_key =<removed>
donot_pickle = True
dagbag_import_timeout = 100
dagbag_import_error_tracebacks = True
dagbag_import_error_traceback_depth = 2
dag_file_processor_timeout = 150
task_runner = StandardTaskRunner
default_impersonation =
security =
unit_test_mode = False
enable_xcom_pickling = False
allowed_deserialization_classes = airflow.*
allowed_deserialization_classes_regexp =
killed_task_cleanup_time = 60
dag_run_conf_overrides_params = True
dag_discovery_safe_mode = True
dag_ignore_file_syntax = regexp
default_task_retries = 0
default_task_retry_delay = 300
max_task_retry_delay = 86400
default_task_weight_rule = downstream
task_success_overtime = 20
default_task_execution_timeout =
min_serialized_dag_update_interval = 30
compress_serialized_dags = False
min_serialized_dag_fetch_interval = 10
max_num_rendered_ti_fields_per_task = 30
check_slas = True
xcom_backend = airflow.models.xcom.BaseXCom
lazy_load_plugins = True
lazy_discover_providers = True
hide_sensitive_var_conn_fields = True
sensitive_var_conn_names =
default_pool_task_slot_count = 128
max_map_length = 1024
daemon_umask = 0o077
# dataset_manager_class =
# dataset_manager_kwargs =
strict_dataset_uri_validation = False
database_access_isolation = False
# internal_api_url =
internal_api_secret_key = <removed>
test_connection = Disabled
max_templated_field_length = 4096
[database]
alembic_ini_file_path = alembic.ini
sql_alchemy_conn = postgresql+psycopg2://<removed>
# sql_alchemy_engine_args =
sql_engine_encoding = utf-8
# sql_engine_collation_for_ids =
sql_alchemy_pool_enabled = True
sql_alchemy_pool_size = 5
sql_alchemy_max_overflow = 10
sql_alchemy_pool_recycle = 1800
sql_alchemy_pool_pre_ping = True
sql_alchemy_schema =
# sql_alchemy_connect_args =
# sql_alchemy_session_maker =
load_default_connections = True
max_db_retries = 3
check_migrations = True
[logging]
base_log_folder = /opt/airflow/logs
remote_logging = False
remote_log_conn_id =
delete_local_logs = False
google_key_path =
remote_base_log_folder =
remote_task_handler_kwargs =
encrypt_s3_logs = False
logging_level = INFO
celery_logging_level =
fab_logging_level = WARNING
logging_config_class =
colored_console_log = True
colored_log_format = [%%(blue)s%%(asctime)s%%(reset)s] {%%(blue)s%%(filename)s:%%(reset)s%%(lineno)d} %%(log_color)s%%(levelname)s%%(reset)s - %%(log_color)s%%(message)s%%(reset)s
colored_formatter_class = airflow.utils.log.colored_log.CustomTTYColoredFormatter
log_format = [%%(asctime)s] {%%(filename)s:%%(lineno)d} %%(levelname)s - %%(message)s
simple_log_format = %%(asctime)s %%(levelname)s - %%(message)s
dag_processor_log_target = file
dag_processor_log_format = [%%(asctime)s] [SOURCE:DAG_PROCESSOR] {%%(filename)s:%%(lineno)d} %%(levelname)s - %%(message)s
log_formatter_class = airflow.utils.log.timezone_aware.TimezoneAware
secret_mask_adapter =
task_log_prefix_template =
log_filename_template = dag_id={{ ti.dag_id }}/run_id={{ ti.run_id }}/task_id={{ ti.task_id }}/{%% if ti.map_index >= 0 %%}map_index={{ ti.map_index }}/{%% endif %%}attempt={{ try_number }}.log
log_processor_filename_template = {{ filename }}.log
dag_processor_manager_log_location = /opt/airflow/logs/dag_processor_manager/dag_processor_manager.log
dag_processor_manager_log_stdout = False
task_log_reader = task
extra_logger_names =
worker_log_server_port = 8793
trigger_log_server_port = 8794
# interleave_timestamp_parser =
file_task_handler_new_folder_permissions = 0o775
file_task_handler_new_file_permissions = 0o664
celery_stdout_stderr_separation = False
enable_task_context_logger = True
color_log_error_keywords = error,exception
color_log_warning_keywords = warn
[metrics]
metrics_use_pattern_match = False
metrics_allow_list =
metrics_block_list =
statsd_on = False
statsd_host = localhost
statsd_port = 8125
statsd_prefix = airflow
stat_name_handler =
statsd_datadog_enabled = False
statsd_datadog_tags =
statsd_datadog_metrics_tags = True
# statsd_custom_client_path =
statsd_disabled_tags = job_id,run_id
statsd_influxdb_enabled = False
otel_on = False
otel_host = localhost
otel_port = 8889
otel_prefix = airflow
otel_interval_milliseconds = 60000
otel_debugging_on = False
otel_ssl_active = False
[traces]
otel_on = False
otel_host = localhost
otel_port = 8889
otel_service = Airflow
otel_debugging_on = False
otel_ssl_active = False
otel_task_log_event = False
[secrets]
backend =
backend_kwargs =
use_cache = False
cache_ttl_seconds = 900
[cli]
api_client = airflow.api.client.local_client
endpoint_url = http://localhost:8080
[debug]
fail_fast = False
[api]
enable_experimental_api = False
auth_backends = airflow.api.auth.backend.session
maximum_page_limit = 100
fallback_page_limit = 100
google_oauth2_audience =
google_key_path =
access_control_allow_headers =
access_control_allow_methods =
access_control_allow_origins =
enable_xcom_deserialize_support = False
[lineage]
backend =
[operators]
default_owner = airflow
default_deferrable = false
default_cpus = 1
default_ram = 512
default_disk = 512
default_gpus = 0
default_queue = default
allow_illegal_arguments = True
[webserver]
access_denied_message = Access is Denied
config_file = /opt/airflow/webserver_config.py
base_url = <removed>
default_ui_timezone = UTC
web_server_host = 0.0.0.0
web_server_port = 8080
web_server_ssl_cert =
web_server_ssl_key =
session_backend = database
web_server_master_timeout = 600
web_server_worker_timeout = 600
worker_refresh_batch_size = 1
worker_refresh_interval = 6000
reload_on_plugin_change = False
secret_key = THIS IS UNSAFE!
workers = 4
worker_class = sync
access_logfile = -
error_logfile = -
access_logformat =
expose_config = non-sensitive-only
expose_hostname = False
expose_stacktrace = False
dag_default_view = grid
dag_orientation = LR
grid_view_sorting_order = topological
log_fetch_timeout_sec = 5
log_fetch_delay_sec = 2
log_auto_tailing_offset = 30
log_animation_speed = 1000
hide_paused_dags_by_default = False
page_size = 100
navbar_color = #fff
navbar_text_color = #51504f
navbar_hover_color = #eee
navbar_text_hover_color = #51504f
navbar_logo_text_color = #51504f
default_dag_run_display_number = 25
enable_proxy_fix = True
proxy_fix_x_for = 1
proxy_fix_x_proto = 1
proxy_fix_x_host = 1
proxy_fix_x_port = 1
proxy_fix_x_prefix = 1
cookie_secure = False
cookie_samesite = Lax
default_wrap = False
x_frame_enabled = True
# analytics_tool =
# analytics_id =
# analytics_url =
show_recent_stats_for_completed_runs = True
session_lifetime_minutes = 43200
# instance_name =
instance_name_has_markup = False
auto_refresh_interval = 3
warn_deployment_exposure = False
# audit_view_excluded_events =
# audit_view_included_events =
enable_swagger_ui = True
run_internal_api = False
caching_hash_method = md5
show_trigger_form_if_no_params = False
num_recent_configurations_for_trigger = 5
allow_raw_html_descriptions = False
allowed_payload_size = 1.0
require_confirmation_dag_change = False
[email]
email_backend = airflow.utils.email.send_email_smtp
email_conn_id = smtp_default
default_email_on_retry = True
default_email_on_failure = True
# subject_template =
# html_content_template =
# from_email =
ssl_context = default
[smtp]
smtp_host = localhost
smtp_starttls = True
smtp_ssl = False
# smtp_user =
# smtp_password =
smtp_port = 25
smtp_mail_from = airflow@example.com
smtp_timeout = 30
smtp_retry_limit = 5
[sentry]
sentry_on = false
sentry_dsn =
# before_send =
[scheduler]
job_heartbeat_sec = 5
scheduler_heartbeat_sec = 5
local_task_job_heartbeat_sec = 0
num_runs = -1
scheduler_idle_sleep_time = 1
min_file_process_interval = 30
parsing_cleanup_interval = 30
stale_dag_threshold = 50
dag_dir_list_interval = 60
print_stats_interval = 30
pool_metrics_interval = 5.0
scheduler_health_check_threshold = 30
enable_health_check = False
scheduler_health_check_server_host = 0.0.0.0
scheduler_health_check_server_port = 8974
orphaned_tasks_check_interval = 300.0
child_process_log_directory = /opt/airflow/logs/scheduler
scheduler_zombie_task_threshold = 300
zombie_detection_interval = 10.0
catchup_by_default = True
ignore_first_depends_on_past_by_default = True
max_tis_per_query = 16
use_row_level_locking = True
max_dagruns_to_create_per_loop = 10
max_dagruns_per_loop_to_schedule = 20
schedule_after_task_execution = True
parsing_pre_import_modules = True
parsing_processes = 2
file_parsing_sort_mode = modified_time
standalone_dag_processor = False
max_callbacks_per_loop = 20
dag_stale_not_seen_duration = 600
use_job_schedule = True
allow_trigger_in_future = False
trigger_timeout_check_interval = 15
task_queued_timeout = 600.0
task_queued_timeout_check_interval = 120.0
allowed_run_id_pattern = ^[A-Za-z0-9_.~:+-]+$
create_cron_data_intervals = True
[triggerer]
default_capacity = 1000
job_heartbeat_sec = 5
triggerer_health_check_threshold = 30
[kerberos]
ccache = /tmp/airflow_krb5_ccache
principal = airflow
reinit_frequency = 3600
kinit_path = kinit
keytab = airflow.keytab
forwardable = True
include_ip = True
[sensors]
default_timeout = 604800
[usage_data_collection]
enabled = True
[aws]
# session_factory =
cloudwatch_task_handler_json_serializer = airflow.providers.amazon.aws.log.cloudwatch_task_handler.json_serialize_legacy
[aws_batch_executor]
conn_id = aws_default
# region_name =
max_submit_job_attempts = 3
check_health_on_startup = True
# job_name =
# job_queue =
# job_definition =
# submit_job_kwargs =
[aws_ecs_executor]
conn_id = aws_default
# region_name =
assign_public_ip = False
# cluster =
# capacity_provider_strategy =
# container_name =
# launch_type =
platform_version = LATEST
# security_groups =
# subnets =
# task_definition =
max_run_task_attempts = 3
# run_task_kwargs =
check_health_on_startup = True
[aws_auth_manager]
enable = False
conn_id = aws_default
# region_name =
# saml_metadata_url =
# avp_policy_store_id =
[celery_kubernetes_executor]
kubernetes_queue = kubernetes
[celery]
celery_app_name = airflow.providers.celery.executors.celery_executor
worker_concurrency = 16
# worker_autoscale =
worker_prefetch_multiplier = 1
worker_enable_remote_control = true
broker_url = redis://redis:6379/0
# result_backend =
result_backend_sqlalchemy_engine_options =
flower_host = 0.0.0.0
flower_url_prefix =
flower_port = 5555
flower_basic_auth =
sync_parallelism = 0
celery_config_options = airflow.providers.celery.executors.default_celery.DEFAULT_CELERY_CONFIG
ssl_active = False
ssl_key =
ssl_cert =
ssl_cacert =
pool = prefork
operation_timeout = 1.0
task_acks_late = True
task_track_started = True
task_publish_max_retries = 3
worker_precheck = False
[celery_broker_transport_options]
# visibility_timeout =
# sentinel_kwargs =
[local_kubernetes_executor]
kubernetes_queue = kubernetes
[kubernetes_executor]
api_client_retry_configuration =
logs_task_metadata = False
pod_template_file = /opt/airflow/pod_templates/pod_template.yaml
worker_container_repository = apache/airflow
worker_container_tag = 2.10.0-python3.11
namespace = airflow-alpha
delete_worker_pods = True
delete_worker_pods_on_failure = True
worker_pod_pending_fatal_container_state_reasons = CreateContainerConfigError,ErrImagePull,CreateContainerError,ImageInspectError, InvalidImageName
worker_pods_creation_batch_size = 10
multi_namespace_mode = False
multi_namespace_mode_namespace_list =
in_cluster = True
# cluster_context =
# config_file =
kube_client_request_args = {"_request_timeout": [240, 240]}
delete_option_kwargs =
enable_tcp_keepalive = True
tcp_keep_idle = 120
tcp_keep_intvl = 30
tcp_keep_cnt = 6
verify_ssl = True
worker_pods_queued_check_interval = 60
ssl_ca_cert =
task_publish_max_retries = 0
[common.io]
xcom_objectstorage_path =
xcom_objectstorage_threshold = -1
xcom_objectstorage_compression =
[elasticsearch]
host =
log_id_template = {dag_id}-{task_id}-{run_id}-{map_index}-{try_number}
end_of_log_mark = end_of_log
frontend =
write_stdout = False
json_format = False
json_fields = asctime, filename, lineno, levelname, message
host_field = host
offset_field = offset
index_patterns = _all
index_patterns_callable =
[elasticsearch_configs]
http_compress = False
verify_certs = True
[fab]
auth_rate_limited = True
auth_rate_limit = 5 per 40 second
update_fab_perms = True
[imap]
# ssl_context =
[azure_remote_logging]
remote_wasb_log_container = airflow-logs
[openlineage]
disabled = False
disabled_for_operators =
selective_enable = False
# namespace =
# extractors =
custom_run_facets =
config_path =
transport =
disable_source_code = False
dag_state_change_process_pool_size = 1
execution_timeout = 10
include_full_task_info = False
[smtp_provider]
# ssl_context =
# templated_email_subject_path =
# templated_html_content_path =

tried with airflow 2.10.0 and 2.10.2 (but have been observing the issues since 2.8.x). dags are kept in a git repository (Azure Repos) deployed on a k8s cluster (AKS) using the community helm chart - not sure if you have all the infor from configs or something more is needed from values as well?

so here is the image of the commit - delete a dag at around 21:01: Screenshot_1

here is the log from git sync:

Screenshot_3

here is the ui at around 21:02- still present: Screenshot_2

here is the dag folder (scheduler) 21:05:

Screenshot_4

here is dag ui and dag table 21:53:

Screenshot_6 Screenshot_7

here are the logs after ~ 1h from scheduler when trying to run the dag (after restarting the scheduler:

/home/airflow/.local/lib/python3.11/site-packages/airflow/plugins_manager.py:30 DeprecationWarning: 'cgitb' is deprecated and slated for removal in Python 3.13
  ____________       _____________
 ____    |__( )_________  __/__  /________      __
____  /| |_  /__  ___/_  /_ __  /_  __ \_ | /| / /
___  ___ |  / _  /   _  __/ _  / / /_/ /_ |/ |/ /
 _/_/  |_/_/  /_/    /_/    /_/  \____/____/|__/
[2024-09-27T20:08:09.049+0000] {_client.py:1026} INFO - HTTP Request: GET https://apacheairflow.gateway.scarf.sh/scheduler?version=2.10.0&python_version=3.11.9&platform=Linux&arch=x86_64&database=postgresql&db_version=14.9&executor=KubernetesExecutor "HTTP/1.1 200 OK"
/home/airflow/.local/lib/python3.11/site-packages/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py:143 FutureWarning: The config section [kubernetes] has been renamed to [kubernetes_executor]. Please update your `conf.get*` call to use the new name
[2024-09-27T20:08:10.788+0000] {executor_loader.py:254} INFO - Loaded executor: KubernetesExecutor
[2024-09-27T20:08:10.901+0000] {scheduler_job_runner.py:935} INFO - Starting the scheduler
[2024-09-27T20:08:10.901+0000] {scheduler_job_runner.py:942} INFO - Processing each file at most -1 times
[2024-09-27T20:08:10.902+0000] {kubernetes_executor.py:287} INFO - Start Kubernetes executor
[2024-09-27T20:08:10.946+0000] {kubernetes_executor_utils.py:140} INFO - Event: and now my watch begins starting at resource_version: 0
[2024-09-27T20:08:10.990+0000] {kubernetes_executor.py:208} INFO - Found 0 queued task instances
[2024-09-27T20:08:10.999+0000] {manager.py:174} INFO - Launched DagFileProcessorManager with pid: 89
[2024-09-27T20:08:11.002+0000] {scheduler_job_runner.py:1843} INFO - Adopting or resetting orphaned tasks for active dag runs
[2024-09-27T20:08:11.011+0000] {settings.py:63} INFO - Configured default timezone UTC
[2024-09-27T20:08:21.101+0000] {scheduler_job_runner.py:423} INFO - 1 tasks up for execution:
        <TaskInstance: example_dag_to_be_deleted.task-kubernetes manual__2024-09-27T20:08:20.051996+00:00 [scheduled]>
[2024-09-27T20:08:21.102+0000] {scheduler_job_runner.py:495} INFO - DAG example_dag_to_be_deleted has 0/18 running and queued tasks
[2024-09-27T20:08:21.102+0000] {scheduler_job_runner.py:634} INFO - Setting the following tasks to queued state:
        <TaskInstance: example_dag_to_be_deleted.task-kubernetes manual__2024-09-27T20:08:20.051996+00:00 [scheduled]>
[2024-09-27T20:08:21.105+0000] {scheduler_job_runner.py:736} INFO - Trying to enqueue tasks: [<TaskInstance: example_dag_to_be_deleted.task-kubernetes manual__2024-09-27T20:08:20.051996+00:00 [scheduled]>] for executor: KubernetesExecutor(parallelism=32)
[2024-09-27T20:08:21.106+0000] {scheduler_job_runner.py:680} INFO - Sending TaskInstanceKey(dag_id='example_dag_to_be_deleted', task_id='task-kubernetes', run_id='manual__2024-09-27T20:08:20.051996+00:00', try_number=1, map_index=-1) to KubernetesExecutor with priority 1 and queue kubernetes
[2024-09-27T20:08:21.106+0000] {base_executor.py:168} INFO - Adding to queue: ['airflow', 'tasks', 'run', 'example_dag_to_be_deleted', 'task-kubernetes', 'manual__2024-09-27T20:08:20.051996+00:00', '--local', '--subdir', 'DAGS_FOLDER/example_dag_to_be_deleted.py']
[2024-09-27T20:08:21.109+0000] {kubernetes_executor.py:326} INFO - Add task TaskInstanceKey(dag_id='example_dag_to_be_deleted', task_id='task-kubernetes', run_id='manual__2024-09-27T20:08:20.051996+00:00', try_number=1, map_index=-1) with command ['airflow', 'tasks', 'run', 'example_dag_to_be_deleted', 'task-kubernetes', 'manual__2024-09-27T20:08:20.051996+00:00', '--local', '--subdir', 'DAGS_FOLDER/example_dag_to_be_deleted.py']
[2024-09-27T20:08:21.145+0000] {kubernetes_executor_utils.py:426} INFO - Creating kubernetes pod for job is TaskInstanceKey(dag_id='example_dag_to_be_deleted', task_id='task-kubernetes', run_id='manual__2024-09-27T20:08:20.051996+00:00', try_number=1, map_index=-1), with pod name example-dag-to-be-deleted-task-kubernetes-tqqfawxa, annotations: <omitted>
[2024-09-27T20:08:21.199+0000] {scheduler_job_runner.py:764} INFO - Received executor event with state queued for task instance TaskInstanceKey(dag_id='example_dag_to_be_deleted', task_id='task-kubernetes', run_id='manual__2024-09-27T20:08:20.051996+00:00', try_number=1, map_index=-1)
[2024-09-27T20:08:21.206+0000] {kubernetes_executor_utils.py:265} INFO - Event: example-dag-to-be-deleted-task-kubernetes-tqqfawxa Pending, annotations: <omitted>
[2024-09-27T20:08:21.211+0000] {scheduler_job_runner.py:791} INFO - Setting external_id for <TaskInstance: example_dag_to_be_deleted.task-kubernetes manual__2024-09-27T20:08:20.051996+00:00 [queued]> to 10
[2024-09-27T20:08:21.215+0000] {kubernetes_executor_utils.py:265} INFO - Event: example-dag-to-be-deleted-task-kubernetes-tqqfawxa Pending, annotations: <omitted>
[2024-09-27T20:08:21.234+0000] {kubernetes_executor_utils.py:265} INFO - Event: example-dag-to-be-deleted-task-kubernetes-tqqfawxa Pending, annotations: <omitted>
[2024-09-27T20:08:22.202+0000] {kubernetes_executor_utils.py:265} INFO - Event: example-dag-to-be-deleted-task-kubernetes-tqqfawxa Pending, annotations: <omitted>
[2024-09-27T20:08:30.227+0000] {kubernetes_executor_utils.py:265} INFO - Event: example-dag-to-be-deleted-task-kubernetes-tqqfawxa Pending, annotations: <omitted>
[2024-09-27T20:08:31.230+0000] {kubernetes_executor_utils.py:299} INFO - Event: example-dag-to-be-deleted-task-kubernetes-tqqfawxa is Running, annotations: <omitted>
[2024-09-27T20:08:47.288+0000] {kubernetes_executor_utils.py:299} INFO - Event: example-dag-to-be-deleted-task-kubernetes-tqqfawxa is Running, annotations: <omitted>
[2024-09-27T20:08:48.480+0000] {kubernetes_executor_utils.py:269} ERROR - Event: example-dag-to-be-deleted-task-kubernetes-tqqfawxa Failed, annotations: <omitted>
[2024-09-27T20:08:48.764+0000] {kubernetes_executor.py:370} INFO - Changing state of (TaskInstanceKey(dag_id='example_dag_to_be_deleted', task_id='task-kubernetes', run_id='manual__2024-09-27T20:08:20.051996+00:00', try_number=1, map_index=-1), <TaskInstanceState.FAILED: 'failed'>, 'example-dag-to-be-deleted-task-kubernetes-tqqfawxa', 'airflow-alpha', '328757893') to failed
[2024-09-27T20:08:48.795+0000] {kubernetes_executor_utils.py:269} ERROR - Event: example-dag-to-be-deleted-task-kubernetes-tqqfawxa Failed, annotations: <omitted>
[2024-09-27T20:08:48.802+0000] {kubernetes_executor.py:475} INFO - Deleted pod associated with the TI TaskInstanceKey(dag_id='example_dag_to_be_deleted', task_id='task-kubernetes', run_id='manual__2024-09-27T20:08:20.051996+00:00', try_number=1, map_index=-1). Pod name: example-dag-to-be-deleted-task-kubernetes-tqqfawxa. Namespace: airflow-alpha
[2024-09-27T20:08:48.803+0000] {scheduler_job_runner.py:764} INFO - Received executor event with state failed for task instance TaskInstanceKey(dag_id='example_dag_to_be_deleted', task_id='task-kubernetes', run_id='manual__2024-09-27T20:08:20.051996+00:00', try_number=1, map_index=-1)
[2024-09-27T20:08:48.811+0000] {scheduler_job_runner.py:801} INFO - TaskInstance Finished: dag_id=example_dag_to_be_deleted, task_id=task-kubernetes, run_id=manual__2024-09-27T20:08:20.051996+00:00, map_index=-1, run_start_date=None, run_end_date=None, run_duration=None, state=queued, executor=KubernetesExecutor(parallelism=32), executor_state=failed, try_number=1, max_tries=0, job_id=None, pool=default_pool, queue=kubernetes, priority_weight=1, operator=PythonOperator, queued_dttm=2024-09-27 20:08:21.103374+00:00, queued_by_job_id=10, pid=None
[2024-09-27T20:08:48.812+0000] {scheduler_job_runner.py:907} ERROR - Executor KubernetesExecutor(parallelism=32) reported that the task instance <TaskInstance: example_dag_to_be_deleted.task-kubernetes manual__2024-09-27T20:08:20.051996+00:00 [queued]> finished with state failed, but the task instance's state attribute is queued. Learn more: https://airflow.apache.org/docs/apache-airflow/stable/troubleshooting.html#task-state-changed-externally
[2024-09-27T20:08:48.822+0000] {taskinstance.py:3303} ERROR - Executor KubernetesExecutor(parallelism=32) reported that the task instance <TaskInstance: example_dag_to_be_deleted.task-kubernetes manual__2024-09-27T20:08:20.051996+00:00 [queued]> finished with state failed, but the task instance's state attribute is queued. Learn more: https://airflow.apache.org/docs/apache-airflow/stable/troubleshooting.html#task-state-changed-externally
[2024-09-27T20:08:48.843+0000] {taskinstance.py:1225} INFO - Marking task as FAILED. dag_id=example_dag_to_be_deleted, task_id=task-kubernetes, run_id=manual__2024-09-27T20:08:20.051996+00:00, execution_date=20240927T200820, start_date=, end_date=20240927T200848
[2024-09-27T20:08:49.127+0000] {kubernetes_executor_utils.py:269} ERROR - Event: example-dag-to-be-deleted-task-kubernetes-tqqfawxa Failed, annotations: <omitted>
[2024-09-27T20:08:49.955+0000] {dagrun.py:823} ERROR - Marking run <DagRun example_dag_to_be_deleted @ 2024-09-27 20:08:20.051996+00:00: manual__2024-09-27T20:08:20.051996+00:00, state:running, queued_at: 2024-09-27 20:08:20.084580+00:00. externally triggered: True> failed
[2024-09-27T20:08:49.956+0000] {dagrun.py:905} INFO - DagRun Finished: dag_id=example_dag_to_be_deleted, execution_date=2024-09-27 20:08:20.051996+00:00, run_id=manual__2024-09-27T20:08:20.051996+00:00, run_start_date=2024-09-27 20:08:20.303660+00:00, run_end_date=2024-09-27 20:08:49.956078+00:00, run_duration=29.652418, state=failed, external_trigger=True, run_type=manual, data_interval_start=2024-09-26 00:00:00+00:00, data_interval_end=2024-09-27 00:00:00+00:00, dag_hash=1a61ee6a34486df6240d5abb977f4507
[2024-09-27T20:08:49.982+0000] {kubernetes_executor.py:370} INFO - Changing state of (TaskInstanceKey(dag_id='example_dag_to_be_deleted', task_id='task-kubernetes', run_id='manual__2024-09-27T20:08:20.051996+00:00', try_number=1, map_index=-1), <TaskInstanceState.FAILED: 'failed'>, 'example-dag-to-be-deleted-task-kubernetes-tqqfawxa', 'airflow-alpha', '328757894') to failed
[2024-09-27T20:08:49.991+0000] {kubernetes_executor.py:475} INFO - Deleted pod associated with the TI TaskInstanceKey(dag_id='example_dag_to_be_deleted', task_id='task-kubernetes', run_id='manual__2024-09-27T20:08:20.051996+00:00', try_number=1, map_index=-1). Pod name: example-dag-to-be-deleted-task-kubernetes-tqqfawxa. Namespace: airflow-alpha
[2024-09-27T20:08:49.993+0000] {kubernetes_executor.py:370} INFO - Changing state of (TaskInstanceKey(dag_id='example_dag_to_be_deleted', task_id='task-kubernetes', run_id='manual__2024-09-27T20:08:20.051996+00:00', try_number=1, map_index=-1), <TaskInstanceState.FAILED: 'failed'>, 'example-dag-to-be-deleted-task-kubernetes-tqqfawxa', 'airflow-alpha', '328757895') to failed
[2024-09-27T20:08:50.000+0000] {kubernetes_executor.py:475} INFO - Deleted pod associated with the TI TaskInstanceKey(dag_id='example_dag_to_be_deleted', task_id='task-kubernetes', run_id='manual__2024-09-27T20:08:20.051996+00:00', try_number=1, map_index=-1). Pod name: example-dag-to-be-deleted-task-kubernetes-tqqfawxa. Namespace: airflow-alpha
[2024-09-27T20:08:50.002+0000] {scheduler_job_runner.py:764} INFO - Received executor event with state failed for task instance TaskInstanceKey(dag_id='example_dag_to_be_deleted', task_id='task-kubernetes', run_id='manual__2024-09-27T20:08:20.051996+00:00', try_number=1, map_index=-1)
[2024-09-27T20:08:50.010+0000] {scheduler_job_runner.py:801} INFO - TaskInstance Finished: dag_id=example_dag_to_be_deleted, task_id=task-kubernetes, run_id=manual__2024-09-27T20:08:20.051996+00:00, map_index=-1, run_start_date=None, run_end_date=2024-09-27 20:08:48.831070+00:00, run_duration=None, state=failed, executor=KubernetesExecutor(parallelism=32), executor_state=failed, try_number=1, max_tries=0, job_id=None, pool=default_pool, queue=kubernetes, priority_weight=1, operator=PythonOperator, queued_dttm=2024-09-27 20:08:21.103374+00:00, queued_by_job_id=10, pid=None
[2024-09-27T20:09:11.082+0000] {kubernetes_executor.py:208} INFO - Found 0 queued task instances
[2024-09-27T20:09:19.134+0000] {kubernetes_executor_utils.py:101} INFO - Kubernetes watch timed out waiting for events. Restarting watch.
[2024-09-27T20:09:20.136+0000] {kubernetes_executor_utils.py:140} INFO - Event: and now my watch begins starting at resource_version: 0

the difference from my original setup to this is that in my original setup I run a KubernetesPodOperator - that similar to the empty operator actually runs successfully without any issues even tho the dag is not there.

In the new setup I have an EmptyOperator and a PythonOperator - the empty operator runs okay, but the python operator fails saying that the dag cannot be found.

In my original setup these are the logs I get, even tho the dag is not there:

/home/airflow/.local/lib/python3.11/site-packages/airflow/plugins_manager.py:30 DeprecationWarning: 'cgitb' is deprecated and slated for removal in Python 3.13
[2024-09-27T19:09:22.402+0000] {dagbag.py:587} INFO - Filling up the DagBag from /opt/airflow/dags/repo/A235712_gea_test.py
[2024-09-27T19:09:22.404+0000] {cli.py:243} WARNING - Dag 'Example_dag' not found in path /opt/airflow/dags/repo/Example_dag.py; trying path /opt/airflow/dags/repo/
[2024-09-27T19:09:22.404+0000] {dagbag.py:587} INFO - Filling up the DagBag from /opt/airflow/dags/repo/