IntelPython / DPPY-Spec

Draft specifications of DPPY
4 stars 1 forks source link

__partitioned__ protocol for partitioned and distributed data containers #3

Open fschlimb opened 3 years ago

fschlimb commented 3 years ago

The current state of the specification can be found here: https://github.com/IntelPython/DPPY-Spec/blob/draft/partitioned/Partitioned.md

fschlimb commented 3 years ago

@YarShev @SmirnovEgorRu @rlnx @shssf @diptorupd @oleksandr-pavlyk I'd like to get your feedback/thoughts on this.

fschlimb commented 3 years ago

@PivovarA This should be of interest to your distributed sklearn work

fschlimb commented 3 years ago

@adarshyoga

YarShev commented 3 years ago

The first thing I would like to notice is that in the examples mapping shape to list is used instead of shape to tuple. The second is that references to data for Dask backend should be dask.distributed.Future, not dask.Delayed. Please, fix these.

The third thing that would be good to have is metadata on: 1) partitions themselves (in case partitions are DataFrames it would be good to know their index, columns, dtypes) 2) shape of partitions themselves

That could look as follows: 2d-structure, 2d-partition-grid, 4 partitions,partitions are of type ray.ObjectRef that refer to pandas DataFrame with shape (5, 5), ray

__partitioned_interface__ = {
    "shape": (2, 2),
    "partitions": {
        "partition_id0": {
            "data": ObjectRef0,
            "position": (0, 0),
            "node_id": '1.1.1.1',
            "metadata": {
                "shape": (5, 5),
                "index": None (or actual index in case it is possible, i.e. "partition_id0" is DataFrame),
                "columns": None (or actual columns in case it is possible, i.e. "partition_id0" is DataFrame),
                "dtypes": None (or actual dtypes in case it is possible, i.e. "partition_id0" is DataFrame),
            },
        "partition_id1": {
            "data": ObjectRef1,
            "position": (0, 1),
            "node_id": '1.1.1.2',
            "metadata": {
                "shape": (5, 5),
                "index": None (or actual index in case it is possible, i.e. "partition_id1" is DataFrame),
                "columns": None (or actual columns in case it is possible, i.e. "partition_id1" is DataFrame),
                "dtypes": None (or actual dtypes in case it is possible, i.e. "partition_id1" is DataFrame),
            },
        "partition_id2": {
            "data": ObjectRef2,
            "position": (1, 0),
            "node_id": '1.1.1.3',
            "metadata": {
                "shape": (5, 5),
                "index": None (or actual index in case it is possible, i.e. "partition_id2" is DataFrame),
                "columns": None (or actual columns in case it is possible, i.e. "partition_id2" is DataFrame),
                "dtypes": None (or actual dtypes in case it is possible, i.e. "partition_id2" is DataFrame),
            },
        "partition_id3": {
            "data": ObjectRef3,
            "position": (1, 1),
            "node_id": '1.1.1.4',
            "metadata": {
                "shape": (5, 5),
                "index": None (or actual index in case it is possible, i.e. "partition_id3" is DataFrame),
                "columns": None (or actual columns in case it is possible, i.e. "partition_id3" is DataFrame),
                "dtypes": None (or actual dtypes in case it is possible, i.e. "partition_id3" is DataFrame),
            },
        },
    },
}
  • For SPMD-MPI-like backends: partitions which are not locally available may be None. This is the recommended behavior unless the underlying backend supports references such as promises to avoid unnecessary data movement.

I am not sure of I understand that case. Where can we retrieve data for those positions from? I don't see they are presented in the interface somehow.

fschlimb commented 3 years ago

@YarShev Thanks for your comments. I corrected the tuples and using dask.Future.

With respect to the metadata: I think I do understand the motivation for including such information. However, the information given in your examples is not really about the partitioning. I think we should cleanly separate the partitioning interface from anything that is related to the underlying dat/data-structure itself. That's also why I suggested to not provide the global shape of the data, only the shape of the partition-grid.

I do agree we should allow any given implementation (like modin) to add more data to the dictionary than what's specified in the protocol. I am just not convinced what you suggest should be a mandatory part of the protocol because a) it's a different kind of information and b) it is not generally useful for being used across different container types.

About the SPMD question: The last example assumes two ranks. The 'partitions' entries are for rank 0. The comments show what would be present on rank 1. The protocol does not provide means to get partitions which live on other ranks. If one needs to get a partition from a different rank one needs to explicitly use any available communication infrastructure (like MPI). Does this make sense?

YarShev commented 3 years ago

@fschlimb , about the metadata I specified, saying about Modin, if we know that information, we can avoid any computation triggering on Modin side when creating a Modin DataFrame. Thus, it allows us to avoid any undesired data movement when running remote calls even though Ray and Dask try performing any calls as closer to data as possible. However, I agree with the points a) and b). Probably, we may want to generalize that information somehow.

"metadata": {
    "shape": (5, 5),
    "dimensions": {
        "index/height/or just 0": values of dim0,
        "columns/width/or just 1": values of dim1,
    },
},

I am not sure of dtypes but, probably, we can get rid of it because dtypes doesn't affect triggering computations on Modin side when creating a Modin DataFrame. But if we then call dtypes (like df.dtypes), this will run remote calls to compute actual dtypes.

About the SPMD, I see, thanks! Do we want to use futures/promises for SPMD-like frameworks like it is done in MPI4PY, for instance?

fschlimb commented 3 years ago

Corrected the dask example to return futures (not arrays).

@YarShev I understand additional information can speed up processing and consumption of the data.

I am getting convinced that shape per partition is information about the partitioning and that it probably belongs here.

When it comes to labels of dims I see no relation to the partitioning, these are attributes of the global data-structure itself.

Similar is the dtype: it's not related to the partitioning.

I prefer the grid-index as the key in the partition dict because a partition-label carries no meaning in general case.

Wouldn't it be sufficient to make information like the dim-labels optional? There would be nothing meaningful in these fields if the data comes from numpy or alike anyway.

So what about the following

'partitions': {
    (0, 0): {
        'location': '1.1.1.4',
        'data': ObjRef/Future/array/df,
        'shape': (n, m),
        # more keys allowed
    }
    ...
}

