mspass-team / mspass

Massive Parallel Analysis System for Seismologists
https://mspass.org
BSD 3-Clause "New" or "Revised" License
28 stars 11 forks source link

map_partition (MapPartition) subtle limitation #469

Open pavlis opened 8 months ago

pavlis commented 8 months ago

In rewriting write_distributed_data I ran across an incredibly subtle problem in the implementation. I am pretty sure the problem is fundamental as it seems to be shared by both map_partition in dask and MapPartition in pyspark.

The problem comes up in the revision because I'm trying to do a bulk write by partition in write_distributed_data to improve the io preformance. I can make that work with the current algorithm, but ran into this fundamental problem when I tried to allow an option for "return_data" that would allow a parallel intermediate save. i.e. with the option write_distributed_data would return a bag/rdd of data equivalent to what would have been be created with a read_distributed_data of what was just saved.

The following small test program illustrates the limitation I discovered. You can all run this to verify what it does:

import dask.bag as dbg
def test(argval):
    print("Type argval=",type(argval))
    for x in argval:
        print(x)
    print("Trying to traverse argval a second time")
    for x in argval:
        print(2*x)
    return argval
def set_value(d):
    print("Type of d =",type(d))
    d += 1
    return d
def map_function_test(bg):
    print("At top of map_test")
    print("Type of arg bg=",type(bg))
    bg = bg.map(set_value)
    bg=bg.map_partitions(test)
    return bg
b = dbg.from_sequence(range(1, 31), npartitions=3)
b = map_function_test(b)
print("Exited map_test.  Type of b=",type(b))
print("calling compute")
b.compute()

Run this and you should get this output:

At top of map_test
Type of arg bg= <class 'dask.bag.core.Bag'>
Exited map_test.  Type of b= <class 'dask.bag.core.Bag'>
calling compute
Type argval= <class 'dask.bag.core._MapChunk'>
Type of d = <class 'int'>
2
Type of d = <class 'int'>
3
Type of d = <class 'int'>
4
Type of d = <class 'int'>
5
Type of d = <class 'int'>
6
Type of d = <class 'int'>
7
Type of d = <class 'int'>
8
Type of d = <class 'int'>
9
Type of d = <class 'int'>
10
Type of d = <class 'int'>
11
Trying to traverse argval a second time
Type argval= <class 'dask.bag.core._MapChunk'>
Type of d = <class 'int'>
12
Type of d = <class 'int'>
13
Type of d = <class 'int'>
14
Type of d = <class 'int'>
15
Type of d = <class 'int'>
16
Type of d = <class 'int'>
17
Type of d = <class 'int'>
18
Type of d = <class 'int'>
19
Type of d = <class 'int'>
20
Type of d = <class 'int'>
21
Trying to traverse argval a second time
Type argval= <class 'dask.bag.core._MapChunk'>
Type of d = <class 'int'>
22
Type of d = <class 'int'>
23
Type of d = <class 'int'>
24
Type of d = <class 'int'>
25
Type of d = <class 'int'>
26
Type of d = <class 'int'>
27
Type of d = <class 'int'>
28
Type of d = <class 'int'>
29
Type of d = <class 'int'>
30
Type of d = <class 'int'>
31
Trying to traverse argval a second time

Too much output there, here are the key points:

This is very weird for several reasons. The argval obeys different rules in different contexts, which is not good. We maybe should report this issue to the dask developers.

Why does this happen? The reason seems to be that when you pass a bag into a function that then returns it before you call compute, all that dask does is define the DAG that is returned by the function. Nothing executes until compute is called. That is why the type of argval changes is a list in one context and an _MapChunk in the other. A _MapChunk is a delayed definition that doesn't do anything until the compute method is called. In that context, the iterator will only be traversed once.

This leaves me with a point for discussion with fellow developers. Our current version of write_distributed_data does not support what I'm trying to do with the return_data option. Do we need this option? I think I can still do this by reverting to atomic writes when return_data is set True I guess we just need to warn user's that doing that will come at a cost. We should discourage intermediate saves where the workflow continues anyway. Opinions?

By the way one reason it took me a long time to figure this out is because the documentation for dask's map_partition and pysparks MapPartition are both horrible. This is also a pretty subtle problem deep in the weeks of the way map/reduce works. One reason I'm posting this is to help some other poor schmuck who gets stumped with map_partitions

pavlis commented 8 months ago

