rajasekarv / vega

A new arguably faster implementation of Apache Spark from scratch in Rust
Apache License 2.0
2.23k stars 206 forks source link

Fixes and improvements for distributed mode #75

Closed iduartgomez closed 4 years ago

iduartgomez commented 4 years ago

There are a couple changes still I want to make:

iduartgomez commented 4 years ago

Note to self, check again if with new knowledge and setup i can make the capnp bufreader aproach work so we don't require to dump on an intermediate vec (or read a bit more abotu the capnp library to find out if we can find out serialized size easily, this too would suffice).

If everything fails switching back to vanilla streams is a possibility.

iduartgomez commented 4 years ago

Almost done with this, this is better that what is currently is in master (which is broken) so it could be merged (still need to investigate what is in the shuffle issues though). Also after last changes the executor tests kind broke so will look into it tomorrow. Aside of that, is working fine in distributed mode now.

p.s: Now deserializes directly the capnp_msg, there is no need for intermediate allocation really :)

iduartgomez commented 4 years ago

So, the problem with the shuffle tasks (or at least the one I found so far) is not limited to this version, is just an artifact of not running the whole application (scheduler included) in every worker (which is the correct behaviour).

In previous versions the scheduler was executed in the workers so it was registering the locations of the output for the shuffle tasks (but also was executing the whole application on every host!), here is a partial log output of a typical execution at the worker where you can see the registerign of shuffle tasks in a worker:

09:45:39 [ INFO] inside get_shufflemap stage before
09:45:39 [ INFO] inside new stage
09:45:39 [ INFO] shuffle dependency and registering mapoutput tracker
09:45:39 [ INFO] inside register shuffle
09:45:39 [ INFO] server_uris after register_shuffle {0: [None, None, None, None]}
09:45:39 [ INFO] new stage tracker after
09:45:39 [ INFO] new stage id 1
--
09:45:39 [ INFO] inside get_shufflemap stage before
09:45:39 [ INFO] inside new stage
09:45:39 [ INFO] shuffle dependency and registering mapoutput tracker
09:45:39 [ INFO] inside register shuffle
09:45:39 [ INFO] server_uris after register_shuffle {0: [None, None, None, None], 1: [None, None, None, None]}
09:45:39 [ INFO] new stage tracker after
09:45:39 [ INFO] new stage id 2
--
09:45:42 [ INFO] running [1, 2]
09:45:42 [ INFO] waiting [0]
09:45:42 [ INFO] here before registering map outputs
09:45:42 [ INFO] stage output locs before register mapoutput tracker [["http://172.18.0.2:5353"], ["http://172.18.0.2:5353"], ["http://172.18.0.2:5353"], ["http://172.18.0.2:5353"]]
09:45:42 [ INFO] locs for shuffle id 0 [Some("http://172.18.0.2:5353"), Some("http://172.18.0.2:5353"), Some("http://172.18.0.2:5353"), Some("http://172.18.0.2:5353")]
09:45:42 [ INFO] registering map outputs inside map output tracker for shuffle id 0 [Some("http://172.18.0.2:5353"), Some("http://172.18.0.2:5353"), Some("http://172.18.0.2:5353"), Some("http://172.18.0.2:5353")]
09:45:42 [ INFO] here after registering map outputs
09:45:42 [ INFO] inside get missing parent stages
09:45:42 [ INFO] missing stages []
--
09:45:42 [ INFO] running [0, 2]
09:45:42 [ INFO] waiting []
09:45:42 [ INFO] here before registering map outputs
09:45:42 [ INFO] stage output locs before register mapoutput tracker [["http://172.18.0.2:5353"], ["http://172.18.0.2:5353"], ["http://172.18.0.2:5353"], ["http://172.18.0.2:5353"]]
09:45:42 [ INFO] locs for shuffle id 1 [Some("http://172.18.0.2:5353"), Some("http://172.18.0.2:5353"), Some("http://172.18.0.2:5353"), Some("http://172.18.0.2:5353")]
09:45:42 [ INFO] registering map outputs inside map output tracker for shuffle id 1 [Some("http://172.18.0.2:5353"), Some("http://172.18.0.2:5353"), Some("http://172.18.0.2:5353"), Some("http://172.18.0.2:5353")]
09:45:42 [ INFO] here after registering map outputs
09:45:42 [ INFO] inside iterator cogrouprdd  shuffle dep agg {}
09:45:42 [ INFO] inside fetch function

My take is there is a missing piece of implementation where the location of the shuffle output should be propagated to each worker.

rajasekarv commented 4 years ago

Locations of shuffle outputs were indeed propagated. Output of shuffle task is nothing but the address of executors which did the task and has the result. You can have a look at ShuffleMapTask. Old versions are working fine. Somewhere in the middle, we have missed something. I will also look into this.

iduartgomez commented 4 years ago

But how was it being propagated, there is no explicit sending of the addresses of the output locations to the executors, it just registers the addresses locally on the master.

However what I see in the debug output of old versions is the whole application and the scheduler running which does not seem fine to me. (you can also see the results being printed multiple times because they are being executed in the workers). What looks like it that it was running fine by accident, not because it was being done correctly.

When and how is the driver supposed to send the output locations to workers?

rajasekarv commented 4 years ago

result task and shuffle task both returns serialzable Any type which is being retrieved in submit task method. For result task, it is directly downcasted to appropriate type and for shuffle task to string representing IP and based on which it fetches from map output tracker of that particular IP

iduartgomez commented 4 years ago

Yes, it returns the IP to the driver, but when a ShuffleTask is next run on an executor it tries to get the IP's from the map output tracker and it does not contain any address, the driver does. That's what I mean, it's never propagated to the executors.

rajasekarv commented 4 years ago

Mapoutput tracker searches in local and uses IPs if it has already, if not it fetches from master. Have a look at get_server_uri method and client method in mapoutput tacker.

iduartgomez commented 4 years ago

Well there is where it crashes:

self
            .server_uris
            .get(&shuffle_id)
            .unwrap()
            .iter()
            .filter(|x| !x.is_none())
            .map(|x| x.clone().unwrap())
            .next()
            .is_none()

The shuffle_id is not present in the map so it panics while unwrapping and it fails. It should be inserting shuffle_id in a previous task with None then but is not. There are no calls to ' register_shuffle` in executor code, and that is the only way to register a suffle_id (right now).

AmbitionXiang commented 3 years ago

Hi, there. When I run in distributed mode, the error "No message received" often occurs. How to fix it?

rajasekarv commented 3 years ago

@AmbitionXiang can you tell us how are you running? one reproducible example would be great

AmbitionXiang commented 3 years ago

@rajasekarv Thank you. I have found the reason. Cap'n Proto limits the message size for some security issues, and workers in Vega sends back (serialized) results to the scheduler in one message. So when the size of results is large, it is blocked. Would Vega support a large amount of data by splitting the data into several messages?