With respect to futures and SPMD I like the idea of being able to use almost any type as the data handle. If some data container stores futures of some kind (and not ndarrays pr DFs) even in SPMD this should definitely be allowed. Similarly, it's up to the consumer to decide which types of data handles it can (or wants to) support. Maybe we will need to extend this protocol with a mechanism which allows consumers to convert/materialize the handles into actual data containers; e.g. we could require an extra entry which returns a function to materialize a future/ObjRef and which just returns identity if the handles are concrete containers already. With such a mechanism and with futures being used a remote SPMD handle might then be not None.

fschlimb commented 3 years ago

@carsonwang

YarShev commented 3 years ago

When it comes to labels of dims I see no relation to the partitioning, these are attributes of the global data-structure itself.

Similar is the dtype: it's not related to the partitioning.

I agree but only in case when we don't care of them, i. e. global labels can be default values. However, it is getting meaningful when partitions are pandas DataFrames (or something else that has similar metadata) and we need to form global labels from the partition labels.

Wouldn't it be sufficient to make information like the dim-labels optional? There would be nothing meaningful in these fields if the data comes from numpy or alike anyway.

Yes, such information can be optional. If the data comes from numpy or alike, we can provide default value/none/or something else, but if the data does have meaningful metadata, I think, the producer should provide it to the consumer.

I prefer the grid-index as the key in the partition dict because a partition-label carries no meaning in general case.

No objections.

About the mechanism to get actual data from handles, that sounds good, but so far I am not sure of how it should look like.

fschlimb commented 3 years ago

@PivovarA Here's an example code of how such an interface could be consumed by daal4py. Just for demonstration.

def extract_input(obj):
    '''identify if this is a supported input data type.
       Return tuple of (data-handles, launch-method)' or None.'''
    parted = getattr(obj, '__partitioned_interface__', None)
    if parted:
        locals = parted.get('locals')
        if locals:
            # this is SPMD mode
            # caller will use daal4py in its native SPMD mode
            handles = [parted['partitions'][x].data for x in locals]
            arr = getattr(handles[0], '__array_interface__', None)
            if arr:
                return (np.concatenate(handles), 'SPMD')
            if isinstance(handles[0], pd.DataFrame):
                return (pd.concatenate(handles), 'SPMD')
        else:
            # This is controller-worker
            # caller will create ray/dask actors and issue 'remote calls'
            # we return ont only the handles, but also locality/node information
            if isinstance(handles[0], ray.ObjRef):
                return (parted['partitions'], 'Ray')
            if isinstance(handles[0], dask.Future):
                return (parted['partitions'], 'Dask')
            # Maybe other types can be supported
    else:
        # not partitioned interface
        # possibly check for supported types etc
        return (obj, 'batch')
fschlimb commented 3 years ago

@YarShev with respect to materializing data from handles we could require 2 additional entries next to 'partitions'

  1. 'ptype' providing the type of a actual partition (e.g. type of what future.get() would return) or the actual partition type if 'data' does not refer to futures.
  2. 'materialize' providing a callable which returns the actual data for any entry in the 'data's. For non-future types this would be lambda x: x, for ray ObjRefs it would be lambda x: ray.get(x) etc.
DrTodd13 commented 3 years ago

@fschlimb I'm a little confused. Could you go through an example of some large 2D array distributed over like 4 nodes? It seems to me that we need to know the overall data structure size and which portion is stored on each node. That is handled out-of-band for you?

fschlimb commented 3 years ago

@sklam @stuartarchibald I'll look into your suggestion about arrow. Any further feedback would be very much appreciated.

fschlimb commented 3 years ago

@fschlimb I'm a little confused. Could you go through an example of some large 2D array distributed over like 4 nodes? It seems to me that we need to know the overall data structure size and which portion is stored on each node. That is handled out-of-band for you?

Our messages overlapped, I just updated the proposal (top comment) which now includes the shape of each partition.

I wonder if it is better to use an offset-tuple instead of a shape-tuple.

DrTodd13 commented 3 years ago

@fschlimb I'm a little confused. Could you go through an example of some large 2D array distributed over like 4 nodes? It seems to me that we need to know the overall data structure size and which portion is stored on each node. That is handled out-of-band for you?

Our messages overlapped, I just updated the proposal (top comment) which now includes the shape of each partition.

I wonder if it is better to use an offset-tuple instead of a shape-tuple.

I think shape should be the start and stop indices in the global address space for each dimension. From that you can compute the local shape but you can't go in the reverse direction.

YarShev commented 3 years ago

@YarShev with respect to materializing data from handles we could require 2 additional entries next to 'partitions'

  1. 'ptype' providing the type of a actual partition (e.g. type of what future.get() would return) or the actual partition type if 'data' does not refer to futures.

@fschlimb , It seems to me there will be an ambiguity between the type of data entry and ptype itself. Maybe, we should have two entries referencing to the type of data and underlying type of partitions. For example, like this:

__partitioned_interface__ = {
  'shape': (2,2),
  'partitions': {...},
  'ptype': type of `data` field,
  'underlying_ptype': type of actual data
}

2. 'materialize' providing a callable which returns the actual data for any entry in the 'data's. For non-future types this would be lambda x: x, for ray ObjRefs it would be lambda x: ray.get(x) etc.

It looks good to me. Probably, with this entry we could provide a bit more advanced callable that can concatenate partitions in any directions.

fschlimb commented 3 years ago

@YarShev 'ptype' does not seem to add anything new, since you can always do a type(partitions[x][data]) and all partitions are required to be of the same type.

I like the idea of requesting different partitioning types. I am not sure if this should go here or elsewhere, like in an extra package or alike. It seems difficult to draw a line what should be supported and what not. At least for now it seems clean to limit the protocol to describing the state (in contrast to triggering a re-partitioning). Let's keep brainstorming, do you have a more concrete suggestion?

fschlimb commented 3 years ago

@DrTodd13

@fschlimb I'm a little confused. Could you go through an example of some large 2D array distributed over like 4 nodes? It seems to me that we need to know the overall data structure size and which portion is stored on each node. That is handled out-of-band for you?

Our messages overlapped, I just updated the proposal (top comment) which now includes the shape of each partition. I wonder if it is better to use an offset-tuple instead of a shape-tuple.

I think shape should be the start and stop indices in the global address space for each dimension. From that you can compute the local shape but you can't go in the reverse direction.

