1)Create a new table, specify the storage format as parquet, and the table structure as Schema { Struct{ int, String }, int ,int, int };
2)Use org.apache.iceberg.io.TaskWriter to write 10,000 rows of data first; then use Struct{int ,String} as the primary key to delete the 10,000 rows of data just written (Here should be equalDelete method);
3)Use org.apache.iceberg.spark.actions.SparkActions#rewriteDataFiles to organize the files.
4)Read the data again, we expecting to read 0 rows of data, but found that 9999 rows of data were read。
2. Reason
1)In iceberg1.5.2,SparkActions rewrite data files by reading and rewriting the data into parquet based on the dataFile and deleteFile of the current snapshot. This process involves equalDelete behavior.
2)Spark equalDelete process needs to read all delete information through DeleteFilter. In iceberg1.5.2, DeleteFilter reads the delete records of DeleteFiles through BaseDeleteLoader, materializing records into memory, and cache them in a Set for subsequent use. In BaseDeleteLoader reads the DeleteFile process, it uses org.apache.iceberg.data.parquet.GenericParquetReaders.RecordReader to read, and the RecordReader reading process is a reused GenericRecord template. Therefore, before materializing it into memory, Record:copy will be executed.
public class BaseDeleteLoader implements DeleteLoader {
...
protected Iterable<StructLike> readEqDeletes(DeleteFile deleteFile, Schema projection) {
CloseableIterable<Record> deletes = openDeletes(deleteFile, projection);
CloseableIterable<Record> copiedDeletes = CloseableIterable.transform(deletes, Record::copy);
CloseableIterable<StructLike> copiedDeletesAsStructs = toStructs(copiedDeletes, projection);
return materialize(copiedDeletesAsStructs);
}
...
// materializes the iterable and releases resources so that the result can be cached
private <T> Iterable<T> materialize(CloseableIterable<T> iterable) {
try (CloseableIterable<T> closeableIterable = iterable) {
return ImmutableList.copyOf(closeableIterable);
} catch (IOException e) {
throw new UncheckedIOException("Failed to close iterable", e);
}
}
...
}
public class GenericParquetReaders extends BaseParquetReaders<Record> {
...
private static class RecordReader extends ParquetValueReaders.StructReader<Record, Record> {
private final GenericRecord template;
>3) The #copy process of GenericRecord is a shallow copy of the internal Object[] value.
public class GenericRecord implements Record, StructLike {
...
public static GenericRecord create(Schema schema) {
return new GenericRecord(schema.asStruct());
}
public static GenericRecord create(StructType struct) {
return new GenericRecord(struct);
}
private final StructType struct;
private final int size;
private final Object[] values;
private final Map<String, Integer> nameToPos;
@Override
public GenericRecord copy() {
return new GenericRecord(this);
}
...
}
> **4)After analyzing this, you may found that since the copy process of GenericRecord is a shallow copy, when the shallow copy of GenericRecord that read from DeleteFile is put into List for equalDelete, the internal values of all GenericRecord elements in List are just a reference, not a deep copy. As the reading of DeleteFile continues, RecordReader will continuously load new data into the GenericRecord template. The value references in all GenericRecord elements in the List will also be constantly modified, that is, after 10,000 lines of Delete information are put into the List, these 10,000 lines of delete information are actually duplicated (equivalent to the last line of record read), so after spark completes the equalDelete process , the data rowNum read is 10000-1=9999 finally.**
> 5)Since DeleteLoader is also used in the equalDelete process of reading parquet table, this data error bug will also appear in the reading process after client use the Struct type as the primary key in equalDelete operation.
#### 3. Solution
**From a personal perspective, changing the copy of GenericRecord from shallow copy to deep copy can solve this bug. This requires adjusting the code of org.apache.iceberg.data.GenericRecord#GenericRecord(GenericRecord toCopy).
Besides, I have verified the feasibility and correctness of this solution locally.**
> Negative impact:
> 1)When the schema contains only basic field types, the deep copy process may have a higher cost than the previous shallow copy. The extent of this influence has not yet been fully demonstrated.
#### 4. Other notes
If the bug is caused by the insufficiency of my demo case, or if the problem has been fixed in the new iceberg version, I hope readers or community workers can tell me. Thank you very much.
### Willingness to contribute
- [ ] I can contribute a fix for this bug independently
- [X] I would be willing to contribute a fix for this bug with guidance from the Iceberg community
- [ ] I cannot contribute a fix for this bug at this time
Apache Iceberg version
1.5.2
Query engine
Other
Please describe the bug 🐞
1. Steps to reproduce the bug:
2. Reason
}
public class GenericRecord implements Record, StructLike { ... public static GenericRecord create(Schema schema) { return new GenericRecord(schema.asStruct()); }
public static GenericRecord create(StructType struct) { return new GenericRecord(struct); }
private final StructType struct; private final int size; private final Object[] values; private final Map<String, Integer> nameToPos;
... private GenericRecord(GenericRecord toCopy) { this.struct = toCopy.struct; this.size = toCopy.size; this.values = Arrays.copyOf(toCopy.values, toCopy.values.length); this.nameToPos = toCopy.nameToPos; } ...
@Override public GenericRecord copy() { return new GenericRecord(this); } ... }