apache / hudi

Upserts, Deletes And Incremental Processing on Big Data.
https://hudi.apache.org/
Apache License 2.0
5.42k stars 2.43k forks source link

[SUPPORT]Code pending on writing data to S3 using Flink datastream API,and the target path is empty. #7953

Open longjuncai1994 opened 1 year ago

longjuncai1994 commented 1 year ago

Tips before filing an issue

Describe the problem you faced

I want to write data to S3 on my local IDE,and set AWS CREDENTIALS in my codes (i found the configurations in source code).The configurations seems like worke well,because it did not throw exception any more. But my code is pending after log info "s3a-file-system metrics system started"

val source =
      env.generateSequence(1,10)
      .map(d=>{
        i += 1
        val rowData = new GenericRowData(4)
        rowData.setField(0,StringData.fromString(i+""))
        rowData.setField(1,StringData.fromString(d+""))
        rowData.setField(2,StringData.fromString("p1"))
        rowData.setField(3,TimestampData.fromEpochMillis(1676427048588L))
        println("data:"+rowData.toString)
        rowData.asInstanceOf[RowData]
      }).javaStream

      val path = "s3a://test/hudi"
      val table = "huditable"
      val options = new util.HashMap[String,String]()
      options.put(FlinkOptions.PATH.key, path)
      options.put(FlinkOptions.TABLE_TYPE.key, HoodieTableType.MERGE_ON_READ.name)
      options.put("fs.defaultFS","s3a://test")
      options.put("hadoop.fs.s3a.access.key","mykey")
      options.put("hadoop.fs.s3a.secret.key","mykey")
      options.put("hadoop.fs.s3a.endpoint","myendpoint")
      options.put("hadoop.fs.s3a.region","myregion")

      val builder = HoodiePipeline.builder(table)
        .column("uuid VARCHAR(20)")
        .column("content VARCHAR(255)")
        .column("ps VARCHAR(20)")
        .column("ts TIMESTAMP(3)")
        .pk("uuid")
        .partition("ps")
        .options(options)
      builder.sink(source,false)
      env.execute()

as you can see,I print recodes in map function,but the stoud is empty,so there was no datas processed.

And then,the codes did not throw exceptions,But pending at here:

[INFO ] 2023-02-15 10:22:01,098:Registering TaskManager with ResourceID e8ee6bfd-f75d-4486-9b7a-45fdf0befd68 (akka://flink/user/rpc/taskmanager_0) at ResourceManager
[INFO ] 2023-02-15 10:22:01,100:Successful registration at resource manager akka://flink/user/rpc/resourcemanager_1 under registration id 7f51c4797923db47714b2ca1ab8d84ab.
[INFO ] 2023-02-15 10:22:01,100:Received JobGraph submission 'Flink Streaming Job' (f9929ff372b45550c0560a998e1a7041).
[INFO ] 2023-02-15 10:22:01,101:Submitting job 'Flink Streaming Job' (f9929ff372b45550c0560a998e1a7041).
[INFO ] 2023-02-15 10:22:01,116:Proposing leadership to contender LeaderContender: JobMasterServiceLeadershipRunner
[INFO ] 2023-02-15 10:22:01,128:Starting RPC endpoint for org.apache.flink.runtime.jobmaster.JobMaster at akka://flink/user/rpc/jobmanager_3 .
[INFO ] 2023-02-15 10:22:01,134:Initializing job 'Flink Streaming Job' (f9929ff372b45550c0560a998e1a7041).
[INFO ] 2023-02-15 10:22:01,154:Using restart back off time strategy NoRestartBackoffTimeStrategy for Flink Streaming Job (f9929ff372b45550c0560a998e1a7041).
[INFO ] 2023-02-15 10:22:01,196:Running initialization on master for job Flink Streaming Job (f9929ff372b45550c0560a998e1a7041).
[INFO ] 2023-02-15 10:22:01,196:Successfully ran initialization on master in 0 ms.
[INFO ] 2023-02-15 10:22:01,292:Built 1 new pipelined regions in 1 ms, total 1 pipelined regions currently.
[INFO ] 2023-02-15 10:22:01,300:No state backend has been configured, using default (HashMap) org.apache.flink.runtime.state.hashmap.HashMapStateBackend@6f4f4b24
[INFO ] 2023-02-15 10:22:01,300:State backend loader loads the state backend as HashMapStateBackend
[INFO ] 2023-02-15 10:22:01,303:Checkpoint storage is set to 'jobmanager'
[INFO ] 2023-02-15 10:22:01,339:No checkpoint found during restore.
[INFO ] 2023-02-15 10:22:01,346:Using failover strategy org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionFailoverStrategy@309d741 for Flink Streaming Job (f9929ff372b45550c0560a998e1a7041).
[INFO ] 2023-02-15 10:22:01,354:Received confirmation of leadership for leader akka://flink/user/rpc/jobmanager_3 , session=9934221e-e0be-4027-a8ce-975fedab52c5
[INFO ] 2023-02-15 10:22:01,357:Starting execution of job 'Flink Streaming Job' (f9929ff372b45550c0560a998e1a7041) under job master id a8ce975fedab52c59934221ee0be4027.
[WARN ] 2023-02-15 10:22:13,961:Cannot locate configuration: tried hadoop-metrics2-s3a-file-system.properties,hadoop-metrics2.properties
[INFO ] 2023-02-15 10:22:13,972:Scheduled Metric snapshot period at 10 second(s).
[INFO ] 2023-02-15 10:22:13,972:s3a-file-system metrics system started

I have tried to use Spark to write data like this,it worked well.

So what is problem with my Flink codes.

I didn't found more examples on Hudi doc using Flink datastream api.

To Reproduce

Steps to reproduce the behavior:

1.generate the records

2.set congigurations

3.sink to s3

Environment Description

Additional context

Stacktrace

yihua commented 1 year ago

@danny0405 could you help here on the Hudi Flink setup?

danny0405 commented 1 year ago

@longjuncai1994 Did you enable the checkpoint? Did you use batch execution mode?