With local shapes and positions you can compute the position of a partition in the global index space as the tuple of sums of shapes-per-dimension of all predecessors in each dimension. For example, given a partition at position (x,y) in a 2d space, the global start index gp[x,y] would be gp[x,y] = (sum[for i in 0 to x-1](shape[i,y][0]), sum[for j in 0 to y-1](shape[x,j][1])).

Since this is not particularly convenient, providing the global start indices (offsets) instead of the shape would probably be preferred. The stop indices are implicitly provided by the start indices of the successor partition (all position indices incremented by one). I would like to avoid providing the global size of the data structure, but we might need to add it because otherwise we are back to forcing the consumer to understand the underlying data structure and how to get to the global size.

Not sure what is better, I see three possibilities

I tend to prefer the latter since it reduces redundancies.

Thoughts?

DrTodd13 commented 3 years ago

@fschlimb In our exploratory work with distributed arrays in ramba, we send the entry in the driver's partitioning data structure for a given worker to that worker so that worker knows what it owns. That makes things a bit easier since there is just one representation in driver and workers. Each worker just knows its own portion but has no idea what other workers own. I don't know to what degree we should take such convenience into account. Yes, with your latter approach the redundancy on the driver side is minimized but then there's a calculation to be made to let an individual worker know its start index.

fschlimb commented 3 years ago

@DrTodd13 Is your system SPMD-style, e.g. will any user-code ever run on a rank/process other than the controller/root process? If not, then the __partitioned_interface__ will only be evaluated on the controller/root node anyway.

DrTodd13 commented 3 years ago

@DrTodd13 Is your system SPMD-style, e.g. will any user-code ever run on a rank/process other than the controller/root process? If not, then the partitioned_interface will only be evaluated on the controller/root node anyway.

@fschlimb It is a matter of perspective. For instance, we support the ability to initialize a distributed array with a lambda and that lambda is given the global indices. In a very simple form, this is used to implement arange. Obviously, you want the code to execute where the data is and so in that sense, yes, code will be run on a rank/process/remote, even for something like ones() or zeros(). We also have a map_index() skeleton that is like a map but is also given the global indices and since the code runs on the remotes, the remotes need to know the global indices of their local portion. We haven't developed our SPMD/SIMT skeleton much and I think our only example doesn't need the global indices but if you can do it with map_index, you should be able to implement the same with SIMT model so I can imagine we'd need the ability to query the global indices of the local portion of the array.

YarShev commented 3 years ago

@fschlimb

@YarShev 'ptype' does not seem to add anything new, since you can always do a type(partitions[x][data]) and all partitions are required to be of the same type.

Yes, that is true. My concern is that if we keep only ptype entry next to partitions, there might be an ambiguity what ptype means. So I think renaming to underlying_ptype or something like that makes sense.

I like the idea of requesting different partitioning types. I am not sure if this should go here or elsewhere, like in an extra package or alike. It seems difficult to draw a line what should be supported and what not. At least for now it seems clean to limit the protocol to describing the state (in contrast to triggering a re-partitioning). Let's keep brainstorming, do you have a more concrete suggestion?

I like the idea as well. Probably, that should go as part of another package or like. Let's say we have a distributed array from which we want to create a distributed DataFrame. I believe we will always rely on the underlying structure of the array to create the DataFrame and we won't need to reshuffle array's partitions to create the DataFrame. I think that should be a responsibility of a distributed array and a distributed DataFrame implementations. By the way, there is a method that is kind of similar to that reshuffle (requesting different partitioning type) of partitions in Modin (link).

fschlimb commented 3 years ago

@fschlimb It is a matter of perspective. For instance, we support the ability to initialize a distributed array with a lambda and that lambda is given the global indices. In a very simple form, this is used to implement arange. Obviously, you want the code to execute where the data is and so in that sense, yes, code will be run on a rank/process/remote, even for something like ones() or zeros(). We also have a map_index() skeleton that is like a map but is also given the global indices and since the code runs on the remotes, the remotes need to know the global indices of their local portion. We haven't developed our SPMD/SIMT skeleton much and I think our only example doesn't need the global indices but if you can do it with map_index, you should be able to implement the same with SIMT model so I can imagine we'd need the ability to query the global indices of the local portion of the array.

@DrTodd13 In my mind the question is where (on which process/rank) __partitioned_interface__ is called from user-code, not that any remote will execute some code (how else would we get distributed execution?). If the code that is executed on the remote ranks is controlled by a single process, then you can do whatever you like to make it work on the remotes (e.g. by adjusting the input).

So, now I am a little confused. How does your above comments relate to __partitioned_interface__? The protocol is meant to serve for collaboration with external packages. What happens internally to your data structure is a separate question. I don't see why constructing a new data-structure the way you describe (using lambdas) should not be possible (or particularly difficult) from the suggested __partitioned_interface__. Similarly, I do not understand why it should be difficult to provide it to external consumers. Can you elaborate?

If your point is that, depending on the internal architecture, producing a __partitioned_interface__ might include communication of meta-data - then yes, that is true.

DrTodd13 commented 3 years ago

@fschlimb It is a matter of perspective. For instance, we support the ability to initialize a distributed array with a lambda and that lambda is given the global indices. In a very simple form, this is used to implement arange. Obviously, you want the code to execute where the data is and so in that sense, yes, code will be run on a rank/process/remote, even for something like ones() or zeros(). We also have a map_index() skeleton that is like a map but is also given the global indices and since the code runs on the remotes, the remotes need to know the global indices of their local portion. We haven't developed our SPMD/SIMT skeleton much and I think our only example doesn't need the global indices but if you can do it with map_index, you should be able to implement the same with SIMT model so I can imagine we'd need the ability to query the global indices of the local portion of the array.

@DrTodd13 In my mind the question is where (on which process/rank) __partitioned_interface__ is called from user-code, not that any remote will execute some code (how else would we get distributed execution?). If the code that is executed on the remote ranks is controlled by a single process, then you can do whatever you like to make it work on the remotes (e.g. by adjusting the input).

So, now I am a little confused. How does your above comments relate to __partitioned_interface__? The protocol is meant to serve for collaboration with external packages. What happens internally to your data structure is a separate question. I don't see why constructing a new data-structure the way you describe (using lambdas) should not be possible (or particularly difficult) from the suggested __partitioned_interface__. Similarly, I do not understand why it should be difficult to provide it to external consumers. Can you elaborate?

