Open peterkasson opened 4 years ago
As we talk, member the worker checkpointing would require two or three operations: reload_all_from_checkpoint() - resets all ensemble members to the latest checkpoint also some capability to control checkpointing (checkpoint on operation finish vs. checkpoint at time interval) and some capacity to detect worker failures / checkpoint auto-reloads (heartbeat signals or some such, perhaps with a hash signature of state)
One final comment: the g_rpc control channel bundling will ultimately limit the scalability of ensemble size. If we can also schedule ensemble members that can themselves launch ensembles, then we can overcome this. In that case, a 100k-member logical ensemble would be handled as an ensemble of 100 "sub-master" members, each of which launches a 1000-member ensemble. This has been pretty well documented in Dave Konerding's Exacycle paper (as well as elsewhere).
@peterkasson , @eirrgang : Thanks for this proposal! Matteo and I discussed this, sorry if the answer below is somewhat long:
In principle, we could work towards providing such a CPI.
Specifically:
Note that the configuration of the workers during launch will define (and also limit) the type of workloads one can run on the workers: workers which span a single node cannot be combined to run MPI-communication across multiple nodes (multiple workers). If a change of configuration is required, we would want to receive a new CPI request to tear down the old worker and to start a new one with a different layout. It's our responsibility to ensure that those CPI requests are enacted timely, but a trade-off between worker-lifetime and worker-throughput will likely remain (longer living workers which can be reused w/o layout change will result in higher overall utilization at scale).
On establishing RPC connectivity:
We read your proposal/request the following way. Please correct if this is incorrect:
We find this difficult for two reasons:
As an alternative, we would propose the following scheme:
This way, the RPC bootstrapping remains in the ensemble layer (no layer violation), is natively async (individual workers send separate RPC messages), and dynamic (workers can be used as they come up and register).
Questions
@peterkasson , @eirrgang : Thanks for this proposal! Matteo and I discussed this, sorry if the answer below is somewhat long:
In principle, we could work towards providing such a CPI.
Specifically:
- We can provide the capability to ensemble masters to request the startup for ensemble workers.
Is "ensemble master" something that exists now or is already documented, or is this a new concept?
- Those workers can in turn invoke the CPI and request further workers to be spawned. The CPI request would include resource requirements for the workers, such as: number of cores, gpus, nodes; amount of memory, local disk space, etc. type of worker (MPI communicator or not, OpenMP env or not, etc. executable (image?) to be loaded by each worker
Interesting! Would the new workers be hierarchically managed by the workers that requested them, or would they be incorporated into a flat pool in which workers are aware of each other?
- The CPI would further provide means to manage (terminate, inspect) workers, or to replace workers with others of different type or layout.
- Worker state changes and failure would be reported via callbacks.
Excellent!
Note that the configuration of the workers during launch will define (and also limit) the type of workloads
Yes. This is true today, but without any ability to reuse workers. We believe that discussions to date indicate that we can optimize a lot of ensemble work within this constraint.
This way, the RPC bootstrapping remains in the ensemble layer (no layer violation), is natively async (individual workers send separate RPC messages), and dynamic (workers can be used as they come up and register).
Your proposal sounds good. I may be missing some of the subtle distinctions. Do the ensemble workers create service end points or initiate client connections to a CPI service on the ensemble master? Is the ensemble master part of the work placement system? or a special worker? Again, is this part of an architecture that is already documented, or a new idea? I apologize, but I'm still catching up on SAGA and RADICAL docs.
- restore of checkpoint: We would assume that this is a RPC request from ensemble master to ensemble worker, and not part of the runtime system behind the CPI?
I think that data placement should happen in separate calls than work placement for many reasons, but it is the responsibility of the upstream framework to either place the data that an operation needs to start or confirm that it has already been placed. Local checkpoint state should be clarified as the worker launches a new task, and the worker needs to communicate updates to graph state.
If you are talking specifically about Peter's mention of reload_all_from_checkpoint()
, then, I expect this actually means one of several scenarios.
In the simplest case, work that was previously launched needs to be launched again, and all of the ensemble workers receive a task they have seen before.
In one variant, each worker may be able to resume execution of the task from local checkpoint information.
In another variant, the workers may be required to discard some local state and/or reacquire previous state data to restore a globally consistent graph state.
In any case, a task that is already executing may need to be canceled, either because there is no way to preserve graph consistency (the results of the task are preemptively invalidated), or to resynchronize the ensemble quickly.
- data bundles: what are those specifically?
For gmxapi 0.0.7, each worker received the same instruction and was configured at launch with an ensemble member ID that could serve to index the outer dimension of array data that should be decomposed across the ensemble. The input may be a list of files or something lighter weight than the actual data, which we can place per worker. gmxapi 0.1 development illustrated that we need stronger specification of data type, shape, uniqueness, and locality.
I described some data reference and decomposition semantics in the serialization proposals, and the basic ideas still apply, with some extension.
I have proposed that we define some basic types and fingerprinting rules for those types, so that a static node for an NxM Integer64 array can have the same fingerprint whether it is stored in a JSON file or numpy file. In general, when a resource reference is passed between components, an accompanying (fingerprinted) reference to the understood state of the resource is needed, and additional messages may be needed to confirm that both components either have or know how to get a state matching the reference. We can use things like request IDs and ETags in the RPC part, but we need at least to be able to reconstruct standardized resource identifiers across components and layers, and to provide for the failure cases in which a reference cannot be resolved: either the graph needs to be reverted to a check point, or maybe we decide to hope for cache hits and resolve misses instead of checking what resources need to be placed before they are referenced.
Data could be placed via RPC in the context of the CPI, or data could be placed (via RPC or through some other mechanism) and just announced through the CPI (e.g. a CPI instruction that amounts to {"uid": "e89c...df01", "operation": ["scalems", "details", "read_npz_file"], "input": "e89c...df01.npz"}
). If the CPI is based on a network protocol stack, like gRPC, the existing standard influences how we may (but are not required to) buffer data directly to the run time, and we don't need to specify at the execution layer whether the data ever hits the filesystem.
I think the other point that Peter is drawing attention to is that the higher level API may represent a data resource as an N-dimensional object, but the ensemble execution may involve decomposing (and/or composing) to/from (N-1)-dimensional objects for individual ensemble members, and that operations may have multiple data dependencies that are individually represented in the work graph. It is not important to the run time how these resources arrive, as long as references to the locally-relevant data can be resolved as the task placed by the CPI is translated for execution.
Is "ensemble master" something that exists now or is already documented, or is this a new concept?
I may have worded that badly, apologies. You wrote:
... If we can also schedule ensemble members that can themselves launch ensembles ...
I meant the task which launches an ensemble. I named that ensemble master
, and the tasks spawned (forming the ensemble) ensemble worker
. If that is confusing please let us know, and I'll forget the terms quickly again :-)
Interesting! Would the new workers be hierarchically managed by the workers that requested them, or would they be incorporated into a flat pool in which workers are aware of each other?
They are inherently in the global namespace the execution layer has of all tasks. operations (state checks, cancellation etc) can be directed to individual tasks, to sets of tasks, or to all known tasks.
Do the ensemble workers create service end points or initiate client connections to a CPI service on the ensemble master? Is the ensemble master part of the work placement system? or a special worker? Again, is this part of an architecture that is already documented, or a new idea? I apologize, but I'm still catching up on SAGA and RADICAL docs.
I also apologize, it is not unlikely that I am still mixing up terms and concepts.
Inherently, we try to consider the execution management and the ensemble or MD management as separate layers. RP is an execution layer: it uses it's own network overlay to coordinate different components which ultimately all manage tasks. My understanding from your input is that the GMX layer, i.e., the layer which executes MD simulations in ensembles, coordinates work items between workers within an ensemble via and RPC protocol - is that correct? I also understood from comments above that you were asking if the execution layer can created the respective RPC communication endpoints for the workers and pass them back to whoever requested the woerkers to spawn (master). Is that also correct? If so, then the reply means: rather than the execution layer attempting to create RPC endpoints of the ensemble layer, I would suggest to keep RPC connection management completely in the ensemble layer, and suggested a mechanism how that information could be isolated during RPC connection bootstrap.
Again, is this part of an architecture that is already documented, or a new idea?
I do not know if the RPC layer is implemented, and how it is currently bootstrapped :-)
I am afraid that the explanation about data bundles and data placement still leaves me confused, as does this paragraph:
Data could be placed via RPC in the context of the CPI, or data could be placed (via RPC or through some other mechanism) and just announced through the CPI (e.g. a CPI instruction that amounts to {"uid": "e89c...df01", "operation": ["scalems", "details", "read_npz_file"], "input": "e89c...df01.npz"}). If the CPI is based on a network protocol stack, like gRPC, the existing standard influences how we may (but are not required to) buffer data directly to the run time, and we don't need to specify at the execution layer whether the data ever hits the filesystem.
I think I understand how and why data items need to be announced, but I do not yet understand why that would be part of the CPI we provide. I likely have a more limited mental image of the scope of the CPI, the RPC protocol, and the workers, and I may have a different separation of concerns in mind than you.
Our suggestion would be to implement a quick experimental prototype of the CPI and some kind of worker skeleton over the next weeks, not necessarily as an attempt at a product, but rather as a discussion input to get more clarity. Matteo and I discussed that, and we should be able to do so within the next 4 weeks. If that sounds like a viable plan to (a) have something to toy with and (b) get more clarity into our discussion by demonstrating how we envision CPI and worker scope, and thus separation of concerns, let us know, and we'll give it a try.
I can work with Eric to help clarify the ensemble master concept. That is familiar to me. We'll have to see whether the master is also a task scheduled on RP or remains external. But the idea would be a (local) tree structure for communication between ensemble members, over grpc.
On Sun, Feb 9, 2020, 5:27 PM Andre Merzky notifications@github.com wrote:
I am afraid that the explanation about data bundles and data placement still leaves me confused, as does this paragraph:
Data could be placed via RPC in the context of the CPI, or data could be placed (via RPC or through some other mechanism) and just announced through the CPI (e.g. a CPI instruction that amounts to {"uid": "e89c...df01", "operation": ["scalems", "details", "read_npz_file"], "input": "e89c...df01.npz"}). If the CPI is based on a network protocol stack, like gRPC, the existing standard influences how we may (but are not required to) buffer data directly to the run time, and we don't need to specify at the execution layer whether the data ever hits the filesystem.
I think I understand how and why data items need to be announced, but I do not yet understand why that would be part of the CPI we provide. I likely have a more limited mental image of the scope of the CPI, the RPC protocol, and the workers, and I may have a different separation of concerns in mind than you.
Our suggestion would be to implement a quick experimental prototype of the CPI and some kind of worker skeleton over the next weeks, not necessarily as an attempt at a product, but rather as a discussion input to get more clarity. Matteo and I discussed that, and we should be able to do so within the next 4 weeks. If that sounds like a viable plan to (a) have something to toy with and (b) get more clarity into our discussion by demonstrating how we envision CPI and worker scope, and thus separation of concerns, let us know, and we'll give it a try.
— You are receiving this because you were mentioned. Reply to this email directly, view it on GitHub https://github.com/SCALE-MS/scale-ms/issues/23?email_source=notifications&email_token=ACRWZNM3AAJV7VNYB4DWXK3RCB7N5A5CNFSM4KQLXXBKYY3PNVWWK3TUL52HS4DFVREXG43VMVBW63LNMVXHJKTDN5WW2ZLOORPWSZGOELG2B6Q#issuecomment-583901434, or unsubscribe https://github.com/notifications/unsubscribe-auth/ACRWZNPQ4ROOJTYXHVUIO73RCB7N5ANCNFSM4KQLXXBA .
Andre: Let us look at the gRPC spec and see if we can make it work that way.
I think we need the ability to have workers all online because synchronous communication algorithms will block all the running workers while waiting for the other ones; however, spinning up workers as they come available is certainly something we want to do since many of the communication algorithms will be asynchronous.
As I mentioned previously, the "ensemble master" might be a high-priority worker on RP, or it might live off RP. I'm not a priori attached to either one (although if it lives within RP it will probably need a similar gRPC channel out of RP).
Andre: Let us look at the gRPC spec and see if we can make it work that way.
I did look at the spec, and that prompted me to suggest the scheme. We'll include that setup in the prototype, lets see how it goes and if I'm overlooking something...
we want to do since many of the communication algorithms will be asynchronous.
From my perspective, the most interesting ones are asynchronous!
CPI interface proposal #23
operations (state checks, cancellation etc) can be directed to individual tasks, to sets of tasks, or to all known tasks.
Perfect!
GMX layer, i.e., the layer which executes MD simulations in ensembles, coordinates work items between workers within an ensemble via and RPC protocol - is that correct?
We have not yet isolated an RPC component, but yes, there is a pair of layers such that ensemble aspects are managed above the executing simulations. We are looking forward to moving this out of the GROMACS-centric regime to a facility that can be provided through the SCALE-MS specification.
asking if the execution layer can created the respective RPC communication endpoints for the workers and pass them back to whoever requested the woerkers to spawn (master). Is that also correct?
I think so, insofar as asking
why that would be part of the CPI we provide
My comment was with regard to the interface that supports the placement of a "node" of work.
If the "node" is an instruction that includes symbolic references, it is helpful if the same interface allows definition of those references, which is trivial if both calculations and resources are representable as nodes. (And I have advocated for a "node" as a generalization of what was described as an "executable".)
I see now that you guys are thinking of the CPI scope as limited to launching and tearing down workers, separating dynamic task placement to a distinct RPC facility, whereas I was thinking of the CPI as something richer that could be implemented in terms of an RPC facility. I am open to separate interfaces for "worker management" and "adaptive ensemble task placement"... are we calling these CPI and RPC, respectively? What does CPI stand for, exactly?
I may have a different separation of concerns in mind than you.
Yes, something worth exploring. Does clarification / convergence on this warrant an action item of its own?
experimental prototype of the CPI and some kind of worker skeleton
Great! I am doing some similar prototyping with gRPC and the entry point indicated at https://scale-ms.readthedocs.io/en/scale-ms-15/executor.html
Our suggestion would be to implement a quick experimental prototype of the CPI and some kind of worker skeleton
Do you have an expectation at this point of whether the worker is implemented in python or c++?
Currently, it looks like the near-term runtime interface consists of radical.pilot.raptor.Master
, radical.pilot.raptor.Worker
, and some additional semantics in Task descriptions.
scheduler
may name a rp.Task
that creates an instance of Master
, uses Master.submit()
to launch one or more workers, and calls Master.start()
.Worker
and calls Worker.start()
.scheduler
.Master.request()
.Task.description.arguments[0].mode
to the function registered for the mode.Master.result_callback()
with the result of the function dispatched for mode.With some caveats, an RPC protocol can be implemented between the client and the Pilot environment in terms of Task.description.arguments[0].data
, corresponding Task results, and Pilot and Task data staging directives. A new stage_on_error
directive allows a consistent protocol to be built on top of the Master/ Worker tasks and Tasks targeting the Master as scheduler, but we need to establish the protocol for response/error encodings and data staging directives. (#92, #107)
(Note that the Task schema will be updated at some point to better accommodate "raptor" tasks. For the moment, the semantics of arguments are overloaded such that the first element is assumed to be a string-encoded JSON object with (minimally) mode and data keys, and (optionally) some TaskDescription-like fields.)
The ensemble handling and data scoping described above in this issue require some additional functionality by which
The resource requirements for traditional rp Tasks function the same for Master and Worker tasks. Raptor requests must fit in the resources allocated to the available Worker(s).
This update does not mention communication channels. There has been some discussion about providing ZMQ access through the RP framework to the Master and Worker instances (and, thus, to the functions executed by the Worker), but this is still somewhat futuristic. (@andre-merzky or @mturilli, could you comment or link to RP discussion?)
The design direction is taking a rather different direction than initially proposed, but seems like it could be coming together. I think we should revisit where we are and where we would like to be, and try to resolve this issue in the next few weeks.
For reference, the small number of "commands" that scalems uses to direct the raptor Master task are documented with the scalems.messages module. Additionally, we almost certainly need at least a pair of commands to start and stop Workers, and at least one command to allow some sort of a query of status (of Master, Worker, Task, or data). Though it would make sense to implement this message-passing with ZMQ, implementation is currently in terms of specially constructed Task submissions, and could probably remain that way indefinitely.
edit: per https://github.com/SCALE-MS/scale-ms/issues/151#issuecomment-1377196318, we might want an additional control to allow tasks to be canceled more gently (or equivocally) in line with common Future patterns.
Most additional controls can be implemented on the "client" side.
The other aspect of a CPI interface, I think, is a formalism for how tasks can/should be packaged and receive resources. Currently, there are some conventions for either serializing or discovering functions to be executed, and a somewhat fluid API by which a rp.raptor.Worker can be subclassed and extended to customize "executor" functions, and the rp.raptor.Master can be subclassed to override a result_cb
(call back) handler for completed functions. Currently, the executor function signature allows arbitrary positional and key-word arguments of msgpack
-serializable values, and an optional comm
argument, accepting a mpi4py.MPI.Comm
object.
Any other details are left (by RP) to the client (e.g. scalems), which can use other extension features of the Master to intercept, rewrite, and generate Tasks at the execution site. This allows final customization of executor function arguments.
Once scalems development returns to the raptor backend, we can resume work on dynamically generating (in the Master) TaskDescriptions and function arguments with compatible representations of resources. As mentioned, once a raptor task is dispatched to an executor function at the Worker, it has no way to query TaskDescription or RP details directly, other than through inspection of the (optional) MPI communicator (unless there are useful environment variables that could be relied upon). One exception is the (unofficial?) presence of a file (I forget the name... something.sl
?) in the worker task directory, with some potentially useful hwloc/cgroups sort of data.
In all cases, handling of results (Python objects or filesystem artifacts), exceptions, and log messages is, at best, weakly defined and/or evolving. Hopefully, we can nail that down a lot more in the next week or two. Currently, most of the details will have to fall in scalems space or be deferred to future work, though we should be able to enumerate the necessary protocols (for scalems-rp interaction, such as file staging) and document long-term constraints (requirements for serializability, for instance).
Rough proposal for CPI: Key capabilities/operations: Allocate ensemble (number of members, compute capability spec for each member, image that each member should load identically, data bundle that could be mapped to each member) -> return bundle of g_rpc channels corresponding to each member
All ensemble operations would be conducted using the g_rpc channels (passing commands and data). Question of checkpointing (i.e. if a member is lost while the ensemble is active, ideally we'd like to be able to restore that from a checkpoint image, but this is negotiable)
Destroy ensemble (close channels, tear down members)
Eric and I think this should be sufficient to perform all the operations we need (and everything we've seen thus far in the use cases). Is this possible?