apache / hudi

Upserts, Deletes And Incremental Processing on Big Data.
https://hudi.apache.org/
Apache License 2.0
5.44k stars 2.42k forks source link

[SUPPORT] Flink KryoSerializer cannot be cast to class org.apache.flink.table.runtime.typeutils.AbstractRowDataSerializer #8164

Open geonyeongkim opened 1 year ago

geonyeongkim commented 1 year ago

Describe the problem you faced

Hello. I'm going to get the log data in json format from kafka and create an app that loads it into the hudi table using the hudi stream api.

Operation has been set to BULK_INSERT to load log data.

However, if you set it to BULK_INSERT, the casting problem will occur as follows.

KryoSerializer cannot be cast to class org.apache.flink.table.runtime.typeutils.AbstractRowDataSerializer


This occurs during the opening of the Sort Operator class.

Flink uses Kryo as the default Serializer.

How can I use Sort Operator to perform BULK_INSERT?

Below is my code.

    @JvmStatic
    fun main(args: Array<String>) {
        val env = StreamExecutionEnvironment.getExecutionEnvironment()
        env.enableCheckpointing(5000)

        // kafka source with json format data
        val kafkaSource = KafkaSource.builder<CustomKafkaRecord>()
            .setBootstrapServers(bootstrapServers)
            .setTopics(topic)
            .setGroupId(StatusV2CowApp::class.java.name)
            .setClientIdPrefix(UUID.randomUUID().toString())
            .setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.EARLIEST))
            .setDeserializer(CustomKafkaRecordDeserializationSchema())
            .build()

        // json data to GenericRowData
        val stream: SingleOutputStreamOperator<GenericRowData> =
            env.fromSource(kafkaSource, WatermarkStrategy.noWatermarks(), "hudi_bmt_cow_status_v2_source")
                .map { DpObjectMapper.readValue(it.value!!, StatusV2VO::class.java) }
                .map { it.toStatusV2WithPartitionVO()}
                .map {
                    val dataSize = StatusV2Partition.values().size
                    val row = GenericRowData(dataSize)
                    HudiFieldUtils.rowSetAll(StatusV2Partition::class.java, it, row)
                    row
                }

        // sink Hudi Table
        HudiFieldUtils.addColumnAllNonDelete(StatusV2Partition::class.java, HoodiePipeline.builder("hudi_cow_bmt_status_v2"))
            .pk("key")
            .partition("partition_day", "partition_hour")
            .options(
                mapOf(
                    FlinkOptions.PATH.key() to "hdfs:///user/geonyeong.kim/hudi_bmt/status_v2/cow",
                    // bulk_insert
                    FlinkOptions.OPERATION.key() to WriteOperationType.BULK_INSERT.value(),
                    FlinkOptions.TABLE_TYPE.key() to HoodieTableType.COPY_ON_WRITE.name,
                    FlinkOptions.INDEX_GLOBAL_ENABLED.key() to "false",
                    FlinkOptions.WRITE_BATCH_SIZE.key() to "3072D",
                    FlinkOptions.WRITE_TASK_MAX_SIZE.key() to "4096D",
                    FlinkOptions.WRITE_MERGE_MAX_MEMORY.key() to "3072"
                )
            )
            .sink(stream as DataStream<RowData>, true)

        env.execute()
    }

Environment Description

danny0405 commented 1 year ago

The input element of the SortOperator should be a RowData, because the serializer is hard coded into BinaryRowDataSerializer.

geonyeongkim commented 1 year ago

Hello. I looked at HoodieFlinkStreamer in github and used JsonRowDataDeserializationSchema to troubleshoot SortOperator.

I have a few questions about it.

1. BULK_INSERT

If the operation is set to BULK_INSERT, there will be no error.

However, it only consumes kafka messages and does not actually create parquet files in hdfs.

My code simply writes kafka messages to the hudi table on the hdfs.