If your point is that, depending on the internal architecture, producing a __partitioned_interface__ might include communication of meta-data - then yes, that is true.

@fschlimb I just think that the structure of __partitioned_interface__ should match as closely as possible to how the users are likely to use it. If a common thing would be to tell a worker to do something on their portion and that requires start and stop then I'd rather store it than having those being continually recalculated.

I guess it is orthogonal but knowing the machine name is not really sufficient is it? Is this assuming there's only one way that things are shared? If you accessed remote data with Ray or some other mechanism then is that kind of information necessary to have in the partition information?

fschlimb commented 3 years ago

@fschlimb I just think that the structure of partitioned_interface should match as closely as possible to how the users are likely to use it. If a common thing would be to tell a worker to do something on their portion and that requires start and stop then I'd rather store it than having those being continually recalculated.

I guess it is orthogonal but knowing the machine name is not really sufficient is it? Is this assuming there's only one way that things are shared? If you accessed remote data with Ray or some other mechanism then is that kind of information necessary to have in the partition information?

@DrTodd13

fschlimb commented 3 years ago

HeAT team (@ClaudiaComito @coquelin77 @Markus-Goetz @Cdebus @bhagemeier @krajsek): Your feedback would be very much appreciated.

fschlimb commented 3 years ago

@DrTodd13 @Markus-Goetz I updated the proposal to include start and shape of each partition.

pspillai commented 3 years ago

I think the minimum information needed in such an interface is the number of partitions, the starting (global) index and size/shape of each partition, and an indication of where the partition resides / remote reference / URL. I think the proposal covers these.

But I am not sure it is necessary to limit the partitions to being in a regular grid arrangement. Particularly looking beyond dense arrays, it may be useful to allow a more general partitioning (though still with the limitation that each piece is an axis-aligned hyper-rectangle).

Actually, is there any requirement that the index range of a particular part has any correlation with its id tuple?

Finally, should location also support additional possibilities, like "on GPU", "on disk", "in NVM", etc.?

DrTodd13 commented 3 years ago

If you have starting global index and size/shape of each local partition then you already have the ability to express non-regular grids. I don't think anything should be imposed through the API or convention that enforces regularity.

fschlimb commented 3 years ago

@pspillai

Actually, is there any requirement that the index range of a particular part has any correlation with its id tuple?

The only correlation is that the tuples have the same length. The dimensionalities of the partition grid and the partitions themselves must be the same.

Finally, should location also support additional possibilities, like "on GPU", "on disk", "in NVM", etc.?

I see that this could be convenient. However, with currently available data-structures which support GPUs etc. this information is available already through the underlying data-structure. These data-structures must be able to handle this no matter what and without an understanding of the underlying structure the data will not be consumable anyway. Could you give an example where duplicating the information in here would significantly help?

pspillai commented 3 years ago

I did not have a specific case in mind for the device location. However, depending on where on the node (main memory, on GPU memory, on disk, etc.) the data resides will affect the latency to fetch data, and thus may influence how one accesses it (e.g. granularity of access). If the underlying data structures all provide this in a uniform way, then it does not need to be replicated here. However, if this information can only be accessed only at the remote node, then we may still want to replicate here so any node that needs to access remote data can optimize its access pattern.

coquelin77 commented 3 years ago

After reading over the conversation, as well as a discussion with @Markus-Goetz, we have a few thoughts. we both like this plan from a general perspective as we think that it is intuitive. We have a few questions and comments which I collected. I'm not going to quote everyone because it would take too long.

Firstly, we think that the name __partitioned_interface__ is a bit too long and cumbersome. Some alternatives could be __dist_plan__ or __parts__ or __portions__ or __chunks__ or __chunk_map__ etc.

Secondly, there seems to be some dissonance with who would be using the __partitioned_interface__. We believe that it should be something intended for developers and not so much for users. We think that once a user moves to this level, they can read the docstring for it and learn how things are used there. This does have certain implications which I will get into later.

ambiguity of the term shape

in the development of Heat, we found that although the local and global shapes can be determined, both are very useful during development. We think that calling it simply shape makes it easy to misunderstand which shape is meant in the code. We think that a top level global shape attribute (i.e. shape) is useful for both user and developer, but the local shape is very very useful for developers. Having the local for all the tiles means that communication can be planned and pre-determined.

In the example below, there is the shape attribute of the __partition_interface__ which corresponds to the global shape and each tile has its own shape attribute as well. This allows for each process to know how much data it will receive a tile that is not local.

offset-tuple v. shape-tuple

the starting index of the data-block is very useful in functions like __getitem__ and __setitem__, as well as in the backend of other functions. also, it does not greatly cluter the dictionary

distribution and balancing

There was a mention of irregular distribution schemes. We would heavily warn against this. We think everything would be fine so long as the data partitions run through the entire dataset, i.e. the divisions occur on global indexes.

The assumption that the data is regularly distributed amongst the available resources makes development much easier and faster. It also reduces the amount of data which needs to be sent over the comm network.

grid keys

Is there a set order for which was the grid is defined? C-order (clockwise) or F-order (counter clockwise)?

materialize

In our opinion, it is best to make each of the tiles a class which defines has all of the attributes which we desire. This would make __getitem__ something which we can set for many different packages. This is not show in the example as it would become unweildy.

The current dict format has a downside that it the calls to it can get large and it can make the code less clear.

dtype/ptype, labels

In the interest of creating comm buffers each tile/chunk should have its own dtype info in the 'partitions' dictionary. To receive data, a process should know how much data it will recieve. for that we need shape and dtype.

Devices/dtype

in out view, we should hope that the dtype and device should be set to be a global array attribute. however each tile whould have an attribute with the device and dtype info. For pd.DataFrames this would also let us to have the local column names at hand easily

Pandas / arrays with differing datatypes or column names

if data is partitioned along columns, then all the partitions should know which column titles it has / metadata

locals

maybe the locals parameter should return an iterator over the local tile/chunk keys and data for simplity.


I compiled these into an example below. I took the "2d-structure (64 elements), 1d-partition-grid, 4 partitions on 2 ranks, row-block-cyclic distribution, ..." example as a guide but the others would look similar

Example

