SCALE-MS / scale-ms

SCALE-MS design and development
GNU Lesser General Public License v2.1
4 stars 4 forks source link

Architectural discussion #11

Closed eirrgang closed 4 years ago

eirrgang commented 4 years ago

See "Specific comments" at issue #8

Notes

From @eirrgang I am not sure the google sketch diagram is helping me.

If we want a packaging diagram and/or deployment diagram, a starting point might be this translation.

If we want to identify the roles participating in use cases, this translation might be a template.

From @mturilli

Happy to go with what helps you the most. Your deployment diagram seems reasonable to me. This is an high-level representation meant for kick starting our discussion. Based on that diagram,

Target resource

We can offer an implementation for the 'data stager', 'task scheduler', 'task placer' and 'launcher'. These components would be able to launch a executable-based task on a node with the requirement amount and type of resources. For function-based tasks, we would launch a worker that, in turn, would be able to schedule, place and launch functions/methods.

Client side

We can offer an API to acquire and manage resources, and one for bulk schedule executable-based tasks/workers on the target resources.

From @eirrgang

For function-based tasks, we would launch a worker that, in turn, would be able to schedule, place and launch functions/methods.

I think this is the primary use case, and I think that for simplicity and flexibility, we can (at least initially) implement new functionality in a single worker before generalizing or fully specifying things like the data persistence we've been talking about.

Is this worker for function-based tasks something that is already implemented?

From @mturilli

I think this is the primary use case,

We do not have use case documents so I am not sure what is/are the project's primary use case(s). Speaking with all the teams involved (Boulder, LSU and Virginia) it is clear we have to support both function- and executable-based tasks. Based on our previous technical meeting and our follow up discussions, it is also clear that Virginia is mostly interested in function-based tasks and I am fine with that.

I think that for simplicity and flexibility, we can (at least initially) implement new functionality in a single worker before generalizing or fully specifying things like the data persistence we've been talking about.

I agree, I would be very happy with an incremental approach to implementing the worker and its functionalities. Would you initially want to focus on executing an 'hello world' function? If so, we would be interested in discussing how the function and its context should be passed from the user-facing 'client' to the worker. If you agree, should we start from Section 3 of your document, 'Serializable work'? @andre-merzky, would you agree with that?

Is this worker for function-based tasks something that is already implemented?

I don't think so. At the moment, it is not clear to me what is the relationship among the worker, gmxapi and gromacs. Without that clarity, I am not able to start to envision a design for the worker. Also, it is not clear to me what are the functional requirements for this worker as I do not have clear use cases for it. For example, I do not know how to answer to questions like: How many functions? What rate? What minimum function execution time? Etc.

Rutgers has a worker prototype that we wrote as a feasibility study to execute Python functions, but I am not sure this is what you are looking for/need. If you think it would be useful, you would be welcome to have a look at the code and we would be happy to go through it together.

From @andre-merzky

@andre-merzky, would you agree with that?

Yes, and I also agree with your point about serialization and call context. What function call context is required and how it is specified / provided will determine how we are going to implement the worker. our prototype might be a good starting point, we'll see.

I should add that we also may need changes to our client API and internal protocols, depending how mach and what kind of information we need to communicate for the call context.

From @eirrgang

we have to support both function- and executable-based tasks

I think we are in agreement to this point.

By my comment, I meant that it is preferable to make command-line executables a subset of Python function use cases than the other way around, and it is the sufficient to focus on the Python function use case. If we are not in agreement here, then I can table that point for the moment. Alternatively, if there is an issue of software structure, then I guess I am proposing that we identify the layer and interface at which an operation implemented by a command line tool is substantially the same as an operation implemented with a Python interface, and use that to anchor the top-down aspects of the design.

discussing how the function and its context should be passed from the user-facing 'client' to the worker

That sounds good. The serialized work graph was our proposal for conveying the client-requested work while allowing the actual executed tasks to be fully specified during execution. We have previously left the response unspecified, and I hope to clarify the runtime activity in issues such as #14

gmxapi < 0.0.8 explored the serialization->deserialization->execution details more fully than gmxapi 0.0.8/0.1, but I can proceed with the next generation specification and prototyping in the context of this conversation.

How many functions?

Functions implemented: < 10 in the core package (or, likely, in any single client/extension package); infinitely extensible

