apache / drill

Apache Drill is a distributed MPP query layer for self describing data
https://drill.apache.org/
Apache License 2.0
1.92k stars 985 forks source link

DRILL-8484: HashJoinPOP memory leak is caused by an oom exception wh… #2889

Closed shfshihuafeng closed 3 months ago

shfshihuafeng commented 3 months ago

…en read data from Stream with container

DRILL-8484: HashJoinPOP memory leak is caused by an oom exception when read data from Stream with container

Description

when traversing fieldList druing read data from Stream with container , if the intermediate process throw exception,we can not release previously constructed vectors. it result in memory leak

Documentation

(Please describe user-visible changes similar to what should appear in the Drill documentation.)

Testing

You can add debugging code to reproduce this scenario as following or test tpch like DRILL-8483 (1) debug code

  public void readFromStreamWithContainer(VectorContainer myContainer, InputStream input) throws IOException {
    final VectorContainer container = new VectorContainer();
    final UserBitShared.RecordBatchDef batchDef = UserBitShared.RecordBatchDef.parseDelimitedFrom(input);
    recordCount = batchDef.getRecordCount();
    if (batchDef.hasCarriesTwoByteSelectionVector() && batchDef.getCarriesTwoByteSelectionVector()) {

      if (sv2 == null) {
        sv2 = new SelectionVector2(allocator);
      }
      sv2.allocateNew(recordCount * SelectionVector2.RECORD_SIZE);
      sv2.getBuffer().setBytes(0, input, recordCount * SelectionVector2.RECORD_SIZE);
      svMode = BatchSchema.SelectionVectorMode.TWO_BYTE;
    }
    final List<ValueVector> vectorList = Lists.newArrayList();
    final List<SerializedField> fieldList = batchDef.getFieldList();
    int i = 0;
    for (SerializedField metaData : fieldList) {
      i++;
      final int dataLength = metaData.getBufferLength();
      final MaterializedField field = MaterializedField.create(metaData);
      final DrillBuf buf = allocator.buffer(dataLength);
      ValueVector vector = null;
      try {
        buf.writeBytes(input, dataLength);
        vector = TypeHelper.getNewVector(field, allocator);
      //DEBUG for leak
        if (i == 3) {
          logger.warn("shf test memory except");
          throw new OutOfMemoryException("test memory except");
        }
        vector.load(metaData, buf);
      } catch (Exception e) {
        if (vectorList.size() > 0 ) {
          for (ValueVector valueVector : vectorList) {
            DrillBuf[] buffers = valueVector.getBuffers(false);
            logger.warn("shf leak buffers " + Arrays.asList(buffers));
            // valueVector.clear();
          }
        }
        throw e;
      } finally {
        buf.release();
      }
      vectorList.add(vector);
    }

(2) run following sql (tpch8)

select
o_year,
sum(case when nation = 'CHINA' then volume else 0 end) / sum(volume) as mkt_share
from (
select
extract(year from o_orderdate) as o_year,
l_extendedprice * 1.0 as volume,
n2.n_name as nation
from hive.tpch1s.part, hive.tpch1s.supplier, hive.tpch1s.lineitem, hive.tpch1s.orders, hive.tpch1s.customer, hive.tpch1s.nation n1, hive.tpch1s.nation n2, hive.tpch1s.region
where
p_partkey = l_partkey
and s_suppkey = l_suppkey
and l_orderkey = o_orderkey
and o_custkey = c_custkey
and c_nationkey = n1.n_nationkey
and n1.n_regionkey = r_regionkey
and r_name = 'ASIA'
and s_nationkey = n2.n_nationkey
and o_orderdate between date '1995-01-01'
and date '1996-12-31'
and p_type = 'LARGE BRUSHED BRASS') as all_nations
group by o_year
order by o_year;

(3) you find memory leak ,but there is no sql

image
cgivre commented 3 months ago

@shfshihuafeng Can you please resolve merge conflicts.

shfshihuafeng commented 3 months ago

@shfshihuafeng Can you please resolve merge conflicts.

it is done