rajasekarv / vega

A new arguably faster implementation of Apache Spark from scratch in Rust
Apache License 2.0
2.23k stars 207 forks source link

Core dump as the number of rdds grows #130

Open AmbitionXiang opened 3 years ago

AmbitionXiang commented 3 years ago

Recently, I found that these pieces of code snippets cause core dump.

let mut tc = sc.parallelize(vec![1], 1);
let tc0 = tc.clone();
let mut next_count = tc.count().unwrap();
while idx < 100 {
    tc = tc.union(tc0.clone().into());
    next_count = tc.count().unwrap();
}
let mut tc = sc.parallelize(vec![1], 1);
let mut next_count = tc.count().unwrap();
while idx < 100 {
    tc = tc.distinct();
    next_count = tc.count().unwrap();
}
let mut tc = sc.parallelize(vec![1], 1);
let mut next_count = tc.count().unwrap();
while idx < 100 {
    tc = tc.group_by_key(1)
        .map(Fn!(|i: (u32, Vec<u32>)| (i.0, i.1[0])));
    next_count = tc.count().unwrap();
}

I used bt in gdb, and get the following information about the error

#11919 0x0000557a28dfdf69 in <erased_serde::de::erase::Visitor<T> as erased_serde::de::Visitor>::erased_visit_enum ()
#11920 0x0000557a28e19c82 in <erased_serde::de::erase::Deserializer<T> as erased_serde::de::Deserializer>::erased_deserialize_enum ()
#11921 0x0000557a28e19e9f in <erased_serde::de::erase::Deserializer<T> as erased_serde::de::Deserializer>::erased_deserialize_enum ()
#11922 0x0000557a28e15dc0 in <erased_serde::de::erase::Visitor<T> as erased_serde::de::Visitor>::erased_visit_newtype_struct ()
#11923 0x0000557a28e16539 in <erased_serde::de::erase::Visitor<T> as erased_serde::de::Visitor>::erased_visit_newtype_struct ()
#11924 0x0000557a28e1c43b in <erased_serde::de::erase::Deserializer<T> as erased_serde::de::Deserializer>::erased_deserialize_newtype_struct ()
#11925 0x0000557a28e1c2cc in <erased_serde::de::erase::Deserializer<T> as erased_serde::de::Deserializer>::erased_deserialize_newtype_struct ()
#11926 0x0000557a28e4e8f1 in <T as serde_traitobject::deserialize::Sealed>::deserialize_erased ()
#11927 0x0000557a28dc00a7 in <erased_serde::de::erase::DeserializeSeed<T> as erased_serde::de::DeserializeSeed>::erased_deserialize_seed ()
#11928 0x0000557a28e1670b in <erased_serde::de::erase::SeqAccess<T> as erased_serde::de::SeqAccess>::erased_next_element ()
#11929 0x0000557a28de32b5 in <erased_serde::de::erase::Visitor<T> as erased_serde::de::Visitor>::erased_visit_seq ()
#11930 0x0000557a28e1ab53 in <erased_serde::de::erase::Deserializer<T> as erased_serde::de::Deserializer>::erased_deserialize_tuple ()
#11931 0x0000557a28e3a7d0 in serde_traitobject::deserialize ()
#11932 0x0000557a28dc45b6 in <erased_serde::de::erase::DeserializeSeed<T> as erased_serde::de::DeserializeSeed>::erased_deserialize_seed ()
#11933 0x0000557a28e1670b in <erased_serde::de::erase::SeqAccess<T> as erased_serde::de::SeqAccess>::erased_next_element ()
#11934 0x0000557a28de85d6 in <erased_serde::de::erase::Visitor<T> as erased_serde::de::Visitor>::erased_visit_seq ()
#11935 0x0000557a28e1b1f7 in <erased_serde::de::erase::Deserializer<T> as erased_serde::de::Deserializer>::erased_deserialize_struct ()
#11936 0x0000557a28ea6877 in <T as serde_traitobject::deserialize::Sealed>::deserialize_erased ()
#11937 0x0000557a28eab37f in <&mut bincode::de::Deserializer<R,O> as serde::de::Deserializer>::deserialize_tuple ()
#11938 0x0000557a28e44407 in vega::scheduler::task::_::<impl serde::de::Deserialize for vega::scheduler::task::TaskOption>::deserialize ()
#11939 0x0000557a28e2cd1a in bincode::internal::deserialize ()
#11940 0x0000557a28e4094b in vega::scheduler::local_scheduler::LocalScheduler::run_task ()
#11941 0x0000557a28e2c517 in tokio::loom::std::unsafe_cell::UnsafeCell<T>::with_mut ()
#11942 0x0000557a28e268e5 in tokio::runtime::task::core::Core<T,S>::poll ()
#11943 0x0000557a28e7d0b6 in <std::panic::AssertUnwindSafe<F> as core::ops::function::FnOnce<()>>::call_once ()
#11944 0x0000557a28e7b09d in tokio::runtime::task::harness::Harness<T,S>::poll ()
#11945 0x0000557a29165267 in tokio::runtime::blocking::pool::Inner::run ()
#11946 0x0000557a2916e1e6 in tokio::runtime::context::enter ()
#11947 0x0000557a2917494e in std::sys_common::backtrace::__rust_begin_short_backtrace ()
#11948 0x0000557a2916f031 in core::ops::function::FnOnce::call_once{{vtable-shim}} ()
#11949 0x0000557a291b4e9a in <alloc::boxed::Box<F> as core::ops::function::FnOnce<A>>::call_once ()
    at /rustc/ffa2e7ae8fbf9badc035740db949b9dae271c29f/library/alloc/src/boxed.rs:1042
#11950 <alloc::boxed::Box<F> as core::ops::function::FnOnce<A>>::call_once () at /rustc/ffa2e7ae8fbf9badc035740db949b9dae271c29f/library/alloc/src/boxed.rs:1042
#11951 std::sys::unix::thread::Thread::new::thread_start () at library/std/src/sys/unix/thread.rs:89
#11952 0x00007f783216b6db in start_thread (arg=0x7f7813386700) at pthread_create.c:463
#11953 0x00007f78318f2a3f in clone () at ../sysdeps/unix/sysv/linux/x86_64/clone.S:95

However, this snippet runs successfully.

let mut tc = sc.parallelize(vec![1], 1);
let mut next_count = tc.count().unwrap();
while idx < 100 {
    tc = tc.map(Fn!(|i: (u32, u32)| (i.0, i.1)));
    next_count = tc.count().unwrap();
}

It seems that the problem is related to serde_traitobject. But I don't know why.