Functions executed: likely 1-30 unique functions executed in a single script; likely 1-5 unique functions executed in a single worker; likely 1-100 function-based tasks executed in a worker lifetime / execution session.

What rate? What minimum function execution time?

Minimum function execution time could be microseconds, but we expect the most interesting work will be simulation tasks on the order of seconds to minutes, with less interesting work (from the standpoint of SCALE-MS value added opportunities) consisting of simulation operations on the order of hours to weeks. This is why we are interested in the overhead of data placement and task switching where we think we can reduce it from minutes to seconds or less.

A scenario we encounter regularly is an iteration sequence like 5-minute simulation -> 10-second data placement -> sub-second analysis -> 5-minute simulation that has been manually optimized down from several minutes of data placement and analysis, but which could be optimized to even less data placement cost with better API integration, which would in turn allow better statistics from a method by supporting more (shorter) iterations.

What function call context is required and how it is specified

I think this has been the goal of a lot of the work so far on our end.

Let me try to break apart some of these discussion points into separate topics. I'll update soon.

From @andre-merzky

By my comment, I meant that it is preferable to make command-line executables a subset of Python function use cases than the other way around, and it is the sufficient to focus on the Python function use case.

Functions and Executables may not differ semantically, and I can see why the upper layers may want to treat them on equal footing. On the execution layer though, they are very different beasts and imply different implementations (and partially different architectures). For example, the notion of an execution environment is very different: for functions this refers a call context (name spaces, available modules and entities), for an executables it refers to things like shell environment, I/O redirection, etc. Some of those may overlap (in principle a function call may look at os.environment and read from STDIN, but it's not common), for others its strictly different. The main difference if process creation and placement: for function calls, we need to place a custom worker (our code) on the target core(s) which hosts the function call and ensures the right call context; for executables we do not need workers on the target nodes at all, but ensure process creation and execution context via system utilities. Isolation is another topic we need to consider: executables are usually well isolated - consecutive or concurrent python functions running in the same worker are usually not.

For those reasons we would like to keep a somewhat separate discussion of both concepts, at least wrt. requirements and architecture / implementation details. How that distinction is rendered on the API level (if at all) is a different question.

Minimum function execution time could [anything]

I am rephrasing above, obviously :-) but that's what we expected mostly. The overhead of running microsecond functions on a remote node is likely significant, no matter what we do, and we would like to communicate that clearly to manage expectations to the execution layer. There is only so much we can do wrt. latency hiding of the communication required to get the function to the node and back again... In general, the expected execution time will put a limit to the efficiency at which we can execute at scale. We feel fairly comfortable with everything running longer than a couple of minutes, at scale.

From @eirrgang

In general, the expected execution time will put a limit to the efficiency at which we can execute at scale. We feel fairly comfortable with everything running longer than a couple of minutes, at scale.

That is our expectation, too, so we are trying to describe cases where we think we can turn groups of short tasks into longer work loads, and to identify the types of resource persistence or software interactions necessary. I think the next step for me, then, is to flesh out #12

... for executables ...

Thank you for clarifying the distinctions. I'm not sure how the use cases handled by such a launch mechanism interact with any use case I've considered previously, so I'll have to chew on that...

eirrgang commented 4 years ago

I don't know how far we want to go with a "hello world" example. On the client side, it looks something like this, and in the runtime, it looks something like this.

Apologies... I just noticed that these links go directly to images, rather than to http://www.plantuml.com/plantuml/uml/ though the image URLs can be pasted there for editing in the web interface.

Unless this generates discussion in itself, I think #12 #15 and #16 will proceed quickly enough to provide the necessary stimulation for architecture discussions.

mturilli commented 4 years ago

Thank you for the diagrams. Looking at the diagram on the runtime side, and before we start to discuss about it, how much did you prototype of the worker already?

eirrgang commented 4 years ago

This is basically the execution model in gmxapi 0.1, but the worker as diagrammed has not really been encapsulated. The main functionality that is incomplete is workflow checkpointing. I think SCALE-MS architecture discussions will help me to identify appropriate encapsulation and fully factor the roles in the new development cycle. The worker was more strongly defined in gmxapi <= 0.0.7 as the Session, but the interface with the operation implementation was poorly defined and the design lacked important features like workflow checkpointing and data flow. Note, again, that "worker" is not a term we have previously used in gmxapi, and the analogy to existing gmxapi roles is not always strong, so we should be careful with our assumptions.

