apache / doris

Apache Doris is an easy-to-use, high performance and unified analytics database.
https://doris.apache.org
Apache License 2.0
12.81k stars 3.3k forks source link

[Bug] flink通过flink-connector-doris写入Map<K,V>类型的数据时候,当K为Timestamp的类型时会写入失败 #21832

Closed imx7 closed 1 year ago

imx7 commented 1 year ago

Search before asking

Version

Apache Doris 2.0-beta (Latest) "org.apache.doris" % "flink-doris-connector-1.17" % "1.4.0" Apache Flink 1.17.1

What's Wrong?

flink通过streamload的方式写入MAP<K,V>类型的数据,当K为Timestamp的类型时会写入失败。 insert into方式正常。 alpha版本正常。

What You Expected?

修复此BUG

How to Reproduce?


Util.scala

import org.apache.doris.flink.cfg.{DorisExecutionOptions, DorisOptions, DorisReadOptions} import org.apache.doris.flink.sink.DorisSink import org.apache.doris.flink.sink.writer.RowDataSerializer import org.apache.flink.api.common.serialization.SimpleStringSchema import org.apache.flink.connector.kafka.source.KafkaSource import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer import org.apache.flink.table.api.DataTypes import org.apache.flink.table.data.RowData import org.apache.flink.table.types.DataType

import java.util.Properties import java.util.regex.Pattern

object Util {

def geneKafkaSource(bootstrap:String,groupId:String,topic:String,offset:OffsetsInitializer):KafkaSource[String]={ KafkaSource .builder[String]() .setBootstrapServers(bootstrap) .setGroupId(groupId) .setTopicPattern(Pattern.compile(topic)) .setValueOnlyDeserializer(new SimpleStringSchema()) .setStartingOffsets(offset) .setProperty("partition.discovery.interval.ms", "10000") .setProperty("metadata.max.age.ms", "1000") .setProperty("max,request.size", "10000000") .setProperty("session.timeout.ms","600000") .setProperty("request.timeout.ms","600000") .setProperty("enable.auto.commit","true") .setProperty("auto.commit.interval.ms","512000") .build() }

val fieldsInfo:Array[(Int,String,DataType)] = Array( (0,"p_date",DataTypes.DATE), (1,"data_code",DataTypes.VARCHAR(32)), (2,"sensor_code",DataTypes.VARCHAR(128)), (3,"post_time",DataTypes.TIMESTAMP), (4,"test_col_1",DataTypes.MAP(DataTypes.TIMESTAMP,DataTypes.INT())), (5,"test_col_2",DataTypes.MAP(DataTypes.STRING,DataTypes.INT())), (6,"DORIS_DELETE_SIGN",DataTypes.INT) )

def generateDorisSink(fe:String,username:String,password:String,table_name:String,prefix:String):DorisSink[RowData] ={ val builder = DorisSink.builder[RowData] val dorisBuilder = DorisOptions.builder dorisBuilder .setFenodes(fe) .setTableIdentifier(table_name) .setUsername(username) .setPassword(password)

val properties = new Properties()
properties.setProperty("format", "json")
properties.setProperty("read_json_by_line", "true")
val executionBuilder = DorisExecutionOptions.builder
executionBuilder
  .setLabelPrefix(prefix)
  .setCheckInterval(2)
  .setBufferCount(32)
  .setMaxRetries(4)
  .setStreamLoadProp(properties)

val fields=fieldsInfo.map(_._2)
val types=fieldsInfo.map(_._3)

builder
  .setDorisReadOptions(DorisReadOptions.builder.build)
  .setDorisExecutionOptions(executionBuilder.build)
  .setSerializer(
    RowDataSerializer
      .builder
      .setFieldNames(fields)
      .setType("json").setFieldType(types).build)
  .setDorisOptions(dorisBuilder.build)
  .build()

}

}

StreamLoadApp.scala

import com.fasterxml.jackson.databind.ObjectMapper import org.apache.flink.api.common.accumulators.LongCounter import org.apache.flink.api.common.eventtime.WatermarkStrategy import org.apache.flink.api.common.restartstrategy.RestartStrategies import org.apache.flink.api.java.utils.ParameterTool import org.apache.flink.configuration. import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer import org.apache.flink.streaming.api.functions.ProcessFunction import org.apache.flink.streaming.api.scala. import org.apache.flink.table.data.{GenericMapData, GenericRowData, RowData, StringData, TimestampData} import org.apache.flink.util.Collector

import java.text.SimpleDateFormat import java.time.LocalDate import java.time.format.DateTimeFormatter import java.util.{Locale, Properties, UUID} import scala.util.Try import scala.collection.JavaConverters._

