dagster-io / dagster

An orchestration platform for the development, production, and observation of data assets.
https://dagster.io
Apache License 2.0
11.67k stars 1.47k forks source link

[dagster-k8s] Retried k8s job is reported as failure #6236

Open alex-treebeard opened 2 years ago

alex-treebeard commented 2 years ago

Summary

A successful job is reported as a failure by dagster-k8s when it is retried due to backoff_limit > 0

dagster_k8s.client.DagsterK8sError: Encountered failed job pods for job dagster-job-4fa7db7aa93211d03f3eca0e2acff339 with status: {'active': 1,
'completion_time': None,
'conditions': None,
'failed': 1,
'start_time': datetime.datetime(2022, 1, 18, 15, 14, 30, tzinfo=tzlocal()),
'succeeded': None}, in namespace faculty-dagster
  File "/usr/local/lib/python3.7/site-packages/dagster_celery_k8s/executor.py", line 457, in _execute_step_k8s_job
    wait_timeout=job_wait_timeout,
  File "/usr/local/lib/python3.7/site-packages/dagster_k8s/utils.py", line 41, in wait_for_job_success
    num_pods_to_wait_for,
  File "/usr/local/lib/python3.7/site-packages/dagster_k8s/client.py", line 278, in wait_for_job_success
    job_name=job_name, status=status, namespace=namespace

A failed pod does not imply a failed job. I believe this is a bug.

Reproduction

If you can create a solid which fails on first attempt and set backoff_limit to 1, you will find the job succeeds but dagster reports as failure.

I have this issue in 0.12.5, but believe that it is still present in 0.13.14

alex-treebeard commented 2 years ago

This seems to fix our use case: https://github.com/facultyai/dagster/commit/0f770784b771b39956b2741859a7fb3cfdd71dd5

Still not sure about:

  1. If extra work is required to avoid retrying non-retryable errors (e.g. a bug in the solid logic)
  2. how we could/should accommodate num_pods_to_wait_for != 1
yuhan commented 2 years ago

cc @johannkm

johannkm commented 2 years ago

Hi @alex-treebeard, what run launcher/executor are you using? I believe the celery_k8s_job_executor?

Thanks for catching this! Part of why this has slipped by is that the majority of users don't configure the backoff_limit on K8s Jobs. What we actually recommend is using https://docs.dagster.io/_apidocs/ops#dagster.RetryPolicy which can configured on Ops and Jobs. With this retry configured, Dagster will launch a whole new K8s Job after a failure.

I'm curious if RetryPolicies cover your use case for backoff_limit. If so I think we may consider deprecating the knob for backoff_limit. We should definitely improve documentation around this.

alex-treebeard commented 2 years ago

@johannkm Yes using celery_k8s_job_executor. I'm not 100% sure of this, but I believe retrypolicy did not help the case where the k8s apiserver fails to create a pod to run the solid.

We were on 0.12.5 when I patched this, now we're on 0.13. I expect to revisit this issue in the coming weeks.

alex-treebeard commented 2 years ago

@johannkm Having tested this again, I can confirm that dagster RetryPolicy will only retry if business logic fails, whereas backoff_limit allows us to retry APIServer failures/timeout, so I would like to upstream this fix if possible.

gjeusel commented 2 years ago

I also believe backoff_limit should be used to retry in case of APIServer failures/timeout, specifically when there is a trigger of auto-scaling marking the status of the pod as OutOfcpu for example.

Milias commented 1 year ago

Hello, I have tried some things and I could not find a work around or solution.

Edit: forgot to mention we are on dagster version 1.4.7.

We are in the same situation (using celery_k8s_job_executor) and sometimes pods are deleted outside of dagster, for example by the autoscaler removing the node. We have not been able to find a reliable way of restarting the specific op that failed when this happens.

I have tried adding dagster/max_retries and dagster/retry_policy: FROM_FAILURE to the job, however in that case not all steps in the job are executed: the failed one specifically is not executed, only the downstream ones. They of course crash because the expected input is not present. In our particular case we have many dynamic steps, which may be the reason why this retry policy is not working as expected? Using retry policy ALL_STEPS works, however it is very resource intensive especially if the failed step happens near the end of the pipeline, which very often is the case.

I have also tried the setting (in the helm chart) dagsterDaemon.runMonitoring.maxResumeAttempts, although it clearly specifies it only works with k8s_job_executor. Also setting runLauncher.config.celeryK8sRunLauncher.failPodOnRunFailure=true does not change anything as far as I could tell.

Additionally, I have also added backoff_limit: 3 to tag dagster-k8s/config, under job_spec_config, which results in correctly increasing the backoff limit of the step jobs. Looks like in this case I'm hitting an edge case not considered in the job executor because even if in this case the Kubernetes job correctly starts a new pod, it is not recognized by the run executor and simply reports: Step <op-name> finished without success or failure event. Downstream steps will not execute..

When backoff_limit is zero then the error originally reported above is thrown:

dagster_k8s.client.DagsterK8sError: Encountered failed job pods for job dagster-step-7ba8de678dc53724aac3de45df947f59 with status: {'active': None,
 'completed_indexes': None,
 'completion_time': None,
 'conditions': [{'last_probe_time': datetime.datetime(2023, 8, 20, 12, 19, 5, tzinfo=tzlocal()),
                 'last_transition_time': datetime.datetime(2023, 8, 20, 12, 19, 5, tzinfo=tzlocal()),
                 'message': 'Job has reached the specified backoff limit',
                 'reason': 'BackoffLimitExceeded',
                 'status': 'True',
                 'type': 'Failed'}],
 'failed': 1,
 'ready': 0,
 'start_time': datetime.datetime(2023, 8, 20, 12, 18, 51, tzinfo=tzlocal()),
 'succeeded': None,
 'uncounted_terminated_pods': {'failed': None, 'succeeded': None}}, in namespace dagster-dta

  File "/usr/local/lib/python3.10/site-packages/dagster_celery_k8s/executor.py", line 450, in _execute_step_k8s_job
    api_client.wait_for_job_success(
  File "/usr/local/lib/python3.10/site-packages/dagster_k8s/client.py", line 350, in wait_for_job_success
    self.wait_for_running_job_to_succeed(
  File "/usr/local/lib/python3.10/site-packages/dagster_k8s/client.py", line 399, in wait_for_running_job_to_succeed
    raise DagsterK8sError(

However, when backoff_limit is non-zero and the pod is deleted (I'm manually deleting the pod to simulate the original problem) then even if an exception in the business logic gets triggered (typically dagster._core.errors.DagsterExecutionInterruptedError, however not always the case) and the step is retried several things go wrong:

Minor thing to mention: I have run these with Op Retries as described in the documentation. However as mentioned above by @alex-treebeard adding this makes no difference because the error originates from the executor and not the business logic.

I don't know if this stems from using celery_k8s_job_executor instead of k8s_job_executor. I cannot test right now easily without celery, however it is not blocking to switch executors if that solves the issue.

Do you have any suggestion to move forward? Does this need to be fixed in dagster-k8s? My impression from this quick investigation is that the logic to deal with job failures due to Kubernetes circumstances can be improved. What is your opinion?

Thank you very much for reading!