facebookincubator / velox

A composable and fully extensible C++ execution engine library for data management systems.
https://velox-lib.io/
Apache License 2.0
3.51k stars 1.15k forks source link

Incompatibility between Spark and Velox Data Types Causing Runtime Failures #5770

Open srinivasst opened 1 year ago

srinivasst commented 1 year ago

Problem Description

When utilizing Velox to read data from Spark, we've observed that certain data types are not represented identically between Spark and Parquet files. This discrepancy results in a runtime error when the data returned by the Parquet reader differs from what Spark anticipates. We've identified the following types as problematic:

  1. u8 -> i16
  2. u16 -> i32
  3. u32 -> i64
  4. u64 -> decimal(20, 0)
  5. DateType ignores rebaseMode conf
  6. TimeStampType ignores rebaseMode conf

For instance, while reading columns through Velox, Gluten creates a Velox scan node based on the format expected by Spark. However, due to the incompatible data representation, an error arises as exemplified by the following log:

E0607 16:44:56.883301 11636 Exceptions.h:68] Line: /root/Velox/velox/vector/ComplexVector.h:68, Function:RowVector, Expression: child->type()->kindEquals(type->childAt(i)) Got type BIGINT for field `n0_0` at position 0, but expected DECIMAL(20,0)., Source: RUNTIME, ErrorCode: INVALID_STATE
23/06/07 16:44:56 ERROR TaskResources: Task 4 failed by error: 
java.lang.RuntimeException: Exception: VeloxRuntimeError
Error Source: RUNTIME
Error Code: INVALID_STATE
Reason: Got type BIGINT for field `n0_0` at position 0, but expected DECIMAL(20,0).
Retriable: False
Expression: child->type()->kindEquals(type->childAt(i))
Context: Split [file file:///root/Gluten/backends-velox/target/scala-2.12/test-classes/data-type-validation-data/type3/primitive_types_parquet_file.parquet 0 - 7295] Task gluten task 1
Top-Level Context: Same as context.
Function: RowVector
File: /root/Velox/velox/vector/ComplexVector.h
Line: 68
Stack trace:
# 0  std::shared_ptr<facebook::velox::VeloxException::State const> facebook::velox::VeloxException::State::make<facebook::velox::VeloxException::make(char const*, unsigned long, char const*, std::basic_string_view<char, std::char_traits<char> >, std::basic_string_view<char, std::char_traits<char> >, std::basic_string_view<char, std::char_traits<char> >, std::basic_string_view<char, std::char_traits<char> >, bool, facebook::velox::VeloxException::Type, std::basic_string_view<char, std::char_traits<char> >)::{lambda(auto:1&)#1}>(facebook::velox::VeloxException::Type, facebook::velox::VeloxException::make(char const*, unsigned long, char const*, std::basic_string_view<char, std::char_traits<char> >, std::basic_string_view<char, std::char_traits<char> >, std::basic_string_view<char, std::char_traits<char> >, std::basic_string_view<char, std::char_traits<char> >, bool, facebook::velox::VeloxException::Type, std::basic_string_view<char, std::char_traits<char> >)::{lambda(auto:1&)#1})
# 1  facebook::velox::VeloxException::VeloxException(char const*, unsigned long, char const*, std::basic_string_view<char, std::char_traits<char> >, std::basic_string_view<char, std::char_traits<char> >, std::basic_string_view<char, std::char_traits<char> >, std::basic_string_view<char, std::char_traits<char> >, bool, facebook::velox::VeloxException::Type, std::basic_string_view<char, std::char_traits<char> >)
# 2  facebook::velox::VeloxRuntimeError::VeloxRuntimeError(char const*, unsigned long, char const*, std::basic_string_view<char, std::char_traits<char> >, std::basic_string_view<char, std::char_traits<char> >, std::basic_string_view<char, std::char_traits<char> >, std::basic_string_view<char, std::char_traits<char> >, bool, std::basic_string_view<char, std::char_traits<char> >)
# 3  void facebook::velox::detail::veloxCheckFail<facebook::velox::VeloxRuntimeError, std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> > const&>(facebook::velox::detail::VeloxCheckFailArgs const&, std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> > const&)
# 4  facebook::velox::RowVector::RowVector(facebook::velox::memory::MemoryPool*, std::shared_ptr<facebook::velox::Type const>, boost::intrusive_ptr<facebook::velox::Buffer>, unsigned long, std::vector<std::shared_ptr<facebook::velox::BaseVector>, std::allocator<std::shared_ptr<facebook::velox::BaseVector> > >, std::optional<int>)
# 5  void __gnu_cxx::new_allocator<facebook::velox::RowVector>::construct<facebook::velox::RowVector, facebook::velox::memory::MemoryPool*&, std::shared_ptr<facebook::velox::RowType const> const&, boost::intrusive_ptr<facebook::velox::Buffer>, int&, std::vector<std::shared_ptr<facebook::velox::BaseVector>, std::allocator<std::shared_ptr<facebook::velox::BaseVector> > >&>(facebook::velox::RowVector*, facebook::velox::memory::MemoryPool*&, std::shared_ptr<facebook::velox::RowType const> const&, boost::intrusive_ptr<facebook::velox::Buffer>&&, int&, std::vector<std::shared_ptr<facebook::velox::BaseVector>, std::allocator<std::shared_ptr<facebook::velox::BaseVector> > >&)
# 6  void std::allocator_traits<std::allocator<facebook::velox::RowVector> >::construct<facebook::velox::RowVector, facebook::velox::memory::MemoryPool*&, std::shared_ptr<facebook::velox::RowType const> const&, boost::intrusive_ptr<facebook::velox::Buffer>, int&, std::vector<std::shared_ptr<facebook::velox::BaseVector>, std::allocator<std::shared_ptr<facebook::velox::BaseVector> > >&>(std::allocator<facebook::velox::RowVector>&, facebook::velox::RowVector*, facebook::velox::memory::MemoryPool*&, std::shared_ptr<facebook::velox::RowType const> const&, boost::intrusive_ptr<facebook::velox::Buffer>&&, int&, std::vector<std::shared_ptr<facebook::velox::BaseVector>, std::allocator<std::shared_ptr<facebook::velox::BaseVector> > >&)
# 7  std::_Sp_counted_ptr_inplace<facebook::velox::RowVector, std::allocator<facebook::velox::RowVector>, (__gnu_cxx::_Lock_policy)2>::_Sp_counted_ptr_inplace<facebook::velox::memory::MemoryPool*&, std::shared_ptr<facebook::velox::RowType const> const&, boost::intrusive_ptr<facebook::velox::Buffer>, int&, std::vector<std::shared_ptr<facebook::velox::BaseVector>, std::allocator<std::shared_ptr<facebook::velox::BaseVector> > >&>(std::allocator<facebook::velox::RowVector>, facebook::velox::memory::MemoryPool*&, std::shared_ptr<facebook::velox::RowType const> const&, boost::intrusive_ptr<facebook::velox::Buffer>&&, int&, std::vector<std::shared_ptr<facebook::velox::BaseVector>, std::allocator<std::shared_ptr<facebook::velox::BaseVector> > >&)
# 8  std::__shared_count<(__gnu_cxx::_Lock_policy)2>::__shared_count<facebook::velox::RowVector, std::allocator<facebook::velox::RowVector>, facebook::velox::memory::MemoryPool*&, std::shared_ptr<facebook::velox::RowType const> const&, boost::intrusive_ptr<facebook::velox::Buffer>, int&, std::vector<std::shared_ptr<facebook::velox::BaseVector>, std::allocator<std::shared_ptr<facebook::velox::BaseVector> > >&>(facebook::velox::RowVector*&, std::_Sp_alloc_shared_tag<std::allocator<facebook::velox::RowVector> >, facebook::velox::memory::MemoryPool*&, std::shared_ptr<facebook::velox::RowType const> const&, boost::intrusive_ptr<facebook::velox::Buffer>&&, int&, std::vector<std::shared_ptr<facebook::velox::BaseVector>, std::allocator<std::shared_ptr<facebook::velox::BaseVector> > >&)
# 9  std::__shared_ptr<facebook::velox::RowVector, (__gnu_cxx::_Lock_policy)2>::__shared_ptr<std::allocator<facebook::velox::RowVector>, facebook::velox::memory::MemoryPool*&, std::shared_ptr<facebook::velox::RowType const> const&, boost::intrusive_ptr<facebook::velox::Buffer>, int&, std::vector<std::shared_ptr<facebook::velox::BaseVector>, std::allocator<std::shared_ptr<facebook::velox::BaseVector> > >&>(std::_Sp_alloc_shared_tag<std::allocator<facebook::velox::RowVector> >, facebook::velox::memory::MemoryPool*&, std::shared_ptr<facebook::velox::RowType const> const&, boost::intrusive_ptr<facebook::velox::Buffer>&&, int&, std::vector<std::shared_ptr<facebook::velox::BaseVector>, std::allocator<std::shared_ptr<facebook::velox::BaseVector> > >&)
# 10 std::shared_ptr<facebook::velox::RowVector>::shared_ptr<std::allocator<facebook::velox::RowVector>, facebook::velox::memory::MemoryPool*&, std::shared_ptr<facebook::velox::RowType const> const&, boost::intrusive_ptr<facebook::velox::Buffer>, int&, std::vector<std::shared_ptr<facebook::velox::BaseVector>, std::allocator<std::shared_ptr<facebook::velox::BaseVector> > >&>(std::_Sp_alloc_shared_tag<std::allocator<facebook::velox::RowVector> >, facebook::velox::memory::MemoryPool*&, std::shared_ptr<facebook::velox::RowType const> const&, boost::intrusive_ptr<facebook::velox::Buffer>&&, int&, std::vector<std::shared_ptr<facebook::velox::BaseVector>, std::allocator<std::shared_ptr<facebook::velox::BaseVector> > >&)
# 11 std::shared_ptr<facebook::velox::RowVector> std::allocate_shared<facebook::velox::RowVector, std::allocator<facebook::velox::RowVector>, facebook::velox::memory::MemoryPool*&, std::shared_ptr<facebook::velox::RowType const> const&, boost::intrusive_ptr<facebook::velox::Buffer>, int&, std::vector<std::shared_ptr<facebook::velox::BaseVector>, std::allocator<std::shared_ptr<facebook::velox::BaseVector> > >&>(std::allocator<facebook::velox::RowVector> const&, facebook::velox::memory::MemoryPool*&, std::shared_ptr<facebook::velox::RowType const> const&, boost::intrusive_ptr<facebook::velox::Buffer>&&, int&, std::vector<std::shared_ptr<facebook::velox::BaseVector>, std::allocator<std::shared_ptr<facebook::velox::BaseVector> > >&)
# 12 std::shared_ptr<facebook::velox::RowVector> std::make_shared<facebook::velox::RowVector, facebook::velox::memory::MemoryPool*&, std::shared_ptr<facebook::velox::RowType const> const&, boost::intrusive_ptr<facebook::velox::Buffer>, int&, std::vector<std::shared_ptr<facebook::velox::BaseVector>, std::allocator<std::shared_ptr<facebook::velox::BaseVector> > >&>(facebook::velox::memory::MemoryPool*&, std::shared_ptr<facebook::velox::RowType const> const&, boost::intrusive_ptr<facebook::velox::Buffer>&&, int&, std::vector<std::shared_ptr<facebook::velox::BaseVector>, std::allocator<std::shared_ptr<facebook::velox::BaseVector> > >&)
# 13 facebook::velox::connector::hive::HiveDataSource::next(unsigned long, folly::SemiFuture<folly::Unit>&)
# 14 facebook::velox::exec::TableScan::getOutput()
# 15 facebook::velox::exec::Driver::runInternal(std::shared_ptr<facebook::velox::exec::Driver>&, std::shared_ptr<facebook::velox::exec::BlockingState>&, std::shared_ptr<facebook::velox::RowVector>&)
# 16 facebook::velox::exec::Driver::next(std::shared_ptr<facebook::velox::exec::BlockingState>&)
# 17 facebook::velox::exec::Task::next(folly::SemiFuture<folly::Unit>*)
# 18 gluten::WholeStageResultIterator::next()
# 19 gluten::ResultIterator::getNext()
# 20 gluten::ResultIterator::hasNext()
# 21 Java_io_glutenproject_vectorized_ColumnarBatchOutIterator_nativeHasNext
# 22 0x00007fc7dd020848

    at io.glutenproject.vectorized.ColumnarBatchOutIterator.nativeHasNext(Native Method)
    at io.glutenproject.vectorized.ColumnarBatchOutIterator.hasNextInternal(ColumnarBatchOutIterator.java:45)
    at io.glutenproject.vectorized.GeneralOutIterator.hasNext(GeneralOutIterator.java:37)
    at io.glutenproject.backendsapi.glutendata.GlutenIteratorApi$$anon$2.hasNext(GlutenIteratorApi.scala:240)
    at io.glutenproject.vectorized.CloseableColumnBatchIterator.hasNext(CloseableColumnBatchIterator.scala:41)
    at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:491)
    at org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:400)
    at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:897)
    at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:897)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:57)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:366)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:330)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
    at org.apache.spark.scheduler.Task.run(Task.scala:136)
    at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1504)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:750)

This error states that a BIGINT type was returned for the field n0_0 at position 0, while a DECIMAL(20,0) was expected.

It is not possible to know the actual parquet type at plan time therefor this converstion must be done at runtime. The conversations done by Spark at ParquetVectorUpdaterFactory.java needs to be honored.

Proposed Solution

As part of this enhanceent I have identified 3 changes required to add unsigned type support to Velox when reading from Gluten / Spark.

Task List

srinivasst commented 1 year ago

I have identified the changes. Will open PRs for each of the tasks.

srinivasst commented 1 year ago

@Yuhta @rui-mo @majetideepak Please review these changes

gaoyangxiaozhu commented 1 year ago

I would onging code work related to this issue