apache / arrow

Apache Arrow is a multi-language toolbox for accelerated data interchange and in-memory processing
https://arrow.apache.org/
Apache License 2.0
14.12k stars 3.44k forks source link

HashJoin Operation, need help performing concat of record batches #33834

Open ava6969 opened 1 year ago

ava6969 commented 1 year ago

Describe the usage question you have. Please include as many useful details as possible.

So I implemented a very inefficient operation to con-cat 2 or more record batches that all individually have an array holding their index. I have been reading about hash operations. I need help improving my current implementation and using arrow functions.

pd::DataFrame Concatenator::concatenateColumns(
    bool intersect,
    bool ignore_index,
    bool sort)
{
    auto newIndexes = mergeIndexes(makeJoinIndexes(objs, AxisType::Columns), intersect);
    const size_t numRows = newIndexes->length();

    if (sort)
    {
        auto sort_indices = ReturnOrThrowOnFailure(arrow::compute::SortIndices(
            newIndexes,
            arrow::compute::SortOptions{}));

        newIndexes =
            arrow::compute::Take(newIndexes, sort_indices)->make_array();
    }

    std::vector<size_t> index_offset;
    index_offset.reserve(objs.size());
    auto newColumnLength = accumulate(
        objs.begin(),
        objs.end(),
        0UL,
        [&index_offset](size_t total, DataFrame const& df)
        {
            index_offset.push_back(total);
            return total + df.num_columns();
        });

    arrow::FieldVector fieldVector(newColumnLength);
    arrow::ArrayDataVector arrayVectors(newColumnLength);

    for(size_t i = 0UL; i < objs.size(); i++)
    {
        const auto& df = objs[i];
        auto schema = df.array()->schema();
        auto df_index = df.index();
        auto fields = schema->fields();
        auto offset = index_offset[i];
        for (size_t j = 0UL; j < fields.size(); j++)
        {
            std::shared_ptr<arrow::Field> const& columnPerDF = fields[j];
            auto col_name = columnPerDF->name();
            auto array = df.m_array->GetColumnByName(col_name);
            auto array_data = array->data();

            if (not array->Equals(newIndexes))
            {
                auto null = arrow::MakeNullScalar(columnPerDF->type());

                arrow::ScalarVector scalars(newIndexes->length(), null);

                for (int k = 0; k < newIndexes->length(); k++)
                {
                    auto idx = newIndexes->GetScalar(k).MoveValueUnsafe();
                    auto result = df_index.index(idx);
                    if (result != -1)
                    {
                        scalars[k] = array->GetScalar(result).MoveValueUnsafe();
                    }
                }
                ASSIGN_OR_ABORT(
                    auto builder,
                    arrow::MakeBuilder(columnPerDF->type()));

                ABORT_NOT_OK(builder->AppendScalars(scalars));

                ABORT_NOT_OK(builder->FinishInternal(&array_data));
            }

            auto flat_index = offset + j;
            fieldVector[flat_index] = ignore_index ?
                arrow::field(
                    std::to_string(flat_index),
                    columnPerDF->type()) :
                columnPerDF;

            arrayVectors[flat_index] = array_data;
        }
    }

    return { arrow::schema(fieldVector),
             static_cast<int64_t>(numRows),
             arrayVectors,
             newIndexes };

}

This is fast as expected but it loses information about the index of each batch

pd::DataFrame concatColumnsUnsafe(std::vector<pd::DataFrame> const& objs)
{
    auto df = objs.at(0).array();
    auto N = df->num_columns();

    for (int i = 1; i < objs.size(); i++)
    {
        for (auto const& field : objs[i].array()->schema()->fields())
        {
            auto result =
                df->AddColumn(N++, field, objs[i][field->name()].m_array);
            if (result.ok())
            {
                df = result.MoveValueUnsafe();
            }
            else
            {
                throw std::runtime_error(result.status().ToString());
            }
        }
    }
    return { df, objs.at(0).indexArray() };
}

Component(s)

C++

westonpace commented 1 year ago

There is a hash-join operation in Acero if you want to use that (note, this example uses some convenience utilities that were only added in 11.0.0, the operation is possible in 10.0.0 or earlier but more complex):

  cp::Declaration left{"table_source", cp::TableSourceNodeOptions{left_table}};
  cp::Declaration right{"table_source", cp::TableSourceNodeOptions{right_table}};

  cp::HashJoinNodeOptions join_opts{
      cp::JoinType::INNER,
      /*left_keys=*/{"index"},
      /*right_keys=*/{"index"}, cp::literal(true), "l_", "r_"};

  cp::Declaration hashjoin{
      "hashjoin", {std::move(left), std::move(right)}, std::move(join_opts)};

  ARROW_ASSIGN_OR_RAISE(std::shared_ptr<Table> joined_table, cp::DeclarationToTable(std::move(hashjoin));

A full working example is in https://github.com/apache/arrow/blob/master/cpp/examples/arrow/execution_plan_documentation_examples.cc