deephaven / deephaven-core

Deephaven Community Core
Other
255 stars 81 forks source link

Implement more efficient shift processing for column sources #6161

Open rcaudy opened 2 weeks ago

rcaudy commented 2 weeks ago

(This is from a discussion in community Slack.)

I think we can hybridize the approach used in update with the one used in update_by.

Something like this, maybe:

            if (upstream.shifted().nonempty()) {
                // Shift the non-redirected output sources now, after parallelPopulation.
                final int chunkSize = (int) Math.min(ArrayBackedColumnSource.BLOCK_SIZE, aggRecorder.getParent().size());
                try (final ChunkSource.FillContext srcContext = columnCopy.makeFillContext(chunkSize);
                     final ChunkSink.FillFromContext destContext = columnCopy.makeFillFromContext(chunkSize);
                     final WritableChunk<Values> chunk = columnCopy.getChunkType().makeWritableChunk(chunkSize);
                     final RowSet prevMinusRemoves = aggRecorder.getParent().getRowSet().prev().minus(upstream.removed())) {
                    upstream.shifted().apply((begin, end, delta) -> {
                        try (final RowSet preShiftKeys = prevMinusRemoves.subSetByKeyRange(begin, end);
                             final RowSequence.Iterator preShiftKeysIter = preShiftKeys.getRowSequenceIterator();
                             final RowSet postShiftKeys = preShiftKeys.shift(delta);
                             final RowSequence.Iterator postShiftKeysIter = postShiftKeys.getRowSequenceIterator()) {
                            while (preShiftKeysIter.hasMore()) {
                                final RowSequence preShiftKeysSlice = preShiftKeysIter.getNextRowSequenceWithLength(chunkSize);
                                final RowSequence postShiftKeysSlice = postShiftKeysIter.getNextRowSequenceWithLength(chunkSize);
                                columnCopy.fillPrevChunk(srcContext, chunk, preShiftKeysSlice);
                                columnCopy.fillFromChunk(destContext, chunk, postShiftKeysSlice);
                            }
                        }
                    });
                }
            }

For whatever reason we have two parallel tools for the null-filling. WritableColumnSource.setNull(RowSequence), and io.deephaven.engine.table.impl.util.ChunkUtils#fillWithNullValue. I slightly prefer the implementation in the second, which of course suggests that it should be the implementation of the first. The easier way is actually to do the null filling at the end, to minimize the work. You don’t want to null-fill as you shift, since you may be null-filling slots that are going to be overwritten anyway. Really, just take the previous row set minus the current row set (final WritableRowSet toClear = resultRowSet.prev().minus(resultRowSet);), and null-fill that.

rcaudy commented 2 weeks ago

User supplied this version of same:

    TableUpdate aggUpdates = this.aggRecorder.getUpdate();
    if (aggUpdates == null) {
      aggUpdates = new TableUpdateImpl(RowSetFactory.empty(), RowSetFactory.empty(), RowSetFactory.empty(),
          RowSetShiftData.EMPTY, ModifiedColumnSet.EMPTY);
    }

    // We need to ensure we have enough capacity (not size) to handle up to the very
    // last row key in our parent.
    this.columnCopy.ensureCapacity(this.aggRecorder.getParent().getRowSet().lastRowKey() + 1);

    // Null out any removed keys.
    final TrackingRowSet resultRowSet = aggRecorder.getParent().getRowSet();
    try (final WritableRowSet toClear = resultRowSet.prev().minus(resultRowSet)) {
      ChunkUtils.fillWithNullValue(this.columnCopy, toClear);
    }

    // Handle shifts
    if (aggUpdates.shifted().nonempty()) {
      // Shift the non-redirected output sources now, after parallelPopulation.
      final int chunkSize = (int) Math.min(ArrayBackedColumnSource.BLOCK_SIZE, aggRecorder.getParent().size());
      try (final ChunkSource.FillContext srcContext = columnCopy.makeFillContext(chunkSize);
          final ChunkSink.FillFromContext destContext = columnCopy.makeFillFromContext(chunkSize);
          final WritableChunk<Values> chunk = columnCopy.getChunkType().makeWritableChunk(chunkSize);
          final RowSet prevMinusRemoves = aggRecorder.getParent().getRowSet().prev().minus(aggUpdates.removed())) {
        aggUpdates.shifted().apply((begin, end, delta) -> {
          try (final RowSet preShiftKeys = prevMinusRemoves.subSetByKeyRange(begin, end);
              final RowSequence.Iterator preShiftKeysIter = preShiftKeys.getRowSequenceIterator();
              final RowSet postShiftKeys = preShiftKeys.shift(delta);
              final RowSequence.Iterator postShiftKeysIter = postShiftKeys.getRowSequenceIterator()) {
            while (preShiftKeysIter.hasMore()) {
              final RowSequence preShiftKeysSlice = preShiftKeysIter.getNextRowSequenceWithLength(chunkSize);
              final RowSequence postShiftKeysSlice = postShiftKeysIter.getNextRowSequenceWithLength(chunkSize);
              columnCopy.fillPrevChunk(srcContext, chunk, preShiftKeysSlice);
              columnCopy.fillFromChunk(destContext, chunk, postShiftKeysSlice);
            }
          }
        });
      }
    }

    // Copy in any adds/modifies from our upstream to our custom column source.
    try (final RowSet added = aggUpdates.added().union(aggUpdates.modified())) {
      ChunkUtils.copyData(this.column, added, this.columnCopy, added, false);
    }

This had two flaws:

  1. You need to do the null filling after you apply the shifts.
  2. Oh, and the way you have this written, you can remove the mods from the row set to shift.
        final RowSet prevMinusRemoves = aggRecorder.getParent().getRowSet().prev().minus(aggUpdates.removed());
        final RowSet prevMinusRemovesMinusMods = prevMinusRemoves.minus(aggUpdates.getModifiedPreShift())) {
            aggUpdates.shifted().apply((begin, end, delta) -> {
                        try (final RowSet preShiftKeys = prevMinusRemovesMinusMods.subSetByKeyRange(begin, end);