apache / airflow

Apache Airflow - A platform to programmatically author, schedule, and monitor workflows
https://airflow.apache.org/
Apache License 2.0
35.35k stars 13.8k forks source link

Executor reports task instance (...) finished (failed) although the task says it's queued #39717

Open andreyvital opened 1 month ago

andreyvital commented 1 month ago

Apache Airflow version

2.9.1

If "Other Airflow 2 version" selected, which one?

No response

What happened?

[2024-05-20T12:03:24.184+0000] {task_context_logger.py:91} ERROR - Executor reports task instance
<TaskInstance: (...) scheduled__2024-05-20T11:00:00+00:00 map_index=15 [queued]> 
finished (failed) although the task says it's queued. (Info: None) Was the task killed externally?

What you think should happen instead?

No response

How to reproduce

I am not sure, unfortunately. But every day I see my tasks being killed randomly without good reasoning behind why it got killed/failed.

Operating System

Ubuntu 22.04.4 LTS

Versions of Apache Airflow Providers

apache-airflow==2.9.1
apache-airflow-providers-amazon==8.20.0
apache-airflow-providers-celery==3.6.2
apache-airflow-providers-cncf-kubernetes==8.1.1
apache-airflow-providers-common-io==1.3.1
apache-airflow-providers-common-sql==1.12.0
apache-airflow-providers-docker==3.10.0
apache-airflow-providers-elasticsearch==5.3.4
apache-airflow-providers-fab==1.0.4
apache-airflow-providers-ftp==3.8.0
apache-airflow-providers-google==10.17.0
apache-airflow-providers-grpc==3.4.1
apache-airflow-providers-hashicorp==3.6.4
apache-airflow-providers-http==4.10.1
apache-airflow-providers-imap==3.5.0
apache-airflow-providers-microsoft-azure==10.0.0
apache-airflow-providers-mongo==4.0.0
apache-airflow-providers-mysql==5.5.4
apache-airflow-providers-odbc==4.5.0
apache-airflow-providers-openlineage==1.7.0
apache-airflow-providers-postgres==5.10.2
apache-airflow-providers-redis==3.6.1
apache-airflow-providers-sendgrid==3.4.0
apache-airflow-providers-sftp==4.9.1
apache-airflow-providers-slack==8.6.2
apache-airflow-providers-smtp==1.6.1
apache-airflow-providers-snowflake==5.4.0
apache-airflow-providers-sqlite==3.7.1
apache-airflow-providers-ssh==3.10.1

Deployment

Docker-Compose

Deployment details

Client: Docker Engine - Community
 Version:    26.1.3
 Context:    default
 Debug Mode: false
 Plugins:
  buildx: Docker Buildx (Docker Inc.)
    Version:  v0.14.0
    Path:     /usr/libexec/docker/cli-plugins/docker-buildx
  compose: Docker Compose (Docker Inc.)
    Version:  v2.27.0
    Path:     /usr/libexec/docker/cli-plugins/docker-compose
  scan: Docker Scan (Docker Inc.)
    Version:  v0.23.0
    Path:     /usr/libexec/docker/cli-plugins/docker-scan

Server:
 Containers: 30
  Running: 25
  Paused: 0
  Stopped: 5
 Images: 36
 Server Version: 26.1.3
 Storage Driver: overlay2
  Backing Filesystem: btrfs
  Supports d_type: true
  Using metacopy: false
  Native Overlay Diff: true
  userxattr: false
 Logging Driver: json-file
 Cgroup Driver: systemd
 Cgroup Version: 2
 Plugins:
  Volume: local
  Network: bridge host ipvlan macvlan null overlay
  Log: awslogs fluentd gcplogs gelf journald json-file local splunk syslog
 Swarm: inactive
 Runtimes: io.containerd.runc.v2 runc
 Default Runtime: runc
 Init Binary: docker-init
 containerd version: e377cd56a71523140ca6ae87e30244719194a521
 runc version: v1.1.12-0-g51d5e94
 init version: de40ad0
 Security Options:
  apparmor
  seccomp
   Profile: builtin
  cgroupns
 Kernel Version: 5.15.0-107-generic
 Operating System: Ubuntu 22.04.4 LTS
 OSType: linux
 Architecture: x86_64
 CPUs: 80
 Total Memory: 62.33GiB
 Name: troy
 ID: UFMO:HODB:7MRE:7O2C:FLWN:HE4Y:EZDF:ZGNF:OZRW:BUTZ:DBQK:MFR2
 Docker Root Dir: /var/lib/docker
 Debug Mode: false
 Experimental: false
 Insecure Registries:
  127.0.0.0/8
 Live Restore Enabled: false
OS: Ubuntu 22.04.4 LTS x86_64
Kernel: 5.15.0-107-generic
Uptime: 1 day, 23 hours, 12 mins
Packages: 847 (dpkg), 4 (snap)
Shell: fish 3.7.1
Resolution: 1024x768
Terminal: /dev/pts/0
CPU: Intel Xeon Silver 4316 (80) @ 3.400GHz
GPU: 03:00.0 Matrox Electronics Systems Ltd. Integrated
Memory: 24497MiB / 63830MiB

Anything else?

No response

Are you willing to submit PR?

Code of Conduct

nathadfield commented 1 month ago

I'm not sure there's an Airflow issue here.

My initial thought is that you are experiencing issues related to your workers and perhaps they are falling over due to resource issues, i.e. disk, ram?

I can see that you are using dynamic task mapping which, depending on what you are asking the workers to do, how many parallel tasks and the number of workers you have, could be overloading your capacity.

andreyvital commented 1 month ago

Not sure...it seems related to redis? I have seen other people report similar issues:

Also, a lot of DAGs are failing within the same reason, so that's not entirely tied to Task Mapping at all. Some tasks fail very early...also this server has a lot of RAM, of which I've granted ~12gb to each worker and the task is very simple, just HTTP requests, all of them run in less than 2 minutes when they don't fail.

RNHTTR commented 1 month ago

I think the log you shared (source) erroneously replaced the "stuck in queued" log somehow. Can you check your scheduler logs for "stuck in queued"?

andreyvital commented 1 month ago

@RNHTTR there's nothing stating "stuck in queued" on scheduler logs.

nghilethanh-atherlabs commented 1 month ago

same issue here

mikolololoay commented 1 month ago

I had the same issue when running hundreds of sensors on reschedule mode - a lot of the times they got stuck in the queued status and raised the same error that you posted. It turned out that our redis pod used by Celery restarted quite often and lost the info about queued tasks. Adding persistence to redis seems to have helped. Do you have persistence enabled?

nghilethanh-atherlabs commented 1 month ago

I had the same issue when running hundreds of sensors on reschedule mode - a lot of the times they got stuck in the queued status and raised the same error that you posted. It turned out that our redis pod used by Celery restarted quite often and lost the info about queued tasks. Adding persistence to redis seems to have helped. Do you have persistence enabled?

Can you help me how to add this persistence?

andreyvital commented 1 month ago

Hi @nghilethanh-atherlabs I've been experimenting with those configs as well:

# airflow.cfg

# https://airflow.apache.org/docs/apache-airflow-providers-celery/stable/configurations-ref.html#task-acks-late
# https://github.com/apache/airflow/issues/16163#issuecomment-1563704852
task_acks_late = False
# https://github.com/apache/airflow/blob/2b6f8ffc69b5f34a1c4ab7463418b91becc61957/airflow/providers/celery/executors/default_celery.py#L52
# https://github.com/celery/celery/discussions/7276#discussioncomment-8720263
# https://github.com/celery/celery/issues/4627#issuecomment-396907957
[celery_broker_transport_options]
visibility_timeout = 300
max_retries = 120
interval_start = 0
interval_step = 0.2
interval_max = 0.5
# sentinel_kwargs = {}

For the redis persistency, you can refer to their config file to enable persistency. Not sure it will sort out. But let's keep trying folks.

# redis.conf
bind 0.0.0.0

protected-mode no

requirepass REDACTED

maxmemory 6gb
# https://redis.io/docs/manual/eviction/
maxmemory-policy noeviction

port 6379

tcp-backlog 511

timeout 0

tcp-keepalive 300

daemonize no
supervised no

pidfile /var/run/redis.pid

loglevel notice

logfile ""

databases 16

always-show-logo no

save 900 1
save 300 10
save 60 10000

stop-writes-on-bgsave-error yes

rdbcompression yes
rdbchecksum yes

dbfilename dump.rdb

dir /bitnami/redis/data

appendonly no
appendfilename "appendonly.aof"
appendfsync everysec
# appendfsync no
no-appendfsync-on-rewrite no
auto-aof-rewrite-percentage 100
auto-aof-rewrite-min-size 64mb
aof-load-truncated yes
aof-use-rdb-preamble no
aof-rewrite-incremental-fsync yes

lua-time-limit 5000

slowlog-log-slower-than 10000
slowlog-max-len 128

latency-monitor-threshold 0
notify-keyspace-events ""

hash-max-ziplist-entries 512
hash-max-ziplist-value 64

list-max-ziplist-size -2
list-compress-depth 0

set-max-intset-entries 512

zset-max-ziplist-entries 128
zset-max-ziplist-value 64

hll-sparse-max-bytes 3000

activerehashing yes

client-output-buffer-limit normal 0 0 0
client-output-buffer-limit slave 256mb 64mb 60
client-output-buffer-limit pubsub 32mb 8mb 60

hz 10
# docker-compose.yml
redis:
  image: bitnami/redis:7.2.5
  container_name: redis
  environment:
    - REDIS_DISABLE_COMMANDS=CONFIG
    # The password will come from the config file, but we need to bypass the validation
    - ALLOW_EMPTY_PASSWORD=yes
  ports:
    - 6379:6379
  # command: /opt/bitnami/scripts/redis/run.sh --maxmemory 2gb
  command: /opt/bitnami/scripts/redis/run.sh
  volumes:
    - ./redis/redis.conf:/opt/bitnami/redis/mounted-etc/redis.conf
    - redis:/bitnami/redis/data
  restart: always
  healthcheck:
    test:
      - CMD
      - redis-cli
      - ping
    interval: 5s
    timeout: 30s
    retries: 10
seanmuth commented 1 month ago

Seeing this issue on 2.9.1 as well, also only with sensors.

We've found that the DAG is timing out trying to fill up the Dagbag on the worker. Even with debug logs enabled I don't have a hint about where in the import it's hanging.

[2024-05-31 18:00:01,335: INFO/ForkPoolWorker-63] Filling up the DagBag from <redacted dag file path>
[2024-05-31 18:00:01,350: DEBUG/ForkPoolWorker-63] Importing <redacted dag file path>
[2024-05-31 18:00:31,415: ERROR/ForkPoolWorker-63] Process timed out, PID: 314

On the scheduler the DAG imports in less than a second.

and not all the tasks from this DAG fail to import, many import just fine, at the same time on the same celery worker. below is the same dag file as above, importing fine:

[2024-05-31 18:01:52,911: INFO/ForkPoolWorker-3] Filling up the DagBag from <redacted dag file path>
[2024-05-31 18:01:52,913: DEBUG/ForkPoolWorker-3] Importing <redacted dag file path>
[2024-05-31 18:01:54,232: WARNING/ForkPoolWorker-3] /usr/local/lib/python3.11/site-packages/airflow/models/baseoperator.py:484: RemovedInAirflow3Warning: The 'task_concurrency' parameter is deprecated. Please use 'max_active_tis_per_dag'.
  result = func(self, **kwargs, default_args=default_args)

[2024-05-31 18:01:54,272: DEBUG/ForkPoolWorker-3] Loaded DAG <DAG: redacted dag>

one caveat/note is that it looks like the 2nd run/retry of each sensor is what runs just fine.

We've also confirmed this behavior was not present on Airflow 2.7.3, and only started occurring since upgrading to 2.9.1.

nghilethanh-atherlabs commented 1 month ago

@andreyvital thank you so much for your response. I have setup and it works really great :)

petervanko commented 1 month ago

I was working on the issue with @seanmuth and increasing parsing time solved the issue. It does not fix the root cause, but as a workaround it can save your night...

AIRFLOW__CORE__DAGBAG_IMPORT_TIMEOUT = 120

Lee-W commented 1 month ago

Hello everyone,

