apache / arrow

Apache Arrow is a multi-language toolbox for accelerated data interchange and in-memory processing
https://arrow.apache.org/
Apache License 2.0
14.33k stars 3.48k forks source link

Acero's Execution Plan never finishes. #33593

Open asfimport opened 1 year ago

asfimport commented 1 year ago

We have observed that sometimes an execution plan with a small input never finishes (the future returned by the ExecPlan::finished() method is never marked as finished), even though the generator in the sink node is exhausted and has returned nullopt.

This issue seems to happen at random, the same plan with the same input sometimes works (the plan is marked finished) and sometimes it doesn't. Since the ExecPlanImpl destructor forces the executing thread to wait for the plan to finish (when the plan has not yet finished) we enter in a deadlock waiting for a plan that never finishes.

Since this has only happened with small inputs and not in a deterministic way, we believe the issue might be in the ExecPlan::StartProducing method.

Our hypothesis is that after the plan starts producing on each node, each node schedules their tasks and they are  immediately finished (due to the small input) and somehow the callback that marks the future finished_ finished is never executed.

 


Status StartProducing() {
  ...
  Future<> scheduler_finished =   util::AsyncTaskScheduler::Make([this(util::AsyncTaskScheduler* async_scheduler) {
  ...
  scheduler_finished.AddCallback([this](const Status& st) { finished_.MarkFinished(st);});
...
}

Reporter: Pau Garcia Rodriguez Assignee: Weston Pace / @westonpace

Note: This issue was originally created as ARROW-18431. Please see the migration documentation for further details.

asfimport commented 1 year ago

Weston Pace / @westonpace: Are you able to provide some more information on the structure of the plan? Can you print the plan? What is your input? Are you reading from files or in-memory tables or some kind of record batch reader?

My guess would be some kind of deadlock in mutexes from callbacks firing more quickly than we'd expect. It should be fixable if we can find a way to reproduce semi-reliably.

asfimport commented 1 year ago

Pau Garcia Rodriguez: Hello, thank you for your quick answer.

 

The explicit plan we use is as follows:


ExecPlan with 3 nodes:
:SinkNode{}
  :GroupByNode{keys=["bearer.cell"], aggregates=[
        hash_sum(interference-events-samples),
  ]}
    :SourceNode{} 

The SourceNode is using a readerGenerator from a RecordBatchReader. This last reader comes from a dataset scanner (the dataset just contains a single parquet file).

We are using the dataset scanner instead of the scan node because in earlier versions we saw it was faster than using the scan node. I believe this has changed and there's been improvements in recent versions and maybe we should just use the scan node again.

As a matter of fact, analysing the OpenTelemetry traces I've seen another plan printed:


        plan: ExecPlan with 4 nodes:
:SinkNode{}
  :ProjectNode{projection=[bearer.cell, interference-events-samples, __fragment_index, __batch_index, __last_in_fragment, __filename]}
    :FilterNode{filter=((timestamp >= 2017-05-05 06:00:00.000000000) and (timestamp <= 2017-05-05 08:59:49.999999000))}
      :SourceNode{} 

Could it be that the dataset scanner implementation is in fact another Acero plan ? (I have no idea of the implementation  details sorry). If so, we could just use a single plan with the scan node and forget the dataset scanner.

As for the parquet file we are using, the metadata header is as follows:


File Name: data_1661435498522945115.29477.parquet
Version: 2.6
Created By: parquet-cpp-arrow version 9.0.0
Total rows: 470
Number of RowGroups: 1
Number of Real Columns: 12
Number of Columns: 12
Number of Selected Columns: 12
Column 0: whateverthisis (BYTE_ARRAY / String / UTF8)
Column 1: bearer.cell (BYTE_ARRAY / String / UTF8)
Column 2: timestamp (INT64 / Timestamp(isAdjustedToUTC=true, timeUnit=nanoseconds, is_from_converted_type=false, force_set_converted_type=false))
Column 3: bearer.rat (BYTE_ARRAY / String / UTF8)
Column 4: cqi (INT64 / Int(bitWidth=64, isSigned=true) / INT_64)
Column 5: cqi-samples (INT64 / Int(bitWidth=64, isSigned=true) / INT_64)
Column 6: ta (INT64 / Int(bitWidth=64, isSigned=true) / INT_64)
Column 7: ta-samples (INT64 / Int(bitWidth=64, isSigned=true) / INT_64)
Column 8: interference-events-samples (INT64 / Int(bitWidth=64, isSigned=true) / INT_64)
Column 9: poor_signal-events-samples (INT64 / Int(bitWidth=64, isSigned=true) / INT_64)
Column 10: ping_pong-events-samples (INT64 / Int(bitWidth=64, isSigned=true) / INT_64)
Column 11: events-samples (INT64 / Int(bitWidth=64, isSigned=true) / INT_64)
--- Row Group: 0 ---
--- Total Bytes: 44362 ---
--- Total Compressed Bytes: 20399 ---
--- Rows: 470 --- 

As you can see it's only 470 rows, so the execution plan finishes very quickly.

 

I hope this information helps. From our side we'll replace the dataset scanner and record batch reader shenanigans with the scan node to see if the deadlock does not occur or at least have a cleaner code.

 

Best regards,

Pau.

asfimport commented 1 year ago

Pau Garcia Rodriguez: I managed to reproduce it with this simple program:


#include <iostream>
#include <optional>
#include <vector>

#include <arrow/array.h>
#include <arrow/builder.h>
#include <arrow/compute/api.h>
#include <arrow/compute/api_vector.h>
#include <arrow/compute/exec/exec_plan.h>
#include <arrow/record_batch.h>
#include <arrow/result.h>
#include <arrow/status.h>
#include <arrow/util/vector.h>

namespace cp = ::arrow::compute;

template<typename TYPE, typename = typename std::enable_if<arrow::is_number_type<TYPE>::value |                                                                                      arrow::is_boolean_type<TYPE>::value |                                                                                                        arrow::is_temporal_type<TYPE>::value>::type>
arrow::Result<std::shared_ptr<arrow::Array>> GetArrayDataSample(const std::vector<typename TYPE::c_type> &values)
{
  using ArrowBuilderType = typename arrow::TypeTraits<TYPE>::BuilderType;

  ArrowBuilderType builder;

  ARROW_RETURN_NOT_OK(builder.Reserve(values.size()));
  ARROW_RETURN_NOT_OK(builder.AppendValues(values));

  return builder.Finish();
}

class TestBatchReader : public arrow::RecordBatchReader
{
  public:
    TestBatchReader() : exhausted_{false}, schema_(std::make_shared<arrow::Schema>(arrow::FieldVector{arrow::field("a", arrow::int32())}))
    {}

    std::shared_ptr<arrow::Schema> schema() const override
    {
      return schema_;
    }

    arrow::Status ReadNext(std::shared_ptr<arrow::RecordBatch> *batch) override
    {
      if (not exhausted_) 
      {
        exhausted_ = true;
        ARROW_ASSIGN_OR_RAISE(auto data_sample, GetArrayDataSample<arrow::Int32Type>({0}));
        *batch = arrow::RecordBatch::Make(schema_, 1, {data_sample});
      }
      else
      {
        *batch = nullptr;
      }

      return arrow::Status::OK();
    }

  private:
    bool exhausted_;
    std::shared_ptr<arrow::Schema> schema_;
};

arrow::Result<std::function<arrow::Future<std::optional<cp::ExecBatch>>()>>
    makeSourceGenerator(std::shared_ptr<arrow::RecordBatchReader> stream)
{
  ARROW_ASSIGN_OR_RAISE(auto io_executor, arrow::internal::ThreadPool::Make(1));
  ARROW_ASSIGN_OR_RAISE(auto soruceGenerator, cp::MakeReaderGenerator(stream, io_executor.get()));

  return [io_executor, soruceGenerator] { return soruceGenerator(); };
}

arrow::Status test()
{
  auto test_reader = std::make_shared<TestBatchReader>();
  ARROW_ASSIGN_OR_RAISE(auto source_generator, makeSourceGenerator(test_reader));

  auto function_option = std::make_shared<cp::CountOptions>(cp::CountOptions::CountMode::ALL);
  arrow::AsyncGenerator<std::optional<cp::ExecBatch>> sink_gen;

  auto sequence = cp::Declaration::Sequence(
    {
      {"source", cp::SourceNodeOptions{test_reader->schema(), source_generator}},
      {"aggregate", cp::AggregateNodeOptions{{cp::Aggregate{"hash_count", function_option, "a", "total"}}, {"a"}}},
      {"sink", cp::SinkNodeOptions{&sink_gen}}
    }
  );

  ARROW_ASSIGN_OR_RAISE(auto plan, cp::ExecPlan::Make());

  ARROW_RETURN_NOT_OK(sequence.AddToPlan(plan.get()));
  ARROW_RETURN_NOT_OK(plan->Validate());

  std::cout << "Start producing" << std::endl;
  ARROW_RETURN_NOT_OK(plan->StartProducing());

  bool finished = false;
  while (not finished)
  {
    auto future = sink_gen();
    ARROW_ASSIGN_OR_RAISE(auto maybe_batch, future.result());

    finished = maybe_batch.has_value();
  }

  std::cout << "Stop producing" << std::endl;
  plan->StopProducing();

  std::cout << "Waiting to finish..." << std::endl;
  auto future = plan->finished();
  future.Wait();

  std::cout << "Finished" << std::endl;
  return future.status(); 
}

int main ()
{
  bool working = true;
  while (working)
  {
    auto status = test();

    working = status.ok();
    if (not working)
    {
      std::cout << "Test failed: " << status.message() << std::endl;
    }
  }
}

 

The program is just an infinite loop that makes and executes the same simple plan with just a single row of input. Usually after a few iterations the program enters in a deadlock waiting for the plan to finish.

I hope this helps to find the root cause.