apache / seatunnel

SeaTunnel is a next-generation super high-performance, distributed, massive data integration tool.
https://seatunnel.apache.org/
Apache License 2.0
8.1k stars 1.84k forks source link

[Bug] [DolphinSchduler3.2.1 调度 seatunel2.3.8 时好时坏] null point exception #8189

Open yaoquanxin opened 2 days ago

yaoquanxin commented 2 days ago

Search before asking

What happened

同步数据从postgresql 到 starrocks3.2.10. 使用海豚3.2.1 调度seatunnel节点,报错: 这种情况上周出现过好几次,即在local模式下,同一个sseatunnel 节点时好时坏.

Caused by: org.apache.seatunnel.engine.common.exception.JobException: org.apache.seatunnel.common.exception.SeaTunnelRuntimeException: ErrorCode:[API-09], ErrorDescription:[Handle save mode failed] at org.apache.seatunnel.engine.server.master.JobMaster.handleSaveMode(JobMaster.java:377) at java.base/java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:183) at java.base/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195) at java.base/java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:177) at java.base/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195) at java.base/java.util.Iterator.forEachRemaining(Iterator.java:133) at java.base/java.util.Spliterators$IteratorSpliterator.forEachRemaining(Spliterators.java:1801) at java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:484) at java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:474) at java.base/java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:150) at java.base/java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:173) at java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) at java.base/java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:497) at org.apache.seatunnel.engine.server.master.JobMaster.init(JobMaster.java:242) at org.apache.seatunnel.engine.server.CoordinatorService.lambda$submitJob$4(CoordinatorService.java:499) at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) at java.base/java.lang.Thread.run(Thread.java:829) Caused by: java.lang.NullPointerException at org.apache.seatunnel.connectors.seatunnel.starrocks.catalog.StarRocksCatalog.tableExists(StarRocksCatalog.java:478) at org.apache.seatunnel.api.sink.DefaultSaveModeHandler.tableExists(DefaultSaveModeHandler.java:138) at org.apache.seatunnel.api.sink.DefaultSaveModeHandler.createSchemaWhenNotExist(DefaultSaveModeHandler.java:107) at org.apache.seatunnel.api.sink.DefaultSaveModeHandler.handleSchemaSaveMode(DefaultSaveModeHandler.java:69) at org.apache.seatunnel.api.sink.SaveModeHandler.handleSaveMode(SaveModeHandler.java:38) at org.apache.seatunnel.api.sink.SaveModeExecuteWrapper.execute(SaveModeExecuteWrapper.java:36) at org.apache.seatunnel.engine.server.master.JobMaster.handleSaveMode(JobMaster.java:375) ... 19 more Suppressed: java.lang.NullPointerException at org.apache.seatunnel.connectors.seatunnel.starrocks.catalog.StarRocksCatalog.close(StarRocksCatalog.java:435) at org.apache.seatunnel.api.sink.DefaultSaveModeHandler.close(DefaultSaveModeHandler.java:229) at org.apache.seatunnel.engine.server.master.JobMaster.handleSaveMode(JobMaster.java:374) ... 19 more

    at org.apache.seatunnel.engine.server.CoordinatorService.lambda$submitJob$4(CoordinatorService.java:506)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
    at java.util.concurrent.FutureTask.run(FutureTask.java:264)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
    at java.lang.Thread.run(Thread.java:829)

SeaTunnel Version

2.3.8

SeaTunnel Config

env {
  parallelism = 4
  job.mode = "BATCH"
}

source {
    Jdbc {
        url = "xxx"
        driver = "org.postgresql.Driver"
        user = "yqx"
        password = "xxx"
        query = """
          select date_trunc('day', a.modify_on) book_date, a.code, 
          max(a.op_org_code) org_code, 
          sum(COALESCE(c.face_value, 0)) face_value
          from hy_card_op_order a, hy_card_op_order_detail b, hy_card c
          where a.id = b.card_op_order_id and b.cardno = c.cardno
          group by date_trunc('day', a.modify_on), a.code;
        """
        partition_num = 5
    }
}

