apache / hudi

Upserts, Deletes And Incremental Processing on Big Data.
https://hudi.apache.org/
Apache License 2.0
5.36k stars 2.42k forks source link

[SUPPORT] Flink Incremental read task use 'payload.class' configure does not work #10351

Open punish-yh opened 10 months ago

punish-yh commented 10 months ago

Describe the problem you faced

In Flink task,I setup payload.class as customPayload, but it does not work.

How should I configure custom payload in flink read task instead of change flink write task, and does not change hoodie.compaction.payload.class configure? just only change read task payload logical.

To Reproduce

create Flink sql table:

CREATE TEMPORARY TABLE IF NOT EXISTS t1
(
`id` BIGINT,
`name` STRING,
PRIMARY KEY(id) NOT ENFORCED
)
WITH(
'hoodie.datasource.write.payload.class'='xxxx.CustomPayload',
'payload.class'='xxxx.CustomPayload',
'metadata.enabled'='false',
'compaction.async.enabled'='false',
'hoodie.datasource.query.type'='incremental',
'connector'='hudi',
'index.type'='BUCKET',
'hoodie.bucket.index.num.buckets'='8',
'read.data.skipping.enabled'='true',
'read.streaming.enabled'='false',
'path'='hdfs://xxxx/path',
'read.tasks'='4',
'read.start-commit'='20231212004636502',
'read.end-commit'='20231218001637223',
'table.type'='MERGE_ON_READ',
'compaction.schedule.enabled'='false',
'changelog.enabled'='true',
'hoodie.bucket.index.hash.field'='id'
)

hoodie.properties:

hoodie.table.precombine.field=_time
hoodie.datasource.write.drop.partition.columns=false
hoodie.table.type=MERGE_ON_READ
hoodie.archivelog.folder=archived
hoodie.table.cdc.enabled=false
hoodie.compaction.payload.class=org.apache.hudi.common.model.EventTimeAvroPayload
hoodie.timeline.layout.version=1
hoodie.table.version=5
hoodie.table.recordkey.fields=id
hoodie.datasource.write.partitionpath.urlencode=false
hoodie.table.name=t1
hoodie.table.keygenerator.class=org.apache.hudi.keygen.NonpartitionedAvroKeyGenerator
hoodie.compaction.record.merger.strategy=eeb8d96f-b1e4-49fd-bbf8-28ac514178e5
....

Expected behavior

maybe it should use custom payload merge record and output, instead of use EventTimeAvroPayload.

Environment Description

Additional context

Add any other context about the problem here.

Stacktrace

Add the stacktrace of the error.

danny0405 commented 10 months ago

It should work if the payload are not merged by the writer, otherwise the writer just takes the onus of merging.