apache / iceberg

Apache Iceberg
https://iceberg.apache.org/
Apache License 2.0
6.49k stars 2.24k forks source link

Serialization of the org.apache.iceberg.io.WriteResult class. #10710

Open xavifeds8 opened 4 months ago

xavifeds8 commented 4 months ago

Query engine

I am using Apache Flink version 1.16.

Question

Currently i am unable to fetch the TypeInformation of this org.apache.iceberg.io.WriteResult. When using the Iceberg's FlinkSink in Iceberg stream sink. For the performance reason i have disabled the kryo serialization env.getConfig().disableGenericTypes();

When executing the program i am currently getting exception. Exception in thread "main" java.lang.UnsupportedOperationException: Generic types have been disabled in the ExecutionConfig and type org.apache.iceberg.io.WriteResult is treated as a generic type. at org.apache.flink.api.java.typeutils.GenericTypeInfo.createSerializer(GenericTypeInfo.java:87) at org.apache.flink.streaming.api.graph.StreamGraph.createSerializer(StreamGraph.java:1037) at org.apache.flink.streaming.api.graph.StreamGraph.addOperator(StreamGraph.java:427) at org.apache.flink.streaming.api.graph.StreamGraph.addOperator(StreamGraph.java:399)

simonykq commented 4 months ago

+1

pvary commented 4 months ago

With a well configured IcebergSink, the number of WriteResults are quite low compared to the number of records, we did not spent the resources on writing the serializer/deserializer on them. Also WriteResult contains DataFiles which is also a complicated object, so it seemed like a serious effort for relatively low gains.

Did you see some performance issues which could be solved by this?

ms1111 commented 4 months ago

