Open eirrgang opened 4 years ago
The merged examples note that the combination of "desequence" and "map" looks a lot like asyncio.as_completed
, but the latter is part of an API that may not represent a set of idioms appropriate for a high-level user-facing interface. In discussion with @mrshirts , it was observed that the purpose of desequence
is to convert something array-like (sequenced) to something set-like (un-sequenced). As such, maybe "set" is the Pythonic idiom we should be presenting to users, and specify that scalems.Set
s are inherently unordered, and therefore inherently asynchronously iterable.
Also worth further discussion: there are potential costs associated with different sorts of asynchronous or unordered data flow. I.e. we are assuming that we will use "slicing" semantics to optimize scheduling, synchronization, and data placement for work and data that can be treated as decomposable array dimensions, but structures like shared asynchronous Queues, or dynamically dispatched work that processes just-completed work on first-available resources will require more coordination in some cases. Presumably, this is the sort of thing that RADICAL uses its MongoDB instance for, and the topic will require input from all project participants to find a reasonable stack of interfaces that can effectively map user-level work expression to efficient technical implementation solutions.
"Conditional execution" may be the one item here that is clearly "adaptive" from the perspective of a higher-level user.
The other cases are formally dynamic in that the DAG is not exactly knowable in advance, or they simply illustrate work loads that are more dynamic than what RP has traditionally dealt with.
In its purest form, without any optimization, "adaptive" work would imply that a task (or post-task logic) could add arbitrary work to the workflow.
We probably need to consider (sooner than later) how to approach that most general case (and sketch out the path to optimization), rather than just enumerating the special cases that we already have a good idea how to optimize.
A list like the above is useful, but it should be more clearly expressed as the functional requirements for specific high-level "adaptive workflow" elements.
Goal: Characterize ensemble use cases in which data does not simply flow in consistent parallel lanes through generalized (higher-dimensional) operations in a well-defined and deterministic sequence.
These are use cases for which the data flow operations or graph topology is not completely defined until run time. Either the graph is incomplete, the graph may be altered, or there exist sets of edges and sets of nodes whose mapping is not determined until some time during graph execution.
Deferred mapping
There are many use cases in which the details of an array or iterable are not well-defined until run time. We can use
scalems.map
to add an element to the workflow that will dynamically generate (one or more) operation instances when executed.Unordered mapping
By default, parallel data is addressable in well defined arrays, and parallel operations can be applied implicitly across "wide" data flows. This can allow us to exploit data locality (or even resident memory), but is not always appropriate.
For instance, we may launch tasks in batches, but consume their results as they become available in an unordered way. Examples include adaptive sampling in which a Markov State Model accumulates data from completed simulations, and is used to determine the inputs for additional simulations with which to refine the model.
We speculatively described scalems.desequence as a data shaping function to explicitly remove the implicit ordering of "ensemble dimensions", converting the outer array dimensions from list-like to set-like. This may not be the best idiom, but in any case we can assume that
scalems.map
can be applied to sequenced or unsequenced iterables, and scheduled opportunistically as iteration is possible.Uncoupled parallel pipelines
We can use data scoping heuristics to decouple or decompose some parallel data flow. Then we can rely on data flow dependencies to govern function execution. Loops expressed at a high level may implicitly iterate asynchronously for different ensemble members in the absence of data flow coupling.
Conditional execution
For control flow logic that must be evaluated at run time, we need to be able to delegate evaluation as close as possible to the data and runtime environment. This means that the higher level work expression needs to provide a description of the work that may be performed, an abstraction of the criteria for run time work flow modification, and a mechanism for adding work to the work flow.
As with
scalems.subgraph
, we can define ascalems.conditional
context manager to insert a meta-function with a runtime-conditional code path.scalems.map
andscalems.Queue
may provide additional essential abstractions.Asynchronous queuing
:py:class:
asyncio.Queue
, :py:func:asyncio.wait
, and :py:func:asyncio.as_completed
illustrate schemes for decoupling parallel result production from result processing.For adaptive asynchronous (naively) parallel work, we could pop tasks and results from a queue (or queues), perform analysis, then queue additional tasks.
Direct exposure of any interface like any of the above
asyncio
interfaces may not be appropriate for the user-facing interface. We should discuss whether a more elaboratescalems.map
interface or some other alternative might be preferable.We should also consider the syntax choices for similar functionality in other modern tools, such as Parsl and Dask.
Others
We can refine this list as distinguishing features of use cases or functional requirements become clearer.
Terminology
We are still a bit uncertain about our terminology for elements of work and the calls that define them (variously: "function," "command," "operation," "operation instance," "graph node," and "task." Python's :py:mod:
asyncio
module provides some precedence that may guide refinement of our terminology. A "coroutine" or "coroutine function" is a callable (defined withasync def
) that can be used create "tasks" and "futures". A "coroutine object" is what is returned when a coroutine function is called. A "task" is the handle to a scheduled coroutine."coroutine function" and "coroutine object" map exactly to the concepts we have struggled to describe as operation factories versus operation instances.
The
asyncio
definition of "task" indicates we should probably reserve the term for the more concrete form of an operation that has been (or will imminently be) dispatched for execution.The
await
syntax may provide an appropriate alternative to our.run()
and.result()
methods (or may simply be something we should try to support as an optional syntax). Further investigation may help us figure out how to disambiguate some of the challenges we have faced with static type checking and with scoped distinctions between code that can or cannot modify the work graph.Tracking