bacalhau-project / bacalhau

Compute over Data framework for public, transparent, and optionally verifiable computation
https://docs.bacalhau.org
Apache License 2.0
649 stars 86 forks source link

Enhancements to Pluggable Executor Interface for Compute Node Restarts #2739

Open frrist opened 11 months ago

frrist commented 11 months ago

Problem:

When a compute node restarts, syncing the current reality (active executors) with the persisted state is essential. This ensures that ongoing tasks before the restart are handled correctly, regardless if the restart is from a VM, a crash, or another reason. The situation is especially prevalent when there are more pluggable executors. There's a need to ensure that the executing executors is either still running or has stopped.

Proposed Solution:

Additional Details:

rossjones commented 11 months ago

To help with generalising the design so that we can find the patterns in what we have to extrapolate out to future plugins ...

It queries the datastore for previous connections to executors.

At present, this is the Execution that is stored in the execution store. For docker the created container is tagged with info from that Execution and so that structure is all that should be necessary. At restart, we know which Executions we think are live, and so we should be able to check those specific Executions.

For the wasm plugin, the plugin process should be in the same process group as the compute node. In some cases, e.g. SIGINT the plugin process will be closed when the compute node is - if however the compute node is killed with SIGKILL (which is uncatchable) then the plugin will be attached to another PID (probably 1). In this case the reattach config (which contains the PID) is critical in being able to find the plugin process.

Given the above, perhaps we should attach a ReattachConfig (once known) to the Execution in the compute node. And this would probably solve the problem for WASM where the execution is happening in-process. For docker however it is entirely possible that the process has died, but the docker container is continuing and even when the reattachconfig fails (due to the missing plugin process) there will be no way to kill the docker container.

Perhaps the answer is to make the executor responsible for ensuring that it is idempotent wrt Executions. If asked to run an Execution which the plugin can determine is already running (within an instance of the plugin), it could return some form of success? For docker for instance it could be doing this early ...

container, err := e.client.FindContainer(ctx, labelExecutionID, e.labelExecutionValue(executionID))
if err == nil {  // no error means container exists 
     return ExecutionAlreadyRunning, nil  // or some other success identifier    
}

but this will only work if Run is async because the code above has no RunCommandResult to return.

rossjones commented 10 months ago

On a re-read I noticed the list of scenarios to manage, with these two mentioned:

If the entire node crashes, any ReattachConfigs from the query might be irrelevant since the executor is also likely dead. If only the compute node restarts, the executor(s) could still be operational, and we need a reattachment process.

I think there is also

After compute node restart, executor(s) no long running 
(as the pid with in the compute node's process group), 
but the underlying execution (docker) is still executing

So I think the logic is like the following:

flowchart TD
    State[For each live execution] --> HaveReattach
    HaveReattach{Have reattach details?} --No--> NewPlugin
    HaveReattach{Have reattach details?} --Yes-->CanReattach 
    CanReattach{Is plugin responding?} --No--> NewPlugin
    CanReattach{Is plugin responding?} --Yes--> Reattach 
    Reattach((Reattach)) --> IsLongLived
    NewPlugin((NewPlugin)) --> IsLongLived
    IsLongLived{Long running?} --No--> Cancel 
    IsLongLived{Long running?} --Yes--> LongLived
    LongLived(((Run)))
    Cancel(((Cancel)))

but there are still points in this flow where we may end up with a docker running a container and we have no connection to it. For these cases we should try to ensure that Run() is idempotent so that when the same execution it submitted it handles it appropriately, or we add a mechanism to the interface which (in some form) asks for a reset of the environment. For docker this might mean stopped/removing anything tagged with bacalhau.

frrist commented 10 months ago

Great point, I hadn't considered that. So basically everything dies except the docker daemon, and its containers(?) - Do you think it would be reasonable to include the containerID in the ReattachConfig of docker executor? (we'd be extending the go-plugin specific config here). For the docker executor specifically it could be used as a hint to say: "hey looks like you tipped over, here are the thing I last remember you running, do you still have it?"

we may end up with a docker running a container and we have no connection to it

Assuming we mean the docker daemon, or am I misunderstanding? If so, again, just for the case of docker, we could also include the details of the docker client we were talking to in the ReattachConfig. So:

  1. Compute Node starts
  2. Queries states and finds reattach config for docker executor
  3. reattaches or starts a new executor
  4. executor uses hint containerID hint to: a) reconnect to the docker daemon and 2) query the daemon for containers from previous execution

I don't understand this bit, are we saying we'll kill the cancel the job iff its not a long running job?

flowchart TD

    NewPlugin((NewPlugin)) --> IsLongLived
    IsLongLived{Long running?} --No--> Cancel 
    IsLongLived{Long running?} --Yes--> LongLived
    LongLived(((Run)))
    Cancel(((Cancel)))
rossjones commented 10 months ago

Do you think it would be reasonable to include the containerID in the ReattachConfig of docker executor

We already label the container in a deterministic way from the execution ID, so we can do something like the following in our Run() (or something similar to clean up independently of the Run())

containerID, _ := e.client.FindContainer(ctx, labelExecutionID, e.labelExecutionValue(request.ExecutionID))
if containerID != "" {
}

are we saying we'll kill the cancel the job iff its not a long running job?

Yes, for short lived batch jobs, we just want to Fail() it; we can't because this only currently happens synchronously (we have no Fail call) so we cancel it instead. Ideally we'd be able to determine for batch jobs current state and only Fail() if it wasn't running (e.g. wasm) but I think best to let the requester node reassess the evaluation and process the failure.

For long-running jobs, we want to make sure it's always running, and in some cases do so without access to the requester node, so in this case we need to either do an idempotent run (as for docker) or a new plugin launch (for wasm). We really have two plugin (executor types), in-process and external with slightly different behaviours that ideally we want to smooth over transparently.

I did wonder whether we could/should add a Recover(execution) call to the Executor interface, but that probably wouldn't make sense in the ExecutorBuffer (although I suspect that may have to go away during async work).