disableGenericTypes() is useful for catching missing type information during development. (Not necessarily about WriteResult, but for any other user type that's used in a job.)

As it stands, with the Iceberg sink, you wouldn't be able to use disableGenericTypes(), so changes in user code that accidentally introduce generic serialization, could be missed.

simonykq commented 1 month ago

@pvary I looked a bit into the implementation around WriteResult.class. For DataFiles and DeleteFiles, it seems to use the org.apache.iceberg.BaseFile as the implementation. Could you make the following changes so that it will get serialized using Pojo seralization as documented in Flink (https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/fault-tolerance/serialization/types_serialization/#pojos) ?

For the CharSequence[], I guess you could just serialize it as array of String?

  1. Make that class and all subclasses (GenericDataFile and GenericDeleteFile) public
  2. Create a public constructor with no args
  3. Make sure all non-static and non-transient fields in those classes are non-final and create a getter setter for them following Java bean convension

I believe if you do that, the WriteResult.class would be treated as Pojo and therefore kicked in flink's PojoTypeInfo?

pvary commented 1 month ago

If we make these classes public, they would become part of the Iceberg core api. Also they would not conform the coding guides. So it is not an option to make them serialzable this way. We need to write our own serializers, if we want to do this.

simonykq commented 1 month ago

@pvary Hi, thanks for your reply.

Not sure if I understand this correctly, but if you want to write your own serializer, I believe you need to do:

see.getConfig().registerTypeWithKryoSerializer(WriteResult.class, YourWriteResultSerializer.class);

which I believe would still need to enable generics to take effect no?

pvary commented 1 month ago

In Flink, it is possible to create a new type, like:

class WriteResultType extends TypeInformation<WriteResult>

This can implement the createSerializer method, ike:

public TypeSerializer<WriteResult> createSerializer(SerializerConfig serializerConfig) {

And this type can be used like returns(new WriteResultType()).

This way the user can specify the type/serializer which is used for the stream. This is far from easy, it is doable. Also this serializer would be a nice addition, so we could control/understand what is travelling on the wire, and avoid or highlight potential situations when the serialised data changes. Also we should respect the Iceberg community's decision about the layout of the core API.

simonykq commented 3 weeks ago

@pvary

Thanks for the info. Very informative.

I am more of less thinking of letting flink runtime treat it as a Pojo types using PojoTypeInfo. Something like this:

@TypeInfo(WriteResult.WriteResultTypeInfoFactory.class)
public class WriteResult {

  private DataFile[] dataFiles;
  private DeleteFile[] deleteFiles;
  private CharSequence[] referencedDataFiles;

    public static class WriteResultTypeInfoFactory extends TypeInfoFactory<WriteResult> {
        @Override
        public TypeInformation<WriteResult> createTypeInfo(Type t,
                Map<String, TypeInformation<?>> genericParameters) {
            return Types.POJO(WriteResult.class, Map.of(
                    "dataFiles", Types.OBJECT_ARRAY(TypeInformation.of(BaseFile.class)),
                    "deleteFiles", Types.OBJECT_ARRAY(TypeInformation.of(BaseFile.class)),
                    "referencedDataFiles", Types.OBJECT_ARRAY(Types.STRING)));
        }
    }

}

And then maybe doing something similiar for BaseFile.class. Is that feasible?

pvary commented 2 weeks ago

This would mean, that the iceberg core (contains WriteResult) need to depend on Flink jars... this is also not something which is good.

Another question (I'm not familiar with this type of serialization) - how this handles inheritance?

simonykq commented 2 weeks ago

Another question (I'm not familiar with this type of serialization) - how this handles inheritance?

Do you mean how to write a instance of TypeInfoFactory if the target class inherit from another class? Not really sure what you mean if you could clarify

pvary commented 2 weeks ago

Another question (I'm not familiar with this type of serialization) - how this handles inheritance?

Do you mean how to write a instance of TypeInfoFactory if the target class inherit from another class? Not really sure what you mean if you could clarify

Yes, the DataFile and the DeleteFile are interfaces. Is it handled by this serialization?

simonykq commented 2 weeks ago

I believe if you give the correct fields that are actually contained in DataFile and DeleteFile (seems to be extends from BaseFile from core library), then it should work, for example:

@TypeInfo(DataFile.BaseFileTypeInfoFactory.class)
public class DataFile {
   ... 
   ...
   ...

  public static class BaseFileTypeInfoFactory extends TypeInfoFactory<DataFile> { 

           @Override
        public TypeInformation<DataFile> createTypeInfo(Type t,
                Map<String, TypeInformation<?>> genericParameters) {

            Map<String, TypeInformation<?>> types = new HashMap<>();

            types.put("fromProjectionPos", Types.PRIMITIVE_ARRAY(Types.INT));
            types.put("fileOrdinal", Types.LONG);
            types.put("partitionSpecId", Types.INT);
            types.put("content", Types.ENUM(FileContent.class));
            types.put("filePath", Types.STRING);
            types.put("format", Types.ENUM(FileFormat.class));
            types.put("recordCount", Types.LONG);
            types.put("fileSizeInBytes", Types.LONG);
            types.put("dataSequenceNumber", Types.LONG);
            types.put("fileSequenceNumber", Types.LONG);
            types.put("columnSizes", Types.MAP(Types.INT, Types.LONG));
            types.put("valueCounts", Types.MAP(Types.INT, Types.LONG));
            types.put("nullValueCounts", Types.MAP(Types.INT, Types.LONG));
            types.put("nanValueCounts", Types.MAP(Types.INT, Types.LONG));
            types.put("lowerBounds", Types.MAP(Types.INT, Types.PRIMITIVE_ARRAY(Types.BYTE)));
            types.put("upperBounds", Types.MAP(Types.INT, Types.PRIMITIVE_ARRAY(Types.BYTE)));
            types.put("splitOffsets", Types.PRIMITIVE_ARRAY(Types.LONG));
            types.put("equalityIds", Types.PRIMITIVE_ARRAY(Types.INT));
            types.put("keyMetadata", Types.PRIMITIVE_ARRAY(Types.BYTE));

            return Types.POJO(DataFile.class, types);
        }

  }

}

just to give an example

simonykq commented 2 weeks ago

If you can not touch DataFile or DeleteFile, or WriteResult, you could also register it using flink config:

pipeline.serialization-config:
  -  org.apache.iceberg.io.WriteResult: {type: typeinfo, class: org.apache.iceberg.io.WriteResultSerializer}

So that this way, you do not need to touch WriteResult.class to add the annotation on the class level

simonykq commented 2 weeks ago

Btw, I found a way to get this to work (without enabling generic types, but still use kyro to serialize the write result ender the hood).

First create a class called WriteResultTypeInformation:

import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.typeinfo.TypeInfoFactory;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer;
import org.apache.iceberg.io.WriteResult;

import java.lang.reflect.Type;
import java.util.Map;

public class WriteResultTypeInformation extends TypeInfoFactory<WriteResult> {

    @Override
    public TypeInformation<WriteResult> createTypeInfo(Type t,
            Map<String, TypeInformation<?>> genericParameters) {

        return new TypeInformation<>() {
            @Override
            public boolean isBasicType() {
                return false;
            }

            @Override
            public boolean isTupleType() {
                return false;
            }

            @Override
            public int getArity() {
                return 3;
            }

            @Override
            public int getTotalFields() {
                return 3;
            }

            @Override
            public Class<WriteResult> getTypeClass() {
                return WriteResult.class;
            }

            @Override
            public boolean isKeyType() {
                return false;
            }

            @Override
            public TypeSerializer<WriteResult> createSerializer(ExecutionConfig config) {
                return new KryoSerializer<>(this.getTypeClass(), config);

            }

            @Override
            public String toString() {
                return null;
            }

            @Override
            public boolean equals(Object obj) {
                return false;
            }

            @Override
            public int hashCode() {
                return 0;
            }

            @Override
            public boolean canEqual(Object obj) {
                return false;
            }
        };

    }

}

and then in the config, put:

pipeline.serialization-config:
  -  org.apache.iceberg.io.WriteResult: {type: typeinfo, class: org.apache.iceberg.io.WriteResultSerializer}

or programatically:

  Configuration config = new Configuration();
  config.set(PipelineOptions.SERIALIZATION_CONFIG,
          List.of("org.apache.iceberg.io.WriteResult: {type: typeinfo, class: org.apache.iceberg.io.WriteResultSerializer}"));
  see = StreamExecutionEnvironment.getExecutionEnvironment(config);
pvary commented 2 weeks ago

I think we could add the typeinformation when defining the streams

simonykq commented 1 week ago

@pvary yeah, if you have the WriteResultTypeInformation I wrote above, and then you could just add that typeinformation into the place where you build the stream, or if someone don't want to change the source code, he/she will need to do so through config, either way is fine

pvary commented 1 week ago

I don't think that the config solution would be something which we would like to support in the Iceberg connector. Setting it when creating the operators and connecting the during the Sink generation would be fine - it could work for the old FlinkSink. I'm not sure how it would work with the SinkV2 (IcebergSink).

I still have my doubts that the complexity would worth the performance gains.