SCALE-MS / scale-ms

SCALE-MS design and development
GNU Lesser General Public License v2.1
4 stars 4 forks source link

Allow direct task submission #345

Open eirrgang opened 1 year ago

eirrgang commented 1 year ago

We need to (re)normalize the low-level submit() schemes again. For convenience and agility, we should formalize the user interface for use by tests and adventurous users to submit rp.TaskDescriptions or higher level work representations directly to the workflow manager. (This may end up being an important use case for existing workflows that are hard to migrate, and which instead need a more procedural API.)

eirrgang commented 1 year ago

Some points

eirrgang commented 1 year ago

I'm currently thinking we should offer a submit(func, *args, **kwargs) instead of a submit(func, *args, requirements: dict, **kwargs) or submit(func, *, args: tuple, kwargs: dict, requirements: dict). Instead, requirements can be provided when initializing a context manager in which submit is called.

Something more like

with get_executor(requirements={}) as executor:
    my_task = executor.submit(func, *args, **kwargs)
try:
    value = my_task.result()
except:
    ...

There are other things that we might want to be able to set on a Task, that had placeholders or proposals in earlier drafts. I think we can inherit most through the managed context and the clear API provider (the executor interface, or whatever).

The main exception is label, which we had proposed as a way for users to add arbitrary scalems-semantic-free identifiers (as long as they are unique) that scalems would then help cross-reference with native identifiers (which are ugly but can embed scalems semantics) to improve usability.

  1. We can probably just allow label to be set as an attribute on the object returned by submit().
  2. We can follow the model of ThreadPoolExecutor and allow a naming prefix to be specified for the enclosing context.

Similarly, we could allow for preparing the filesystem environment with a context manager layer.

Also, we probably can and should follow the convention of copying the contextvars.Context for submitted tasks. This would make it easier to implicitly manage module state, like a stateful WorkflowManager or RuntimeManager in a narrow scope around the submit call without making references go stale while Tasks are in non-final states. But we need to make sure to release the references in Context object once the Task is done so that we can have the best chance of properly shutting down resources like the scalems FileStore.

eirrgang commented 1 year ago

Updated design description

Previously, the WorkflowManager was sort of the "root" object and primary interface. I was never quite comfortable with this because it gave the WorkflowManager too many responsibilities and made some circular dependencies seemingly unavoidable.

In an updated design, the WorkflowManager is only responsible for maintaining the state of the workflow (the data flow topology, task state, and (locations of) results / filesystem artifacts), which it does in collaboration with a DataStore that keeps the non-volatile filesystem backup of the workflow state and metadata.

We can manage the WorkflowManager implicitly, but I show it explicitly below.

The RuntimeManager is reworked a little bit to work more like a "multiprocessing context" (ref mulitprocessing module and concurrent.futures.ProcessPoolExecutor). It, too, could be managed implicitly, but is shown below for clarity.

A new ScalemsExecutor consolidates some of the former responsibilities of RuntimeManager and various scattered code. It equates to the scope in which task resource requirements are known such that we are able to provision a RP Raptor session (we know the Worker requirements, including critical details like cores-per-process), and we can unambiguously schedule a task for a submit() call.

    workflow = scalems.radical.workflow_manager(loop)
    # Explicitly activate and deactivate a managed workflow with filesystem-backed metadata,
    # providing a clear point for filesystem flushing before interpreter shutdown.
    with scalems.workflow.scope(workflow, close_on_exit=True):
        # Explicitly select the RP back-end and configure the Pilot session.
        with scalems.radical.runtime.launch(workflow, runtime_config) as runtime_context:
            # Provision Raptor with known (or knowable) Worker resource requirements.
            # Defines a scope for tasks with mostly homogeneous runtime requirements,
            # and allows for an "ensemble scope" in which tasks and data can be more tightly and clearly coupled,
            # or in which data and compute placement could be better optimized.
            async with scalems.execution.executor(runtime_context, requirements={...}) as executor:
                # Following the native model, we submit a picklable function and arguments to get a thread-like Future.
                task1: concurrent.futures.Future = executor.submit(fn, *args, **kwargs)
                # The concurrent.futures.Future object could be wrapped in an asyncio Future, or
                # an asyncio.Task could be directly acquired through the asyncio event loop interface.
                task2: asyncio.Task = asyncio.create_task(loop.run_in_executor(executor, fn, *args))

