Project-MONAI / monai-deploy-workflow-manager

Repository containing the Workflow Manager sub-system part of MONAI Deploy.
Apache License 2.0
13 stars 3 forks source link

Trigger new workflow #70

Closed joshliberty closed 2 years ago

joshliberty commented 2 years ago

When a new workflow request is received (see #51) the workflow manager component is notified and should begin processing a new workflow.

The following has been split out into these subtasks: (note: all else is included as part of this task)

Implementation Notes

Dispatching tasks

To dispatch a task, the following steps need to be taken:

  1. Read the task definition to understand which artefacts are needed by the task
  2. Evaluate each artefact’s value field to understand where the data needs to come from
  3. Create temporary credentials for MinIO (see https://docs.min.io/docs/minio-sts-quickstart-guide.html), and give them permissions to read all input artefact.
  4. Provide artefact list and MinIO credentials to the task executer. a. The artefact list should be saved in the task object in MongoDB b. The MinIO credentials should be added to the event in a RabbitMQ

Publishing task dispatch events

Queue: md.workflow.task_dispatch

{
  "ExecutionId": $execution_id,
  "WorkflowInstanceId": $workflow_instance_id,
  "StorageInformation": Object
}

The storage information object should contain the MinIO/storage engine credentials – exact schema depends on the storage back-end used.

Acceptance criteria

  1. Given a registered workflow for a specific AE title, when a payload is received from that AE title then that workflow should be triggered.
  2. Given a registered workflow without a specified AE title, when a payload is received from any AE title then that workflow should be triggered.
  3. Given multiple workflows match a specific payload, when that payload is processed, both workflows should be created and launched.
  4. Given multiple workflows match a specific payload and launching the second payload failed, when the event is processed for second time then the first workflow shouldn't be started for second time.
  5. Given a payload is received which contains specific workflow IDs, when it is being processed only those workflows should be launched, regardless of any other workflows that may match it based on AE title.

Database schema

For each workflow instance, a document such as the below should be created:

{
    "id": string - a newly generated unique ID,
    "workflow_id": reference to the workflow definition,
    "payload_id": string - payload ID as received from the informatics gateway,
    "start_time": datetime,
    "status": created,
    "bucket_id": string,
    "input_metadata": {
      "input_type": "dicom"
    }
    "tasks": [
        {
            "execution_id": string - newly generated UUID,
            "task_type": string - plug-in identifier, e.g. "argo",
            "task_plugin_arguments": {
              "workflow_id": "image_type_detector",
              "server_url": "https://argoserver:2220"
            },
            "task_id": "image_type_detector",
            "status": "created",
            "input_artifacts": {
                 "key": "path_to_file"
            },
             "output_directory": $bucketId/$execution_id,
            "metadata": {}            
        }
    ]
}

image

mocsharp commented 2 years ago

If a workflow request (with a single payload) matches multiple workflows, do you need to make multiple copies of the payload, one for each workflow/namespace?

joshliberty commented 2 years ago

No, the input data is a read-only so we can keep a single copy of it. The data generated within the workflow by its tasks will be stored in a subfolder

nakfour commented 2 years ago

@joshliberty in the flow chart, can you explain the line "Add task to dispatch queue"? Also the last "Acknowledge" flow line too? Is there a high level design system architecture that shows how all the components interact? Thanks

joshliberty commented 2 years ago

@nakfour yes:

  1. Adding a task to dispatch queue simply means publishing a new event which will be picked up for the task manager to process this task.
  2. Acknowledge is a RabbitMQ action, which removes the event from the queue. This is what denotes success in handling the initial workflow request, so it can then be removed from the queue.

As to a high-level architecture diagram, unfortunately not yet - once we have a clearer picture of how the various components interact I'll make sure it's all documented, but at the moment it's all a bit of a work in progress.

dbericat commented 2 years ago

I am very much in favor of drafting a high-level arch diagram to facilitate the discussion. A visual = 1000 words. As we make decisions, we can modify. It can be a collaborative effort so @joshliberty doesn't do all the work. WDYT?

nakfour commented 2 years ago

Thanks @joshliberty I think we discussed this in our last meeting with respect to invoking a new task. If the Task Manager and Workflow Manager are part of the same process (ie in the same Pod/Container) then it does not make sense to use pub/sub for communication. Also for the "Acknowledge", what if you have multiple instances of Workflow manager and two Workflow managers read the same message. I say that because you are sending the "Acknowledge" message for RabbitMQ to clear a message after a long list of tasks.

joshliberty commented 2 years ago

Babar, Jack, as discussed in our meeting – for the purpose of this ticket you can make the following assumptions: