apache / hudi

Upserts, Deletes And Incremental Processing on Big Data.
https://hudi.apache.org/
Apache License 2.0
5.48k stars 2.44k forks source link

[SUPPORT] Can't read data from kafka in avro format and write them to hudi table #12324

Open langelloantoine opened 1 week ago

langelloantoine commented 1 week ago

Hi guys, i'm trying to sink my data from my kafka (redpanda) topics to hudi table using storage in file format in a first time. i can't understand what is wrong with my sink conf. when i try to sink i have a message saying i can't deserialize message. here is an example of error message : 2024-11-23 21:45:51,310 WARN || Error received while writing records for transaction 20241123214451614 in partition 1 [org.apache.hudi.connect.transaction.ConnectTransactionParticipant] java.lang.ClassCastException: class org.apache.kafka.connect.data.Struct cannot be cast to class org.apache.avro.generic.GenericRecord (org.apache.kafka.connect.data.Struct and org.apache.avro.generic.GenericRecord are in unnamed module of loader 'app') at org.apache.hudi.connect.writers.AbstractConnectWriter.writeRecord(AbstractConnectWriter.java:73) at org.apache.hudi.connect.transaction.ConnectTransactionParticipant.writeRecords(ConnectTransactionParticipant.java:219) at org.apache.hudi.connect.transaction.ConnectTransactionParticipant.processRecords(ConnectTransactionParticipant.java:137) at org.apache.hudi.connect.HoodieSinkTask.put(HoodieSinkTask.java:114) at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:587) at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:336) at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:237) at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:206) at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:204) at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:259) at org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$1(Plugins.java:181) at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539) at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) at java.base/java.lang.Thread.run(Thread.java:833) here is my kafka sink connector conf : { "name": "hudi-sink", "config": { "bootstrap.servers": "redpanda-0:9092", "connector.class": "org.apache.hudi.connect.HoodieSinkConnector", "tasks.max": "6",

    "key.deserializer": "io.confluent.kafka.serializers.KafkaAvroDeserializer",
    "value.deserializer": "io.confluent.kafka.serializers.KafkaAvroDeserializer",
    "key.converter": "io.confluent.connect.avro.AvroConverter",
    "value.converter": "io.confluent.connect.avro.AvroConverter",
    "key.converter.schema.registry.url": "http://redpanda-0:8090",
    "value.converter.schema.registry.url": "http://redpanda-0:8090",
    "key.converter.schemas.enable": "true",
    "value.converter.schemas.enable": "true",

    "topics": "dev.employees.employees",

    "hoodie.write.payload.class": "org.apache.hudi.common.model.HoodieAvroPayload",
    "hoodie.write.payload.serializer.class": "org.apache.hudi.common.model.HoodieAvroPayloadSerializer",
    "hoodie.table.name": "dev_employees_employees",
    "hoodie.table.type": "MERGE_ON_READ",
    "hoodie.base.path": "file:///tmp/hoodie/dev_employees_employees",

    "hoodie.datasource.write.recordkey.field": "payload.after.emp_no",
    "hoodie.datasource.write.partitionpath.field": "payload.after.birth_date",

    "hoodie.schemaprovider.class": "org.apache.hudi.schema.SchemaRegistryProvider",
    "hoodie.streamer.schemaprovider.registry.url": "http://redpanda-0:8090/subjects/dev.employees.employees-value/versions/latest"
}

}

here is my mysql source kafka connect config

{ "name": "mysql-source-connector-employees", "config": { "key.converter": "io.confluent.connect.avro.AvroConverter", "key.converter.schema.registry.url": "http://redpanda-0:8090", "value.converter": "io.confluent.connect.avro.AvroConverter", "value.converter.schema.registry.url": "http://redpanda-0:8090", "value.converter.schemas.enable": "true", "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState", "transforms.unwrap.add.fields": "before,after,op", "transforms.unwrap.drop.tombstone": "false", "transforms.unwrap.operation.header": "true", "connector.class": "io.debezium.connector.mysql.MySqlConnector", "tasks.max": "1", "group.id": "connect-cluster", "database.hostname": "host.docker.internal", "incremental.snapshot.chunk.size": "100000", "max.batch.size": "50000", "max.queue.size": "1000000", "connectionTimeZone": "Europe/Paris", "database.port": "3306", "database.user": "dbz", "database.password": "****", "database.server.id": "5", "database.include.list": "employees", "database.history.kafka.bootstrap.servers": "redpanda-0:9092", "database.history.kafka.topic": "schema-changes.employees", "include.schema.changes": "true", "topic.prefix": "dev", "snapshot.mode": "initial", "schema.history.internal.kafka.bootstrap.servers": "redpanda-0:9092", "schema.history.internal.kafka.topic": "schema-internal-changes.employees", "topic.creation.enable": "true", "topic.creation.default.replication.factor": "3", "topic.creation.default.partitions": "12", "topic.creation.default.cleanup.policy": "compact", "topic.creation.default.compression.type": "lz4", "producer.override.compression.type": "lz4", "offset.flush.timeout.ms": "10000", "offset.flush.interval.ms": "5000", "producer.override.linger.ms": "500", "producer.override.batch.size": "2000", "producer.override.acks": "1", "poll.interval.ms": "50"

} }

im using debezium to ingest and data are stored in avro format in my topics.

Thanks for help and tips, im a newbie to hudi so i'm not aware of everything

Environment Description

rangareddy commented 2 days ago

Hi @langelloantoine

Could you please try adding the value.deserializer.specific.avro.reader=true parameter to the Hudi sink connector?