flux-framework / flux-coral2

Plugins and services for Flux on CORAL2 systems
GNU Lesser General Public License v3.0
9 stars 7 forks source link

coral2-dws: workflow resource can become stranded with a finalizer when job is canceled #165

Closed garlick closed 4 months ago

garlick commented 6 months ago

Problem: (reported in slack by Dean R.) : "Last week we noticed if we cancelled a job early flux-cora2-dws could leave a Workflow resource stranded with a finalizer. Right now on elcap1 we're running a fix that has held up to early cancellation and we've run it under stress when there is poor connectivity to the kubeapiserver. it is holding up much better now."

This is the diff (relative to 0.12.0):

--- /usr/bin/coral2_dws.py-orig 2024-05-16 12:26:43.291189496 -0700
+++ /usr/bin/coral2_dws.py  2024-05-22 12:12:55.412562876 -0700
@@ -10,7 +10,6 @@
 import os
 import syslog
 import json
-import re
 import functools
 import argparse
 import logging
@@ -18,6 +17,7 @@
 import time
 import pathlib
 import math
+import sys

 import kubernetes as k8s
 from kubernetes.client.rest import ApiException
@@ -68,7 +68,7 @@
     try:
         msg = rpc.get()
     except Exception as exc:
-        LOGGER.warning("RPC error %s", str(exc))
+        LOGGER.warning("RPC error %s: (type=%s)", str(exc), type(exc))
     else:
         if msg is not None:
             LOGGER.debug("RPC response was %s", msg)
@@ -111,13 +111,28 @@
             handle.log(syslog.LOG_ERR, f"{os.path.basename(__file__)}: {errstr}
")
             handle.respond(msg, {"success": False, "errstr": errstr})
             LOGGER.error(
-                "Error in responding to %s RPC for %s: %s", topic, jobid, errst
r
+                "Error in responding to %s RPC for %s: %s: (type=%s)", topic, j
obid, errstr, type(exc)
             )
         else:
             handle.respond(msg, {"success": True})

     return wrapper

+def remove_finalizer(workflow_name, k8s_api, workflow=None):
+    """Remove the finalizer from the workflow so it can be deleted."""
+    if workflow is None:
+        workflow = k8s_api.get_namespaced_custom_object(*WORKFLOW_CRD, workflow
_name)
+
+    try:
+        workflow["metadata"]["finalizers"].remove(_FINALIZER)
+    except ValueError:
+        pass
+    else:
+        k8s_api.patch_namespaced_custom_object(
+            *WORKFLOW_CRD,
+            workflow_name,
+            {"metadata": {"finalizers": workflow["metadata"]["finalizers"]}},
+        )

 def move_workflow_desiredstate(workflow_name, desiredstate, k8s_api):
     """Helper function for moving workflow to a desiredState."""
@@ -292,6 +307,8 @@
         # the job hit an exception before beginning to run; transition
         # the workflow immediately to 'teardown'
         move_workflow_desiredstate(winfo.name, "Teardown", k8s_api)
+        # Remove the finalizer so the resource can be deleted.
+        remove_finalizer(winfo.name, k8s_api, workflow=None)
         winfo.toredown = True
     else:
         move_workflow_desiredstate(winfo.name, "PostRun", k8s_api)
@@ -304,6 +321,11 @@
         and workflow["status"]["ready"]
     )

+def state_active(workflow, state):
+    """Helper function for checking whether a workflow is working on a given st
ate."""
+    return (
+        workflow["spec"]["desiredState"] == workflow["status"]["state"] == stat
e
+    )

 def workflow_state_change_cb(event, handle, k8s_api):
     """Exception-catching wrapper around _workflow_state_change_cb_inner."""
@@ -324,17 +346,18 @@
         return
     try:
         _workflow_state_change_cb_inner(workflow, jobid, winfo, handle, k8s_api
)
-    except Exception:
+    except Exception as exc:
         LOGGER.exception(
-            "Failed to process event update for workflow with jobid %s:", jobid
+            "Failed to process event update for workflow with jobid %s: (type=%
s)", jobid, type(exc)
         )
         try:
             move_workflow_desiredstate(winfo.name, "Teardown", k8s_api)
-        except ApiException:
+            # Remove the finalizer so the resource can be deleted.
+            remove_finalizer(winfo.name, k8s_api, workflow=workflow)
+        except ApiException as exc2:
             LOGGER.exception(
-                "Failed to move workflow with jobid %s to 'teardown' "
-                "state after error: ",
-                jobid,
+                "Failed to move workflow with jobid %s to 'teardown' (type=%s)"
,
+                jobid, type(exc2)
             )
         else:
             winfo.toredown = True
@@ -349,18 +372,15 @@
     if winfo.deleted:
         # deletion request has been submitted, nothing to do
         return