__dtype__ = dppy.dtype  # possibly a mirror of dpnp or np or ...      or    dict -> pd.DataFrame column_names/dtypes
__device__ = dppy.device  # if all on same device
__partitioned_interface__ = {
  'shape': (64, 64),
  'partition_order': 'C'  # clockwise (C-type) or 'F' counter-clockwise (F-type)
  'partition_tiling': (4, 1),  # name t
  'partitions': {
      (0,0):  {
        'start': (0, 0),
        'shape': (16, 64),
        'data': df0,   # reference to local data
        'data.__getitem__': ray.get(df0) if ray else df0 ... ,
        'location': 0,  # location of data 
        'dtype': None,  # None -> global dtype     or    dict -> pd.DataFrame column_names/dtypes
        'device': None,  # None -> global device
      },
      (1,0):  {
        'start': (16, 0),
        'shape': (16, 64),
        'data': None,   # reference to local data
        'data.__getitem__': ray.get(df0) if ray else df0 ... ,
        'location': 1,  # location of data 
        'dtype': None,  # None -> global dtype     or    dict -> pd.DataFrame column_names/dtypes
      },
      (2,0):  {
        'start': (32, 0),
        'shape': (16, 64),
        'data': df2,   # reference to local data
        'data.__getitem__': ray.get(df0) if ray else df0 ... ,
        'location': 0,  # location of data 
        'dtype': None,  # None -> global dtype     or    dict -> pd.DataFrame column_names/dtypes
      },
      (3,0):  {
        'start': (48, 0),
        'shape': (16, 64),
        'data': None,  # reference to local data
        'data.__getitem__': ray.get(df0) if ray else df0 ... ,
        'location': 1,  # location of data
        'dtype': None,  # None -> global dtype     or    dict -> pd.DataFrame column_names/dtypes
      },
  },
  'locals': iter([ [(0, 0), (0, 0).data], [(2, 0), (2, 0).data] ])  # this is for rank 0, for rank 1 it'd be [(1,0), (3,0)],
}
fschlimb commented 3 years ago

Thanks so much for your comments, @coquelin77 @Markus-Goetz I mostly agree. Here are some additional and/or clarifying thoughts.

What about __partitioned__ instead of __partitioned_interface__?

Yes, I think there is a broad agreement that this protocol is intended for library/package developers, not for "end-users".

I think we need to be careful with overloading this interface. The goal is to provide meta-information related to the partitioning/distribution of the data, not to describe the global data fully within the interface. For example, I can see the argument that the global shape is directly related. However, the dtype is not directly related and can be easily extracted from the underlying data.

I strongly suggest limiting this interface to a single property and not require more than __partitioned__ in the global structure.

The iterator for locals looks a bit odd to me, mostly because it suggests redundant information which is super-easy to access with a simple list.

I am not sure I understand why C/Fortran grid order matters at this level. This is about Python and the dimensions of the grid correspond/map to the dimensions of the underlying structure (through start/shape of each partition). Can you explain why we need the partition order or how it could be useful?

I am not sure we can address devices here in a meaningful way. There is a lot of fragmentation in this space and we probably better handle this as an independent issue. I suggest we start with referring developers to using __usm_array_interface__, __dlpack__, and/or __cuda_array_interface__ as provided by the underlying data (@pspillai).

coquelin77 commented 3 years ago

What about __partitioned__ instead of __partitioned_interface__?

i think __partitions__ would also work. but this is a very minor difference. whichever works best for the most people is fine with me.

I think we need to be careful with overloading this interface. The goal is to provide meta-information related to the partitioning/distribution of the data, not to describe the global data fully within the interface. For example, I can see the argument that the global shape is directly related. However, the dtype is not directly related and can be easily extracted from the underlying data.

I agree that we should be careful to avoid overloading this interface. Our idea was to use __partitions__ to create correctly sized buffers to receive data from other processes as well as for the setters and getters. And for that, we need to know the shape and dtype of the data. So if the dtype of all tiles is determined when __partitioned__ is created then it doesnt need to be sent before every single communication function.

The iterator for locals looks a bit odd to me, mostly because it suggests redundant information which is super-easy to access with a simple list.

The idea was to handle the most common case that when you get locals, you probably also want to get the data from the processes.

I am not sure I understand why C/Fortran grid order matters at this level. This is about Python and the dimensions of the grid correspond/map to the dimensions of the underlying structure (through start/shape of each partition). Can you explain why we need the partition order or how it could be useful?

this references how the tiling is determined in a general sense. It can be determined based of the starting indexes and the tile keys but i think it would be useful to know it from the start.

C:         F:
1 | 2       1 | 3
-----       -----
3 | 4       2 | 4

I am not sure we can address devices here in a meaningful way. There is a lot of fragmentation in this space and we probably better handle this as an independent issue. I suggest we start with referring developers to using __usm_array_interface__, __dlpack__, and/or __cuda_array_interface__ as provided by the underlying data (@pspillai).

yes this is another issue. for one, if there are tiles which move between devices it will become difficult to communicate to the other processes. However, we should decide now (or soon): we can either assume that the data exists on the corresponding devices now (i.e. all on GPU) and make the compatibility change later, or we can leave this open and allow the data to be on any device and transfer the data between devices at the top of a function within a check.

DrTodd13 commented 3 years ago

distribution and balancing

There was a mention of irregular distribution schemes. We would heavily warn against this. We think everything would be fine so long as the data partitions run through the entire dataset, i.e. the divisions occur on global indexes.

The assumption that the data is regularly distributed amongst the available resources makes development much easier and faster. It also reduces the amount of data which needs to be sent over the comm network.

Are you referring to some comments that I made? I don't think the term "irregular" was used in this thread and that term could be defined in multiple ways. Let's say I have a 2x3 array and want to partition it 3 ways. Here are some various ways to do that. How would you label them?

1) Node 1 gets [0,0] and [1,0] Node 2 gets [0,1] and [1,1] Node 3 gets [0,2] and [1,2]

2) Node 1 gets [0,0] and [1,0] Node 2 gets [0,1] and [0,2] Node 3 gets [1,1] and [1,2]

3) Node 1 gets [0,0] and [0,2] Node 2 gets [0,1] and [1,0] Node 3 gets [1,1] and [1,2]

In case #1, everything is partitioned across only one dimension. I think everyone would call that a "regular" distribution. As the number of dimensions or the size of dimensions grows, this principle can be extended such that all partitions have the same shape except maybe those at the maximum edges of the array, which may be smaller.

