apache / iceberg

Apache Iceberg
https://iceberg.apache.org/
Apache License 2.0
6.16k stars 2.14k forks source link

Why call deleteKey for Insert and Update After in Flink BaseDeltaTaskWriter? #11081

Open SML0127 opened 1 week ago

SML0127 commented 1 week ago

Query engine

Flink

Question

Hi there, I sink mysql change event(binlog) to Iceberg table now. When I did some research iceberg code to figure out how it works, I found some ambiguous points.

  public void write(RowData row) throws IOException {
    RowDataDeltaWriter writer = route(row);

    switch (row.getRowKind()) {
      case INSERT:
      case UPDATE_AFTER:
        if (upsert) {
          writer.deleteKey(keyProjection.wrap(row));
        }
        writer.write(row);
        break;

      case UPDATE_BEFORE:
        if (upsert) {
          break; // UPDATE_BEFORE is not necessary for UPSERT, we do nothing to prevent delete one
          // row twice
        }
        writer.delete(row);
        break;
      case DELETE:
        if (upsert) {
          writer.deleteKey(keyProjection.wrap(row));
        } else {
          writer.delete(row);
        }
        break;

      default:
        throw new UnsupportedOperationException("Unknown row kind: " + row.getRowKind());
    }
  }

link: https://github.com/apache/iceberg/blob/7830a3b938a2f7b74ada46c21d14d12a6bb9c6e0/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/sink/BaseDeltaTaskWriter.java#L79-L109

pvary commented 1 week ago

There are 2 different types of CDC streams:

So in upsert mode, an equality delete is always needed.

I hope this helps, Peter

SML0127 commented 1 week ago

@pvary Thx for answer pvary. Now I'm checking some code in ColumnarBatchReader, whether it works as follow:

  1. apply eq delete files first, from oldest snapshot
  2. then apply data file and pos delete files
  3. reapply from step 1 for the next snapshot.

I'm still looking into it so it could be wrong.

ColumnarBatch loadDataToColumnBatch() {
  int numRowsUndeleted = initRowIdMapping();

  ColumnVector[] arrowColumnVectors = readDataToColumnVectors();  

  ColumnarBatch newColumnarBatch = new ColumnarBatch(arrowColumnVectors);  

  newColumnarBatch.setNumRows(numRowsUndeleted);  

  if (hasEqDeletes()) {  
    applyEqDelete(newColumnarBatch);  
  }  

  if (hasIsDeletedColumn && rowIdMapping != null) {  
    // reset the row id mapping array, so that it doesn't filter out the deleted rows  
    for (int i = 0; i < numRowsToRead; i++) {  
      rowIdMapping[i] = i;  
    }  
    newColumnarBatch.setNumRows(numRowsToRead);  
  }  

  return newColumnarBatch;  
}
pvary commented 1 week ago

I'm not sure what is the actual question, but when reading a data file we apply eq deletes until the previous snapshots and apply the pos deletes until the current snapshot

SML0127 commented 1 week ago

@pvary Sorry for the vague question. But thx to your answer, now I understand how data file and delete file works (maybe..?)

That is,

Please check if I understand corrently🙏🙏

pvary commented 1 week ago

Please check if I understand corrently🙏🙏

Your understanding is correct

SML0127 commented 1 week ago

@pvary Thank you for your support. It really helped me a lot. Lastly, I have one more question. I am looking at the code ColumnarBatchReader.java. Where can I see that the equality delete file is applied to the previous snapshot and the pos delete file is applied to the current snapshot? 🙇🏻‍♂️🙇🏻‍♂️🙇🏻‍♂️

pvary commented 6 days ago

@SML0127: The decision is made based on the DeleteFileIndex.java

SML0127 commented 4 days ago

@pvary Thx pvary! I have one question.

As I understand under code, EqualityDeletes.filter() function don't filter delete file that has same seq number with data file.

For example, seq number of delete files = [100,101,102,103,104,105] and seq number of data file 102, then start is 2 and for loop starts from 2 to 5. (for (int index = 2; index < 6; index++).

But as I know, it should be start from 3, not 2. It would appreciate if you could let me know what part I misunderstood.

- An _equality_ delete file must be applied to a data file when all of the following are true:
    - The data file's data sequence number is _strictly less than_ the delete's data sequence number

https://github.com/apache/iceberg/blob/0747b604413fa6d3c663ec850dbe6f50a647ca04/core/src/main/java/org/apache/iceberg/DeleteFileIndex.java#L670-L689

pvary commented 4 days ago

@SML0127: I would suggest to write some unit test, or chose an existing one to test the behaviour. I have not seen complains about this yet.