-    if state_complete(workflow, "Teardown"):
-        # delete workflow object and tell DWS jobtap plugin that the job is don
e
-        try:
-            workflow["metadata"]["finalizers"].remove(_FINALIZER)
-        except ValueError:
-            pass
-        else:
-            k8s_api.patch_namespaced_custom_object(
-                *WORKFLOW_CRD,
-                winfo.name,
-                {"metadata": {"finalizers": workflow["metadata"]["finalizers"]}
},
-            )
+    if state_active(workflow, "Teardown") and not state_complete(workflow, "Tea
rdown"):
+        # Remove the finalizer as soon as the workflow begins working on its
+        # teardown state.
+        remove_finalizer(winfo.name, k8s_api, workflow=workflow)
+    elif state_complete(workflow, "Teardown"):
+        # Delete workflow object and tell DWS jobtap plugin that the job is don
e.
+        # Attempt to remove the finalizer again in case the state transitioned
+        # too quickly for it to be noticed earlier.
+        remove_finalizer(winfo.name, k8s_api, workflow=workflow)
         k8s_api.delete_namespaced_custom_object(*WORKFLOW_CRD, winfo.name)
         winfo.deleted = True
         handle.rpc("job-manager.dws.epilog-remove", payload={"id": jobid}).then
(
roehrich-hpe commented 6 months ago

See also https://github.com/flux-framework/flux-coral2/issues/159

roehrich-hpe commented 5 months ago

Today (May 31) there's a new version of the plugin installed on elcap1. Here's an updated version of my patch for that new plugin. This is not yet tested:

--- elcap-coral2_dws.py-20240531    2024-05-31 12:56:32
+++ elcap-coral2_dws.py-20240531-mine   2024-05-31 13:21:17
@@ -11,7 +11,6 @@
 import sys
 import syslog
 import json
-import re
 import functools
 import argparse
 import logging
@@ -118,6 +117,23 @@
             handle.respond(msg, {"success": True})

     return wrapper
+
+
+def remove_finalizer(workflow_name, k8s_api, workflow=None):
+    """Remove the finalizer from the workflow so it can be deleted."""
+    if workflow is None:
+        workflow = k8s_api.get_namespaced_custom_object(*WORKFLOW_CRD, workflow_name)
+
+    try:
+        workflow["metadata"]["finalizers"].remove(_FINALIZER)
+    except ValueError:
+        pass
+    else:
+        k8s_api.patch_namespaced_custom_object(
+            *WORKFLOW_CRD,
+            workflow_name,
+            {"metadata": {"finalizers": workflow["metadata"]["finalizers"]}},
+        )

 def move_workflow_desiredstate(workflow_name, desiredstate, k8s_api):
@@ -293,6 +309,8 @@
         # the job hit an exception before beginning to run; transition
         # the workflow immediately to 'teardown'
         move_workflow_desiredstate(winfo.name, "Teardown", k8s_api)
+        # Remove the finalizer so the resource can be deleted.
+        remove_finalizer(winfo.name, k8s_api, workflow=None)
         winfo.toredown = True
     else:
         move_workflow_desiredstate(winfo.name, "PostRun", k8s_api)
@@ -305,6 +323,13 @@
         and workflow["status"]["ready"]
     )

+
+def state_active(workflow, state):
+    """Helper function for checking whether a workflow is working on a given state."""
+    return (
+        workflow["spec"]["desiredState"] == workflow["status"]["state"] == state
+    )
+ 

 def workflow_state_change_cb(event, handle, k8s_api, disable_fluxion):
     """Exception-catching wrapper around _workflow_state_change_cb_inner."""
@@ -333,6 +358,8 @@
         )
         try:
             move_workflow_desiredstate(winfo.name, "Teardown", k8s_api)
+            # Remove the finalizer so the resource can be deleted.
+            remove_finalizer(winfo.name, k8s_api, workflow=workflow)
         except ApiException:
             LOGGER.exception(
                 "Failed to move workflow with jobid %s to 'teardown' "
@@ -354,18 +381,15 @@
     if winfo.deleted:
         # deletion request has been submitted, nothing to do
         return
-    if state_complete(workflow, "Teardown"):
-        # delete workflow object and tell DWS jobtap plugin that the job is done
-        try:
-            workflow["metadata"]["finalizers"].remove(_FINALIZER)
-        except ValueError:
-            pass
-        else:
-            k8s_api.patch_namespaced_custom_object(
-                *WORKFLOW_CRD,
-                winfo.name,
-                {"metadata": {"finalizers": workflow["metadata"]["finalizers"]}},
-            )
+    if state_active(workflow, "Teardown") and not state_complete(workflow, "Teardown"):
+        # Remove the finalizer as soon as the workflow begins working on its
+        # teardown state.
+        remove_finalizer(winfo.name, k8s_api, workflow=workflow)
+    elif state_complete(workflow, "Teardown"):
+        # Delete workflow object and tell DWS jobtap plugin that the job is done.
+        # Attempt to remove the finalizer again in case the state transitioned
+        # too quickly for it to be noticed earlier.
+        remove_finalizer(winfo.name, k8s_api, workflow=workflow)
         k8s_api.delete_namespaced_custom_object(*WORKFLOW_CRD, winfo.name)
         winfo.deleted = True
         handle.rpc("job-manager.dws.epilog-remove", payload={"id": jobid}).then(
@@ -422,6 +446,8 @@
     elif state_complete(workflow, "DataOut"):
         # move workflow to next stage, teardown
         move_workflow_desiredstate(winfo.name, "Teardown", k8s_api)
+        # Remove the finalizer so the resource can be deleted.
+        remove_finalizer(winfo.name, k8s_api, workflow=workflow)
         winfo.toredown = True
     if workflow["status"].get("status") == "Error":
         # a fatal error has occurred