transform {}

sink {
  starrocks {
    nodeUrls = ["xxx"]
    base-url = "jdbc:mysql://172.16.0.170:9030/"
    username = root
    password = "xxx("
    database = "test"
    table = "seatunel_join_table"
    batch_max_rows = 8000
  }
}

Running Command

no, use dolphinScheduler do it.

Error Exception

Caused by: org.apache.seatunnel.engine.common.exception.JobException: org.apache.seatunnel.common.exception.SeaTunnelRuntimeException: ErrorCode:[API-09], ErrorDescription:[Handle save mode failed]
        at org.apache.seatunnel.engine.server.master.JobMaster.handleSaveMode(JobMaster.java:377)
        at java.base/java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:183)
        at java.base/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195)
        at java.base/java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:177)
        at java.base/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195)
        at java.base/java.util.Iterator.forEachRemaining(Iterator.java:133)
        at java.base/java.util.Spliterators$IteratorSpliterator.forEachRemaining(Spliterators.java:1801)
        at java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:484)
        at java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:474)
        at java.base/java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:150)
        at java.base/java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:173)
        at java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
        at java.base/java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:497)
        at org.apache.seatunnel.engine.server.master.JobMaster.init(JobMaster.java:242)
        at org.apache.seatunnel.engine.server.CoordinatorService.lambda$submitJob$4(CoordinatorService.java:499)
        at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
        at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
        at java.base/java.lang.Thread.run(Thread.java:829)
    Caused by: java.lang.NullPointerException
        at org.apache.seatunnel.connectors.seatunnel.starrocks.catalog.StarRocksCatalog.tableExists(StarRocksCatalog.java:478)
        at org.apache.seatunnel.api.sink.DefaultSaveModeHandler.tableExists(DefaultSaveModeHandler.java:138)
        at org.apache.seatunnel.api.sink.DefaultSaveModeHandler.createSchemaWhenNotExist(DefaultSaveModeHandler.java:107)
        at org.apache.seatunnel.api.sink.DefaultSaveModeHandler.handleSchemaSaveMode(DefaultSaveModeHandler.java:69)
        at org.apache.seatunnel.api.sink.SaveModeHandler.handleSaveMode(SaveModeHandler.java:38)
        at org.apache.seatunnel.api.sink.SaveModeExecuteWrapper.execute(SaveModeExecuteWrapper.java:36)
        at org.apache.seatunnel.engine.server.master.JobMaster.handleSaveMode(JobMaster.java:375)
        ... 19 more
        Suppressed: java.lang.NullPointerException
            at org.apache.seatunnel.connectors.seatunnel.starrocks.catalog.StarRocksCatalog.close(StarRocksCatalog.java:435)
            at org.apache.seatunnel.api.sink.DefaultSaveModeHandler.close(DefaultSaveModeHandler.java:229)
            at org.apache.seatunnel.engine.server.master.JobMaster.handleSaveMode(JobMaster.java:374)
            ... 19 more

        at org.apache.seatunnel.engine.server.CoordinatorService.lambda$submitJob$4(CoordinatorService.java:506)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
        at java.util.concurrent.FutureTask.run(FutureTask.java:264)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
        at java.lang.Thread.run(Thread.java:829)

Zeta or Flink or Spark Version

Zeta Engine

Java or Scala Version

open jdk11

Screenshots

image image

Are you willing to submit PR?

Code of Conduct

yaoquanxin commented 2 days ago

have some big brother help me? if you give me some guide , may be I can fix the bug for commity.

liunaijie commented 2 days ago

Looks it a bug, StarRocks Jdbc Connection is not init. You can check the code in this class StarRocksCatalog cc @Hisoka-X

yaoquanxin commented 22 hours ago

Looks it a bug, StarRocks Jdbc Connection is not init. You can check the code in this class StarRocksCatalog cc @Hisoka-X

have some idea about it,my bro ? that sound I can do some Contribution for our seatunnel community. if you want, welecome to Communication use wechat, my wx: 18770170039