In case #2, the first partition is across one dimension but then subsequent partitions are across the other dimension. To generalize this to more dimensions or larger dimensions, all the partitions are hyperrectangles but theoretically each hyperrectangle could have a different shape. I think you would get different answers here as to whether this is "regular" or not. An individual hyperrectangles is a pretty "regular" shape but are a set of hyperrectangles with different shapes "irregular?" What do you think?

In case #3, we're saying that one partition on one node get disjoint sets of elements (that don't share an edge). I think everyone would call this irregular. There is a 4th option similar to #3 where we split the array into 6 partitions but still assign each partition to one of 3 nodes. In this case, each partition could be the same shape but more than 1 partition assigned to each node. I haven't heard anything in this thread that would prohibit that. I would still call that "regular", would you? We were talking about that just yesterday when talking to the nums-package folks about how a block-cyclic distribution (with multiple disjoint blocks assigned to the same node) like this is useful for load balancing in things like LU-decomposition.

My position is that we should support #2 with the ability to assign multiple disjoint partitions to the same node. #2 can still represent what you could call a "more regular" distribution like #1 but not vice versa. My fear is that #1 is overly restrictive. A compromise position could be a flag to indicate whether all hyperrectangles are the same size except at the edges.

fschlimb commented 3 years ago

I agree that we should be careful to avoid overloading this interface. Our idea was to use __partitions__ to create correctly sized buffers to receive data from other processes as well as for the setters and getters. And for that, we need to know the shape and dtype of the data. So if the dtype of all tiles is determined when __partitioned__ is created then it doesnt need to be sent before every single communication function.

I see. The proposal here suggests you add more information if you want. So if your implementation provides the dtype, that's absolutely fine. I am not sure if this should be a required field, though. What do others think?

The iterator for locals looks a bit odd to me, mostly because it suggests redundant information which is super-easy to access with a simple list.

The idea was to handle the most common case that when you get locals, you probably also want to get the data from the processes.

I prefer to not add redundancies just because we think we know what the most common case is. I am not even sure I would agree to your assessment since I think in most cases you will require at least some of the other partition information as well.

I am not sure I understand why C/Fortran grid order matters at this level. This is about Python and the dimensions of the grid correspond/map to the dimensions of the underlying structure (through start/shape of each partition). Can you explain why we need the partition order or how it could be useful?

this references how the tiling is determined in a general sense. It can be determined based of the starting indexes and the tile keys but i think it would be useful to know it from the start.

C:         F:
1 | 2       1 | 3
-----       -----
3 | 4       2 | 4

I am confused. What do the numbers denote in this diagram? Following the proposal, you'd need to to have a 2-tuple for each partition, not a scalar. Maybe the disconnect here is that I purposely proposed a dictionary with grid-locations and not partition-ids. There is no implicit or explicit order, only locations. There is no need to assume any order and no need to think about an order.

I am not sure we can address devices here in a meaningful way. There is a lot of fragmentation in this space and we probably better handle this as an independent issue. I suggest we start with referring developers to using __usm_array_interface__, __dlpack__, and/or __cuda_array_interface__ as provided by the underlying data (@pspillai).

yes this is another issue. for one, if there are tiles which move between devices it will become difficult to communicate to the other processes. However, we should decide now (or soon): we can either assume that the data exists on the corresponding devices now (i.e. all on GPU) and make the compatibility change later, or we can leave this open and allow the data to be on any device and transfer the data between devices at the top of a function within a check.

I would make no assumptions other than that the underlying data-structure needs to provide the necessary information to access/use the data. Whether it's on GPU's or CPU's only or if it's a mixed CPU/GPU distribution and how to access it depends on the implementation. Without adding any new requirements we can push this to the consumer. There is no new requirement because even if the consumed data-structure is non-distributed the consumer will need to know how to handle the device business.

coquelin77 commented 3 years ago

@DrTodd13 i agree with everything that you said. My apologies for not more clearly defining irregular, I was already becoming concerned about the length of my comment. i think that option #3 might cause issues down the road in the communications structures. when I was picturing irregular grids i was thinking of things like this:

x x x x x x | x x x x
x x x x x x | x x x x
---------------------
x x x | x x x x x x x
x x x | x x x x x x x

when i was talking about the tile/chunk divisions crossing all tiles, i imagined something like this:

x x x x x x | x x x x
x x x x x x | x x x x
---------------------
x x x x x x | x x x x
x x x x x x | x x x x

If i remember correctly, this should still allow for many load balancing techniques, but it has been some time since i looked over everything

EDIT: the second distribution scheme puts no limits on which blocks are on which proesses

coquelin77 commented 3 years ago

@fschlimb for the dtypes: this could very well be grouped into the implementation details

for the locals attribute, that is a good point. i find that i keep going back towards the idea of a class which would be the value in the partition dictionary. the locals could return the key and the class for the corresponding tile. if its and iterator can also be an implementation detail.

as for the tiling in C vs F layout, i was thinking about these tiles numbering from 1 to 4 in partition-ids instead of as their coordinate-ids. I think this happened when i was thinking about the partitioning schemes to allow and my wires were crossed

for the devices, it makes a lot of sense to leave the control up to the user/implementation. if a choice doesnt need to be made, then it might be best to leave the devices for the user to keep track of

DrTodd13 commented 3 years ago

@DrTodd13 i agree with everything that you said. My apologies for not more clearly defining irregular, I was already becoming concerned about the length of my comment. i think that option #3 might cause issues down the road in the communications structures. when I was picturing irregular grids i was thinking of things like this:

x x x x x x | x x x x
x x x x x x | x x x x
---------------------
x x x | x x x x x x x
x x x | x x x x x x x

The example above is #2 and you could say it is less regular than the example below, which is #1. It still isn't clear to me if you want to allow or disallow the above example.

when i was talking about the tile/chunk divisions crossing all tiles, i imagined something like this:

x x x x x x | x x x x
x x x x x x | x x x x
---------------------
x x x x x x | x x x x
x x x x x x | x x x x

If i remember correctly, this should still allow for many load balancing techniques, but it has been some time since i looked over everything

