rajasekarv / vega

A new arguably faster implementation of Apache Spark from scratch in Rust
Apache License 2.0
2.23k stars 206 forks source link

Deadlock while partitioning #41

Closed iduartgomez closed 4 years ago

iduartgomez commented 4 years ago

As talked in Gitter, while developing union I found out a problem where the application enters a deadlock while resolving the partitioning or computation of a dag. The workign branch is: https://github.com/iduartgomez/native_spark/tree/dev

The error is reproducible executing:

#[test]
fn test_error() {
    let sc = CONTEXT.clone();
    let join = || {
        let col1 = vec![
            (1, ("A".to_string(), "B".to_string())),
            (2, ("C".to_string(), "D".to_string())),
            (3, ("E".to_string(), "F".to_string())),
            (4, ("G".to_string(), "H".to_string())),
        ];
        let col1 = sc.parallelize(col1, 4);
        let col2 = vec![
            (1, "A1".to_string()),
            (1, "A2".to_string()),
            (2, "B1".to_string()),
            (2, "B2".to_string()),
            (3, "C1".to_string()),
            (3, "C2".to_string()),
        ];
        let col2 = sc.parallelize(col2, 4);
        col2.join(col1.clone(), 4)
    };
    let join1 = join();
    let join2 = join();
    let res = join1.union(join2).unwrap().collect().unwrap();
    assert_eq!(res.len(), 12);
}

Inside some executor there is a thread panic over here:

let mut stream_r = std::io::BufReader::new(&mut stream);
let message_reader = serialize_packed::read_message(&mut stream_r, r).unwrap()
rajasekarv commented 4 years ago

Will check it out asap @iduartgomez

iduartgomez commented 4 years ago

There is an other problem I got into while implementing this (same branch), but is unrelated (I think). with this test:

let partitioner = HashPartitioner::<i32>::new(2);
            let co_grouped = || {
                let rdd = vec![
                    (1i32, "A".to_string()),
                    (2, "B".to_string()),
                    (3, "C".to_string()),
                    (4, "D".to_string()),
                ];
                let rdd0 = SerArc::new(sc.parallelize(rdd.clone(), 2)) as SerArc<dyn RddBase>;
                let rdd1 = SerArc::new(sc.parallelize(rdd, 2)) as SerArc<dyn RddBase>;
                CoGroupedRdd::<i32>::new(vec![rdd0, rdd1], Box::new(partitioner.clone()))
            };
            let rdd0 = co_grouped();
            let rdd1 = co_grouped();
            let res = rdd0.union(rdd1)?.collect()?;
            assert_eq!(res.len(), 8)

it fails at local_scheduler::get_preferred_locs because an out of bound panic. Inside submit_missing_tasks (using local scheduler) it computes 4 total output parts, but the iterated computed final_rdd only has two (which is probably wrong and the final one should probably be 4 indeed in this case).

iduartgomez commented 4 years ago

Writing it here for the record too, after the changes to make Rdd object safe I updated the branch with the changes but still encountering this couple of problems.

rajasekarv commented 4 years ago

@iduartgomez Remove that number_of_splits definition in union_rdd. It is already a provided method in RddBase trait and there is no need to implement it again.

rajasekarv commented 4 years ago

Apart from gitter, just mentioning it here for completeness. The issue occurs due to the incomplete implementation of dependencies in unionrdd. It leads to an improper dependency graph and ultimately leads to panic(as shuffle tasks are never performed to update server_uris). Please open a separate branch where I can commit the patch or if you want to resolve it yourself, then also fine. Let me know your decision.

iduartgomez commented 4 years ago

@rajasekarv pushed the dev branch where the union changes reside on my fork to the main repo

iduartgomez commented 4 years ago

I think we can close this for now.