apache / incubator-gluten

Gluten is a middle layer responsible for offloading JVM-based SQL engines' execution to native engines.
https://gluten.apache.org/
Apache License 2.0
1.22k stars 439 forks source link

[VL] Error during spilling: The current running driver and the request driver must be from the same task #6864

Closed zhztheplayer closed 3 months ago

zhztheplayer commented 3 months ago

Velox has this restriction to disallow spilling one task by operations in another task. We used to follow the restriction in Gluten but somehow we lost it so an error The current running driver and the request driver must be from the same task occurs.

It's found in a local TPC-DS test.

zhztheplayer commented 3 months ago

Error call stack:

Caused by: org.apache.gluten.exception.GlutenException: Error during calling Java code from native code: org.apache.gluten.exception.GlutenException: org.apache.gluten.exception.GlutenException: Exception: VeloxRuntimeError
Error Source: RUNTIME
Error Code: INVALID_STATE
Reason: (Gluten_Stage_371_TID_21258_VTID_1061 vs. Gluten_Stage_371_TID_21258_VTID_1060) The current running driver and the request driver must be from the same task
Retriable: False
Expression: runningDriver->task()->taskId() == opDriver->task()->taskId()
Context: Top-level Expression: multiply(try_cast((n0_0) as DECIMAL(10, 0)), n0_1)
Additional Context: Operator: FilterProject[1] 1
Function: enterArbitration
File: /opt/gluten/ep/build-velox/build/velox_ep/velox/exec/Operator.cpp
Line: 571
Stack trace:
# 0  facebook::velox::process::StackTrace::StackTrace(int)
# 1  facebook::velox::VeloxException::VeloxException(char const*, unsigned long, char const*, std::basic_string_view<char, std::char_traits<char> >, std::basic_string_view<char, std::char_traits<char> >, std::basic_string_view<char, std::char_traits<char> >, std::basic_string_view<char, std::char_traits<char> >, bool, facebook::velox::VeloxException::Type, std::basic_string_view<char, std::char_traits<char> >)
# 2  void facebook::velox::detail::veloxCheckFail<facebook::velox::VeloxRuntimeError, std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> > const&>(facebook::velox::detail::VeloxCheckFailArgs const&, std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> > const&)
# 3  facebook::velox::exec::Operator::MemoryReclaimer::enterArbitration()
# 4  facebook::velox::memory::MemoryPoolImpl::growCapacity(facebook::velox::memory::MemoryPool*, unsigned long)
# 5  facebook::velox::memory::MemoryPoolImpl::incrementReservationThreadSafe(facebook::velox::memory::MemoryPool*, unsigned long)
# 6  facebook::velox::memory::MemoryPoolImpl::incrementReservationThreadSafe(facebook::velox::memory::MemoryPool*, unsigned long)
# 7  facebook::velox::memory::MemoryPoolImpl::incrementReservationThreadSafe(facebook::velox::memory::MemoryPool*, unsigned long)
# 8  facebook::velox::memory::MemoryPoolImpl::incrementReservationThreadSafe(facebook::velox::memory::MemoryPool*, unsigned long)
# 9  facebook::velox::memory::MemoryPoolImpl::reserveThreadSafe(unsigned long, bool)
# 10 facebook::velox::memory::MemoryPoolImpl::reserve(unsigned long, bool)
# 11 facebook::velox::memory::MemoryPoolImpl::allocate(long)
# 12 boost::intrusive_ptr<facebook::velox::Buffer> facebook::velox::AlignedBuffer::allocate<long>(unsigned long, facebook::velox::memory::MemoryPool*, std::optional<long> const&)
# 13 facebook::velox::FlatVector<long>::mutableRawValues()
# 14 facebook::velox::functions::sparksql::(anonymous namespace)::DecimalBaseFunction<long, long, long, facebook::velox::functions::sparksql::(anonymous namespace)::Multiply>::apply(facebook::velox::SelectivityVector const&, std::vector<std::shared_ptr<facebook::velox::BaseVector>, std::allocator<std::shared_ptr<facebook::velox::BaseVector> > >&, std::shared_ptr<facebook::velox::Type const> const&, facebook::velox::exec::EvalCtx&, std::shared_ptr<facebook::velox::BaseVector>&) const
# 15 facebook::velox::exec::Expr::applyFunction(facebook::velox::SelectivityVector const&, facebook::velox::exec::EvalCtx&, std::shared_ptr<facebook::velox::BaseVector>&)
# 16 facebook::velox::exec::Expr::evalAllImpl(facebook::velox::SelectivityVector const&, facebook::velox::exec::EvalCtx&, std::shared_ptr<facebook::velox::BaseVector>&)
# 17 facebook::velox::exec::Expr::evalAll(facebook::velox::SelectivityVector const&, facebook::velox::exec::EvalCtx&, std::shared_ptr<facebook::velox::BaseVector>&)
# 18 facebook::velox::exec::Expr::evalWithNulls(facebook::velox::SelectivityVector const&, facebook::velox::exec::EvalCtx&, std::shared_ptr<facebook::velox::BaseVector>&)
# 19 facebook::velox::exec::Expr::evalEncodings(facebook::velox::SelectivityVector const&, facebook::velox::exec::EvalCtx&, std::shared_ptr<facebook::velox::BaseVector>&)
# 20 facebook::velox::exec::Expr::eval(facebook::velox::SelectivityVector const&, facebook::velox::exec::EvalCtx&, std::shared_ptr<facebook::velox::BaseVector>&, facebook::velox::exec::ExprSet const*)
# 21 facebook::velox::exec::ExprSet::eval(int, int, bool, facebook::velox::SelectivityVector const&, facebook::velox::exec::EvalCtx&, std::vector<std::shared_ptr<facebook::velox::BaseVector>, std::allocator<std::shared_ptr<facebook::velox::BaseVector> > >&)
# 22 facebook::velox::exec::FilterProject::project(facebook::velox::SelectivityVector const&, facebook::velox::exec::EvalCtx&)
# 23 facebook::velox::exec::FilterProject::getOutput()
# 24 facebook::velox::exec::Driver::runInternal(std::shared_ptr<facebook::velox::exec::Driver>&, std::shared_ptr<facebook::velox::exec::BlockingState>&, std::shared_ptr<facebook::velox::RowVector>&)
# 25 facebook::velox::exec::Driver::next(std::shared_ptr<facebook::velox::exec::BlockingState>&)
# 26 facebook::velox::exec::Task::next(folly::SemiFuture<folly::Unit>*)
# 27 gluten::WholeStageResultIterator::next()
# 28 Java_org_apache_gluten_vectorized_ColumnarBatchOutIterator_nativeHasNext
# 29 0x00007f8edcb29ea8

    at org.apache.gluten.vectorized.GeneralOutIterator.hasNext(GeneralOutIterator.java:39)
    at scala.collection.convert.Wrappers$JIteratorWrapper.hasNext(Wrappers.scala:45)
    at org.apache.gluten.utils.iterator.IteratorsV1$InvocationFlowProtection.hasNext(IteratorsV1.scala:159)
    at org.apache.gluten.utils.iterator.IteratorsV1$IteratorCompleter.hasNext(IteratorsV1.scala:71)
    at org.apache.gluten.utils.iterator.IteratorsV1$PayloadCloser.hasNext(IteratorsV1.scala:37)
    at org.apache.gluten.utils.iterator.IteratorsV1$LifeTimeAccumulator.hasNext(IteratorsV1.scala:100)
    at org.apache.gluten.utils.iterator.IteratorsV1$ReadTimeAccumulator.hasNext(IteratorsV1.scala:127)
    at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
    at scala.collection.convert.Wrappers$IteratorWrapper.hasNext(Wrappers.scala:32)
    at org.apache.gluten.vectorized.GeneralInIterator.hasNext(GeneralInIterator.java:31)
    at org.apache.gluten.vectorized.ColumnarBatchOutIterator.nativeHasNext(Native Method)
    at org.apache.gluten.vectorized.ColumnarBatchOutIterator.hasNextInternal(ColumnarBatchOutIterator.java:61)
    at org.apache.gluten.vectorized.GeneralOutIterator.hasNext(GeneralOutIterator.java:37)
    at scala.collection.convert.Wrappers$JIteratorWrapper.hasNext(Wrappers.scala:45)
    at org.apache.gluten.utils.iterator.IteratorsV1$ReadTimeAccumulator.hasNext(IteratorsV1.scala:127)
    at org.apache.gluten.utils.iterator.IteratorsV1$PayloadCloser.hasNext(IteratorsV1.scala:37)
    at org.apache.gluten.utils.iterator.IteratorsV1$IteratorCompleter.hasNext(IteratorsV1.scala:71)
    at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
    at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
    at org.apache.spark.shuffle.ColumnarShuffleWriter.internalWrite(ColumnarShuffleWriter.scala:122)
    at org.apache.spark.shuffle.ColumnarShuffleWriter.write(ColumnarShuffleWriter.scala:239)
    at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
    at org.apache.spark.scheduler.Task.run(Task.scala:136)
    at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1504)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
zhztheplayer commented 3 months ago

It's probably because one Velox task is extending a vector created in another Velox task. (the call mutableRawValues())

zhztheplayer commented 3 months ago

Related to https://github.com/facebookincubator/velox/pull/10705

FelixYBW commented 3 months ago

So it's the velox task threads, not Spark's, right? how multiple threads are triggered during spill?

zhztheplayer commented 3 months ago

So it's the velox task threads, not Spark's, right? how multiple threads are triggered during spill?

The two Velox tasks are running on the same Spark thread, though they are created by different WholeStageTransformer plan. Which means data is going from C++ to Java then back to C++. So the issue is triggered.

https://github.com/apache/incubator-gluten/pull/6932 will fix this