If we just consider the number of elements per partition, the more flexible arbitrary hyperrectangles can more evenly split the array in some cases. (Of course, #3 is what would be required to get a truly maximally balanced split but we don't want to go there and in practice I don't think that people would use the #4 approach to get that level of balancing as it is probably not worth it.)

I guess the question is that some packages might natively want to split across only one certain dimension and then they decide they want to accept external arrays with partitioned that could come from a package with a different philosophy. So, what are we saying to packages that produce or consume this interface? I really hesitate to start dictating a one-size-fits-all partitioning scheme and at the same time I don't think a package should be forced to accept all possible partitioning schemes. As a package writer, it would be nice be able to quickly tell if a given partitioning scheme is one that I'm prepared to support (versus having to inspect the partitioning in detail). If we ere on the flexible side and let people support what they want to support then I think we may get convergence through time and the marketplace of ideas but again I hesitate to force convergence at this point.

fschlimb commented 3 years ago

@DrTodd13 I fully agree your last comments. We should allow flexibility on both ends - we neither want to limit how an implementation (producer) partitions the data and we also cannot demand that a consumer must digest all possible partitionings.

At the same time it is worth mentioning explicitly that simple partitionings will be easier to consume and most likely by more packages.

I guess that brings back the idea (@YarShev) of providing a mechanism to request a certain partitioning and/or a library which can do so. I tend to prefer a re-partitioning library because a) the proposal here allows zero-copy on the consumer side and b) there is no point in asking every producer to implement re-partitioning strategies and c) it can easily grow over time.

coquelin77 commented 3 years ago

I think that the point about flexibility is very valid. Allowing hyperrectangles is reasonable for some application use cases. However, the examples which @DrTodd13 gave previously (specifically case 2) shows that the concept of coordinate-ids breaks down at this point. Alternatively, the tiles can be forced into a more regular grid, at which point, two or more tiles can sit on a single process. Notably, matching these tiles for networked communication, especially in SPMD-like processing will be challenging. However, this challenge can be pushed to the framework which uses the interface.

This leads to the next thought: a semi-regular grid, similar to what I have drawn previously. In this scheme, the divisions between tiles would occur on global indices, however all tiles need not be the same size. Furthermore, the rows and columns can all be different from one another, they only must be globally defined, similar to a spreadsheet in Excel (for a 2D array). This is the first case for which coordinate-ids of the tiles can be used in all cases. As previously stated by @fschlimb and myself, the tiles on a process do not need to share an axis. Beyond this step, the constraints are too restrictive for what we seek.

In my opinion, the last option is the best bet. It can allow for a form of hyperrectangles when data sits on a single process, coordinate-id referencing, and it can easily grow over time. I do not propose that we out-law free-form distribution schemes, however I believe that it should be a more specific option and not the default.

as an example:

to modify the previous cases shown by @DrTodd13 where instead of a 2x3 array it will be a 2x3 array of tiles: The values at the top center indicate the process which they are on

1.                     2.                       3.
x 0 | x 1 x | x 2 x    x 0 x | x 1 x | x 1 x    x 0 x | x 1 x | x 0 x
x x | x x x | x x x    x x x | x x x | x x x    x x x | x x x | x x x
x x | x x x | x x x    ---------------------    ---------------------
-------------------    x 0 x | x 2 x | x 2 x    x 1 x | x 2 x | x 2 x
x 0 | x 1 x | x 2 x    x x x | x x x | x x x    x x x | x x x | x x x

All three of these examples would be allowed.

I agree with @fschlimb that encapsulating partitioning into a library makes sense. Especially, when other packages should or could make use of it.

What I find unclear is the future usage of this spec. There are two clear avenues: a) a first (perhaps future-feature-incomplete) software or b) a general-purpose standard to be release to the world. If a) is the case, I would go with a simple partitioning model that is easy to grasp, quick to implement and useful, i.e. covering most application use cases. This would mean that hyperrectangles on non-aligned grids would be disallowed but hyperrectangles using multiple tiles on a semi-regular grid would be allowed. If b) is the case, we would need to rethink tile coordinates and carefully evaluate whether a first implementation needs to provide this feature.

fschlimb commented 3 years ago

Hi @coquelin77 I am sorry, but I am having trouble following your argument. You seem to suggest that you would like to express things, which the current proposal disallows. If that's a correct understanding, could you please provide a concrete example of what you are missing and maybe even a suggestion on how to address the gap?

With respect to where this should go, we'd like this to be become some kind of a standard. Of course a standard is useful only when it's being used. So we need

  1. get input from stakeholders in the abstract to understand differences early on
  2. Implement prototypes in at least two (complementary) packages to gain practical insights

In my mind good candidates for early prototypes of __partitioned__ would be daal4py, modin and HeAT. If all three teams (@YarShev @coquelin77 @PivovarA) are on board with the concept/idea I would volunteer to spend time on this.

pspillai commented 3 years ago

If we allow only the partitions like the examples @coquelin77 shows, or example 1 @DrTodd13 showed, then there is a lot of redundant information in the partition data structure. It would be far simpler to indicate the global range, list the split points for each axis, and have a simple convention of mapping tile id to the grid of tiles formed by the axis split points. Then, we would need to map tile ids to locations.

YarShev commented 3 years ago

@fschlimb

As for the name of the protocol, I like __partitions__, __shards__, __chunks__, __dist_plan__.

I agree that we should be careful to avoid overloading this interface. Our idea was to use __partitions__ to create correctly sized buffers to receive data from other processes as well as for the setters and getters. And for that, we need to know the shape and dtype of the data. So if the dtype of all tiles is determined when __partitioned__ is created then it doesnt need to be sent before every single communication function.

I see. The proposal here suggests you add more information if you want. So if your implementation provides the dtype, that's absolutely fine. I am not sure if this should be a required field, though. What do others think?

dtype should probably be as an optional field. If a producer provides it, a consumer won't need to calculate them by scheduling remote operations. Also, I agree this might be helpful for better scheduling work. In addition to dtype as optional fields should be labels of partitions themselves in order to a consumer is able to construct global labels of the underlying data.

As for the location field, we should provide a list of IPs/hostnames/ranks (not a string) because multiple workers can own data.

I am not fully understand what start field is required for. Can you explain or point me to the comment above if it exists (probably, I missed something)?

I agree we should try implementing the protocol at least in Modin and DAAL4PY soon. A good candidate for seeing the protocol usage would be the PR related to distributed KMeans in DAAL4PY (link).

fschlimb commented 3 years ago

