Closed Toroidals closed 2 months ago
@Toroidals Thanks for the reporting, it would be very helpful if you can also paste the detailed error stack trace.
@Toroidals Thanks for the reporting, it would be very helpful if you can also paste the detailed error stack trace.
java.io.IOException: Failed to deserialize consumer record due to
at org.apache.flink.connector.kafka.source.reader.KafkaRecordEmitter.emitRecord(KafkaRecordEmitter.java:56) ~[hand_flink_kafka_to_hudi-1.0-SNAPSHOT.jar:?]
at org.apache.flink.connector.kafka.source.reader.KafkaRecordEmitter.emitRecord(KafkaRecordEmitter.java:33) ~[hand_flink_kafka_to_hudi-1.0-SNAPSHOT.jar:?]
at org.apache.flink.connector.base.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:143) ~[flink-connector-base-1.15.2.jar:1.15.2]
at org.apache.flink.streaming.api.operators.SourceOperator.emitNext(SourceOperator.java:385) ~[flink-streaming-java-1.15.2.jar:1.15.2]
at org.apache.flink.streaming.runtime.io.StreamTaskSourceInput.emitNext(StreamTaskSourceInput.java:68) ~[flink-streaming-java-1.15.2.jar:1.15.2]
at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65) ~[flink-streaming-java-1.15.2.jar:1.15.2]
at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:519) ~[flink-streaming-java-1.15.2.jar:1.15.2]
at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:203) ~[flink-streaming-java-1.15.2.jar:1.15.2]
at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:804) ~[flink-streaming-java-1.15.2.jar:1.15.2]
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:753) ~[flink-streaming-java-1.15.2.jar:1.15.2]
at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:948) ~[flink-dist-1.15.2.jar:1.15.2]
at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:927) ~[flink-dist-1.15.2.jar:1.15.2]
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:741) ~[flink-dist-1.15.2.jar:1.15.2]
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563) ~[flink-dist-1.15.2.jar:1.15.2]
at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_211]
Caused by: org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: Could not forward element to next operator
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:99) ~[flink-streaming-java-1.15.2.jar:1.15.2]
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:57) ~[flink-streaming-java-1.15.2.jar:1.15.2]
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29) ~[flink-streaming-java-1.15.2.jar:1.15.2]
at org.apache.flink.streaming.runtime.tasks.BroadcastingOutputCollector.collect(BroadcastingOutputCollector.java:77) ~[flink-streaming-java-1.15.2.jar:1.15.2]
at org.apache.flink.streaming.runtime.tasks.BroadcastingOutputCollector.collect(BroadcastingOutputCollector.java:32) ~[flink-streaming-java-1.15.2.jar:1.15.2]
at org.apache.flink.streaming.runtime.tasks.SourceOperatorStreamTask$AsyncDataOutputToOutput.emitRecord(SourceOperatorStreamTask.java:313) ~[flink-streaming-java-1.15.2.jar:1.15.2]
at org.apache.flink.streaming.api.operators.source.SourceOutputWithWatermarks.collect(SourceOutputWithWatermarks.java:110) ~[flink-streaming-java-1.15.2.jar:1.15.2]
at org.apache.flink.connector.kafka.source.reader.KafkaRecordEmitter$SourceOutputWrapper.collect(KafkaRecordEmitter.java:67) ~[hand_flink_kafka_to_hudi-1.0-SNAPSHOT.jar:?]
at org.apache.flink.api.common.serialization.DeserializationSchema.deserialize(DeserializationSchema.java:84) ~[flink-core-1.15.2.jar:1.15.2]
at org.apache.flink.connector.kafka.source.reader.deserializer.KafkaValueOnlyDeserializationSchemaWrapper.deserialize(KafkaValueOnlyDeserializationSchemaWrapper.java:51) ~[hand_flink_kafka_to_hudi-1.0-SNAPSHOT.jar:?]
at org.apache.flink.connector.kafka.source.reader.KafkaRecordEmitter.emitRecord(KafkaRecordEmitter.java:53) ~[hand_flink_kafka_to_hudi-1.0-SNAPSHOT.jar:?]
... 14 more
Caused by: org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: Could not forward element to next operator
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:99) ~[flink-streaming-java-1.15.2.jar:1.15.2]
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:57) ~[flink-streaming-java-1.15.2.jar:1.15.2]
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29) ~[flink-streaming-java-1.15.2.jar:1.15.2]
at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:56) ~[flink-streaming-java-1.15.2.jar:1.15.2]
at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:29) ~[flink-streaming-java-1.15.2.jar:1.15.2]
at org.apache.flink.streaming.api.operators.StreamFilter.processElement(StreamFilter.java:39) ~[flink-streaming-java-1.15.2.jar:1.15.2]
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:82) ~[flink-streaming-java-1.15.2.jar:1.15.2]
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:57) ~[flink-streaming-java-1.15.2.jar:1.15.2]
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29) ~[flink-streaming-java-1.15.2.jar:1.15.2]
at org.apache.flink.streaming.runtime.tasks.BroadcastingOutputCollector.collect(BroadcastingOutputCollector.java:77) ~[flink-streaming-java-1.15.2.jar:1.15.2]
at org.apache.flink.streaming.runtime.tasks.BroadcastingOutputCollector.collect(BroadcastingOutputCollector.java:32) ~[flink-streaming-java-1.15.2.jar:1.15.2]
at org.apache.flink.streaming.runtime.tasks.SourceOperatorStreamTask$AsyncDataOutputToOutput.emitRecord(SourceOperatorStreamTask.java:313) ~[flink-streaming-java-1.15.2.jar:1.15.2]
at org.apache.flink.streaming.api.operators.source.SourceOutputWithWatermarks.collect(SourceOutputWithWatermarks.java:110) ~[flink-streaming-java-1.15.2.jar:1.15.2]
at org.apache.flink.connector.kafka.source.reader.KafkaRecordEmitter$SourceOutputWrapper.collect(KafkaRecordEmitter.java:67) ~[hand_flink_kafka_to_hudi-1.0-SNAPSHOT.jar:?]
at org.apache.flink.api.common.serialization.DeserializationSchema.deserialize(DeserializationSchema.java:84) ~[flink-core-1.15.2.jar:1.15.2]
at org.apache.flink.connector.kafka.source.reader.deserializer.KafkaValueOnlyDeserializationSchemaWrapper.deserialize(KafkaValueOnlyDeserializationSchemaWrapper.java:51) ~[hand_flink_kafka_to_hudi-1.0-SNAPSHOT.jar:?]
at org.apache.flink.connector.kafka.source.reader.KafkaRecordEmitter.emitRecord(KafkaRecordEmitter.java:53) ~[hand_flink_kafka_to_hudi-1.0-SNAPSHOT.jar:?]
... 14 more
Caused by: org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: Could not forward element to next operator
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:99) ~[flink-streaming-java-1.15.2.jar:1.15.2]
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:57) ~[flink-streaming-java-1.15.2.jar:1.15.2]
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29) ~[flink-streaming-java-1.15.2.jar:1.15.2]
at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:56) ~[flink-streaming-java-1.15.2.jar:1.15.2]
at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:29) ~[flink-streaming-java-1.15.2.jar:1.15.2]
at org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:38) ~[flink-streaming-java-1.15.2.jar:1.15.2]
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:82) ~[flink-streaming-java-1.15.2.jar:1.15.2]
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:57) ~[flink-streaming-java-1.15.2.jar:1.15.2]
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29) ~[flink-streaming-java-1.15.2.jar:1.15.2]
at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:56) ~[flink-streaming-java-1.15.2.jar:1.15.2]
at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:29) ~[flink-streaming-java-1.15.2.jar:1.15.2]
at org.apache.flink.streaming.api.operators.StreamFilter.processElement(StreamFilter.java:39) ~[flink-streaming-java-1.15.2.jar:1.15.2]
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:82) ~[flink-streaming-java-1.15.2.jar:1.15.2]
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:57) ~[flink-streaming-java-1.15.2.jar:1.15.2]
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29) ~[flink-streaming-java-1.15.2.jar:1.15.2]
at org.apache.flink.streaming.runtime.tasks.BroadcastingOutputCollector.collect(BroadcastingOutputCollector.java:77) ~[flink-streaming-java-1.15.2.jar:1.15.2]
at org.apache.flink.streaming.runtime.tasks.BroadcastingOutputCollector.collect(BroadcastingOutputCollector.java:32) ~[flink-streaming-java-1.15.2.jar:1.15.2]
at org.apache.flink.streaming.runtime.tasks.SourceOperatorStreamTask$AsyncDataOutputToOutput.emitRecord(SourceOperatorStreamTask.java:313) ~[flink-streaming-java-1.15.2.jar:1.15.2]
at org.apache.flink.streaming.api.operators.source.SourceOutputWithWatermarks.collect(SourceOutputWithWatermarks.java:110) ~[flink-streaming-java-1.15.2.jar:1.15.2]
at org.apache.flink.connector.kafka.source.reader.KafkaRecordEmitter$SourceOutputWrapper.collect(KafkaRecordEmitter.java:67) ~[hand_flink_kafka_to_hudi-1.0-SNAPSHOT.jar:?]
at org.apache.flink.api.common.serialization.DeserializationSchema.deserialize(DeserializationSchema.java:84) ~[flink-core-1.15.2.jar:1.15.2]
at org.apache.flink.connector.kafka.source.reader.deserializer.KafkaValueOnlyDeserializationSchemaWrapper.deserialize(KafkaValueOnlyDeserializationSchemaWrapper.java:51) ~[hand_flink_kafka_to_hudi-1.0-SNAPSHOT.jar:?]
at org.apache.flink.connector.kafka.source.reader.KafkaRecordEmitter.emitRecord(KafkaRecordEmitter.java:53) ~[hand_flink_kafka_to_hudi-1.0-SNAPSHOT.jar:?]
... 14 more
Caused by: java.lang.reflect.InvocationTargetException
at sun.reflect.GeneratedConstructorAccessor81.newInstance(Unknown Source) ~[?:?]
at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) ~[?:1.8.0_211]
at java.lang.reflect.Constructor.newInstance(Constructor.java:423) ~[?:1.8.0_211]
at org.apache.hudi.sink.utils.PayloadCreation.createPayload(PayloadCreation.java:81) ~[hudi-flink1.15-bundle-0.15.0.jar:0.15.0]
at org.apache.hudi.sink.transform.RowDataToHoodieFunction.toHoodieRecord(RowDataToHoodieFunction.java:113) ~[hudi-flink1.15-bundle-0.15.0.jar:0.15.0]
at org.apache.hudi.sink.transform.RowDataToHoodieFunction.map(RowDataToHoodieFunction.java:98) ~[hudi-flink1.15-bundle-0.15.0.jar:0.15.0]
at org.apache.hudi.sink.transform.RowDataToHoodieFunctionWithRateLimit.map(RowDataToHoodieFunctionWithRateLimit.java:61) ~[hudi-flink1.15-bundle-0.15.0.jar:0.15.0]
at org.apache.hudi.sink.transform.RowDataToHoodieFunctionWithRateLimit.map(RowDataToHoodieFunctionWithRateLimit.java:34) ~[hudi-flink1.15-bundle-0.15.0.jar:0.15.0]
at org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:38) ~[flink-streaming-java-1.15.2.jar:1.15.2]
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:82) ~[flink-streaming-java-1.15.2.jar:1.15.2]
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:57) ~[flink-streaming-java-1.15.2.jar:1.15.2]
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29) ~[flink-streaming-java-1.15.2.jar:1.15.2]
at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:56) ~[flink-streaming-java-1.15.2.jar:1.15.2]
at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:29) ~[flink-streaming-java-1.15.2.jar:1.15.2]
at org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:38) ~[flink-streaming-java-1.15.2.jar:1.15.2]
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:82) ~[flink-streaming-java-1.15.2.jar:1.15.2]
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:57) ~[flink-streaming-java-1.15.2.jar:1.15.2]
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29) ~[flink-streaming-java-1.15.2.jar:1.15.2]
at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:56) ~[flink-streaming-java-1.15.2.jar:1.15.2]
at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:29) ~[flink-streaming-java-1.15.2.jar:1.15.2]
at org.apache.flink.streaming.api.operators.StreamFilter.processElement(StreamFilter.java:39) ~[flink-streaming-java-1.15.2.jar:1.15.2]
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:82) ~[flink-streaming-java-1.15.2.jar:1.15.2]
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:57) ~[flink-streaming-java-1.15.2.jar:1.15.2]
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29) ~[flink-streaming-java-1.15.2.jar:1.15.2]
at org.apache.flink.streaming.runtime.tasks.BroadcastingOutputCollector.collect(BroadcastingOutputCollector.java:77) ~[flink-streaming-java-1.15.2.jar:1.15.2]
at org.apache.flink.streaming.runtime.tasks.BroadcastingOutputCollector.collect(BroadcastingOutputCollector.java:32) ~[flink-streaming-java-1.15.2.jar:1.15.2]
at org.apache.flink.streaming.runtime.tasks.SourceOperatorStreamTask$AsyncDataOutputToOutput.emitRecord(SourceOperatorStreamTask.java:313) ~[flink-streaming-java-1.15.2.jar:1.15.2]
at org.apache.flink.streaming.api.operators.source.SourceOutputWithWatermarks.collect(SourceOutputWithWatermarks.java:110) ~[flink-streaming-java-1.15.2.jar:1.15.2]
at org.apache.flink.connector.kafka.source.reader.KafkaRecordEmitter$SourceOutputWrapper.collect(KafkaRecordEmitter.java:67) ~[hand_flink_kafka_to_hudi-1.0-SNAPSHOT.jar:?]
at org.apache.flink.api.common.serialization.DeserializationSchema.deserialize(DeserializationSchema.java:84) ~[flink-core-1.15.2.jar:1.15.2]
at org.apache.flink.connector.kafka.source.reader.deserializer.KafkaValueOnlyDeserializationSchemaWrapper.deserialize(KafkaValueOnlyDeserializationSchemaWrapper.java:51) ~[hand_flink_kafka_to_hudi-1.0-SNAPSHOT.jar:?]
at org.apache.flink.connector.kafka.source.reader.KafkaRecordEmitter.emitRecord(KafkaRecordEmitter.java:53) ~[hand_flink_kafka_to_hudi-1.0-SNAPSHOT.jar:?]
... 14 more
Caused by: java.lang.NullPointerException: null value for (non-nullable) string at hudi_user_cdc_record.role_id
at org.apache.hudi.org.apache.avro.path.TracingNullPointException.summarize(TracingNullPointException.java:88) ~[hudi-flink1.15-bundle-0.15.0.jar:0.15.0]
at org.apache.hudi.org.apache.avro.path.TracingNullPointException.summarize(TracingNullPointException.java:30) ~[hudi-flink1.15-bundle-0.15.0.jar:0.15.0]
at org.apache.hudi.org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:84) ~[hudi-flink1.15-bundle-0.15.0.jar:0.15.0]
at org.apache.hudi.avro.HoodieAvroUtils.indexedRecordToBytes(HoodieAvroUtils.java:184) ~[hudi-flink1.15-bundle-0.15.0.jar:0.15.0]
at org.apache.hudi.avro.HoodieAvroUtils.avroToBytes(HoodieAvroUtils.java:176) ~[hudi-flink1.15-bundle-0.15.0.jar:0.15.0]
at org.apache.hudi.common.model.BaseAvroPayload.
Upon inspection, it was found that the original table indeed contains null values in the role_id field. However, when writing into Hudi, the primary key is set to: user_id, role_id. The combination of user_id + role_id is not null.
However, when writing into Hudi, the primary key is set to: user_id, role_id. The combination of user_id + role_id is not null.
This is a requirement for primary key semantics, when declared as PK fields, all the fields involved in should be non-nullable, maybe you should pad the null fields with other values before ingestion.
Closing this issue @Toroidals . Feel free to reopen or create new one for further queries on this. Thanks.
Tips before filing an issue
Have you gone through our FAQs? yes
Join the mailing list to engage in conversations and get faster support at dev-subscribe@hudi.apache.org.
If you have triaged this as a bug, then file an issue directly.
Describe the problem you faced
After upgrading from hudi-0.14.0 to 0.15.0, an error occurred: Caused by: java.lang.NullPointerException: null value for (non-nullable) string at table_name.role_id2 这个表主键为联合主键:role_id1,role_id2,其中role_id2为空时导致报错
To Reproduce
Steps to reproduce the behavior:
1. 2. 3. 4.
Expected behavior
1.当主键为联合主键时,允许部分字段为空 2.可以给字段设置默认值
Environment Description
Hudi version : 0.15.0
Spark version :
Hive version :
Hadoop version :
Storage (HDFS/S3/GCS..) :
Running on Docker? (yes/no) :
Additional context
Add any other context about the problem here.
Stacktrace
Add the stacktrace of the error.