Closed waytoharish closed 7 months ago
@waytoharish Can you let us know what error/issue you are facing? Or is it giving the compilation error only?
@ad1happy2go I am getting following error when tried with DataTypes.FIELD("data", DataTypes.ARRAY( ROW( DataTypes.FIELD("test", DataTypes.VARCHAR(256)), // record key DataTypes.FIELD("type", DataTypes.VARCHAR(10))
)))
Exception in thread "main" org.apache.flink.table.api.SqlParserException: SQL parse failed. Incorrect syntax near the keyword 'ARRAY' at line 6, column 8. Was expecting one of: "ROW" ...
Can you show us the complete sql text?
Hi @danny0405 I dont see the SQL in Flink.
Here is my code which I am trying in Java
`public static final DataType ROW_DATA_TYPE = ROW( DataTypes.FIELD("uuid", DataTypes.VARCHAR(256)), // record key DataTypes.FIELD("name", DataTypes.VARCHAR(10)), DataTypes.FIELD("age", DataTypes.INT()), DataTypes.FIELD("ts", DataTypes.TIMESTAMP(3)), DataTypes.FIELD("ts", DataTypes.TIMESTAMP(3)), DataTypes.FIELD("data", DataTypes.ARRAY( ROW( DataTypes.FIELD("test", DataTypes.VARCHAR(256)), // record key DataTypes.FIELD("type", DataTypes.VARCHAR(10))
))),
DataTypes.FIELD("partition", DataTypes.VARCHAR(10))
)
.notNull();`
Hmm, i guess it is because the HoodiePipeline clazz does not concatenate the fiels in good shape, that might be a bug.
Thank @danny0405
I am able to map the arraytype using below but not sure if I am doing it correctly
private static HoodiePipeline.Builder createHudiPipeline(String targetTable, Map<String, String> options) {
return HoodiePipeline.builder(targetTable)
.column("uuid VARCHAR(256)")
.column("name VARCHAR(10)")
.column("age INT")
.column("ts TIMESTAMP(3)")
.column("data ARRAY<ROW<measure_name VARCHAR(256), type VARCHAR(10)>>")
.column("partition
VARCHAR(20)")
.pk("uuid")
.partition("partition")
.options(options);
}
// Define the schema for Hudi records
public static final DataType ROW_ARRAY_DATA_TYPE = ROW(DataTypes.FIELD("measure_name", DataTypes.VARCHAR(256)), // record key
DataTypes.FIELD("type", DataTypes.VARCHAR(10))).notNull();
public static final DataType ROW_DATA_TYPE = ROW(
DataTypes.FIELD("uuid", DataTypes.VARCHAR(256)), // record key
DataTypes.FIELD("name", DataTypes.VARCHAR(10)),
DataTypes.FIELD("age", DataTypes.INT()),
DataTypes.FIELD("ts", DataTypes.TIMESTAMP(3)),
DataTypes.FIELD("partition", DataTypes.VARCHAR(10)),
DataTypes.FIELD("data",DataTypes.ARRAY(ROW_ARRAY_DATA_TYPE))
)
.notNull();
Now I am trying to figure out how to map it to rowdata where I tried like
static class HudiDataSource implements MapFunction<Telemetry, RowData> {
@Override
public RowData map(Telemetry kafkaRecord) throws Exception {
return insertRow(StringData.fromString(kafkaRecord.getCampaignName()), StringData.fromString("Danny"), 23,
TimestampData.fromEpochMillis(1), StringData.fromString("par1"),insertRow(StringData.fromString(kafkaRecord.getCampaignName()), StringData.fromString("Danny"))
);
}
}
but its falling with
aused by: java.lang.ClassCastException: class org.apache.flink.table.data.binary.BinaryRowData cannot be cast to class org.apache.flink.table.data.ArrayData (org.apache.flink.table.data.binary.BinaryRowData and org.apache.flink.table.data.ArrayData are in unnamed module of loader 'app')
at org.apache.flink.table.data.writer.BinaryWriter.write(BinaryWriter.java:147)
at com.hudi.flink.quickstart.Kafka2HudiPipelineNew.insertRow(Kafka2HudiPipelineNew.java:177)
at com.hudi.flink.quickstart.Kafka2HudiPipelineNew.insertRow(Kafka2HudiPipelineNew.java:186)
at com.hudi.flink.quickstart.Kafka2HudiPipelineNew$HudiDataSource.map(Kafka2HudiPipelineNew.java:194)
at com.hudi.flink.quickstart.Kafka2HudiPipelineNew$HudiDataSource.map(Kafka2HudiPipelineNew.java:190)
at org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:38)
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:82)
... 22 more
@waytoharish As discussed on call, can you provide latest code and exception what we were getting.
Thanks for your time @ad1happy2go @ad1happy2go @danny0405 Here is the error which I am getting after the code change :
14:38:06,411 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Source: Kafka Source -> (Map -> row_data_to_hoodie_record, Sink: Print to Std. Out) (6/10) (b41f7215661da4d1d3d1c157a58c57e4) switched from RUNNING to FAILED on 8d293143-6737-4451-a2c5-4e4f6f85cbd4 @ localhost (dataPort=-1). java.io.IOException: Failed to deserialize consumer record due to at org.apache.flink.connector.kafka.source.reader.KafkaRecordEmitter.emitRecord(KafkaRecordEmitter.java:56) ~[flink-connector-kafka-1.15.0.jar:1.15.0] at org.apache.flink.connector.kafka.source.reader.KafkaRecordEmitter.emitRecord(KafkaRecordEmitter.java:33) ~[flink-connector-kafka-1.15.0.jar:1.15.0] at org.apache.flink.connector.base.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:143) ~[flink-connector-base-1.15.0.jar:1.15.0] at org.apache.flink.streaming.api.operators.SourceOperator.emitNext(SourceOperator.java:385) ~[flink-streaming-java-1.15.0.jar:1.15.0] at org.apache.flink.streaming.runtime.io.StreamTaskSourceInput.emitNext(StreamTaskSourceInput.java:68) ~[flink-streaming-java-1.15.0.jar:1.15.0] at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65) ~[flink-streaming-java-1.15.0.jar:1.15.0] at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:519) ~[flink-streaming-java-1.15.0.jar:1.15.0] at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:203) ~[flink-streaming-java-1.15.0.jar:1.15.0] at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:804) ~[flink-streaming-java-1.15.0.jar:1.15.0] at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:753) ~[flink-streaming-java-1.15.0.jar:1.15.0] at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:948) ~[flink-runtime-1.15.0.jar:1.15.0] at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:927) ~[flink-runtime-1.15.0.jar:1.15.0] at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:741) ~[flink-runtime-1.15.0.jar:1.15.0] at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563) ~[flink-runtime-1.15.0.jar:1.15.0] at java.lang.Thread.run(Thread.java:829) ~[?:?] Caused by: org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: Could not forward element to next operator at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:99) ~[flink-streaming-java-1.15.0.jar:1.15.0] at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:57) ~[flink-streaming-java-1.15.0.jar:1.15.0] at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29) ~[flink-streaming-java-1.15.0.jar:1.15.0] at org.apache.flink.streaming.runtime.tasks.BroadcastingOutputCollector.collect(BroadcastingOutputCollector.java:77) ~[flink-streaming-java-1.15.0.jar:1.15.0] at org.apache.flink.streaming.runtime.tasks.BroadcastingOutputCollector.collect(BroadcastingOutputCollector.java:32) ~[flink-streaming-java-1.15.0.jar:1.15.0] at org.apache.flink.streaming.runtime.tasks.SourceOperatorStreamTask$AsyncDataOutputToOutput.emitRecord(SourceOperatorStreamTask.java:313) ~[flink-streaming-java-1.15.0.jar:1.15.0] at org.apache.flink.streaming.api.operators.source.SourceOutputWithWatermarks.collect(SourceOutputWithWatermarks.java:110) ~[flink-streaming-java-1.15.0.jar:1.15.0] at org.apache.flink.connector.kafka.source.reader.KafkaRecordEmitter$SourceOutputWrapper.collect(KafkaRecordEmitter.java:67) ~[flink-connector-kafka-1.15.0.jar:1.15.0] at org.apache.flink.api.common.serialization.DeserializationSchema.deserialize(DeserializationSchema.java:84) ~[flink-core-1.15.0.jar:1.15.0] at org.apache.flink.connector.kafka.source.reader.deserializer.KafkaValueOnlyDeserializationSchemaWrapper.deserialize(KafkaValueOnlyDeserializationSchemaWrapper.java:51) ~[flink-connector-kafka-1.15.0.jar:1.15.0] at org.apache.flink.connector.kafka.source.reader.KafkaRecordEmitter.emitRecord(KafkaRecordEmitter.java:53) ~[flink-connector-kafka-1.15.0.jar:1.15.0] ... 14 more Caused by: org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: Could not forward element to next operator at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:99) ~[flink-streaming-java-1.15.0.jar:1.15.0] at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:57) ~[flink-streaming-java-1.15.0.jar:1.15.0] at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29) ~[flink-streaming-java-1.15.0.jar:1.15.0] at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:56) ~[flink-streaming-java-1.15.0.jar:1.15.0] at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:29) ~[flink-streaming-java-1.15.0.jar:1.15.0] at org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:38) ~[flink-streaming-java-1.15.0.jar:1.15.0] at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:82) ~[flink-streaming-java-1.15.0.jar:1.15.0] at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:57) ~[flink-streaming-java-1.15.0.jar:1.15.0] at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29) ~[flink-streaming-java-1.15.0.jar:1.15.0] at org.apache.flink.streaming.runtime.tasks.BroadcastingOutputCollector.collect(BroadcastingOutputCollector.java:77) ~[flink-streaming-java-1.15.0.jar:1.15.0] at org.apache.flink.streaming.runtime.tasks.BroadcastingOutputCollector.collect(BroadcastingOutputCollector.java:32) ~[flink-streaming-java-1.15.0.jar:1.15.0] at org.apache.flink.streaming.runtime.tasks.SourceOperatorStreamTask$AsyncDataOutputToOutput.emitRecord(SourceOperatorStreamTask.java:313) ~[flink-streaming-java-1.15.0.jar:1.15.0] at org.apache.flink.streaming.api.operators.source.SourceOutputWithWatermarks.collect(SourceOutputWithWatermarks.java:110) ~[flink-streaming-java-1.15.0.jar:1.15.0] at org.apache.flink.connector.kafka.source.reader.KafkaRecordEmitter$SourceOutputWrapper.collect(KafkaRecordEmitter.java:67) ~[flink-connector-kafka-1.15.0.jar:1.15.0] at org.apache.flink.api.common.serialization.DeserializationSchema.deserialize(DeserializationSchema.java:84) ~[flink-core-1.15.0.jar:1.15.0] at org.apache.flink.connector.kafka.source.reader.deserializer.KafkaValueOnlyDeserializationSchemaWrapper.deserialize(KafkaValueOnlyDeserializationSchemaWrapper.java:51) ~[flink-connector-kafka-1.15.0.jar:1.15.0] at org.apache.flink.connector.kafka.source.reader.KafkaRecordEmitter.emitRecord(KafkaRecordEmitter.java:53) ~[flink-connector-kafka-1.15.0.jar:1.15.0] ... 14 more Caused by: java.lang.IndexOutOfBoundsException at org.apache.flink.core.memory.MemorySegment.getInt(MemorySegment.java:787) ~[flink-core-1.15.0.jar:1.15.0] at org.apache.flink.table.data.binary.BinarySegmentUtils.getInt(BinarySegmentUtils.java:633) ~[flink-table-common-1.15.0.jar:1.15.0] at org.apache.flink.table.data.binary.BinaryArrayData.pointTo(BinaryArrayData.java:143) ~[flink-table-common-1.15.0.jar:1.15.0] at org.apache.flink.table.data.binary.BinarySegmentUtils.readArrayData(BinarySegmentUtils.java:1110) ~[flink-table-common-1.15.0.jar:1.15.0] at org.apache.flink.table.data.binary.BinaryRowData.getArray(BinaryRowData.java:376) ~[flink-table-common-1.15.0.jar:1.15.0] at org.apache.flink.table.data.RowData.lambda$createFieldGetter$245ca7d1$10(RowData.java:265) ~[flink-table-common-1.15.0.jar:1.15.0] at org.apache.flink.table.data.RowData.lambda$createFieldGetter$25774257$1(RowData.java:296) ~[flink-table-common-1.15.0.jar:1.15.0] at org.apache.hudi.util.RowDataToAvroConverters$11.convert(RowDataToAvroConverters.java:271) ~[hudi-flink1.15-bundle-0.14.0.jar:0.14.0] at org.apache.hudi.util.RowDataToAvroConverters$10.convert(RowDataToAvroConverters.java:239) ~[hudi-flink1.15-bundle-0.14.0.jar:0.14.0] at org.apache.hudi.sink.transform.RowDataToHoodieFunction.toHoodieRecord(RowDataToHoodieFunction.java:109) ~[hudi-flink1.15-bundle-0.14.0.jar:0.14.0] at org.apache.hudi.sink.transform.RowDataToHoodieFunction.map(RowDataToHoodieFunction.java:97) ~[hudi-flink1.15-bundle-0.14.0.jar:0.14.0] at org.apache.hudi.sink.transform.RowDataToHoodieFunction.map(RowDataToHoodieFunction.java:46) ~[hudi-flink1.15-bundle-0.14.0.jar:0.14.0] at org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:38) ~[flink-streaming-java-1.15.0.jar:1.15.0] at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:82) ~[flink-streaming-java-1.15.0.jar:1.15.0] at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:57) ~[flink-streaming-java-1.15.0.jar:1.15.0] at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29) ~[flink-streaming-java-1.15.0.jar:1.15.0] at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:56) ~[flink-streaming-java-1.15.0.jar:1.15.0] at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:29) ~[flink-streaming-java-1.15.0.jar:1.15.0] at org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:38) ~[flink-streaming-java-1.15.0.jar:1.15.0] at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:82) ~[flink-streaming-java-1.15.0.jar:1.15.0] at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:57) ~[flink-streaming-java-1.15.0.jar:1.15.0] at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29) ~[flink-streaming-java-1.15.0.jar:1.15.0] at org.apache.flink.streaming.runtime.tasks.BroadcastingOutputCollector.collect(BroadcastingOutputCollector.java:77) ~[flink-streaming-java-1.15.0.jar:1.15.0] at org.apache.flink.streaming.runtime.tasks.BroadcastingOutputCollector.collect(BroadcastingOutputCollector.java:32) ~[flink-streaming-java-1.15.0.jar:1.15.0] at org.apache.flink.streaming.runtime.tasks.SourceOperatorStreamTask$AsyncDataOutputToOutput.emitRecord(SourceOperatorStreamTask.java:313) ~[flink-streaming-java-1.15.0.jar:1.15.0] at org.apache.flink.streaming.api.operators.source.SourceOutputWithWatermarks.collect(SourceOutputWithWatermarks.java:110) ~[flink-streaming-java-1.15.0.jar:1.15.0] at org.apache.flink.connector.kafka.source.reader.KafkaRecordEmitter$SourceOutputWrapper.collect(KafkaRecordEmitter.java:67) ~[flink-connector-kafka-1.15.0.jar:1.15.0] at org.apache.flink.api.common.serialization.DeserializationSchema.deserialize(DeserializationSchema.java:84) ~[flink-core-1.15.0.jar:1.15.0] at org.apache.flink.connector.kafka.source.reader.deserializer.KafkaValueOnlyDeserializationSchemaWrapper.deserialize(KafkaValueOnlyDeserializationSchemaWrapper.java:51) ~[flink-connector-kafka-1.15.0.jar:1.15.0] at org.apache.flink.connector.kafka.source.reader.KafkaRecordEmitter.emitRecord(KafkaRecordEmitter.java:53) ~[flink-connector-kafka-1.15.0.jar:1.15.0] ... 14 more
Here is the latest code :
package com.hudi.flink.quickstart;
import com.test.gen.Datum; import com.test.gen.Telemetry; import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.connector.kafka.source.KafkaSource; import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer; import org.apache.flink.streaming.api.CheckpointingMode; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.CheckpointConfig; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.DataTypes; import org.apache.flink.table.data.*; import org.apache.flink.table.data.binary.BinaryArrayData; import org.apache.flink.table.data.binary.BinaryRowData; import org.apache.flink.table.data.writer.BinaryArrayWriter; import org.apache.flink.table.data.writer.BinaryRowWriter; import org.apache.flink.table.data.writer.BinaryWriter; import org.apache.flink.table.runtime.typeutils.ArrayDataSerializer; import org.apache.flink.table.runtime.typeutils.InternalSerializers; import org.apache.flink.table.types.DataType; import org.apache.flink.table.types.logical.LogicalType; import org.apache.flink.table.types.logical.RowType; import org.apache.hudi.common.model.HoodieTableType; import org.apache.hudi.configuration.FlinkOptions; import org.apache.hudi.util.HoodiePipeline; import java.util.HashMap; import java.util.Map; import java.util.Properties;
import static org.apache.flink.table.api.DataTypes.ROW;
/**
A Flink program that ingests data from Kafka and writes it to Apache Hudi. */ public class Kafka2HudiPipelineNew {
public static void main(String[] args) throws Exception {
String pipelineName = "TestKafkaPipelIneTest";
String hudiBasePath = "s3a://awshksharma/"+pipelineName;
String kafkaGroupId = "myTest";
String kafkaTopicName = "hksharma";
// Create a Flink execution environment
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// Configure checkpointing
configureCheckpointing(env);
// Set up Kafka source
KafkaSource<Telemetry> kafkaConsumer = createKafkaConsumer(kafkaTopicName, kafkaGroupId);
// Create a Kafka stream
DataStream<Telemetry> kafkaStream = env.fromSource(kafkaConsumer, WatermarkStrategy.noWatermarks(), "Kafka Source");
// Transform Kafka data to Hudi records
DataStream<RowData> transformedStream = kafkaStream.map(new HudiDataSource());
// Define Hudi target table and options
String targetTable = "hudi_hksharma_table_test";
Map<String, String> options = createHudiOptions(hudiBasePath, targetTable);
// Define HoodiePipeline.Builder for configuring the Hudi write
HoodiePipeline.Builder builder = createHudiPipeline(targetTable, options);
kafkaStream.print();
// Write to Hudi
builder.sink(transformedStream, false);
env.execute("Api_Sink");
// Execute the Flink job
env.execute(pipelineName);
}
// Configure Flink checkpointing settings private static void configureCheckpointing(StreamExecutionEnvironment env) { env.enableCheckpointing(10000); // Checkpoint every 50 seconds CheckpointConfig checkpointConfig = env.getCheckpointConfig(); checkpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); checkpointConfig.setMinPauseBetweenCheckpoints(1000); // Minimum time between checkpoints checkpointConfig.setCheckpointTimeout(60000); // Checkpoint timeout in milliseconds // checkpointConfig.setCheckpointStorage(checkpointLocation); }
// Create a Kafka consumer with specified properties
private static KafkaSource
return KafkaSource.<Telemetry>builder()
.setBootstrapServers("localhost:9092")
.setTopics("hksharma")
.setGroupId("flinkgroup")
.setProperties(properties)
.setStartingOffsets(OffsetsInitializer.latest())
.setValueOnlyDeserializer(new TelemetryEventDeserializationSchema())
.build();
}
// Create Hudi options for the data sink private static Map<String, String> createHudiOptions(String basePath,String tableName) { Map<String, String> options = new HashMap<>(); options.put("path", basePath);
options.put(FlinkOptions.HIVE_SYNC_ENABLED.key(), "true");
options.put(FlinkOptions.HIVE_SYNC_ENABLED.key(), "true");
options.put(FlinkOptions.HIVE_SYNC_MODE.key(), "glue");
options.put(FlinkOptions.HIVE_SYNC_TABLE.key(), tableName);
options.put(FlinkOptions.HIVE_SYNC_DB.key(), "default");
options.put(FlinkOptions.TABLE_TYPE.key(), HoodieTableType.MERGE_ON_READ.name());
options.put(FlinkOptions.PRECOMBINE_FIELD.key(), "ts");
options.put(FlinkOptions.RECORD_KEY_FIELD.key(), "uuid");
return options;
}
// Create a HoodiePipeline.Builder with specified target table and options
private static HoodiePipeline.Builder createHudiPipeline(String targetTable, Map<String, String> options) {
return HoodiePipeline.builder(targetTable)
.column("uuid VARCHAR(256)")
.column("name VARCHAR(10)")
.column("age INT")
.column("ts TIMESTAMP(3)")
.column("data ARRAY<ROW<measure_name VARCHAR(256)>>")
.column("partition
VARCHAR(20)")
.pk("uuid")
.partition("partition")
.options(options);
}
// Define the schema for Hudi records
public static final DataType ROW_ARRAY_DATA_TYPE = ROW(DataTypes.FIELD("measure_name", DataTypes.VARCHAR(256))).notNull(); public static final DataType ROW_DATA_TYPE = ROW( DataTypes.FIELD("uuid", DataTypes.VARCHAR(256)), // record key DataTypes.FIELD("name", DataTypes.VARCHAR(10)), DataTypes.FIELD("age", DataTypes.INT()), DataTypes.FIELD("ts", DataTypes.TIMESTAMP(3)), DataTypes.FIELD("partition", DataTypes.VARCHAR(10)), DataTypes.FIELD("data",DataTypes.ARRAY(ROW_ARRAY_DATA_TYPE)) ) .notNull();
// Create a Hudi record from specified fields
public static BinaryArrayData insertArrayRow() {
BinaryArrayData array = new BinaryArrayData();
BinaryArrayWriter arrayWriter =
new BinaryArrayWriter(
array,
1,
BinaryArrayData.calculateFixLengthPartSize(DataTypes.STRING().getLogicalType()));
arrayWriter.writeString(0,StringData.fromString("par1"));
arrayWriter.complete();
return array;
} public static BinaryRowData insertRow(RowType rowType, Object... fields) {
LogicalType[] types = rowType.getFields().stream().map(RowType.RowField::getType)
.toArray(LogicalType[]::new);
BinaryRowData row = new BinaryRowData(fields.length);
BinaryRowWriter writer = new BinaryRowWriter(row);
writer.reset();
for (int i = 0; i < fields.length; i++) {
Object field = fields[i];
System.out.println("field>>>>>>>>>>>>>"+field.toString());
System.out.println("Type>>>>>>>>>>>>>"+types[i]);
if (field == null) {
writer.setNullAt(i);
} else if(field instanceof org.apache.flink.table.data.binary.BinaryArrayData){
BinaryWriter.write(writer, i, field, types[i], new ArrayDataSerializer(types[0]));
}
else {
BinaryWriter.write(writer, i, field, types[i], InternalSerializers.create(types[i]));
}
}
writer.complete();
return row;
}
// Overloaded method for creating a Hudi record using the default schema public static BinaryRowData insertRow(Object... fields) { return insertRow((RowType) ROW_DATA_TYPE.getLogicalType(), fields); }
// Mapper to convert Kafka data to Hudi records static class HudiDataSource implements MapFunction<Telemetry, RowData> { @Override public RowData map(Telemetry kafkaRecord) throws Exception {
return insertRow(StringData.fromString(kafkaRecord.getCampaignName()), StringData.fromString("Danny"), 23,
TimestampData.fromEpochMillis(1), StringData.fromString("par1"), insertArrayRow()
);
}
} }
@danny0405 @ad1happy2go Please can you help me to understand if this is a Bug or I am doing anything wrong
Another way is to use the GenericRowData
, it's much more simpler or just the Row
and convert it back to RowData
with utility.
@waytoharish Did you got a chance to try out GenericRowData, Are you still facing the issue?
HI @ad1happy2go Still facing the issue . Not able to figure out how to do with GenericRowData
Hi @ad1happy2go , I have added the below code after that I can see data pushed to the S3 but there is no table in the Glue has been created
' public RowData map(Telemetry kafkaRecord) throws Exception { GenericRowData row = new GenericRowData(6); row.setField(0, StringData.fromString(kafkaRecord.getCampaignName())); row.setField(1,StringData.fromString("name9")); row.setField(2,23); row.setField(3, TimestampData.fromEpochMillis(1)); row.setField(4,StringData.fromString("p1")); row.setField(5,kafkaRecord.getData());
return row;
}'
I am keep getting
lang.ClassCastException: class org.apache.flink.table.data.binary.BinaryStringData cannot be cast to class org.apache.flink.table.data.ArrayData (org.apache.flink.table.data.binary.BinaryStringData and org.apache.flink.table.data.ArrayData are in unnamed module of loader 'app') at org.apache.flink.table.data.GenericRowData.getArray(GenericRowData.java:195) ~[flink-table-common-1.15.0.jar:1.15.0] at org.apache.flink.table.data.RowData.lambda$createFieldGetter$245ca7d1$10(RowData.java:265) ~[flink-table-common-1.15.0.jar:1.15.0] at org.apache.flink.table.data.RowData.lambda$createFieldGetter$25774257$1(RowData.java:296) ~[flink-table-common-1.15.0.jar:1.15.0] at org.apache.hudi.util.RowDataToAvroConverters$11.convert(RowDataToAvroConverters.java:271) ~[hudi-flink1.15-bundle-0.14.0.jar:0.14.0] at org.apache.hudi.util.RowDataToAvroConverters$10.convert(RowDataToAvroConverters.java:239) ~[hudi-flink1.15-bundle-0.14.0.jar:0.14.0] at org.apache.hudi.sink.transform.RowDataToHoodieFunction.toHoodieRecord(RowDataToHoodieFunction.java:109) ~[hudi-flink1.15-bundle-0.14.0.jar:0.14.0] at org.apache.hudi.sink.transform.RowDataToHoodieFunction.map(RowDataToHoodieFunction.java:97) ~[hudi-flink1.15-bundle-0.14.0.jar:0.14.0] at org.apache.hudi.sink.transform.RowDataToHoodieFunction.map(RowDataToHoodieFunction.java:46) ~[hudi-flink1.15-bundle-0.14.0.jar:0.14.0] at org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:38) ~[flink-streaming-java-1.15.0.jar:1.15.0] at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:82) ~[flink-streaming-java-1.15.0.jar:1.15.0] at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:57) ~[flink-streaming-java-1.15.0.jar:1.15.0] at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29) ~[flink-streaming-java-1.15.0.jar:1.15.0]
Thanks @ad1happy2go @danny0405 its worked for me after using the GenericRowData.
I am closing the issue
Hi Team, I am trying to insert a nested Object data in the Hudi Table but not able to figure out how can I do that:
Here is my JSON input
{ "uuid": "Hello", "name": "Hk", "age" : 10, "ts": "1396014602", "data": [ { "test": "Test", "type": "Test"
] }
Here is my Java Code :
public static final DataType ROW_DATA_TYPE = ROW( DataTypes.FIELD("uuid", DataTypes.VARCHAR(256)), // record key DataTypes.FIELD("name", DataTypes.VARCHAR(10)), DataTypes.FIELD("age", DataTypes.INT()), DataTypes.FIELD("ts", DataTypes.TIMESTAMP(3)), DataTypes.FIELD("ts", DataTypes.TIMESTAMP(3)), DataTypes.FIELD("data", DataTypes.ARRAY( ROW( DataTypes.FIELD("test", DataTypes.VARCHAR(256)), // record key DataTypes.FIELD("type", DataTypes.VARCHAR(10))
I am not able to figure out how to map the value in
static class HudiDataSource implements MapFunction<Telemetry, RowData> { @Override public RowData map(Telemetry kafkaRecord) throws Exception {