peterkasson commented 4 years ago

BTW just to make sure we all have our terminology consistent: Eric--I think the "Session" is the overall run environment, correct? All assigned workers collectively provide the "compute context" in earlier gmxapi specifications. So as Eric says the notion of an individual "worker" is (1) abstracted a fair amount and (2) was stepped around with the MPI implementation for the context executor.

Eric, correct me if I'm wrong above.

edited by Eric to remove special GitHub issue-referencing syntax

eirrgang commented 4 years ago

Yes, @peterkasson 's point is important. Previously, we have avoided any strong specification of how processes or other run-time resources were mapped to tasks, in part because it was not obvious whether and how to distinguish parallelism within tasks from parallelism between groups of tasks. Our strong mapping of processes / MPI ranks to ensemble members was a necessary implementation detail of our MPI ensemble executor and is explicitly non-normative.

mturilli commented 4 years ago

From our runtime point of view, we are non-normative about the worker either. It can be single-multiple core/gpu/node, implemented as single/multi-thread/process, OpenMP or MPI executable.

eirrgang commented 4 years ago

Okay, good. That should simplify the discussion. The Worker then looks like an implementation detail of a Session, or the concrete sub-session executing as implemented in a particular context.

For a single element of work: here (updated)

For multiple elements of work (but still focusing on just one of them): here

andre-merzky commented 4 years ago

Thanks @eirrgang for these UML docs. One questions on them: am I correct to assume that the full scope shown in the diagrams is executed within a worker on the HPC resource? Or are those Checkpoint Facilities and Operation Implementation services / processes / ... running outside of the worker? Thanks!

eirrgang commented 4 years ago