@JvmStatic
fun main(args: Array<String>) {
    val env = StreamExecutionEnvironment.getExecutionEnvironment()
    env.enableCheckpointing(5000)

    val props = Configuration()
    props.setString(FlinkOptions.SOURCE_AVRO_SCHEMA, "avro schema")

    val rowType = AvroSchemaConverter.convertToDataType(StreamerUtil.getSourceSchema(props)).logicalType as RowType

    val kafkaSource = KafkaSource.builder<RowData>()
        .setBootstrapServers(bootstrapServers)
        .setTopics(topic)
        .setGroupId(SampleHudiApp::class.java.name)
        .setClientIdPrefix(UUID.randomUUID().toString())
        .setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.EARLIEST))
        .setDeserializer(CustomJsonRowDataDeserializationSchema(
            rowType,
            InternalTypeInfo.of(rowType),
            false,
            true,
            TimestampFormat.ISO_8601
        ))
        .build()
    HoodiePipeline.builder("hudi_test_table")
        .column("id BIGINT")
        .column("name STRING")
        .column("`partition_path` STRING")
        .column("ts BIGINT")
        .column("dc STRING")
        .column("op STRING")
        .pk("id")
        .partition("partition_path")
        .options(mapOf(
            FlinkOptions.PATH.key() to "hdfs:///user/geonyeong.kim/hudi_flink_test",
            FlinkOptions.TABLE_TYPE.key() to HoodieTableType.COPY_ON_WRITE.name,
            FlinkOptions.INDEX_GLOBAL_ENABLED.key() to "false"
        ))
        .sink(env.fromSource(kafkaSource, WatermarkStrategy.noWatermarks(), "hudi_source"), true)
    env.execute("HUDI STREAM SINK")
}

2. BULK_INSERT vs APPEND

I viewed org.apache.hudi.sink.utils.Pipelines class. And i confirmed that BulkInsertWriteFunction class is used for bulk_insert mode and AppendWriteFunction class is used for append mode.

However, if the index type is not Bucket in BulkInsertWriterFunction, BulkInsertWriterHelper is used.

AppendWriteFunction also uses BulkInsertWriterHelper. Then, if the index type is a FLINK_STATE, will the behavior of the two be the same?

3. Compress

I want to use Flink to apply Compress to the COW table when Parquet Write.

In Flink, Hudi Write created HoodieFlinkWriteClient in FlinkWriteClients based on the FlinkOptions value and confirmed that each WriteFunction uses it.

So I overrided the FlinkWriteClients class and added the parquetCompressionCodec("gzip") setting.

image

However, compress was not applied. Is this not applicable in Flink?


I want to actively introduce Hudi. I'd appreciate your help.

geonyeongkim commented 1 year ago

@danny0405 Hello. Could you answer the question above?

danny0405 commented 1 year ago
  1. Did you enable the ckp yet? Flink sink relies the ckp success event for Hudi trasanction commiting;
  2. Both bulk_insert and append_write use the BulkInsertWriterHelper to write the parquet files direcly, there is no UPSERTs, if FLINK_STATE is used, things are very diffrent, the StreamWriteFunction would kick in;
  3. You can just set up the compress options within the Flink SQL options, or the HoodiePipeline#options you have used:

e.g.

create table xxx(
) with (
  'connector' = 'hudi',
  'hoodie.parquet.compression.codec' = 'gzip'
);
HoodiePipeline.builder("xxx")
    .option("hoodie.parquet.compression.codec", "gzip")

The default codec is already gzip, probably that is the reason you do not perceive any difference

geonyeongkim commented 1 year ago

@danny0405 Thank you for your reply.

1. Did you enable the ckp yet? Flink sink relies the ckp success event for Hudi trasanction commiting;

Ckp means checkpoint, right?

As shown in the attached picture, checkpoint is performed normally.

But still no file in hdfs while consuming kafka message.

Moreover, the problem is that we are committing to the kafka broker.

checkpoint image

hdfs directory image

2. Both bulk_insert and append_write use the BulkInsertWriterHelper to write the parquet files direcly, there is no UPSERTs, if FLINK_STATE is used, things are very diffrent, the StreamWriteFunction would kick in;

Then, in case of FLINK_STATE, can you tell me the difference between bulk_insert and append in detail?

3. You can just set up the compress options within the Flink SQL options, or the HoodiePipeline

I tried to restart by adding the settings below as a guide.

HoodiePipeline.builder("xxx")
    .option("hoodie.parquet.compression.codec", "gzip")

However, gzip compression still does not apply.


I know that compression is difficult to apply in the stream associated with Hadoop.

But it's very strange that bulk_insert doesn't work.

danny0405 commented 1 year ago

Then, in case of FLINK_STATE, can you tell me the difference between bulk_insert and append in detail?

Flink state index only works for UPSERT operation, not BULK_INSERT.

But it's very strange that bulk_insert doesn't work.

Bulk insert only works in batch execution mode.