@YarShev The start can be seen as a convenience information. It can be derived from shapes only but it is such a fundamental information that several people think it should be there directly (and not ask consumers to compute it each time).

Could you explain what you mean by

multiple workers can own data.

?

YarShev commented 3 years ago

@fschlimb , thanks!

Could you explain what you mean by

multiple workers can own data.

?

Let's look at these examples:

Dask

from distributed.client import Client
c = Client()
c.scheduler_info()["workers"].keys()
dict_keys(['tcp://1.1.1.1:54547', 'tcp://1.1.1.2:54550', 'tcp://1.1.1.1:54553', 'tcp://1.1.1.2:54556'])
f = c.scatter(1, broadcast=True)
c.who_has(f)
{'int-58e78e1b34eb49a68c65b54815d1b158': ('tcp://1.1.1.1:54553',
  'tcp://1.1.1.2:54550',
  'tcp://1.1.1.2:54556',
  'tcp://1.1.1.1:54547')}

Ray

import ray
ray.init()
ref = ray.put(1)
# When scheduling a remote call Ray can submit the task to a different node and copy `ref` to it if necessary.
# Then, the following call may show two nodes that own data.
ray.objects(ref.hex())
{'ObjectRef': 'ffffffffffffffffffffffff0100000001000000',
 'Locations': ['5fdec4674c145fa35efc5df817c2fbb7c9eb0730', '5fdec4604c145fa15efc2df817c2fbb7c9eb1732']}
# Each value of the `Locations` field matches respective node IP address.

That's why we should provide a list of IPs/hostnames/ranks (not a string) for the location field.

fschlimb commented 2 years ago

I created a new branch and put the above spec into a new file which will allow tracking changes to the spec: https://github.com/IntelPython/DPPY-Spec/blob/draft/partitioned/Partitioned.md

Some more experiments made apparent that the above discussed callable to resolve a future/handle is needed. I added it the the spec (https://github.com/IntelPython/DPPY-Spec/blob/draft/partitioned/Partitioned.md).

Also, I think we should probably get rid of the partition-grid idea altogether. As discussed with @pspillai, @DrTodd13, @coquelin77 and @Markus-Goetz it doesn't seem to add anything and would rather limit applicability. Additionally, it seems more practically useful to map locations to partition-lists - and not positions to partitions.

Thoughts? Objections?

YarShev commented 2 years ago

Also, I think we should probably get rid of the partition-grid idea altogether. As discussed with @pspillai, @DrTodd13, @coquelin77 and @Markus-Goetz it doesn't seem to add anything and would rather limit applicability.

Are you saying about getting rid of 'shape' field that is next to 'partitions'?

Additionally, it seems more practically useful to map locations to partition-lists - and not positions to partitions.

If we want to unify implementations across different execution backends (Ray, Dask, MPI), we should provide uniform objects for both futures and locations. Consumers themselves can map map list of partitions to specific location.

Some more experiments made apparent that the above discussed callable to resolve a future/handle is needed.

That callable should be kind of generic to be able to materialize list of futures but not one.

fschlimb commented 2 years ago

Also, I think we should probably get rid of the partition-grid idea altogether. As discussed with @pspillai, @DrTodd13, @coquelin77 and @Markus-Goetz it doesn't seem to add anything and would rather limit applicability.

Are you saying about getting rid of 'shape' field that is next to 'partitions'?

Maybe, yes, maybe no. As discussed earlier, the current grid-shape implies that every cut of any dimension fully spans over all other dimensions. In other words, no partition will ever have more than one neighbor in every dimension. @DrTodd13 seems to propose being more flexible which would mean we interpret the grid-shape more flexible or we need a different (or no) scheme to describe the grid-space.

I tend to agree to @coquelin77 and @Markus-Goetz that a regular grid-shape is most likely all we need. If we can agree on that, we should keep the grid-shape.

Changing the dict to mapping locations to list of partitions I think is a good idea in either case.

Additionally, it seems more practically useful to map locations to partition-lists - and not positions to partitions.

If we want to unify implementations across different execution backends (Ray, Dask, MPI), we should provide uniform objects for both futures and locations. Consumers themselves can map map list of partitions to specific location.

Not sure I understand your last comment, Could you elaborate? Also, could you give an example how such a uniform location/future object could look like and how this would be beneficial in a potential use case? Note we cannot assume that we have a uniform runtime that everybody uses, e.g. we explicitly target the real world where different packages use different backends.

In any case, I like uniform objects, but for this we should not define new data types/structures. I think we need to describe distribution etc. in a uniform way, but not invent new types or alike. I think of this as a protocol, not a library/package.

Some more experiments made apparent that the above discussed callable to resolve a future/handle is needed.

That callable should be kind of generic to be able to materialize list of futures but not one.

Are you suggesting this to allow parallel execution?

YarShev commented 2 years ago

I tend to agree to @coquelin77 and @Markus-Goetz that a regular grid-shape is most likely all we need. If we can agree on that, we should keep the grid-shape.

I am okay with this.

Changing the dict to mapping locations to list of partitions I think is a good idea in either case.

Additionally, it seems more practically useful to map locations to partition-lists - and not positions to partitions.

If we want to unify implementations across different execution backends (Ray, Dask, MPI), we should provide uniform objects for both futures and locations. Consumers themselves can map map list of partitions to specific location.

Not sure I understand your last comment, Could you elaborate? Also, could you give an example how such a uniform location/future object could look like and how this would be beneficial in a potential use case? Note we cannot assume that we have a uniform runtime that everybody uses, e.g. we explicitly target the real world where different packages use different backends.

Don't we lose an original positioning of the partitions in the grid in that case? That might be useful for consumers if they are more concerned about the positioning rather than mapping locations to list of partitions. As for uniform objects for futures and locations, consumers may and may not want to check types of these futures and locations and call respective APIs on them. Consumers may call future.result() (in the case of Dask env) and as well as may call ray.get(future) (in the case of Ray env). Something similar relates to locations. However, if we will require consumers throw an exception in case they do not support information by the protocol (explicit checks), that would be ok.

Some more experiments made apparent that the above discussed callable to resolve a future/handle is needed.

That callable should be kind of generic to be able to materialize list of futures but not one.

Are you suggesting this to allow parallel execution?

Something similar to ray.get(list_of_futures) and dask_client.gather(list_of_futures).