argoproj / argo-workflows

Workflow Engine for Kubernetes
https://argo-workflows.readthedocs.io/
Apache License 2.0
15.04k stars 3.2k forks source link

Option to have workflow with single available lock run #13630

Open tooptoop4 opened 1 month ago

tooptoop4 commented 1 month ago

Summary

The multi mutex/semaphore feature has a restriction that only becomes apparent with this feature. Prior to this feature there was only ever one possible mutex or semaphore so in case of highpriority wf and lowpriority wf both wanting it the high one would get it. But in case of high priority wf wanting 2 mutexes (1 which is available but the other taken by some other wf) and different low priority wf only wanting 1 mutex (which is available) the low priority wf is unnecessarily blocked from running even though the high priority can't run anyway (since not ALL of its criteria is met)

see https://github.com/argoproj/argo-workflows/pull/13358#discussion_r1681710287

Use Cases

This is similar to 'short query acceleration' where small tasks are allowed to get a slice of compute whenever the heavy hitters did not necessarily have enough headroom to run

Joibel commented 1 month ago

Do you have a design for what this would look like?

Currently the only strategy is front of queue workflow gets to run. Would an alternative (controller global) strategy of "first runnable workflow" in queue be an alternative to this proposal for you?

tooptoop4 commented 1 month ago

Would an alternative (controller global) strategy of "first runnable workflow" in queue be an alternative to this proposal for you?

yes this exactly!

tooptoop4 commented 5 days ago

i guess https://github.com/argoproj/argo-workflows/blob/v3.6.0-rc4/workflow/sync/semaphore.go#L178-L195 would be the section to change

draft idea (not complete):

func (s *prioritySemaphore) checkAcquire(holderKey string, requiredLockNames []string) (bool, bool, string) {
    if holderKey == "" {
        return false, false, "bug: attempt to check semaphore with empty holder key"
    }

    if _, ok := s.lockHolder[holderKey]; ok {
        s.log.Debugf("%s is already holding a lock", holderKey)
        return false, true, ""
    }

    waitingMsg := fmt.Sprintf("Waiting for %s lock(s). Lock status: %d/%d", s.name, s.limit-len(s.lockHolder), s.limit)

    // Check that semaphore has capacity for more holders
    if len(s.lockHolder) < s.limit {
        // Iterate through the queue and find any workflow that can acquire all its required locks
        for _, item := range s.pending.items {
            if s.areAllLocksAvailable(item.requiredLockNames) {
                // If we reach the holderKey, allow it to proceed
                if isSameWorkflowNodeKeys(holderKey, item.key) {
                    return true, false, ""
                }
                // Otherwise, enqueue this workflow to proceed
                s.nextWorkflow(workflowKey(item))
                s.log.Infof("Allowed %s to proceed due to available locks", item.key)
                return false, false, waitingMsg
            }
        }
    }

    // No workflows can fully acquire their locks right now
    s.log.Debugf("Current semaphore holders: %v", s.lockHolder)
    return false, false, waitingMsg
}