One other point about this issue worth preserving for the record. It turns out to be very difficult to debug functions like read_distributed_data and write_distributed_data because what they work with are a "delayed" aka "futures" aka "lazy computation" object. Most things the functions do can only be tested with the old fashion insertion of print statements where problems are suspected. Things like pdb and more advanced things like IDEs exist because that approach is horribly slow compared to interactive testing with a debugger.

If anyone has a solution to this problem other than print statements I would love the learn the trick.

pavlis commented 8 months ago

It dawned on me that an issue here is that my redesign of write_distributed_data has a fundamental flaw. Our prototype version of write_distributed_data in the current code base actually does something quite different. It returns a dataframe of the Metadata contents of all atomic data objects saved. The current version has no support at all for ensembles. I was aiming to produce a single function that would handle a bag/rdd of any set of atomic data or ensembles. I was thinking the same function could be use for inline saves and terminal saves. That is problematic for several reasons. For that reason I am planing now to proceed as follows:

  1. This particular issue arises because of trying to push intermediate saves and terminal saves through the same function. I am realizing that is a fundamental error. I think we would make write_distributed_data ONLY function as a terminator for a workflow.
  2. I propose we create a different function for handling intermediate saves that could be used in a basic map operator. Not sure what to call it but it would be little more than a wrapper on the save_data method of Database that would standardize use in a map operator. Not sure what to call it but maybe something save_data_distributed. Then again, that maybe should be the name we use instead of write_distributed_data and we should have a different name for this function like save_data_and_continue. Opinions?

An issue this further simplifies is handling history. Case 1 and 2 have to be handled differently in history management.

wangyinz commented 8 months ago

Right now, the write_distributed_data is a terminator for a workflow. I think it does support using the output of it for read_distributed_data so that a new workflow can be immediately initiated after the terminator. So, what you proposed here is to implement a similar function that automatically does the save and then continue. I think the difference here will be subtile, but I can get the point that such "save and continue" function will have much less overhead.

As of naming, why don't we still have one write_distributed_data function but add an argument to it to control the two different branches? Internally, this can be two very different function, but the user-facing API can be the same. What do you think? Otherwise, it will be confusing to have "save" v.s. "write" in functions that does similar thing.

p.s. what I am proposing is to have a default argument like continue = False. Then, it calls two different underlying functions according to that argument.

pavlis commented 8 months ago

Good points, @wangyinz. I also realized something else that makes the whole idea of using the function name write_distributed_data for an intermediate save as a bit unnecessary. I sat down and thought about how I might implement an intermediate save function to be used in a map operator. I came up with the following after I had cross-referenced the api for Database.save_data.

def save_data_distributed_inline(
          mspass_object,
          db,
          data_are_atomic=True,
          mode="promiscuous",
          storage_mode="gridfs",
          format=None,
          file_format=None,
          overwrite=False,
          exclude_keys=None,
          collection=None,
          data_tag=None,
          cremate=False,
          return_data=False,
          alg_name="save_data_distributed",
          alg_id="0",
      ):
    mspass_object = db.save_data(
            mspass_object,
            return_data=True,
            mode=mode,
            storage_mode=storage_mode,
            format=file_format,   # namespace inconsistency here - distributed should call this "scheduler"
            overwrite=overwrite,
            exclude_keys=exclude_keys,
            collection=collection,
            data_tag=data_tag,
            cremate=cremate,
            alg_name="save_data_distributed_inline",
            alg_id=alg_id,
        )
    return mspass_object

To be clear, this function would be used in a map call like the following:

mybag = mybag.map(save_data_distributed_inline,db, - optional args)

Why it is so unnecessary is this an almost silly wrapper for Database.save_data. The only thing it really does is: (1) set the (new) return_data boolean True - current new default is False, and (2) fixes an api symbol collision I carried forward with the keyword "format" that becomes ambiguous.

Note why this wrapper is so trivial now is a fundamental change in the v2 api I'm finalizing. That is, save_data handles all atomic and ensemble types through the same method.

Based on the above experience, I think the best approach is to:

  1. Retain the name write_distributed_data.
  2. Have this function only work as a terminator to a workflow
  3. Add an example in the docstring and text in the user manual showing how to use Database.save_data within a workflow in a map operator for an intermediate save.

If you concur, I will proceed that way.

wangyinz commented 8 months ago

I see. Yes, that make sense. I agree with the new design.