I'm currently investigating this issue, but I haven't been able to replicate it yet. Could you please try setting AIRFLOW__CORE__EXECUTE_TASKS_NEW_PYTHON_INTERPRETER=True [1] to see if we can generate more error logs? It seems that _execute_in_subprocess generates more error logs compared to _execute_in_fork, which might provide us with some additional clues.

https://github.com/apache/airflow/blob/2d53c1089f78d8d1416f51af60e1e0354781c661/airflow/providers/celery/executors/celery_executor_utils.py#L187-L188

[1] https://airflow.apache.org/docs/apache-airflow/stable/configurations-ref.html#execute-tasks-new-python-interpreter

niegowic commented 1 month ago

Spotted same problem with Airflow 2.9.1 - problem didn't occur earlier so it's strictly related with this version. It happens randomly on random task execution. Restarting scheduler and triggerer helps - but this is our temp workaround.

Lee-W commented 4 weeks ago

Spotted same problem with Airflow 2.9.1 - problem didn't occur earlier so it's strictly related with this version. It happens randomly on random task execution. Restarting scheduler and triggerer helps - but this is our temp workaround.

We've released apache-airflow-providers-celery 3.7.2 with enhanced logging. Could you please update the provider version and check the debug log for any clues? Additionally, what I mentioned in https://github.com/apache/airflow/issues/39717#issuecomment-2148697763 might give us some club as well. Thanks!

trlopes1974 commented 2 weeks ago

Following... and adding some spice.