Those boxes are natural API boundaries (and packaging boundaries, in extension cases), but within the scope of a worker (e.g. a single Python interpreter process, or a group of Python interpreter processes sharing a (sub)communicator). Here is a minor update to the first diagram (I'll update the edit link above...).

Note that gmxapi 0.0.7 behavior is very similar to the second diagram, but without the checkpointing. gmxapi 0.1 behavior is essentially a recursive version of the first (again without checkpointing), since the data model for resource subscription was not complete. Checkpointing is core to the new prototyping, though. I need to update #15...

andre-merzky commented 4 years ago

"my_array" node: I am not sure what this means, can you clarify please?

a worker [==] a single Python interpreter process: this implies that a worker instance is constrained to a single compute node, at most. Is this correct, and are we sure that this won't change? Constraining a worker to a single compute node will significantly simplify things (from our perspective), relaxing this at a later point will be very difficult.

I think we discussed this on a call before, but I would like to record this here to document requirements clearly.

eirrgang commented 4 years ago

"my_array" node: I am not sure what this means, can you clarify please?

I'm sorry it is a little unclear because this does not yet show the levels of abstraction in abstract work and concrete work or results. Maybe it was not helpful to add those additional notes at this point. The idea, though, is that the serialized record of the work to perform is received. I referenced the node's label ("my_array") instead of a unique identifier because the diagram is not particularly clear about the fingerprinting yet.

this implies that a worker instance is constrained to a single compute node

This is important to clarify and I updated the comment.

The diagrams should apply equally well to a single-process task or to a multi-process task. If the worker may include more processes than are applied to a particular task, that is beyond the scope of the diagrams.

I believe we need to support tasks that are internally accelerated with multi-node MPI in both the short and long term in order to be able to execute useful work loads based on existing software. It sounds like the sensible way to do that is with a multi-node worker.

We can pursue abstractions to task-internal parallelization for software optimized for SCALE-MS execution, but I don't know what priority or time scale that would have.

andre-merzky commented 4 years ago

Thanks for clarifying (on both accounts)!

Re multinode worker: Understood. The problem will not be to spawn multinode workers, but to have a clear understanding of the implications of running multiple concurrent MPI tasks within such a multinode worker. If the tasks are expressed via a command line, then the MPI communicators of the different tasks will be (relatively) easy to isolate. If the tasks are method invocation, things are much less clear, and capabilities will likely depend on the underlying MPI layer.

There arises also a question of separation of concerns: my mental image up to now was that the pilot system (or execution system as a more general term) would be responsible for placing and starting workers, and workers would simply burn through tasks which get fed via some communication channel. If workers can span nodes and can potentially run concurrent MPI tasks, they themselves need to have mechanism for task placement and execution, thus repeating some of the functionality of the layer which started the workers in the first place.

eirrgang commented 4 years ago

running multiple concurrent MPI tasks

I don't know whether this needs to be possible in early versions, as long as there is some mechanisms by which knowledge of worker and task parallelization can be reconciled. Mismatches would be reasonable places to put barriers between chunks of work that can be executed in a single worker lifetime. But we need to be able to use the worker parallelism to inform task-internal parallelism and/or allow some mismatch/waste in order to insert light-weight tasks between between big highly parallel tasks, or to append tasks.

If the tasks are method invocation, things are much less clear, and capabilities will likely depend on the underlying MPI layer.

I'm not sure why it is easier to run concurrent tasks from the command line... In either case, a Worker's ability to execute a multi-node MPI-enabled task would necessarily depend on the capabilities of the execution environment, so the ability to fill available ranks with independent concurrent tasks may be, as well.

Separately, we have the functional area of related parallel tasks that I wanted to elaborate on in #16. Peter and I consider a Session scope to be inclusive of, say, 16 simulation trajectories being produced in parallel by 4-node simulation tasks in a 64-rank MPI communicator. Each of these simulation tasks produces one of 16 uniquely fingerprinted sets of artifacts, but should be addressable at a high level in a unified way, such as subscripts on a symbol. And whether or not the simulations are coupled by MPI is definitely an implementation detail of the execution environment. I had assumed such a thing would mean a communication abstraction for coscheduled workers, but if it makes more sense to include all of that in the scope of a single worker, it becomes more important to be able to subdivide the worker resources and coschedule interstitial tasks, such as 4-node per-simulation analysis steps between ensemble simulation phases.

There arises also a question of separation of concerns: my mental image up to now was that the pilot system (or execution system as a more general term) would be responsible for placing and starting workers, and workers would simply burn through tasks which get fed via some communication channel. If workers can span nodes and can potentially run concurrent MPI tasks, they themselves need to have mechanism for task placement and execution, thus repeating some of the functionality of the layer which started the workers in the first place.

Yes, I think that is the crux of what we need to work out. But we can still impose a lot of the logic from outside of the worker, and limit the scope of what the worker can resolve on its own. Peter and I propose that a big and useful set of ensemble simulation protocols are effectively optimized if we can map worker scope to array dimensions and nesting scopes of algorithm expressions. If workers could communicate with each other, we could have smaller, longer-lived workers. Otherwise, we would need to relaunch workers of appropriate "size" when exiting certain algorithm scopes or when the work no longer matches the worker size.

In any case, I still think the worker can burn through tasks which get fed through some communication channel, but it may need to be given some additional hints on how to allocate resources to those tasks.

mrshirts commented 4 years ago

If workers can span nodes and can potentially run concurrent MPI tasks, they themselves need to have mechanism for task placement and execution, thus repeating some of the functionality of the layer which started the workers in the first place.

Large molecular systems will need to be running on multiple nodes, and they will be using MPI parallelization in may cases. Now, most of these MD codes have relatively robust ways of optimizing within the nodes they are assigned, so to some extent it's possible they could be treated as "black boxes" and only the overall performance noted, rather than the allocation of work within the node itself.

One question is how much we can assume homogeneity. In many cases (maybe most, maybe all?) one can assume that all jobs of a certain type will have the same requirements (same number of codes, same communication patterns). So all the 'compute' jobs, since they will be of the same system, will be similar, or, less restrictively, there will be 2-3 types of compute jobs, and within each category, all the requirements (including number of nodes, parallelization) are the same. 'Analysis' jobs will have different requirements than compute job.

andre-merzky commented 4 years ago

Swapping workers depending on the arriving tasks will incur some significant inefficiencies at scale: assume 1000 nodes, and a typical worker launch rate of 10 to 20 workers per second. Swapping workers on all nodes would need 50 to 100 seconds (1000 nodes is less than a fourth of Summit, and about a 10% of Frontier (est.)). I would suggest not to plan for worker swapping if it can be avoided.

Sure, one can overlap worker startup with computation and/or data transfers - but at complete turnovers like switching the whole allocation from simulation to analysis mode, there are limited options to hide the swap delay.

And whether or not the simulations are coupled by MPI is definitely an implementation detail of the execution environment.

Isn't that an implementation aspect of the simulation engine being used? I am not sure if the execution management layer (which spawns MPI processes and/or spawns new MPI communicators, in my understanding) can influence the coupling of simulation components.

To elaborate on the difference between spawning MPI tasks as new processes, and running the worker as MPI processes and execute the tasks within their MPI communicator: In the first case (processes), the workload (the simulation code) will be able to use MPI_COMM_WORLD as usual, and will see all MPI ranks etc. When executing the tasks as method calls in the worker, and thus within an existing MPI communicator, the workers need to spawn a sub-communicator, and the workload (the simulation code) cannot use MPI_COMM_WORLD, but needs to use the spawned sub-communicator. Thus we need the capability to communicate to the simulation what communicator to use, and the simulation code needs to respect that information.

This at least is my understanding of the respective MPI mechanisms - happy to be corrected (preferably via some example code). Also note that, to my knowledge, not all MPI implementations have equal support of those mechanisms (I need to read up on this again though). Last but not least, most MPI applications implicitly assume MPI_COMM_WORLD to be usable, and don't easily work with other communicators. Are we sure this is not a problem for the target simulation codes (other than gmx where you have control over this aspect)?

mrshirts commented 4 years ago
  • but at complete turnovers like switching the whole allocation from simulation to analysis mode, there are limited options to hide the swap delay.

I don't think we want to be swapping everything from one mode to another. I think we want both analysis and simulation running simultaneously, with analysis continually launching new simulations as old ones deposit their information with the analysis code and free up resources. This is the asynchronous model as I see it.

andre-merzky commented 4 years ago

Thanks @mrshirts, that's good to know. Nevertheless I would be cautious with a potential turnover if we expect the resulting system to work with use cases where consecutive batches of tasks are not uniform. If ye'all are sure that this will remain out of scope, then requirements do simplify.

eirrgang commented 4 years ago

Thanks for those numbers. That seems in line with what Peter and I figured we were up against. What Peter and I have been trying to specify are ways to identify or specify uniformity for chains of tasks so that we can intersperse simulation and analysis without relaunching workers. To clarify, though: is the worker launch rate you describe per node or per worker?

And whether or not the simulations are coupled by MPI is definitely an implementation detail of the execution environment.

Isn't that an implementation aspect of the simulation engine being used?

Today: yes. But the intention is to enable coupling between simulations in terms of ensemble API features. It needs to be irrelevant to the ensemble API whether reduce() is implemented in terms of MPI, but MPI would be a reasonable way to implement reduce() for an ensemble executing in a multi-node Worker implemented in terms of mpiexec.

Thus we need the capability to communicate to the simulation what communicator to use, and the simulation code needs to respect that information.

Yes. This is why I don't understand how it could be easier rather than harder to run concurrent command line tasks in a worker vs. functional-call driven tasks.

For API-driven tasks (function calls), the worker can pass a sub-communicator. A multi-process MPI worker must use a task Builder that either allocates a subcommunicator, accounts for the task using all concurrency available in the worker, or launches a non-MPI instance of the task. (Ref "Feature: Environment provisioning", #3)

GROMACS can be an early adopter, demonstrating that large scale work can be managed more efficiently by accepting a subcommunicator to allow flexibility in external computing resource managers. But in order to allow more tools to get more benefit with less effort, I want us to consider how best to allow for coupled MPI contexts. Can we can use "smaller" MPI workers (granting their tasks access to MPI_WORLD) by allowing some coupling between (groups of) workers through the framework? Can a multi-node worker span multiple MPI contexts? My expectation is that the latter does not make sense, since HPC jobs typically equate to MPI contexts, and jobs are the unit by which we can coschedule. If, however, the pilot job system is already able to pool running jobs into larger workers, it would be an option.

Are we sure this is not a problem for the target simulation codes

API-driven simulation codes will need adapters to express and handle the MPI resources granted through the worker, or we have to assume each task will try to grab COMM_WORLD.

For generic command line tools, we cannot infer whether they will try to use MPI or not, so actually I believe the gmxapi command line wrapping scheme needs to be extended to explicitly annotate how many processes to execute the command on, as well as prohibiting concurrent execution of command line tasks in MPI environments unless they are explicitly annotated as not using MPI.

eirrgang commented 4 years ago

If ye'all are sure that this will remain out of scope

I don't think we can say that non-uniform work will remain out-of-scope entirely, but I think we can identify that optimizing uniform work is very valuable, and explore and characterize work that can fit into that optimization.

andre-merzky commented 4 years ago

To clarify, though: is the worker launch rate you describe per node or per worker?

10/sec is a conservative estimates for placing and executing workers on any compute node. It does not matter (much) if the workers are single core or multi-node MPI - running 10 mpirun / aprun / srun / jsrun / ... per second is about the rate most clusters support out of the box. That limit is not specific to RCT - other HPC task managers (like Flux) show similar rates in their default configuration. On some machines we are able to use fine tuned launch methods with higher throughput rates, but those efforts are not yet close to production, so I would not calculate with better numbers just yet.

And whether or not the simulations are coupled by MPI is definitely an implementation detail of the execution environment.

Isn't that an implementation aspect of the simulation engine being used?

Today: yes. But the intention is to enable coupling between simulations in terms of ensemble API features. It needs to be irrelevant to the ensemble API whether reduce() is implemented in terms of MPI, but MPI would be a reasonable way to implement reduce() for an ensemble executing in a multi-node Worker implemented in terms of mpiexec.

We are not yet in sync with terminology I think, I am likely missing your point, apologies. I think we use the terms execution layer and execution environment differently? I am not sure. Let me try some clarification, with request to correct me if I am off:

It needs to be irrelevant to the ensemble API whether reduce() is implemented in terms of MPI

Sure, I can see that, absolutely. The original statement though was:

And whether or not the simulations are coupled by MPI is definitely an implementation detail of the execution environment.

and here the terminology problem hits me: in my view, and execution environment has no implementation details. I would consider the question if a reduce() task uses MPI an implementation detail of the task executable or task function. And if MPI is used to couple different tasks, then that would be a shared implementation detail of those task implementations, not of the executor or execution environment. Or am I mis-parsing what you mean with simulations are coupled by MPI?

Thus we need the capability to communicate to the simulation what communicator to use, and the simulation code needs to respect that information.

Yes. This is why I don't understand how it could be easier rather than harder to run concurrent command line tasks in a worker vs. functional-call driven tasks.

Sending that communication to the task is easy either way. For example, os.environ() would work for both. Having code which can make use of that is what I consider the difficult part. Another question for clarification though: are the tasks which are executed (such as reduce()) always code which is written (or modified) within this project, or do you expect reduce() methods or executables to run which are provided by end users? The latter case is what has me worried, because those are unlikely able to use sub-communicators out of the box.

must use a task Builder that either allocates a subcommunicator

Interesting! I don't yet see how that would work, do you have a pointer to some documentation or implementation of this? (I checked #3).

[combining MPI communicators]

Ha, that would be fun! :-) But that is not possible to my knowledge, and I don't see how that could be expressed in MPI :-(

For generic command line tools, we cannot infer whether they will try to use MPI or not, so actually I believe the gmxapi command line wrapping scheme needs to be extended to explicitly annotate how many processes to execute the command on, as well as prohibiting concurrent execution of command line tasks in MPI environments unless they are explicitly annotated as not using MPI.

Agree, that cannot be inferred,

The above sounds a bit like you would expect the mpiexec commands or equivalents to be run by the gmxapi command line wrapp[er] - is that the case? Please note that the exact mechanism to start an MPI executable likely differs from machine to machine, and also depends on how the execution layer is configured, i.e., how it previously placed the workers, etc. In some cases you won't be able to do that without having a communication protocol between RCT and that gmxapi command like wrapper.

For example, on Summit we use a system called PRRTE for launching RCT tasks (in this case our worker processes). PRRTE creates a set of daemons on the compute nodes which it uses for process management and MPI setup. Any attempt to run mpiexec without talking to those daemons is bound to fail, as is an attempt to use Summit's native jsrun command. Talking to those deamons comes with constraints (who can talk, concurrent communication limits, etc). PRRTE though is the only available option on summit to spawn more than a couple hundred concurrent workers (LSF / jsrun is not able to do so).

We likely need to consolidate our understanding about what entity manages the MPI spawning and execution, what knowledge needs to be exposed, etc.

API-driven simulation codes will need adapters to express and handle the MPI resources granted through the worker, ...

Adapters to the MPI layer?

...or we have to assume each task will try to grab COMM_WORLD.

The latter (grabbing MPI_COMM_WORLD) is what works easier with process based execution, as the tasks will indeed get a new MPI_COMM_WORLD.

andre-merzky commented 4 years ago

If ye'all are sure that this will remain out of scope

I don't think we can say that non-uniform work will remain out-of-scope entirely, but I think we can identify that optimizing uniform work is very valuable, and explore and characterize work that can fit into that optimization

That makes sense to me - thanks.

eirrgang commented 4 years ago

The above sounds a bit like you would expect the mpiexec commands or equivalents to be run by the gmxapi command line wrapp[er] - is that the case?

No, I expect that the Python interpreter was launched with the appropriate MPI launcher, and that we don't have absolute control of whether a command line tool will try to pick up the MPI environment when it is forked. If there are ways to handle that differently or better, my expectation would be that such details would be taken care of by overriding aspects of the commandline_wrapper operation implementation for classes of execution environments.

  • execution layer: some entity which is able to dynamically, on request, place processes on compute nodes
  • execution environment: the OS environment a task will run in when being executed on a compute node (this includes information about MPI communicator, if needed)
  • worker: a proxy process running on one or more compute nodes, started by the execution layer, and responsible for preparing a tasks execution environment and executing a task (either by spawning more processes or by calling methods from loaded Python modules or shared libraries)

That sounds right to me, except that a worker running on multiple compute nodes is necessarily multiple OS processes. But it sounds like these processes are launched such that the execution layer is already interacting with them as a multiprocess task in terms of the local job management system.

and here the terminology problem hits me: in my view, and execution environment has no implementation details.

Something has to account for the fact that some execution environments have MPI and some don't. I don't know where in the current stack that detail occurs or is abstracted. I expect that the packet of work given to a worker (JSON record, gRPC call, whatever) will not reflect such a detail, but it comes into play at some point as the worker is building the task.

I would consider the question if a reduce() task uses MPI an implementation detail of the task executable or task function.

Yes. The framework should provide an implementation of reduce() that works everywhere, we would want at least a couple of implementations optimized for particular circumstances, such as a group of processes already running in an MPI subcommunicator, or, more exotically, where a cloud computing API provides something similar.

And if MPI is used to couple different tasks, then that would be a shared implementation detail of those task implementations, not of the executor or execution environment. Or am I mis-parsing what you mean with simulations are coupled by MPI?

It is an implementation detail of the task, where the task implementation has to be chosen in terms of the facilities available in the execution environment.

Another question for clarification though: are the tasks which are executed (such as reduce()) always code which is written (or modified) within this project, or do you expect reduce() methods or executables to run which are provided by end users?

It would be great if we could work towards some abstractions of communication facilities that tool developers code code against, but in the short term it would be sufficient to allow an MPI communicator to be wrapped in the input resources that a task implementation can request, and I think there will be few early adopters of such functionality. Instead, I think a handful of data flow topology / "ensemble" operations need to be specified in the API and provided by the framework.

must use a task Builder that either allocates a subcommunicator

Interesting! I don't yet see how that would work, do you have a pointer to some documentation or implementation of this? (I checked #3).

The gist is to allow dispatching in terms of the Context, which encapsulates details of the execution environment, when getting Builders or Directors for operations and their resources. To minimize complexity in the code implementing the task, we can prefer to do most of the specialization in the builder for the resources that will be provided to the task.

This is an approximate diagram. (This is an older diagram that was intended to illustrate that a single model is used to construct a handle to an operation whether it is in the run time context or in an abstract high level context that defers execution, but that just means dispatching on a different aspect of the Context. It doesn't illustrate the more recent realization that it is necessary to dispatch builders for the input/output/resources as well.)

The implementations aren't clean yet, but the concepts are evident at https://github.com/kassonlab/gmxapi/blob/894531636b81554c6eea72593c32476d5511a250/python_packaging/src/gmxapi/simulation/mdrun.py#L560 and https://github.com/kassonlab/gmxapi/blob/894531636b81554c6eea72593c32476d5511a250/python_packaging/src/gmxapi/simulation/context.py#L409

We likely need to consolidate our understanding about what entity manages the MPI spawning and execution, what knowledge needs to be exposed, etc.

Yes. Let me try...

It sounds to me like we both understand the MPI spawning to occur during worker launch. Is that correct? It sounds to me like the RADICAL worker equates to the process (group) associated with an MPI context, but I'm not completely clear on that. A worker never spans multiple MPI contexts, but it seems like maybe multiple workers might exist within an MPI context (not clear on that), but that such workers could not be used for command line tasks because of the inability to restrict access to MPI_COMM_WORLD.

API-driven simulation codes will need adapters to express and handle the MPI resources granted through the worker, ...

Adapters to the MPI layer?

... Adapters between the existing tool and the SCALE-MS API facility that enables such enhanced use cases. In other words, someone with knowledge of both the tool and of SCALE-MS will need to write a little bit of glue to get an MPI communicator from SCALE-MS instead of from os.environ() or from mpi4py.MPI import Comm_World.

...or we have to assume each task will try to grab COMM_WORLD.

The latter (grabbing MPI_COMM_WORLD) is what works easier with process based execution, as the tasks will indeed get a new MPI_COMM_WORLD.

Okay. That is when the task spans the entire worker, which I understand to be typical, but I wasn't sure whether it was universal.

I am fine with that model, but the point is that it warrants some facility for some operations that span workers, such as an "ensemble reduce," that couples the data flow between an ensemble of (chained) tasks frequently enough that we can't afford 50-100 seconds of turnover to switch to the reduce task and then again to resume the ensemble of simulations.

Peter and I propose that we can define the scope of such a group of tasks as outer array dimensions from the high level down to the execution layer, but we need some sort of run time facility by which we can enter a scope, within which the execution layer can keep the worker(s) alive for multiple intervals of (uncoupled) simulation interspersed with additional (coupling) logic, communication, or data placement. The scope needs to be a hint rather than a requirement, though, to allow graceful checkpoint / recovery, and flexibility in scheduling the uncoupled portions of the ensemble work.

peterkasson commented 4 years ago

Just to clarify here: I think a "worker" maps to an ensemble member in our framework. The worker may be wide and may have an MPI communicator or other parallelism. We of course perform some operations across ensemble members (and in gmxapi to date that has used MPI), but in our spec going forward we would not expect that to MPI communication.

What we'd really like is for a set of workers to be allocated when a given ensemble comes into scope and for that set of workers to remain allocated until the ensemble goes out of scope. (Obviously, there could be worker failures, etc. but this is what it should look like to the calling code.)

For both control and data streams to/from allocated workers, we are tentatively proposing grpc (https://grpc.io/) as a protocol that would provide bidirectional point-to-point streams.

eirrgang commented 4 years ago

Right. I'm responding at issue #16 with respect to the ensemble scoping points. I'll respond more in this issue when I have more to say about the control interface.

eirrgang commented 4 years ago

This conversation seems to have run its course, but I'm not sure how to translate it to some sort of issue resolution. I will try to distill the most salient points I observe into wiki entries, but it may take a while (other participants please feel free to do the same) and we can open more targeted issues as needed to follow up.

At least one key point that has been identified:

In RP, there is not an abstraction that links operations that are implemented differently for different run time environments. For SCALE-MS to allow user-facing work descriptors to be defined in terms of expected outcome rather than implementation details, user-level work needs to be mapped to implementation details such as LaunchMethod, executable / kernel, and other task description elements according to the targeted computing resources (and potentially other parameters).

eirrgang commented 4 years ago

In RP, there is not an abstraction that links operations that are implemented differently for different run time environments.

Possible correction: Is the kernel variant of a ComputeUnit intended to implement some sort of abstraction like this? Is the concretization of a kernel deferred at all relative to a ComputeUnit built from an executable based task description?

eirrgang commented 4 years ago

I have attempted to record the major points at https://github.com/SCALE-MS/scale-ms/wiki/Execution-model#work-load-life-cycle