DTStack / chunjun

A data integration framework
https://dtstack.github.io/chunjun/
Apache License 2.0
4k stars 1.69k forks source link

kuduSource to kuduSink #750

Open jctanking opened 2 years ago

jctanking commented 2 years ago

Kudu output-format writeSingleRecordInternal failed. java.lang.ClassCastException: org.apache.flink.table.data.GenericRowData cannot be cast to com.dtstack.flinkx.element.ColumnRowData at com.dtstack.flinkx.connector.kudu.sink.KuduOutputFormat.writeSingleRecordInternal(KuduOutputFormat.java:77) at com.dtstack.flinkx.sink.format.BaseRichOutputFormat.writeSingleRecord(BaseRichOutputFormat.java:486) at com.dtstack.flinkx.sink.format.BaseRichOutputFormat.writeRecord(BaseRichOutputFormat.java:246) at com.dtstack.flinkx.sink.format.BaseRichOutputFormat.writeRecord(BaseRichOutputFormat.java:85) at com.dtstack.flinkx.sink.DtOutputFormatSinkFunction.invoke(DtOutputFormatSinkFunction.java:117) at org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:54)

com.dtstack.flinkx.connector.kudu.converter.KuduColumnConverter.java 77 line val converter ColumnRowData.

Paddy0523 commented 2 years ago

Could you please provide the json script of the job ?

jctanking commented 2 years ago

@PaddyMelody { "job": { "content": [ { "reader": { "name": "kudureader", "parameter": { "column": [ { "name": "id", "type": "int" }, { "name": "name", "type": "string" }, { "name": "age", "type": "int" }, { "name": "sex", "type": "int" } ], "masters": "kudu01:7051", "table": "Themis", "readMode": "READ_LATEST ", "workerCount": 6, "operationTimeout": 3000000, "adminOperationTimeout": 3000000, "queryTimeout": 30000, "batchSizeBytes": 104857600 } }, "writer": { "parameter": { "column": [ { "name": "id", "type": "int32" }, { "name": "name", "type": "string" }, { "name": "age", "type": "int32" }, { "name": "sex", "type": "int32" } ], "masters": "kudu02:7051", "table": "Themis_out", "flushMode": "manual_flush", "writeMode": "append", "batchSizeBytes": 1048576 }, "name": "kuduwriter" }

  }
],
"setting": {
        "speed": {
    "channel": 1
  },
  "restore": {
    "isRestore": false,
    "isStream": false
  }
}

} }