ScalemsExecutor is a concurrent.futures.Executor, so ScalemsExecutor.submit() produces a concurrent.futures.Future. For scalems, we prefer to wrap this in a more flexible asyncio.Task.

    workflow = scalems.radical.workflow_manager(loop)
    with scalems.workflow.scope(workflow, close_on_exit=True):
        with scalems.radical.runtime.launch(workflow, runtime_config) as runtime_context:
            async with scalems.execution.executor(runtime_context, requirements={...}) as executor:
                # The concurrent.futures.Future object could be wrapped in an asyncio Future, or
                # an asyncio.Task could be directly acquired through the asyncio event loop interface.
                task2: asyncio.Task = asyncio.create_task(loop.run_in_executor(executor, fn, *args))

To minimize the syntax, to allow for extra features on class scalemsTask(asyncio.Task), and to provide a stable interface through future enhancements, I expect we will immediately provide abstraction through scalems.submit()

We can use contextvars.ContextVars to hold some state at the module namespace level. A snapshot of necessary state will be bound to tasks at the time of submit(). This is already standard behavior for asyncio and concurrent.futures high-level Task and call-back patterns.

    workflow = scalems.radical.workflow_manager(loop)
    # Explicitly activate and deactivate a managed workflow with filesystem-backed metadata,
    # providing a clear point for filesystem flushing before interpreter shutdown.
    with scalems.workflow.scope(workflow, close_on_exit=True):
        # The scalems.workflow module now tracks the currently active WorkflowManager.

        # Explicitly select the RP back-end and configure the Pilot session.
        with scalems.radical.runtime.launch(workflow, runtime_config) as runtime_context:
            # The scalems.execution module now tracks the default RuntimeManager,
            # bound to the provided WorkflowManager.

            # Provision Raptor with known (or knowable) Worker resource requirements.
            # Defines a scope for tasks with mostly homogeneous runtime requirements,
            # and allows for an "ensemble scope" in which tasks and data can be more tightly and clearly coupled,
            # or in which data and compute placement could be better optimized.
            async with scalems.execution.executor(runtime_context, worker_requirements=[], task_requirements={...}) as executor:
                task3: scalems.Task = scalems.submit(fn, *args, **kwargs))
                # task3 has a snapshot (`contextvars.copy()`) of any module-global variables needed
                # to support asynchronous scheduling and Future fulfillment outside of the `with` block,
                # allowing us to use this context manager hierarchy for scripting interface / program scoping
                # without unnecessarily strong coupling to the underlying runtime management.
                task3.label = "a nickname that makes sense to me"
            # Task handles must still be valid outside of the executor context manager in order to schedule
            # chains of tasks with different resource requirements. Unlike concurrent.futures.Executor, then,
            # `ScalemsExecutor.__exit__()` should probably default to `self.shutdown(wait=False)`.

Refinement of requirements is still the subject of separate discussions. For the immediate future, we can assume that worker_requirements will be TaskDescription fields for the Raptor Worker(s), and task_requirements will include TaskDescription fields, including staging directives to be used for the RP tasks submitted in that scope. It would be nice to either collapse these together or split them up to separate scopes, but I don't have a good idea of how to approach that without deviating from the concurrent.futures.Executor pattern. At the moment, I think it is more important to reuse an existing and well-documented pattern (as long as it works) and defer innovation to future refinement.

eirrgang commented 1 year ago

MPI details

Functions

With the current version of RP, if task_requirements["ranks"]>1, the rp.raptor.MPIWorker would prepare a mpi4py subcommunicator at to provide to the function.

In some near future version, RP will be able to provide a comm even if ranks==1 if a (TBD) flag is set.

Currently, scalems has a thin wrapper around serialized functions+arguments, so there is room to adjust the above behavior to some extent, if necessary. What we cannot do, however, is to support functions that directly call mpi4py.MPI.COMM_WORLD for their communicator.

Command line tools

MPI-enable command line tools are fundamentally incompatible with the RP Raptor MPIWorker. We will submit traditional RP executable tasks for command line programs. worker_requirements should be set to None, and the task_requirements should include appropriate TaskDescription fields for an MPI rp.Task.

Allocated pilot job resources aren't as easy to share/re-use between raptor and non-raptor, so the rp.TASK_EXECUTABLE task may have to wait for a previously provisioned raptor session to release enough worker or master resources.

Submitting CLI tasks

If necessary, I can point out how to access the rp TaskManager for submitting a traditional rp.Task in the framework described above.

In the next iteration, though, we can reintroduce scalems.executable() with similar under-the-hood dispatching semantics as scalems.submit(), but its own syntax and feature evolution.