object StreamLoadApp { def main(args: Array[String]): Unit = {

val params = ParameterTool.fromArgs(args)
val bootstrap=params.get("bootstrap")
val groupId=params.get("group_id")
val topic=params.get("topic")
val offset=params.get("offset").toLowerCase match {
  case "earliest"=>OffsetsInitializer.earliest()
  case _ =>OffsetsInitializer.latest()
}

println("bootstrap : "+ bootstrap)
println("groupId : "+ groupId)
println("topic : "+ topic)
println("offset : "+ params.get("offset").toLowerCase)

val fe=params.get("fe")
val username=params.get("username")
val password=params.get("password")
val table_name=params.get("table_name")

println("fe : "+ fe)
println("username : "+ username)
println("password : "+ password)
println("table_name : "+ table_name)

val config = new org.apache.flink.configuration.Configuration()
config.setString(RestOptions.BIND_PORT, "6062-6092")
    val env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(config)

// val env = StreamExecutionEnvironment.getExecutionEnvironment env.enableCheckpointing(8000) env.setRestartStrategy(RestartStrategies.fixedDelayRestart(999999, 32*1000L))

val logSource =Util.geneKafkaSource(bootstrap = bootstrap,groupId = groupId,topic = topic,offset = offset)
val kafkaStream = env.fromSource(logSource,WatermarkStrategy.noWatermarks(),"KafkaSourceLog")
val logInfoStream = kafkaStream.process(
  new ProcessFunction[String,RowData]() {
    val dateFormat  = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss",Locale.CHINA)

    val logCount = new LongCounter()
    override def open(parameters: Configuration): Unit = {
      super.open(parameters)
      getRuntimeContext.addAccumulator("log-count",logCount)
    }

    override def processElement(value: String, ctx: ProcessFunction[String, RowData]#Context, out: Collector[RowData]): Unit = {
      val mapper = new ObjectMapper()
      try {
        val root = mapper.readTree(value)
        val dataCode = root.path("dataCode").asText
        val sensorCode = Try{
          root.path("sensorId").asText
        }.getOrElse(root.path("deviceId").asText)
        val postTime = root.path("postTime").asText

        val p_date= LocalDate.parse(postTime, DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")).toEpochDay.toInt
        val data_code=StringData.fromString(dataCode)
        val sensor_code=StringData.fromString(sensorCode)
        val post_time_tmp=new java.sql.Timestamp(dateFormat.parse(postTime).getTime)
        val post_time=TimestampData.fromTimestamp(post_time_tmp)

// val test_col_1=Map(post_time_tmp.asInstanceOf[AnyRef] ->value.length.asInstanceOf[AnyRef]) val test_col_1=Map(post_time_tmp->value.length) val test_col_2=Map(post_time_tmp.getTime.toString ->value.length)

        val genericRowData = new GenericRowData(7)
        genericRowData.setField(0,p_date)
        genericRowData.setField(1,data_code)
        genericRowData.setField(2,sensor_code)
        genericRowData.setField(3,post_time)
        /***下面这一行执行失败*/
        genericRowData.setField(4,new GenericMapData(test_col_1.asJava))
        genericRowData.setField(5,new GenericMapData(test_col_2.asJava))
        genericRowData.setField(6,0)

        println(test_col_1)
        println(test_col_2)

// println(genericRowData) out.collect(genericRowData) /* 计数加1 */ logCount.add(1) } catch{ case e: Exception => println(s"exception-time: ${dateFormat.format(System.currentTimeMillis())}\n") println(value) e.printStackTrace() }

    }
  }
).name("log-process")

logInfoStream.print()

logInfoStream.sinkTo( 
  Util.generateDorisSink(fe=fe,username = username,password = password,table_name = table_name,prefix = groupId+"_"+System.currentTimeMillis())
).name("log-to-doris")

env.execute("log-sink")

} }

启动参数 --group_id iot-2023-0717-1000 --bootstrap 10.255.60.54:9092,10.255.60.56:9092 --topic iot_device_log.* --offset latest --username root --password 123456 --fe 10.21.221.18:8030 --table_name ods_iot.test_doris_v1

topic的数据demo {"data":{"wsLat":30.918466},"dataCode":"FOH8U310XA4EORK","userCode":"user_gis","sensorId":"dvc_20220712_1633","postTime":"2023-05-01 00:00:00","commissionCode":"WB510100000001"}

Anything Else?

没有了

Are you willing to submit PR?

Code of Conduct

amorynan commented 1 year ago

is there any info log or coredump? and please show the table schema in doris

imx7 commented 1 year ago

@amorynan drop table if exists ods_iot.test_doris_v1; CREATE TABLE IF NOT EXISTS ods_iot.test_doris_v1 ( p_date datev2 not null comment '天分区' , data_code varchar(32) not null comment '产品编码', sensor_code varchar(128) not null comment '设备编码', post_time datetime not null comment '日志上报时间', test_col_1 map<datetimev2,int> not null comment '测试1', test_col_2 map<varchar(32),int> not null comment '测试2' ) ENGINE=OLAP UNIQUE KEY(p_date, data_code,sensor_code,post_time) PARTITION BY RANGE(p_date)() DISTRIBUTED BY HASH(data_code,sensor_code) BUCKETS 8 PROPERTIES ( "dynamic_partition.enable" = "true", "dynamic_partition.create_history_partition" = "true", "dynamic_partition.time_unit" = "DAY", "dynamic_partition.start" = "-360", "dynamic_partition.end" = "3", "dynamic_partition.prefix" = "p", "dynamic_partition.buckets" = "16", "dynamic_partition.replication_num" = "1" ); //该sql执行正常。 insert into ods_iot.test_doris_v1( p_date, data_code, sensor_code, post_time, test_col_1, test_col_2) values( '2023-07-13', 'test_data_code_1', 'test_sensor_code_1', '2023-07-13 13:13:13', {'2023-07-13 13:13:13':12,'2023-07-13 13:13:15':15}, {'str_2023-07-13 13:13:13':12,'str_2023-07-13 13:13:15':15} )

kafka发送数据 {"data":{"wsLat":30.918466},"dataCode":"FOH8U310XA4EORK","userCode":"user_gis","sensorId":"dq_20220712_1632","postTime":"2023-07-18 02:01:01","commissionCode":"WB510100000001"} 程序执行结果 2023-07-17 10:51:08 java.lang.Exception: Could not perform checkpoint 5 for operator Source: KafkaSourceLog -> log-process -> (Sink: Print to Std. Out, log-to-doris: Writer -> log-to-doris: Committer) (12/16)#0. at org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointAsyncInMailbox(StreamTask.java:1184) at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$triggerCheckpointAsync$13(StreamTask.java:1131) at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50) at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90) at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMail(MailboxProcessor.java:398) at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsWhenDefaultActionUnavailable(MailboxProcessor.java:367) at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:352) at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:229) at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:839) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:788) at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:952) at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:931) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:745) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:562) at java.base/java.lang.Thread.run(Thread.java:829) Caused by: org.apache.doris.flink.exception.DorisRuntimeException: stream load error: [INTERNAL_ERROR]too many filtered rows, see more in http://10.21.221.18:8040/api/_load_error_log?file=__shard_9/error_log_insert_stmt_b480845759a6b07-a2db1414732dcc91_b480845759a6b07_a2db1414732dcc91 at org.apache.doris.flink.sink.writer.DorisWriter.prepareCommit(DorisWriter.java:158) at org.apache.flink.streaming.api.transformations.SinkV1Adapter$SinkWriterV1Adapter.prepareCommit(SinkV1Adapter.java:151) at org.apache.flink.streaming.runtime.operators.sink.SinkWriterOperator.emitCommittables(SinkWriterOperator.java:196) at org.apache.flink.streaming.runtime.operators.sink.SinkWriterOperator.prepareSnapshotPreBarrier(SinkWriterOperator.java:166) at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.prepareSnapshotPreBarrier(RegularOperatorChain.java:89) at org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.checkpointState(SubtaskCheckpointCoordinatorImpl.java:321) at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$performCheckpoint$14(StreamTask.java:1299) at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50) at org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:1287) at org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointAsyncInMailbox(StreamTask.java:1172) ... 14 more


http://10.21.221.18:8040/api/_load_error_log?file=__shard_9/error_log_insert_stmt_b480845759a6b07-a2db1414732dcc91_b480845759a6b07_a2db1414732dcc91 Reason: column(test_col_1) values is null while columns is not nullable. src line [2023-07-18 FOH8U310XA4EORK dq_20220712_1632 2023-07-18 02:01:01.0 {\"2023-07-17T18:01:01.000+00:00\":176} {\"1689616861000\":176} 0];

amorynan commented 1 year ago

It seems U load a null data, but the column test_col_1 is defined not null test_col_1 map<datetimev2,int> not null comment '测试1',

imx7 commented 1 year ago

teste_col_1 在页面上是{"2023-07-17T18:01:01.000+00:00":176},不是null。在doris 2.0的alpha版本是正常的。在beta版本才出现的。 image

amorynan commented 1 year ago

teste_col_1 在页面上是{"2023-07-17T18:01:01.000+00:00":176},不是null。在doris 2.0的alpha版本是正常的。在beta版本才出现的。 image

why here has 7 columns? but ur table schema just defined 6 column

imx7 commented 1 year ago

根据之前报的错误,写入时候需要添加 DORIS_DELETE_SIGN这一列。

amorynan commented 1 year ago

can u add me with wechat? ID:amorynan