We have just upgraded to Airflow 2.9.2 and also have the (same) issue. Yet we have seen the problem in Airflow 2.8 (in our case the celery task airflow.exceptions.AirflowException: Celery command failed on host: slautop02 with celery_task_id 5d7f577d-3e89-4867-8481-24df778346ae (PID: 815333, Return Code: 256) but the Airflow tasks did not fail.

After reading this issue I also caugth this on shceduler logs: [2024-06-20T17:45:58.167+0100] {processor.py:161} INFO - Started process (PID=830424) to work on /home/ttauto/airflow/dags/ITSM_DAILY_INFORM_CLOSE_TASKS.py [2024-06-20T17:45:58.169+0100] {processor.py:830} INFO - Processing file /home/ttauto/airflow/dags/ITSM_DAILY_INFORM_CLOSE_TASKS.py for tasks to queue [2024-06-20T17:45:58.170+0100] {logging_mixin.py:188} INFO - [2024-06-20T17:45:58.170+0100] {dagbag.py:545} INFO - Filling up the DagBag from /home/ttauto/airflow/dags/ITSM_DAILY_INFORM_CLOSE_TASKS.py [2024-06-20T17:46:28.174+0100] {logging_mixin.py:188} INFO - [2024-06-20T17:46:28.173+0100] {timeout.py:68} ERROR - Process timed out, PID: 830424

Despite that these timeouts apear on several dags, we see no errors on the airflow ui neither on the airflow tasks We also cannot match the Pid in this logs with the pid mentioned on the celery tasks (pid XXX return code 256)

We are experiencing Celery tasks failures with the following stack trace: Traceback (most recent call last): File "/opt/tkapp/env_airflow/lib64/python3.9/site-packages/celery/app/trace.py", line 453, in trace_task R = retval = fun(*args, *kwargs) File "/opt/tkapp/env_airflow/lib64/python3.9/site-packages/celery/app/trace.py", line 736, in __protected_call__ return self.run(args, **kwargs) File "/opt/tkapp/env_airflow/lib64/python3.9/site-packages/airflow/providers/celery/executors/celery_executor_utils.py", line 136, in execute_command _execute_in_fork(command_to_exec, celery_task_id) File "/opt/tkapp/env_airflow/lib64/python3.9/site-packages/airflow/providers/celery/executors/celery_executor_utils.py", line 151, in _execute_in_fork raise AirflowException(msg) airflow.exceptions.AirflowException: Celery command failed on host: slautop02 with celery_task_id 5d7f577d-3e89-4867-8481-24df778346ae (PID: 815333, Return Code: 256)

Most of the times, this does not raise any issues and the dags tasks complete successfully without problems, even if the CELERY task is marked as failed, the airflow tasks completes successefully. Today we had a dag failure on the very first task ( an emptyoperator ) wit the exact same problem in the celery task. So the problem is now a real issue for us.

Found local files: * /opt/tkapp/airflow/logs/dag_id=CSDISPATCHER_SIMPLES/run_id=scheduled2024-06-20T16:34:00+00:00/task_id=Start/attempt=1.log.SchedulerJob.log [2024-06-20, 17:39:30 WEST] {scheduler_job_runner.py:843} ERROR - Executor reports task instance <TaskInstance: CSDISPATCHER_SIMPLES.Start scheduled2024-06-20T16:34:00+00:00 [queued]> finished (failed) although the task says it's queued. (Info: None) Was the task killed externally? [2024-06-20, 17:50:47 WEST] {event_scheduler.py:40} WARNING - Marking task instance <TaskInstance: CSDISPATCHER_SIMPLES.Start scheduled__2024-06-20T16:34:00+00:00 [queued]> stuck in queued as failed. If the task instance has available retries, it will be retried. [2024-06-20, 17:50:48 WEST] {scheduler_job_runner.py:843} ERROR - Executor reports task instance <TaskInstance: CSDISPATCHER_SIMPLES.Start scheduled__2024-06-20T16:34:00+00:00 [queued]> finished (failed) although the task says it's queued. (Info: None) Was the task killed externally?

We have investigated the (Return Code: 256) but without success, the "best" reason would be memory contention on the server but we also do not observe that.

Our server status, no exhaustion of resources. image

version in use: apache-airflow==2.9.2 apache-airflow-providers-celery==3.7.2 apache-airflow-providers-common-io==1.3.2 apache-airflow-providers-common-sql==1.14.0 apache-airflow-providers-fab==1.1.1 apache-airflow-providers-ftp==3.9.1 apache-airflow-providers-hashicorp==3.7.1 apache-airflow-providers-http==4.11.1 apache-airflow-providers-imap==3.6.1 apache-airflow-providers-postgres==5.11.1 apache-airflow-providers-sftp==4.10.1 apache-airflow-providers-smtp==1.7.1 apache-airflow-providers-sqlite==3.8.1 apache-airflow-providers-ssh==3.11.1

We have just changed the AIRFLOWCOREEXECUTE_TASKS_NEW_PYTHON_INTERPRETER=True to try to get some more info.

potiuk commented 2 weeks ago

Can you try to set https://airflow.apache.org/docs/apache-airflow/stable/configurations-ref.html#schedule-after-task-execution to False and see if it helps @trlopes1974 ?

vizeit commented 2 weeks ago

I see the same issue, with dynamic task mapping in multiple instances of a DAG. All the pods have enough cpu-memory

Executor: CeleryKubernetes Airflow version: 2.9.1 Redis persistence: enabled DAG: Dynamic task group with dynamic tasks and multiple instances of the DAG may run at a time

when I re-run the failed tasks with this error, it goes through and finishes successfully

potiuk commented 1 week ago

@vizeit and anyone looking here and tempted to report "I have the same issue". PLEASE before doing it upgrade to 2.9.2 and latest celery provider. And when you do, report it here whether things are fixed, and if not, add logs from the celery executor.

If you actually look at the discussion - some of related issues were fixed in 2.9.2 and Celery logging has been improved in latest provider to add more information. So the best thing you can do - is not really post "i have the same issue" but upgrade and let us know if it helped, and second best thing is to upgrade celery provider and post relevant logs.

Just posting "I have the same issue in 2.9.1" is not moving a needle when it comes to investigating and fixing such problem.

vizeit commented 1 week ago

Sure, I can upgrade and check. I believe others here already tested on 2.9.2 reporting the same issue

Lee-W commented 1 week ago

Following... and adding some spice.

We have just upgraded to Airflow 2.9.2 and also have the (same) issue. Yet we have seen the problem in Airflow 2.8 (in our case the celery task airflow.exceptions.AirflowException: Celery command failed on host: slautop02 with celery_task_id 5d7f577d-3e89-4867-8481-24df778346ae (PID: 815333, Return Code: 256) but the Airflow tasks did not fail.

After reading this issue I also caugth this on shceduler logs: [2024-06-20T17:45:58.167+0100] {processor.py:161} INFO - Started process (PID=830424) to work on /home/ttauto/airflow/dags/ITSM_DAILY_INFORM_CLOSE_TASKS.py [2024-06-20T17:45:58.169+0100] {processor.py:830} INFO - Processing file /home/ttauto/airflow/dags/ITSM_DAILY_INFORM_CLOSE_TASKS.py for tasks to queue [2024-06-20T17:45:58.170+0100] {logging_mixin.py:188} INFO - [2024-06-20T17:45:58.170+0100] {dagbag.py:545} INFO - Filling up the DagBag from /home/ttauto/airflow/dags/ITSM_DAILY_INFORM_CLOSE_TASKS.py [2024-06-20T17:46:28.174+0100] {logging_mixin.py:188} INFO - [2024-06-20T17:46:28.173+0100] {timeout.py:68} ERROR - Process timed out, PID: 830424

Despite that these timeouts apear on several dags, we see no errors on the airflow ui neither on the airflow tasks We also cannot match the Pid in this logs with the pid mentioned on the celery tasks (pid XXX return code 256)

We are experiencing Celery tasks failures with the following stack trace: Traceback (most recent call last): File "/opt/tkapp/env_airflow/lib64/python3.9/site-packages/celery/app/trace.py", line 453, in trace_task R = retval = fun(*args, kwargs) File "/opt/tkapp/env_airflow/lib64/python3.9/site-packages/celery/app/trace.py", line 736, in protected_call* return self.run(args, **kwargs) File "/opt/tkapp/env_airflow/lib64/python3.9/site-packages/airflow/providers/celery/executors/celery_executor_utils.py", line 136, in execute_command _execute_in_fork(command_to_exec, celery_task_id) File "/opt/tkapp/env_airflow/lib64/python3.9/site-packages/airflow/providers/celery/executors/celery_executor_utils.py", line 151, in _execute_in_fork raise AirflowException(msg) airflow.exceptions.AirflowException: Celery command failed on host: slautop02 with celery_task_id 5d7f577d-3e89-4867-8481-24df778346ae (PID: 815333, Return Code: 256)

Most of the times, this does not raise any issues and the dags tasks complete successfully without problems, even if the CELERY task is marked as failed, the airflow tasks completes successefully. Today we had a dag failure on the very first task ( an emptyoperator ) wit the exact same problem in the celery task. So the problem is now a real issue for us.

Found local files: * /opt/tkapp/airflow/logs/dag_id=CSDISPATCHER_SIMPLES/run_id=scheduled2024-06-20T16:34:00+00:00/task_id=Start/attempt=1.log.SchedulerJob.log [2024-06-20, 17:39:30 WEST] {scheduler_job_runner.py:843} ERROR - Executor reports task instance <TaskInstance: CSDISPATCHER_SIMPLES.Start scheduled2024-06-20T16:34:00+00:00 [queued]> finished (failed) although the task says it's queued. (Info: None) Was the task killed externally? [2024-06-20, 17:50:47 WEST] {event_scheduler.py:40} WARNING - Marking task instance <TaskInstance: CSDISPATCHER_SIMPLES.Start scheduled__2024-06-20T16:34:00+00:00 [queued]> stuck in queued as failed. If the task instance has available retries, it will be retried. [2024-06-20, 17:50:48 WEST] {scheduler_job_runner.py:843} ERROR - Executor reports task instance <TaskInstance: CSDISPATCHER_SIMPLES.Start scheduled__2024-06-20T16:34:00+00:00 [queued]> finished (failed) although the task says it's queued. (Info: None) Was the task killed externally?

We have investigated the (Return Code: 256) but without success, the "best" reason would be memory contention on the server but we also do not observe that.

Our server status, no exhaustion of resources. image

version in use: apache-airflow==2.9.2 apache-airflow-providers-celery==3.7.2 apache-airflow-providers-common-io==1.3.2 apache-airflow-providers-common-sql==1.14.0 apache-airflow-providers-fab==1.1.1 apache-airflow-providers-ftp==3.9.1 apache-airflow-providers-hashicorp==3.7.1 apache-airflow-providers-http==4.11.1 apache-airflow-providers-imap==3.6.1 apache-airflow-providers-postgres==5.11.1 apache-airflow-providers-sftp==4.10.1 apache-airflow-providers-smtp==1.7.1 apache-airflow-providers-sqlite==3.8.1 apache-airflow-providers-ssh==3.11.1

We have just changed the AIRFLOWCOREEXECUTE_TASKS_NEW_PYTHON_INTERPRETER=True to try to get some more info.

Does setting the log level to debug help? we might be able to get the log here

potiuk commented 1 week ago

Sure, I can upgrade and check. I believe others here already tested on 2.9.2 reporting the same issue

Sometimes similar issues are not the same issues, and upgrading to latest version of Airflow and checking there saves a lot of effort to voluntary people who want to help find issues, if the issue has been solved already, so this is the least effort you can do to help with it.

Not mentioning that lates versions (including 2.9.2) has latest fixes (including security fixes) - so well. it's in the best interest of yours to upgrade

trlopes1974 commented 1 week ago

some more info:

We did not set schedule_after_task_execution=False as we altered the setting: execute_tasks_new_python_interpreter = True and wanted to see if it helped.

we had tis info. dag run CS00007002_Correcao_Dados_Oracle_WO0000000808061_2024-06-27T16:19:17.211167+01:00

error in celery ( flower) image


Traceback (most recent call last):
  File "/opt/tkapp/env_airflow/lib64/python3.9/site-packages/airflow/providers/celery/executors/celery_executor_utils.py", line 192, in _execute_in_subprocess
    subprocess.check_output(command_to_exec, stderr=subprocess.STDOUT, close_fds=True, env=env)
  File "/usr/lib64/python3.9/subprocess.py", line 424, in check_output
    return run(*popenargs, stdout=PIPE, timeout=timeout, check=True,
  File "/usr/lib64/python3.9/subprocess.py", line 528, in run
    raise CalledProcessError(retcode, process.args,
subprocess.CalledProcessError: Command '['airflow', 'tasks', 'run', 'CS00007002_Correcao_Dados_Oracle', 'remote_actions.ssh_command_remove_operator', 'CS00007002_Correcao_Dados_Oracle_WO0000000808061_2024-06-27T16:19:17.211167+01:00', '--local', '--subdir', 'DAGS_FOLDER/CS00007002_Correcao_Dados_Oracle.py']' returned non-zero exit status 1.

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/opt/tkapp/env_airflow/lib64/python3.9/site-packages/celery/app/trace.py", line 453, in trace_task
    R = retval = fun(*args, **kwargs)
  File "/opt/tkapp/env_airflow/lib64/python3.9/site-packages/celery/app/trace.py", line 736, in __protected_call__
    return self.run(*args, **kwargs)
  File "/opt/tkapp/env_airflow/lib64/python3.9/site-packages/airflow/providers/celery/executors/celery_executor_utils.py", line 134, in execute_command
    _execute_in_subprocess(command_to_exec, celery_task_id)
  File "/opt/tkapp/env_airflow/lib64/python3.9/site-packages/airflow/providers/celery/executors/celery_executor_utils.py", line 197, in _execute_in_subprocess
    raise AirflowException(msg)
airflow.exceptions.AirflowException: Celery command failed on host: slautop02 with celery_task_id d5489483-fbfc-4943-868d-f058c8c0d8d0

Dag Status is OK, no failure and all tasks completed successfuly. image

The corresponding Airflow Task log: remote_actions.ssh_command_remove_operator

Task Instance: remote_actions.ssh_command_remove_operator at 2024-06-27, 16:19:17

grid_on Grid
details Task Instance Details
code Rendered Template
reorder Log
sync_alt XCom

Log by attempts
1

*** Found local files:
***   * /opt/tkapp/airflow/logs/dag_id=CS00007002_Correcao_Dados_Oracle/run_id=CS00007002_Correcao_Dados_Oracle_WO0000000808061_2024-06-27T16:19:17.211167+01:00/task_id=remote_actions.ssh_command_remove_operator/attempt=1.log
[2024-06-27, 16:21:07 WEST] {local_task_job_runner.py:120} ▼ Pre task execution logs
[2024-06-27, 16:21:07 WEST] {taskinstance.py:2076} INFO - Dependencies all met for dep_context=non-requeueable deps ti=<TaskInstance: CS00007002_Correcao_Dados_Oracle.remote_actions.ssh_command_remove_operator CS00007002_Correcao_Dados_Oracle_WO0000000808061_2024-06-27T16:19:17.211167+01:00 [queued]>
[2024-06-27, 16:21:07 WEST] {taskinstance.py:2076} INFO - Dependencies all met for dep_context=requeueable deps ti=<TaskInstance: CS00007002_Correcao_Dados_Oracle.remote_actions.ssh_command_remove_operator CS00007002_Correcao_Dados_Oracle_WO0000000808061_2024-06-27T16:19:17.211167+01:00 [queued]>
[2024-06-27, 16:21:07 WEST] {taskinstance.py:2306} INFO - Starting attempt 1 of 1
[2024-06-27, 16:21:07 WEST] {taskinstance.py:2330} INFO - Executing <Task(SSHOperator): remote_actions.ssh_command_remove_operator> on 2024-06-27 15:19:17.213772+00:00
[2024-06-27, 16:21:07 WEST] {standard_task_runner.py:63} INFO - Started process 2545260 to run task
[2024-06-27, 16:21:07 WEST] {standard_task_runner.py:90} INFO - Running: ['airflow', 'tasks', 'run', 'CS00007002_Correcao_Dados_Oracle', 'remote_actions.ssh_command_remove_operator', 'CS00007002_Correcao_Dados_Oracle_WO0000000808061_2024-06-27T16:19:17.211167+01:00', '--job-id', '369573', '--raw', '--subdir', 'DAGS_FOLDER/CS00007002_Correcao_Dados_Oracle.py', '--cfg-path', '/tmp/tmpsp8huiqu']
[2024-06-27, 16:21:07 WEST] {standard_task_runner.py:91} INFO - Job 369573: Subtask remote_actions.ssh_command_remove_operator
[2024-06-27, 16:21:07 WEST] {task_command.py:426} INFO - Running <TaskInstance: CS00007002_Correcao_Dados_Oracle.remote_actions.ssh_command_remove_operator CS00007002_Correcao_Dados_Oracle_WO0000000808061_2024-06-27T16:19:17.211167+01:00 [running]> on host SERVERNAME
[2024-06-27, 16:21:07 WEST] {taskinstance.py:2648} INFO - Exporting env vars: AIRFLOW_CTX_DAG_EMAIL='TESTEMAIL@EMAILTEST>' AIRFLOW_CTX_DAG_OWNER='ttauto' AIRFLOW_CTX_DAG_ID='CS00007002_Correcao_Dados_Oracle' AIRFLOW_CTX_TASK_ID='remote_actions.ssh_command_remove_operator' AIRFLOW_CTX_EXECUTION_DATE='2024-06-27T15:19:17.213772+00:00' AIRFLOW_CTX_TRY_NUMBER='1' AIRFLOW_CTX_DAG_RUN_ID='CS00007002_Correcao_Dados_Oracle_WO0000000808061_2024-06-27T16:19:17.211167+01:00'
[2024-06-27, 16:21:07 WEST] {taskinstance.py:430} ▲▲▲ Log group end
[2024-06-27, 16:21:07 WEST] {ssh.py:151} INFO - Creating ssh_client
[2024-06-27, 16:21:07 WEST] {ssh.py:302} WARNING - No Host Key Verification. This won't protect against Man-In-The-Middle attacks
[2024-06-27, 16:21:07 WEST] {transport.py:1909} INFO - Connected (version 2.0, client OpenSSH_8.0)
[2024-06-27, 16:21:07 WEST] {transport.py:1909} INFO - Auth banner: b"################################### AVISO #########################################\n\nOs sistemas internos TARGET SYSTEM so' devem ser usados para realizar atividades de \nnegocio do TARGET SYSTEM ou outros fins autorizados pela Direcao do TARGET SYSTEM \xe2\x80\x93 \nDepartamento de Sistemas de Informacao (DSI)\n\n===============================================================================\n=             O DIREITO DE ACESSO A ESTE SISTEMA E' RESERVADO !!              =\n===============================================================================\n=                                                                             =\n=      Este sistema deve ser utilizado apenas em actividades de negocio       =\n=                       autorizadas pela Gestao do TARGET SYSTEM .          =\n=                                                                             =\n===============================================================================\n=    Este sistema esta' sujeito a auditorias efectuadas a qualquer momento.   =\n===============================================================================\n"
[2024-06-27, 16:21:07 WEST] {transport.py:1909} INFO - Authentication (publickey) successful!
[2024-06-27, 16:21:07 WEST] {ssh.py:483} INFO - Running command: sudo -s --  eval 'su - SOMEUSER -c "/home/SOMEUSER/correcao_dados/correcao_dados.sh WO0000000808061 REMOVE"'
[2024-06-27, 16:21:07 WEST] {ssh.py:529} INFO - a remover /tmp/correcao_dados/WO0000000808061
[2024-06-27, 16:21:07 WEST] {ssh.py:529} INFO - ACCAO 'REMOVE' EXECUTADA PARA A WORKORDERID:'WO0000000808061'
[2024-06-27, 16:21:07 WEST] {taskinstance.py:441} ▼ Post task execution logs
[2024-06-27, 16:21:08 WEST] {taskinstance.py:1206} INFO - Marking task as SUCCESS. dag_id=CS00007002_Correcao_Dados_Oracle, task_id=remote_actions.ssh_command_remove_operator, run_id=CS00007002_Correcao_Dados_Oracle_WO0000000808061_2024-06-27T16:19:17.211167+01:00, execution_date=20240627T151917, start_date=20240627T152107, end_date=20240627T152108
[2024-06-27, 16:21:08 WEST] {local_task_job_runner.py:240} INFO - Task exited with return code 0
[2024-06-27, 16:21:08 WEST] {local_task_job_runner.py:222} ▲▲▲ Log group end

Version: v2.9.2
Git Version: .release:f56f13442613912725d307aafc537cc76277c2d1

details: image

In the same dag run another celery task also has a failed state with the exact same error...

Traceback (most recent call last):
  File "/opt/tkapp/env_airflow/lib64/python3.9/site-packages/airflow/providers/celery/executors/celery_executor_utils.py", line 192, in _execute_in_subprocess
    subprocess.check_output(command_to_exec, stderr=subprocess.STDOUT, close_fds=True, env=env)
  File "/usr/lib64/python3.9/subprocess.py", line 424, in check_output
    return run(*popenargs, stdout=PIPE, timeout=timeout, check=True,
  File "/usr/lib64/python3.9/subprocess.py", line 528, in run
    raise CalledProcessError(retcode, process.args,
subprocess.CalledProcessError: Command '['airflow', 'tasks', 'run', 'CS00007002_Correcao_Dados_Oracle', 'remote_actions.get_files_from_sftp', 'CS00007002_Correcao_Dados_Oracle_WO0000000808061_2024-06-27T16:19:17.211167+01:00', '--local', '--subdir', 'DAGS_FOLDER/CS00007002_Correcao_Dados_Oracle.py']' returned non-zero exit status 1.

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/opt/tkapp/env_airflow/lib64/python3.9/site-packages/celery/app/trace.py", line 453, in trace_task
    R = retval = fun(*args, **kwargs)
  File "/opt/tkapp/env_airflow/lib64/python3.9/site-packages/celery/app/trace.py", line 736, in __protected_call__
    return self.run(*args, **kwargs)
  File "/opt/tkapp/env_airflow/lib64/python3.9/site-packages/airflow/providers/celery/executors/celery_executor_utils.py", line 134, in execute_command
    _execute_in_subprocess(command_to_exec, celery_task_id)
  File "/opt/tkapp/env_airflow/lib64/python3.9/site-packages/airflow/providers/celery/executors/celery_executor_utils.py", line 197, in _execute_in_subprocess
    raise AirflowException(msg)
airflow.exceptions.AirflowException: Celery command failed on host: slautop02 with celery_task_id d97fac72-aed7-49a5-8fe0-b4696d418431

We are observing something interesting, might be a coincidence or not, but all these errors seem to be related to SSHOperator or SFTPOperator direct usage. We have other dags where the SSHOperator or SFTPOperator are inside a PytonOperator and we see no errors on those tasks ( in celery, remeber that Airflow is not complaining on these).

Anything else I should look for?

vova-navirego commented 1 week ago

Hello everyone,

I'm currently investigating this issue, but I haven't been able to replicate it yet. Could you please try setting AIRFLOW__CORE__EXECUTE_TASKS_NEW_PYTHON_INTERPRETER=True [1] to see if we can generate more error logs? It seems that _execute_in_subprocess generates more error logs compared to _execute_in_fork, which might provide us with some additional clues.

https://github.com/apache/airflow/blob/2d53c1089f78d8d1416f51af60e1e0354781c661/airflow/providers/celery/executors/celery_executor_utils.py#L187-L188

[1] https://airflow.apache.org/docs/apache-airflow/stable/configurations-ref.html#execute-tasks-new-python-interpreter

Man, you saved my day. I don't know how but your recommendation to set AIRFLOWCOREEXECUTE_TASKS_NEW_PYTHON_INTERPRETER to True fixed my problem. I am running airflow locally on my Apple M3 Pro machine. I've spotted that when I use boto3.client(...) code in one of the Task within my DAG then even very simple PythonOperator which does printing to console throws the error:

[2024-06-30T15:16:06.022+0200] {scheduler_job_runner.py:843} ERROR - Executor reports task instance <TaskInstance: my_dag.get_params manual__2024-06-30T13:15:42+00:00 [queued]> finished (failed) although the task says it's queued. (Info: None) Was the task killed externally?

I am using LocalExecutor with Postgres. That boto3.client code is used in a downstream task but the upstream task which is very simple print statement failed with that command. I had no meaningful logs and ChatGPT started to hate me about my questions. Then I started searching through the web and landed on your comment. And now after I've applied your suggestion I don't have that issue anymore. Thank you!

trlopes1974 commented 1 week ago

This is driving me nuts...

Airflow Task Status: = Success CS00007002_Correcao_Dados_Oracle | 2024-07-01T10:54:20.981538+00:00 | remote_actions.ssh_command_remove_operator | success | 2024-07-01T10:55:56.350026+00:00 | 2024-07-01T10:55:57.277247+00:00

Celery Task Status:= Failure image

How can the celery task be marked as failed but the airflow task has a success status?

potiuk commented 1 week ago

How can the celery task be marked as failed but the airflow task has a success status?

There are few things that happens in celery process AFTER task is marked as successful - one of them is controlled by https://airflow.apache.org/docs/apache-airflow/stable/configurations-ref.html#schedule-after-task-execution - which is "mini-scheduling" that happens in this process. So one of the ways you can see if this is the cause - is to disable it. Another - there was a lock problem detected and fixed in one of the most recent versions of Airlfow, that was starting to happen more for dynamically mapped tasks - so maybe upgrading Airflow might help as well.

trlopes1974 commented 1 week ago

@potiuk . I'll try this on our dev environement. We are already at Airflow 2.9.2...

NBardelot commented 6 days ago

We do see a few errors of this kind too, with an Airflow v2.9.2 in Kubernetes + Celery workers + Redis OSS 7.0.7 (AWS Elasticache).

potiuk commented 6 days ago

We do see a few errors of this kind too, with an Airflow v2.9.2 in Kubernetes + Celery workers + Redis OSS 7.0.7 (AWS Elasticache).

Does it help if you disable "schedule after task execution"?

NBardelot commented 6 days ago

Does it help if you disable "schedule after task execution"?

Unfortunately, in our case we rely on the feature for some DAGs with many sequential tasks, and the tradeoff would not be welcomed by our IT teams (schedule_after_task_execution was a v1 -> v2 migration seller, on top of the security incentive, for our IT teams :) ).

potiuk commented 6 days ago

Unfortunately, in our case we rely on the feature for some DAGs with many sequential tasks, and the tradeoff would not be welcomed by our IT teams (schedule_after_task_execution was a v1 -> v2 migration seller, on top of the security incentive, for our IT teams :) ).

Any particularities/findings/correlated logs and events that happen around the failures then? Just knowing it happens does not bring us any closer to diagnosing it.

vizeit commented 6 days ago

I did few tests with new version 2.9.2 and have the following details with the log

Configuration

Airflow version: 2.9.2 Compute: GKE Executor: CeleryKubernetesExecutor AIRFLOWCOREPARALLELISM: 160 AIRFLOWSCHEDULERMAX_TIS_PER_QUERY: 0 AIRFLOWCELERYWORKER_CONCURRENCY: 60 Worker replicas: 4 Scheduler replicas: 2 AIRFLOWSCHEDULERTASK_QUEUED_TIMEOUT: 3600

I am running multiple instances of a dag with dynamic task mapping that expands into hundreds of tasks. The log shows that task gets scheduled and queued (at 2024-07-05T00:01:59.683) but does not get executed within task queued timeout period resulting into the reported error (at 2024-07-05T01:02:09.431)

{
  "textPayload": "\t<TaskInstance: dynamic-map-group.supplier.agent manual__2024-07-04T23:59:12.475712+00:00 map_index=342 [scheduled]>\u001b[0m",
  "insertId": "5fqi9x47wvvla4jh",
  "resource": {
    "type": "k8s_container",
    "labels": {
      "container_name": "airflow-scheduler",
      "namespace_name": "mynamespacedevdev",
      "location": "us-central1",
      "project_id": "mygcp-project",
      "cluster_name": "mygkecluster",
      "pod_name": "mygkecluster-scheduler-6b77fc67d-8z6gs"
    }
  },
  "timestamp": "2024-07-05T00:01:50.234024733Z",
  "severity": "INFO",
  "labels": {
    "k8s-pod/release": "mygkecluster",
    "k8s-pod/component": "scheduler",
    "k8s-pod/pod-template-hash": "6b77fc67d",
    "compute.googleapis.com/resource_name": "gk3-mygkecluster-nap-c58g4osx-19a59e8c-pxv6",
    "k8s-pod/app": "airflow"
  },
  "logName": "projects/mygcp-project/logs/stdout",
  "receiveTimestamp": "2024-07-05T00:01:52.886420247Z"
}

{
  "textPayload": "[2024-07-05 00:01:59,683: INFO/ForkPoolWorker-49] Running <TaskInstance: dynamic-map-group.supplier.agent manual__2024-07-04T23:59:12.475712+00:00 map_index=342 [queued]> on host mygkecluster-worker-1.mygkecluster-worker.mynamespacedevdev.svc.cluster.local",
  "insertId": "4o989t3huassb59y",
  "resource": {
    "type": "k8s_container",
    "labels": {
      "container_name": "airflow-worker",
      "project_id": "mygcp-project",
      "location": "us-central1",
      "pod_name": "mygkecluster-worker-1",
      "namespace_name": "mynamespacedevdev",
      "cluster_name": "mygkecluster"
    }
  },
  "timestamp": "2024-07-05T00:01:59.683635856Z",
  "severity": "INFO",
  "labels": {
    "k8s-pod/apps_kubernetes_io/pod-index": "1",
    "k8s-pod/release": "mygkecluster",
    "k8s-pod/component": "worker",
    "k8s-pod/app": "airflow",
    "k8s-pod/statefulset_kubernetes_io/pod-name": "mygkecluster-worker-1",
    "k8s-pod/controller-revision-hash": "mygkecluster-worker-87b575989",
    "compute.googleapis.com/resource_name": "gk3-mygkecluster-nap-c58g4osx-19a59e8c-kdqm"
  },
  "logName": "projects/mygcp-project/logs/stderr",
  "receiveTimestamp": "2024-07-05T00:02:03.578550371Z"
}

{
  "textPayload": "[\u001b[34m2024-07-05T01:01:58.407+0000\u001b[0m] {\u001b[34mtask_context_logger.py:\u001b[0m91} WARNING\u001b[0m - Marking task instance <TaskInstance: dynamic-map-group.supplier.agent manual__2024-07-04T23:59:12.475712+00:00 map_index=342 [queued]> stuck in queued as failed. If the task instance has available retries, it will be retried.\u001b[0m",
  "insertId": "11k0z0jmz77mlcu6",
  "resource": {
    "type": "k8s_container",
    "labels": {
      "container_name": "airflow-scheduler",
      "project_id": "mygcp-project",
      "namespace_name": "mynamespacedevdev",
      "cluster_name": "mygkecluster",
      "location": "us-central1",
      "pod_name": "mygkecluster-scheduler-6b77fc67d-8z6gs"
    }
  },
  "timestamp": "2024-07-05T01:01:58.409116538Z",
  "severity": "INFO",
  "labels": {
    "k8s-pod/release": "mygkecluster",
    "k8s-pod/app": "airflow",
    "k8s-pod/component": "scheduler",
    "compute.googleapis.com/resource_name": "gk3-mygkecluster-nap-c58g4osx-19a59e8c-pxv6",
    "k8s-pod/pod-template-hash": "6b77fc67d"
  },
  "logName": "projects/mygcp-project/logs/stdout",
  "receiveTimestamp": "2024-07-05T01:02:02.907406580Z"
}

{
  "textPayload": "[\u001b[34m2024-07-05T01:02:09.431+0000\u001b[0m] {\u001b[34mtask_context_logger.py:\u001b[0m91} ERROR\u001b[0m - Executor reports task instance <TaskInstance: dynamic-map-group.supplier.agent manual__2024-07-04T23:59:12.475712+00:00 map_index=342 [queued]> finished (failed) although the task says it's queued. (Info: None) Was the task killed externally?\u001b[0m",
  "insertId": "pldcpv17g4ggyycu",
  "resource": {
    "type": "k8s_container",
    "labels": {
      "project_id": "mygcp-project",
      "location": "us-central1",
      "pod_name": "mygkecluster-scheduler-6b77fc67d-8z6gs",
      "cluster_name": "mygkecluster",
      "container_name": "airflow-scheduler",
      "namespace_name": "mynamespacedevdev"
    }
  },
  "timestamp": "2024-07-05T01:02:09.431825344Z",
  "severity": "INFO",
  "labels": {
    "k8s-pod/app": "airflow",
    "compute.googleapis.com/resource_name": "gk3-mygkecluster-nap-c58g4osx-19a59e8c-pxv6",
    "k8s-pod/component": "scheduler",
    "k8s-pod/release": "mygkecluster",
    "k8s-pod/pod-template-hash": "6b77fc67d"
  },
  "logName": "projects/mygcp-project/logs/stdout",
  "receiveTimestamp": "2024-07-05T01:02:12.928364383Z"
}

I believe increasing AIRFLOWSCHEDULERTASK_QUEUED_TIMEOUT value might fix the issue. However, the root cause is still unknown, why did worker not process the task for an entire hour? Interesting observation is the failed task with this error is from the first instance of the DAG e.g. If I have 8 instances of the dag running, the error shows up in the 1st instance. Which indicates that tasks from all the instances are running and processed but somehow-sometime certain (not all) task from the 1st instance never gets executed. It may have to do with the overall throughput so tasks never stay in the queue for that long and I am not sure yet how to increase it. I have enough CPU and memory for the worker replicas as well as scheduler. Any ideas?

vizeit commented 5 days ago

I did few tests with new version 2.9.2 and have the following details with the log

Configuration

Airflow version: 2.9.2 Compute: GKE Executor: CeleryKubernetesExecutor AIRFLOWCOREPARALLELISM: 160 AIRFLOWSCHEDULERMAX_TIS_PER_QUERY: 0 AIRFLOWCELERYWORKER_CONCURRENCY: 60 Worker replicas: 4 Scheduler replicas: 2 AIRFLOWSCHEDULERTASK_QUEUED_TIMEOUT: 3600

I am running multiple instances of a dag with dynamic task mapping that expands into hundreds of tasks. The log shows that task gets scheduled and queued (at 2024-07-05T00:01:59.683) but does not get executed within task queued timeout period resulting into the reported error (at 2024-07-05T01:02:09.431)

{
  "textPayload": "\t<TaskInstance: dynamic-map-group.supplier.agent manual__2024-07-04T23:59:12.475712+00:00 map_index=342 [scheduled]>\u001b[0m",
  "insertId": "5fqi9x47wvvla4jh",
  "resource": {
    "type": "k8s_container",
    "labels": {
      "container_name": "airflow-scheduler",
      "namespace_name": "mynamespacedevdev",
      "location": "us-central1",
      "project_id": "mygcp-project",
      "cluster_name": "mygkecluster",
      "pod_name": "mygkecluster-scheduler-6b77fc67d-8z6gs"
    }
  },
  "timestamp": "2024-07-05T00:01:50.234024733Z",
  "severity": "INFO",
  "labels": {
    "k8s-pod/release": "mygkecluster",
    "k8s-pod/component": "scheduler",
    "k8s-pod/pod-template-hash": "6b77fc67d",
    "compute.googleapis.com/resource_name": "gk3-mygkecluster-nap-c58g4osx-19a59e8c-pxv6",
    "k8s-pod/app": "airflow"
  },
  "logName": "projects/mygcp-project/logs/stdout",
  "receiveTimestamp": "2024-07-05T00:01:52.886420247Z"
}

{
  "textPayload": "[2024-07-05 00:01:59,683: INFO/ForkPoolWorker-49] Running <TaskInstance: dynamic-map-group.supplier.agent manual__2024-07-04T23:59:12.475712+00:00 map_index=342 [queued]> on host mygkecluster-worker-1.mygkecluster-worker.mynamespacedevdev.svc.cluster.local",
  "insertId": "4o989t3huassb59y",
  "resource": {
    "type": "k8s_container",
    "labels": {
      "container_name": "airflow-worker",
      "project_id": "mygcp-project",
      "location": "us-central1",
      "pod_name": "mygkecluster-worker-1",
      "namespace_name": "mynamespacedevdev",
      "cluster_name": "mygkecluster"
    }
  },
  "timestamp": "2024-07-05T00:01:59.683635856Z",
  "severity": "INFO",
  "labels": {
    "k8s-pod/apps_kubernetes_io/pod-index": "1",
    "k8s-pod/release": "mygkecluster",
    "k8s-pod/component": "worker",
    "k8s-pod/app": "airflow",
    "k8s-pod/statefulset_kubernetes_io/pod-name": "mygkecluster-worker-1",
    "k8s-pod/controller-revision-hash": "mygkecluster-worker-87b575989",
    "compute.googleapis.com/resource_name": "gk3-mygkecluster-nap-c58g4osx-19a59e8c-kdqm"
  },
  "logName": "projects/mygcp-project/logs/stderr",
  "receiveTimestamp": "2024-07-05T00:02:03.578550371Z"
}

{
  "textPayload": "[\u001b[34m2024-07-05T01:01:58.407+0000\u001b[0m] {\u001b[34mtask_context_logger.py:\u001b[0m91} WARNING\u001b[0m - Marking task instance <TaskInstance: dynamic-map-group.supplier.agent manual__2024-07-04T23:59:12.475712+00:00 map_index=342 [queued]> stuck in queued as failed. If the task instance has available retries, it will be retried.\u001b[0m",
  "insertId": "11k0z0jmz77mlcu6",
  "resource": {
    "type": "k8s_container",
    "labels": {
      "container_name": "airflow-scheduler",
      "project_id": "mygcp-project",
      "namespace_name": "mynamespacedevdev",
      "cluster_name": "mygkecluster",
      "location": "us-central1",
      "pod_name": "mygkecluster-scheduler-6b77fc67d-8z6gs"
    }
  },
  "timestamp": "2024-07-05T01:01:58.409116538Z",
  "severity": "INFO",
  "labels": {
    "k8s-pod/release": "mygkecluster",
    "k8s-pod/app": "airflow",
    "k8s-pod/component": "scheduler",
    "compute.googleapis.com/resource_name": "gk3-mygkecluster-nap-c58g4osx-19a59e8c-pxv6",
    "k8s-pod/pod-template-hash": "6b77fc67d"
  },
  "logName": "projects/mygcp-project/logs/stdout",
  "receiveTimestamp": "2024-07-05T01:02:02.907406580Z"
}

{
  "textPayload": "[\u001b[34m2024-07-05T01:02:09.431+0000\u001b[0m] {\u001b[34mtask_context_logger.py:\u001b[0m91} ERROR\u001b[0m - Executor reports task instance <TaskInstance: dynamic-map-group.supplier.agent manual__2024-07-04T23:59:12.475712+00:00 map_index=342 [queued]> finished (failed) although the task says it's queued. (Info: None) Was the task killed externally?\u001b[0m",
  "insertId": "pldcpv17g4ggyycu",
  "resource": {
    "type": "k8s_container",
    "labels": {
      "project_id": "mygcp-project",
      "location": "us-central1",
      "pod_name": "mygkecluster-scheduler-6b77fc67d-8z6gs",
      "cluster_name": "mygkecluster",
      "container_name": "airflow-scheduler",
      "namespace_name": "mynamespacedevdev"
    }
  },
  "timestamp": "2024-07-05T01:02:09.431825344Z",
  "severity": "INFO",
  "labels": {
    "k8s-pod/app": "airflow",
    "compute.googleapis.com/resource_name": "gk3-mygkecluster-nap-c58g4osx-19a59e8c-pxv6",
    "k8s-pod/component": "scheduler",
    "k8s-pod/release": "mygkecluster",
    "k8s-pod/pod-template-hash": "6b77fc67d"
  },
  "logName": "projects/mygcp-project/logs/stdout",
  "receiveTimestamp": "2024-07-05T01:02:12.928364383Z"
}

I believe increasing AIRFLOWSCHEDULERTASK_QUEUED_TIMEOUT value might fix the issue. However, the root cause is still unknown, why did worker not process the task for an entire hour? Interesting observation is the failed task with this error is from the first instance of the DAG e.g. If I have 8 instances of the dag running, the error shows up in the 1st instance. Which indicates that tasks from all the instances are running and processed but somehow-sometime certain (not all) task from the 1st instance never gets executed. It may have to do with the overall throughput so tasks never stay in the queue for that long and I am not sure yet how to increase it. I have enough CPU and memory for the worker replicas as well as scheduler. Any ideas?

I take back my previous comment, the issue occurs without reaching task queued timeout. I did few more tests and also turned on debug log level. Please find the log from worker

[2024-07-05 21:34:55,178: INFO/MainProcess] Task airflow.providers.celery.executors.celery_executor_utils.execute_command[c7d43e18-44af-4524-b83d-237e6b4d6a39] received
[2024-07-05 21:34:55,178: DEBUG/MainProcess] TaskPool: Apply <function fast_trace_task at 0x7c4547b2bb50> (args:('airflow.providers.celery.executors.celery_executor_utils.execute_command', 'c7d43e18-44af-4524-b83d-237e6b4d6a39', {'lang': 'py', 'task': 'airflow.providers.celery.executors.celery_executor_utils.execute_command', 'id': 'c7d43e18-44af-4524-b83d-237e6b4d6a39', 'shadow': None, 'eta': None, 'expires': None, 'group': None, 'group_index': None, 'retries': 0, 'timelimit': [None, None], 'root_id': 'c7d43e18-44af-4524-b83d-237e6b4d6a39', 'parent_id': None, 'argsrepr': "[['airflow', 'tasks', 'run', 'dynamic-map', 'consumer', 'manual__2024-07-05T21:23:23.580900+00:00', '--local', '--subdir', 'DAGS_FOLDER/dynamic_task_mapping.py', '--map-index', '243']]", 'kwargsrepr': '{}', 'origin': 'gen9371@mygkecluster-scheduler-545bb869c4-td9p7', 'ignore_result': False, 'replaced_task_nesting': 0, 'stamped_headers': None, 'stamps': {}, 'properties': {'correlation_id': 'c7d43e18-44af-4524-b83d-237e6b4d6a39', 'reply_to': '941f6db4-8ed8-3a87-b084-d800ec5eec6f', 'delivery_mode': 2, 'delivery_info': {'exchange': '', 'routing_key':... kwargs:{})
[2024-07-05 21:34:55,180: DEBUG/MainProcess] Task accepted: airflow.providers.celery.executors.celery_executor_utils.execute_command[c7d43e18-44af-4524-b83d-237e6b4d6a39] pid:98
[2024-07-05 21:34:55,201: INFO/ForkPoolWorker-31] [c7d43e18-44af-4524-b83d-237e6b4d6a39] Executing command in Celery: ['airflow', 'tasks', 'run', 'dynamic-map', 'consumer', 'manual__2024-07-05T21:23:23.580900+00:00', '--local', '--subdir', 'DAGS_FOLDER/dynamic_task_mapping.py', '--map-index', '243']
[2024-07-05 21:34:55,210: DEBUG/ForkPoolWorker-31] calling func 'task_run' with args Namespace(subcommand='run', dag_id='dynamic-map', task_id='consumer', execution_date_or_run_id='manual__2024-07-05T21:23:23.580900+00:00', cfg_path=None, depends_on_past='check', force=False, ignore_all_dependencies=False, ignore_dependencies=False, ignore_depends_on_past=False, interactive=False, job_id=None, local=True, map_index=243, mark_success=False, shut_down_logging=False, pickle=None, pool=None, raw=False, read_from_db=False, ship_dag=False, subdir='DAGS_FOLDER/dynamic_task_mapping.py', verbose=False, func=<function lazy_load_command.<locals>.command at 0x7c454c769480>, external_executor_id='c7d43e18-44af-4524-b83d-237e6b4d6a39')
[2024-07-05 21:35:48,192: ERROR/ForkPoolWorker-31] [c7d43e18-44af-4524-b83d-237e6b4d6a39] Failed to execute task.
[2024-07-05 21:35:48,547: ERROR/ForkPoolWorker-31] Task airflow.providers.celery.executors.celery_executor_utils.execute_command[c7d43e18-44af-4524-b83d-237e6b4d6a39] raised unexpected: AirflowException('Celery command failed on host: mygkecluster-worker-2.mygkecluster-worker.mygkeclusterdev.svc.cluster.local with celery_task_id c7d43e18-44af-4524-b83d-237e6b4d6a39 (PID: 26675, Return Code: 256)')
Traceback (most recent call last):
  File "/home/airflow/.local/lib/python3.10/site-packages/celery/app/trace.py", line 453, in trace_task
    R = retval = fun(*args, **kwargs)
  File "/home/airflow/.local/lib/python3.10/site-packages/celery/app/trace.py", line 736, in __protected_call__
    return self.run(*args, **kwargs)
  File "/home/airflow/.local/lib/python3.10/site-packages/airflow/providers/celery/executors/celery_executor_utils.py", line 136, in execute_command
    _execute_in_fork(command_to_exec, celery_task_id)
  File "/home/airflow/.local/lib/python3.10/site-packages/airflow/providers/celery/executors/celery_executor_utils.py", line 151, in _execute_in_fork
    raise AirflowException(msg)
airflow.exceptions.AirflowException: Celery command failed on host: mygkecluster-worker-2.mygkecluster-worker.mygkeclusterdev.svc.cluster.local with celery_task_id c7d43e18-44af-4524-b83d-237e6b4d6a39 (PID: 26675, Return Code: 256)
vizeit commented 5 days ago

@Lee-W your suggestion of testing with AIRFLOWCOREEXECUTE_TASKS_NEW_PYTHON_INTERPRETER=True is not practical because the processing is 5-6 times slower. I have hundreds of dynamically mapped tasks running with multiple DAG runs

Lee-W commented 5 days ago

@Lee-W your suggestion of testing with AIRFLOWCOREEXECUTE_TASKS_NEW_PYTHON_INTERPRETER=True is not practical because the processing is 5-6 times slower. I have hundreds of dynamically mapped tasks running with multiple DAG runs

Yep, that's more about getting a log or a temporary workaround than using it in production.

vizeit commented 5 days ago

@Lee-W this line may require more changes than just returning status code?

https://docs.python.org/3/library/os.html#os.waitpid https://www.gnu.org/software/libc/manual/html_node/Process-Completion-Status.html

trlopes1974 commented 5 days ago

some more info:

Despite that we do have an external task id Dependencies Blocking Task From Getting Scheduled Dependency | Reason Dagrun Running | Task instance's dagrun was not in the 'running' state but in the state 'failed'. ... external_executor_id | 4f9de4af-fba0-4d6a-852e-3a82d32bc721

It does not appear in flower, so it seems that it never got into celery (in tis case).

also, in this case, the time lag between this faileds task and its "sister" task is 10minutes (task_adoption_timeout = 600) image These 2 tasks run at the same time

1st task log: Found local files: * /opt/tkapp/airflow/logs/dag_id=CSDISPATCHER_OTHERS/run_id=scheduled__2024-07-05T15:37:00+00:00/task_id=dispatch_restores/attempt=1.log [2024-07-05, 16:42:11 WEST] {taskinstance.py:1042} DEBUG - previous_execution_date was called

2nd task log: Found local files: * /opt/tkapp/airflow/logs/dag_id=CSDISPATCHER_OTHERS/run_id=scheduled2024-07-05T15:37:00+00:00/task_id=dispatch_correcao_dados/attempt=1.log.SchedulerJob.log [2024-07-05, 16:52:37 WEST] {event_scheduler.py:40} WARNING - Marking task instance <TaskInstance: CSDISPATCHER_OTHERS.dispatch_correcao_dados scheduled__2024-07-05T15:37:00+00:00 [queued]> stuck in queued as failed. If the task instance has available retries, it will be retried. [2024-07-05, 16:52:37 WEST] {scheduler_job_runner.py:843} ERROR - Executor reports task instance <TaskInstance: CSDISPATCHER_OTHERS.dispatch_correcao_dados scheduled2024-07-05T15:37:00+00:00 [queued]> finished (failed) although the task says it's queued. (Info: None) Was the task killed externally?

trlopes1974 commented 2 days ago

Detailed logs sequence of failing task. See that the task 'dispatch_restores' was scheduled/queued at 2024-07-07 13:12:03,993 and marked as failed 10minutes later at 2024-07-07 13:22:59,976

I cannot find the '9a60d4ed-09b2-4e83-9b1d-58c47839a068' task id in flower/celery and have no other logs to search in.

Any ideas?

scheduler.stdout

    <TaskInstance: CSDISPATCHER_OTHERS.dispatch_restores scheduled__2024-07-07T12:07:00+00:00 [scheduled]>
    <TaskInstance: CSDISPATCHER_OTHERS.dispatch_correcao_dados scheduled__2024-07-07T12:07:00+00:00 [scheduled]>
    <TaskInstance: CSDISPATCHER_OTHERS.dispatch_restores scheduled__2024-07-07T12:07:00+00:00 [scheduled]>
    <TaskInstance: CSDISPATCHER_OTHERS.dispatch_correcao_dados scheduled__2024-07-07T12:07:00+00:00 [scheduled]>

2024-07-07 13:12:03,851 INFO - Sending TaskInstanceKey(dag_id='CSDISPATCHER_OTHERS', task_id='dispatch_restores', run_id='scheduled2024-07-07T12:07:00+00:00', try_number=1, map_index=-1) to executor with priority 2 and queue default 2024-07-07 13:12:03,851 INFO - Adding to queue: ['airflow', 'tasks', 'run', 'CSDISPATCHER_OTHERS', 'dispatch_restores', 'scheduled2024-07-07T12:07:00+00:00', '--local', '--subdir', 'DAGS_FOLDER/CSDISPATCHER_OTHERS.py'] 2024-07-07 13:12:03,852 INFO - Sending TaskInstanceKey(dag_id='CSDISPATCHER_OTHERS', task_id='dispatch_correcao_dados', run_id='scheduled2024-07-07T12:07:00+00:00', try_number=1, map_index=-1) to executor with priority 2 and queue default 2024-07-07 13:12:03,852 INFO - Adding to queue: ['airflow', 'tasks', 'run', 'CSDISPATCHER_OTHERS', 'dispatch_correcao_dados', 'scheduled__2024-07-07T12:07:00+00:00', '--local', '--subdir', 'DAGS_FOLDER/CSDISPATCHER_OTHERS.py'] 2024-07-07 13:12:03,984 INFO - Received executor event with state queued for task instance TaskInstanceKey(dag_id='CSDISPATCHER_OTHERS', task_id='dispatch_restores', run_id='scheduled2024-07-07T12:07:00+00:00', try_number=1, map_index=-1) 2024-07-07 13:12:03,985 INFO - Received executor event with state queued for task instance TaskInstanceKey(dag_id='CSDISPATCHER_OTHERS', task_id='dispatch_correcao_dados', run_id='scheduled2024-07-07T12:07:00+00:00', try_number=1, map_index=-1) 2024-07-07 13:12:03,985 INFO - Received executor event with state success for task instance TaskInstanceKey(dag_id='CSDISPATCHER_OTHERS', task_id='Start', run_id='scheduled2024-07-07T12:07:00+00:00', try_number=1, map_index=-1) 2024-07-07 13:12:03,993 INFO - Setting external_id for <TaskInstance: CSDISPATCHER_OTHERS.dispatch_restores scheduled2024-07-07T12:07:00+00:00 [queued]> to 9a60d4ed-09b2-4e83-9b1d-58c47839a068 2024-07-07 13:12:03,993 INFO - Setting external_id for <TaskInstance: CSDISPATCHER_OTHERS.dispatch_correcao_dados scheduled2024-07-07T12:07:00+00:00 [queued]> to 2418fdad-ed1e-440d-afb5-1c79b52e1390 2024-07-07 13:12:03,994 INFO - TaskInstance Finished: dag_id=CSDISPATCHER_OTHERS, task_id=Start, run_id=scheduled__2024-07-07T12:07:00+00:00, map_index=-1, run_start_date=2024-07-07 12:12:03.327772+00:00, run_end_date=2024-07-07 12:12:03.703420+00:00, run_duration=0.375648, state=success, executor_state=success, try_number=1, max_tries=0, job_id=411092, pool=default_pool, queue=default, priority_weight=4, operator=EmptyOperator, queued_dttm=2024-07-07 12:12:00.189483+00:00, queued_by_job_id=408194, pid=2687251 2024-07-07 13:12:04,038 DEBUG - <TaskInstance: CSDISPATCHER_OTHERS.End scheduled2024-07-07T12:07:00+00:00 [None]> dependency 'Trigger Rule' PASSED: False, Task's trigger rule 'all_success' requires all upstream tasks to have succeeded, but found 2 non-success(es). upstream_states=_UpstreamTIStates(success=0, skipped=0, failed=0, upstream_failed=0, removed=0, done=0, success_setup=0, skipped_setup=0), upstream_task_ids={'dispatch_correcao_dados', 'dispatch_restores'} 2024-07-07 13:12:04,039 DEBUG - Dependencies not met for <TaskInstance: CSDISPATCHER_OTHERS.End scheduled2024-07-07T12:07:00+00:00 [None]>, dependency 'Trigger Rule' FAILED: Task's trigger rule 'all_success' requires all upstream tasks to have succeeded, but found 2 non-success(es). upstream_states=_UpstreamTIStates(success=0, skipped=0, failed=0, upstream_failed=0, removed=0, done=0, success_setup=0, skipped_setup=0), upstream_task_ids={'dispatch_correcao_dados', 'dispatch_restores'}

...

2024-07-07 13:22:53,014 DEBUG - <TaskInstance: CSDISPATCHER_OTHERS.End scheduled__2024-07-07T12:07:00+00:00 [None]> dependency 'Previous Dagrun State' PASSED: True, The task did not have depends_on_past set. 2024-07-07 13:22:53,014 DEBUG - <TaskInstance: CSDISPATCHER_OTHERS.End scheduled2024-07-07T12:07:00+00:00 [None]> dependency 'Not In Retry Period' PASSED: True, The task instance was not marked for retrying. 2024-07-07 13:22:53,016 DEBUG - <TaskInstance: CSDISPATCHER_OTHERS.dispatch_restores scheduled2024-07-07T12:07:00+00:00 [queued]> dependency 'Previous Dagrun State' PASSED: True, The task did not have depends_on_past set. 2024-07-07 13:22:53,016 DEBUG - <TaskInstance: CSDISPATCHER_OTHERS.dispatch_restores scheduled__2024-07-07T12:07:00+00:00 [queued]> dependency 'Not In Retry Period' PASSED: True, The context specified that being in a retry period was permitted.

...

2024-07-07 13:22:59,976 ERROR - Executor reports task instance <TaskInstance: CSDISPATCHER_OTHERS.dispatch_restores scheduled__2024-07-07T12:07:00+00:00 [queued]> finished (failed) although the task says it's queued. (Info: None) Was the task killed externally?

dag processor manager log:

[2024-07-07T13:22:59.985+0100] {manager.py:564} DEBUG - Received {'full_filepath': '/home/ttauto/airflow/dags/CSDISPATCHER_OTHERS.py', 'processor_subdir': '/home/ttauto/airflow/dags', 'msg': "Executor reports task instance <TaskInstance: CSDISPATCHER_OTHERS.dispatch_restores scheduled__2024-07-07T12:07:00+00:00 [queued]> finished (failed) although the task says it's queued. (Info: None) Was the task killed externally?", 'simple_task_instance': <airflow.models.taskinstance.SimpleTaskInstance object at 0x7f13fd3bc550>, 'is_failure_callback': True} signal from DagFileProcessorAgent [2024-07-07T13:22:59.986+0100] {manager.py:719} DEBUG - Queuing TaskCallbackRequest CallbackRequest: {'full_filepath': '/home/ttauto/airflow/dags/CSDISPATCHER_OTHERS.py', 'processor_subdir': '/home/ttauto/airflow/dags', 'msg': "Executor reports task instance <TaskInstance: CSDISPATCHER_OTHERS.dispatch_restores scheduled__2024-07-07T12:07:00+00:00 [queued]> finished (failed) although the task says it's queued. (Info: None) Was the task killed externally?", 'simple_task_instance': <airflow.models.taskinstance.SimpleTaskInstance object at 0x7f13fd3bc550>, 'is_failure_callback': True}

scheduler logs:

[2024-07-07T13:23:00.026+0100] {processor.py:161} INFO - Started process (PID=2692915) to work on /home/ttauto/airflow/dags/CSDISPATCHER_OTHERS.py [2024-07-07T13:23:00.027+0100] {processor.py:830} INFO - Processing file /home/ttauto/airflow/dags/CSDISPATCHER_OTHERS.py for tasks to queue [2024-07-07T13:23:00.028+0100] {logging_mixin.py:188} INFO - [2024-07-07T13:23:00.028+0100] {dagbag.py:545} INFO - Filling up the DagBag from /home/ttauto/airflow/dags/CSDISPATCHER_OTHERS.py [2024-07-07T13:23:00.030+0100] {logging_mixin.py:188} INFO - [2024-07-07T13:23:00.030+0100] {dagbag.py:337} DEBUG - Importing /home/ttauto/airflow/dags/CSDISPATCHER_OTHERS.py [2024-07-07T13:23:00.033+0100] {logging_mixin.py:188} INFO - [2024-07-07T13:23:00.032+0100] {h_secrets_backend.py:208} DEBUG - _get_secret: airflow_prod , variables/CSDISPATCHER_OTHERS_SCHEDULE [2024-07-07T13:23:00.033+0100] {logging_mixin.py:188} INFO - [2024-07-07T13:23:00.033+0100] {h_secrets_backend.py:117} DEBUG - init backend with url:https://vault.kyndryl.net proxy:http://129.41.90.15:443 auth_type:approle [2024-07-07T13:23:00.035+0100] {logging_mixin.py:188} INFO - [2024-07-07T13:23:00.035+0100] {cachevaultmemcache.py:57} DEBUG - CA : /opt/tkapp/ssl/slautop02_knb_combined.pem Host: memcache.slautop02.knb [2024-07-07T13:23:00.704+0100] {logging_mixin.py:188} INFO - [2024-07-07T13:23:00.703+0100] {h_secrets_backend.py:219} DEBUG - Got response data (len): 1 [2024-07-07T13:23:00.704+0100] {logging_mixin.py:188} INFO - [2024-07-07T13:23:00.704+0100] {h_secrets_backend.py:185} DEBUG - get_variableCSDISPATCHER_OTHERS_SCHEDULE:{'value': '2,7,12,17,22,27,32,37,42,47,52,57 '} [2024-07-07T13:23:00.709+0100] {logging_mixin.py:188} INFO - [2024-07-07T13:23:00.709+0100] {GetText.py:65} DEBUG - /opt/tkapp/env_airflow/lib64/python3.9/site-packages/cron_descriptor/locale/en_US.mo Loaded [2024-07-07T13:23:00.728+0100] {logging_mixin.py:188} INFO - [2024-07-07T13:23:00.728+0100] {dagbag.py:511} DEBUG - Loaded DAG [2024-07-07T13:23:00.728+0100] {processor.py:840} INFO - DAG(s) 'CSDISPATCHER_OTHERS' retrieved from /home/ttauto/airflow/dags/CSDISPATCHER_OTHERS.py [2024-07-07T13:23:00.729+0100] {processor.py:706} DEBUG - Processing Callback Request: {'full_filepath': '/home/ttauto/airflow/dags/CSDISPATCHER_OTHERS.py', 'processor_subdir': '/home/ttauto/airflow/dags', 'msg': "Executor reports task instance <TaskInstance: CSDISPATCHER_OTHERS.dispatch_restores scheduled2024-07-07T12:07:00+00:00 [queued]> finished (failed) although the task says it's queued. (Info: None) Was the task killed externally?", 'simple_task_instance': <airflow.models.taskinstance.SimpleTaskInstance object at 0x7f13fd3bc550>, 'is_failure_callback': True} [2024-07-07T13:23:00.759+0100] {logging_mixin.py:188} INFO - [2024-07-07T13:23:00.759+0100] {taskinstance.py:2907} ERROR - Executor reports task instance <TaskInstance: CSDISPATCHER_OTHERS.dispatch_restores scheduled__2024-07-07T12:07:00+00:00 [queued]> finished (failed) although the task says it's queued. (Info: None) Was the task killed externally? [2024-07-07T13:23:00.767+0100] {logging_mixin.py:188} INFO - [2024-07-07T13:23:00.767+0100] {taskinstance.py:562} DEBUG - Task Duration set to None [2024-07-07T13:23:00.769+0100] {logging_mixin.py:188} INFO - [2024-07-07T13:23:00.769+0100] {taskinstance.py:584} DEBUG - Clearing next_method and next_kwargs. [2024-07-07T13:23:00.773+0100] {logging_mixin.py:188} INFO - [2024-07-07T13:23:00.772+0100] {plugins_manager.py:332} DEBUG - Plugins are already loaded. Skipping. [2024-07-07T13:23:00.773+0100] {logging_mixin.py:188} INFO - [2024-07-07T13:23:00.773+0100] {plugins_manager.py:516} DEBUG - Integrate DAG plugins [2024-07-07T13:23:00.789+0100] {logging_mixin.py:188} INFO - [2024-07-07T13:23:00.788+0100] {taskinstance.py:1042} DEBUG - previous_execution_date was called [2024-07-07T13:23:00.800+0100] {logging_mixin.py:188} INFO - [2024-07-07T13:23:00.800+0100] {taskinstance.py:1206} INFO - Marking task as FAILED. dag_id=CSDISPATCHER_OTHERS, task_id=dispatch_restores, run_id=scheduled2024-07-07T12:07:00+00:00, execution_date=20240707T120700, start_date=, end_date=20240707T122300 [2024-07-07T13:23:00.805+0100] {logging_mixin.py:188} INFO - [2024-07-07T13:23:00.805+0100] {taskinstance.py:1042} DEBUG - previous_execution_date was called [2024-07-07T13:23:00.815+0100] {logging_mixin.py:188} INFO - [2024-07-07T13:23:00.815+0100] {h_secrets_backend.py:208} DEBUG - _get_secret: airflow_prod , connections/smtp_default [2024-07-07T13:23:00.816+0100] {logging_mixin.py:188} INFO - [2024-07-07T13:23:00.815+0100] {h_secrets_backend.py:219} DEBUG - Got response data (len): 4 [2024-07-07T13:23:00.816+0100] {logging_mixin.py:188} INFO - [2024-07-07T13:23:00.816+0100] {h_secrets_backend.py:258} DEBUG - a obter connection para smtp_default : {'conn_type': 'Email', 'extra': {'disable_ssl': True, 'disable_tls': False, 'from_email': 'nb-prd-noreply@kyndryl.com', 'retry_limit': 5, 'ssl_context': 'default', 'timeout': 20}, 'host': '158.98.137.90', 'port': '25'} [2024-07-07T13:23:00.817+0100] {logging_mixin.py:188} INFO - [2024-07-07T13:23:00.816+0100] {crypto.py:82} WARNING - empty cryptography key - values will not be stored encrypted. [2024-07-07T13:23:00.818+0100] {logging_mixin.py:188} INFO - [2024-07-07T13:23:00.818+0100] {base.py:84} INFO - Using connection ID 'smtp_default' for task execution. [2024-07-07T13:23:00.820+0100] {logging_mixin.py:188} WARNING - /opt/tkapp/env_airflow/lib64/python3.9/site-packages/airflow/utils/email.py:154 RemovedInAirflow3Warning: Fetching SMTP credentials from configuration variables will be deprecated in a future release. Please set credentials using a connection instead. [2024-07-07T13:23:00.820+0100] {logging_mixin.py:188} INFO - [2024-07-07T13:23:00.820+0100] {configuration.py:1050} WARNING - section/key [smtp/smtp_user] not found in config [2024-07-07T13:23:00.820+0100] {logging_mixin.py:188} INFO - [2024-07-07T13:23:00.820+0100] {email.py:267} DEBUG - No user/password found for SMTP, so logging in with no authentication. [2024-07-07T13:23:00.821+0100] {logging_mixin.py:188} INFO - [2024-07-07T13:23:00.821+0100] {email.py:271} INFO - Email alerting: attempt 1 [2024-07-07T13:23:00.824+0100] {logging_mixin.py:188} INFO - [2024-07-07T13:23:00.824+0100] {email.py:282} INFO - Sent an alert email to ['{{ var.json.DAG_FAILURE_EMAIL }}'] [2024-07-07T13:23:00.850+0100] {logging_mixin.py:188} INFO - [2024-07-07T13:23:00.850+0100] {h_secrets_backend.py:208} DEBUG - _get_secret: airflow_prod , variables/RABBIT [2024-07-07T13:23:00.851+0100] {logging_mixin.py:188} INFO - [2024-07-07T13:23:00.851+0100] {h_secrets_backend.py:219} DEBUG - Got response data (len): 9 [2024-07-07T13:23:00.851+0100] {logging_mixin.py:188} INFO - [2024-07-07T13:23:00.851+0100] {h_secrets_backend.py:185} DEBUG - get_variableRABBIT:{'rabbitExchange': 'amq.topic', 'rabbitHost': 'rabbit.slautop02.knb', 'rabbitHost_OLD': 'slautop02-rabbit01', 'rabbitPassword': 'xxxxxxxxxxxx', 'rabbitPort': '5672', 'rabbitUserName': 'airflow', 'rabbitconnection_attempts': '3', 'rabbitsocket_timeout': '5000', 'rabbitvhost': 'airflowhost'} [2024-07-07T13:23:00.853+0100] {logging_mixin.py:188} INFO - [2024-07-07T13:23:00.852+0100] {h_secrets_backend.py:208} DEBUG - _get_secret: airflow_prod , variables/TKAPP [2024-07-07T13:23:00.853+0100] {logging_mixin.py:188} INFO - [2024-07-07T13:23:00.853+0100] {h_secrets_backend.py:219} DEBUG - Got response data (len): 10 [2024-07-07T13:23:00.853+0100] {logging_mixin.py:188} INFO - [2024-07-07T13:23:00.853+0100] {h_secrets_backend.py:185} DEBUG - get_variableTKAPP:{'backupdir': 'backups', 'db_purge_interval': '90', 'dbname': 'tkapp', 'host': 'postgres.slautop02.knb', 'host_OLD': 'slautop02-postgres01', 'options': '-c search_path=tkapp', 'password': 'xxxxxxxxxx', 'port': '5432', 'retention': '5', 'user': 'ttauto'} [2024-07-07T13:23:00.899+0100] {logging_mixin.py:188} INFO - [2024-07-07T13:23:00.899+0100] {CSDISPATCHER_OTHERS.py:63} DEBUG - dag: CSDISPATCHER_OTHERS, task: End, state: failed, url: https://airflow.slautop02.knb:8080/dags/CSDISPATCHER_OTHERS/grid?dag_run_id=scheduled__2024-07-07T12%3A07%3A00%2B00%3A00&task_id=End&map_index=-1&tab=logs, Exception: Executor reports task instance <TaskInstance: CSDISPATCHER_OTHERS.dispatch_restores scheduled2024-07-07T12:07:00+00:00 [queued]> finished (failed) although the task says it's queued. (Info: None) Was the task killed externally? [2024-07-07T13:23:00.900+0100] {logging_mixin.py:188} INFO - [2024-07-07T13:23:00.900+0100] {CSDISPATCHER_OTHERS.py:63} DEBUG - dag: CSDISPATCHER_OTHERS, task: Start, state: failed, url: https://airflow.slautop02.knb:8080/dags/CSDISPATCHER_OTHERS/grid?dag_run_id=scheduled2024-07-07T12%3A07%3A00%2B00%3A00&task_id=Start&map_index=-1&tab=logs, Exception: Executor reports task instance <TaskInstance: CSDISPATCHER_OTHERS.dispatch_restores scheduled2024-07-07T12:07:00+00:00 [queued]> finished (failed) although the task says it's queued. (Info: None) Was the task killed externally? [2024-07-07T13:23:00.900+0100] {logging_mixin.py:188} INFO - [2024-07-07T13:23:00.900+0100] {CSDISPATCHER_OTHERS.py:63} DEBUG - dag: CSDISPATCHER_OTHERS, task: dispatch_correcao_dados, state: failed, url: https://airflow.slautop02.knb:8080/dags/CSDISPATCHER_OTHERS/grid?dag_run_id=scheduled2024-07-07T12%3A07%3A00%2B00%3A00&task_id=dispatch_correcao_dados&map_index=-1&tab=logs, Exception: Executor reports task instance <TaskInstance: CSDISPATCHER_OTHERS.dispatch_restores scheduled2024-07-07T12:07:00+00:00 [queued]> finished (failed) although the task says it's queued. (Info: None) Was the task killed externally? [2024-07-07T13:23:00.901+0100] {logging_mixin.py:188} INFO - [2024-07-07T13:23:00.901+0100] {CSDISPATCHER_OTHERS.py:63} DEBUG - dag: CSDISPATCHER_OTHERS, task: dispatch_restores, state: failed, url: https://airflow.slautop02.knb:8080/dags/CSDISPATCHER_OTHERS/grid?dag_run_id=scheduled__2024-07-07T12%3A07%3A00%2B00%3A00&task_id=dispatch_restores&map_index=-1&tab=logs, Exception: Executor reports task instance <TaskInstance: CSDISPATCHER_OTHERS.dispatch_restores scheduled2024-07-07T12:07:00+00:00 [queued]> finished (failed) although the task says it's queued. (Info: None) Was the task killed externally? [2024-07-07T13:23:00.901+0100] {logging_mixin.py:188} INFO - [2024-07-07T13:23:00.901+0100] {CSDISPATCHER_OTHERS.py:69} ERROR - A terminar callback de erro! [2024-07-07T13:23:00.908+0100] {processor.py:791} INFO - Executed failure callback for <TaskInstance: CSDISPATCHER_OTHERS.dispatch_restores scheduled__2024-07-07T12:07:00+00:00 [failed]> in state failed [2024-07-07T13:23:00.909+0100] {logging_mixin.py:188} INFO - [2024-07-07T13:23:00.909+0100] {dagbag.py:667} DEBUG - Running dagbag.sync_to_db with retries. Try 1 of 3 [2024-07-07T13:23:00.910+0100] {logging_mixin.py:188} INFO - [2024-07-07T13:23:00.910+0100] {dagbag.py:672} DEBUG - Calling the DAG.bulk_sync_to_db method [2024-07-07T13:23:00.912+0100] {logging_mixin.py:188} INFO - [2024-07-07T13:23:00.912+0100] {serialized_dag.py:166} DEBUG - Checking if DAG (CSDISPATCHER_OTHERS) changed [2024-07-07T13:23:00.930+0100] {logging_mixin.py:188} INFO - [2024-07-07T13:23:00.930+0100] {serialized_dag.py:177} DEBUG - Serialized DAG (CSDISPATCHER_OTHERS) is unchanged. Skipping writing to DB [2024-07-07T13:23:00.931+0100] {logging_mixin.py:188} INFO - [2024-07-07T13:23:00.931+0100] {dag.py:3096} INFO - Sync 1 DAGs [2024-07-07T13:23:00.963+0100] {processor.py:183} INFO - Processing /home/ttauto/airflow/dags/CSDISPATCHER_OTHERS.py took 0.959 seconds

NBardelot commented 2 days ago

Any particularities/findings/correlated logs and events that happen around the failures then? Just knowing it happens does not bring us any closer to diagnosing it.

Nothing obvious or pointing to a specific direction, I'd have given more correlation if I had some to share. We're not staffed to make a deep dive analysis into such intricate behaviour sadly.

But collecting enough positive cases means you know it's probably not config specific. And it hardly looks like a race condition, because we have the error occuring several times a day some days. That said, it also does not happen on every Airflow instance we have, but I haven't figured what make the instance where it happens special.

potiuk commented 2 days ago

But collecting enough positive cases means you know it's probably not config specific. And it hardly looks like a race condition, because we have the error occuring several times a day some days. That said, it also does not happen on every Airflow instance we have, but I haven't figured what make the instance where it happens special.

The fact that it does not happen everywere actually points out to the fact that it IS config specific. If we have few reports like that with tens of thousands installations out there, rather than having the issue flooded with "things stopped worked in the same way - is even more indication that it's not a widespread issue. This one seems like happening a bit more frequently but it's not "widespread issue" - but it somewhat configuration specific. I know it looks differently for you - because you are actually experiencing it - similarly as few other people, but this is often survivorship bias - you don't account for all the others who do not have similar issue and do not report it here that "it works fine for me".

Unforrtunately without knowing possible causes it makes it extremely difficult to diagnose and reproduce. And here there are only volunteers working on such issues, poeople who decided to look at those issues in - basically - their free time. And if the diagnosis on some correlation is not done on the side of users, those volunteers have basicallly no chances to figure out what are the conditions, other than guessing. And this is not even as lucky as you that there is a paid staff to actually run your pipelines AND being able to dig deeper :). There is no paid staff here and there is no way to dig deeper.

That's why we usually ask for as much of digging and diagnosing as much the users can spare, and provide the results here - this might (or might not) make it easier to guess what is wrong (no guarantees), but it's the best way for our users to increase the chances (and a nice way to contribute back for the free software they get as well).

NBardelot commented 1 day ago

When I say "configuration specific", I mean it from the point-of-view of it being specific to the use of Kubernetes, or Redis OSS, or a config value. We use very straightforward configurations. If I had to take a bet, I'd wager on something specific to the DAG/tasks load (and maybe that enters your definition of "config specific").

Could you point to some statsd metrics that would help analyze the issue? (queue sizes, timeouts, something related to the scheduler load...)

Also, we're currently migrating our observability stack to Datadog, which provides out-of-the-box Airflow dashboards that might be more useful for such analysis that the ones we made ourselves. We're new to the tool, and need some time to integrate it, but maybe you're familiar with their dashboards and know something that could be useful also there.

potiuk commented 1 day ago

(and maybe that enters your definition of "config specific").

Yes. The thing is "whatever makes it possible to reproduce the issue". If the maintainers are not able to reproduce the issue reliable, they can only make guesses, and even worse, even if they will come up with some hypothesis, and implement a supposed fix, they are not able to confirm or even confidently say that it fixes the issue. Having a minimal reproduction scenario is an absolute key to diagnose and fix the issue. It's just a fact of "life" - fixing a problem at some remote, inaccessible installation that you are not able to dig deeper and look around (and nobody pays for time spent on it) is all about that. "Having reproducible scenario" increases chances of fixing the problem by at least few orders of magnitude. That's simply how it works.

Could you point to some statsd metrics that would help analyze the issue? (queue sizes, timeouts, something related to the scheduler load...)

No - nothing specific comes to my mind - the problem with such issues is that if we knew what exactly to look at, then we would know what the cause of the issue is - which we don't - and try to get enough of signals to be able to deduce it. Like with everything in complex systems there are no easy recipes to follow for diagnosing "rare" issues that are unexpected (the sheer fact of it that they are unexpected makes them difficult to diagnose, because you have no idea what to look at and you need to look at the "whole" system and try to spot anomalies.

However, soon (likely) in 2.10.0 there will be open-telemetry tracing integrated into Airflow's observability, and this one will give much more detailed information on what's going on with each task and I'd strongly recoommend to integrate it into your observability stack - especially that a number of tools that will use it will have an option to export such tracing and make it available to someone else than those who manager to be able to dig deeper. Until then we can mostly say "dig deeper".

There is a monthly town hall tomorrow https://www.linkedin.com/feed/update/urn:li:activity:7216205556301090817 where @howardyoo is going to talk about it.

trlopes1974 commented 1 day ago

Humm. I'm not sure I agree with the "configuration specific" targeting problem/issue/ whatever.

It is clear now that this happens with several configurations Kubernets, Celery / Redis ( we have RabbitMQ). Some have clearly stated that messing with the task_adoption_timeout (increasing to 2H or so) has fixed their issue, and this gives me migraines has it makes no sense ( in my mind ) how can a timeout value interact with the scheduling/executing of tasks. In my last provided logs you can see that after 10minutes the task is marked as failed but there is no evidence that it left the queued state... could it be some logic failure in the scheduler/worker? (I see no concurrency or exhaustion issues on our setup).

I see 2 different problems in this issue: 1 - the task is never executed ( it is queued but the scheduler does not launch it) and this is the case where you have an external_task_id but you have no reference of it see it in the worker ( celery/flower ); 2 - the task is executed, the worker "tries" or launches it but something in the execution ( either in fork or in new process ) messes up the return value in the os.waitpid(). The curious part here is that for Airflow the task was executed with success despite that we see the failure in celery/flower.

Yes, it seems that this is one of those that keeps hiding in several places and it will be hard to find it. The good news is that (in our case) it keeps happening from time to time, randomly on different tasks. One curious thing is that, in our case, it is affecting only a few DAGs and not others....

potiuk commented 1 day ago

I'm not sure I agree with the "configuration specific" targeting problem/issue/ whatever.

Yes, And as I mentioned before with "survivorship bias" you think that it affects everyone but it's not the case (otherwise we would be flooded with people stating it and it would show up in our tests and some of the service companies - Astronomer for one who runs latest version would also experienced it.

So the best what you can do is to provide some guesses, results of digging that will narrow down the problem - ideally possibly maybe bisecting things and disabling it - for example if someone (one of the teams here) experiences this problem on a staging system, turning on and off and modifying selected parameters one-by-one to see what can fix it is is a great engineering practice. I use it daily to narrow down and find out what the problem is by trying out and guessing - for Airflow CI.

And you can only, and exclusively use that techique when you have somewhat reproducible issue. Which narrows down the list of poeple who can actually do such check to those who experience it (or when we have eeasily reproducible case anyone can do it). If you don't experience the problem, there is very little chance that you can guess what configuration or DAG might cause it. Especially when you have no access to the DAG and the environment where Airflow is run. On the other hand, when you DO experience it and DO have DAG access and CAN change and play with the configuration (for example on staging system) and DO have somewhat reproducible case, then yes - you can do bisecting and testing results of individual configuration changes.

And this is all we are asking here for: if any of people experiencing it and having somewhat reproducible case and be able to change various configuration parameters (by looking at the discussion above you will find a few of hypotheses) and creatively dig and find correlation and maybe try even different hypothesis - this might ACTUALLY help with diagnosis.

Yes. It will require time and dedication and focused effort from someone. And this the smallest thing the community can ask our users to contribute back as a sign of gratitude and thank you for the free software the community releases for free, I think that's generally a small price to pay for all the cost saved (tens of thousands of dollars a month sometimes) for similar commercial solutions.

potiuk commented 1 day ago

I see 2 different problems in this issue:

1 - the task is never executed ( it is queued but the scheduler does not launch it) and this is the case where you have an external_task_id but you have no reference of it see it in the worker ( celery/flower );

This might indicate that somewhere on the way, the task has been lost. I think there might be a small rece condition between acknowledging the task and running task by celery, but that would be correlated with - for example - celery worker being killed (for example by ephemeral machine eviction etc. - but that would be a correlated event somewhere in the deployment.

2 - the task is executed, the worker "tries" or launches it but something in the execution ( either in fork or in new process ) messes up the return value in the os.waitpid(). The curious part here is that for Airflow the task was executed with success despite that we see the failure in celery/flower.

And here there is the "mini-scheduler" that happens after state of the task is set to "true" (which is an easy thing - "schedule-after-task-execution". Which is another thing that might be checked. But if you can't disable it then it's a good idea to see if there is any log in celery task or deployment that would indicate that THIS is the reason - because it's a hypothesis. Again it could for example be a celery task being evicted for whatever reason before saving the state and returning with "success" and celery master being able to record the success. In this case MAYBE a solution to avoid the celery status is to ignore errors coming from the "mini-scheduler" - but in order to do it, we need to have some indication of the error, then we can ignore it and then users experiencing it my apply a patch and (following my bisection techinque) - try if it fixes the problem.

So ... overall - we need to combine findings and hypotheses coming from digging deeper (by our users) with attempts to apply hypothetic fixes (by our users) if we can come up with some. This is pretty much the only way to be able to address the issue if we cannot easily guess the cause or reproduce it

trlopes1974 commented 1 day ago

Well.. I'm up for it! I can (try) to help diagnosing it. Despite that my lack of knowllege may limit me on certain spots.

Maybe we could try to drill down this one: airflow.exceptions.AirflowException: Celery command failed on host: slautop02 with celery_task_id 884985ad-0ae3-46b6-83b3-36ae84e584b3 (PID: 3631594, Return Code: 256) I have a lot of these...

viitoare commented 18 hours ago

I also encountered the same problem. During debugging, I found a very strange phenomenon. In the airflow.cfg file, I set the parallelism to 300, but when I printed the value of self.parallelism in the base_executor.py file, it was 32. It seems that my configuration is not taking effect. 微信截图_20240710192422 微信截图_20240710192444

potiuk commented 18 hours ago

I found a very strange phenomenon. In the airflow.cfg file, I set the parallelism to 300, but when I printed the value of self.parallelism in the base_executor.py file, it was 32. It seems that my configuration is not taking effect.

You forgot to mention which executor you have, but maybe it's overridden in configuration of your I found a very strange phenomenon. In the airflow.cfg file, I set the parallelism to 300, but when I printed the value of self.parallelism in the base_executor.py file, it was 32. It seems that my configuration is not taking effect., and maybe it's overridden via ENV_VARIABLE - debugging it up the stack should help you to find it out.

(and it's a bit tangential to the problem - I suggest to open a separate discussion about it if you want to continue)

viitoare commented 18 hours ago

I found a very strange phenomenon. In the airflow.cfg file, I set the parallelism to 300, but when I printed the value of self.parallelism in the base_executor.py file, it was 32. It seems that my configuration is not taking effect.

You forgot to mention which executor you have, but maybe it's overridden in configuration of your I found a very strange phenomenon. In the airflow.cfg file, I set the parallelism to 300, but when I printed the value of self.parallelism in the base_executor.py file, it was 32. It seems that my configuration is not taking effect., and maybe it's overridden via ENV_VARIABLE - debugging it up the stack should help you to find it out.

(and it's a bit tangential to the problem - I suggest to open a separate discussion about it if you want to continue)

Sorry, I didn't describe it clearly. I am using the CeleryExecutor. When I set parallelism=300 only in airflow.cfg and do not add AIRFLOWCOREPARALLELISM in docker-compose.yaml, the value of self.parallelism printed in base_executor.py is 32. When I add AIRFLOWCOREPARALLELISM: 300 in docker-compose.yaml, the value of self.parallelism printed in base_executor.py is the configured 300. Apache Airflow version 2.9.1 Celery version 5.4.0