apache / seatunnel

SeaTunnel is a next-generation super high-performance, distributed, massive data integration tool.
https://seatunnel.apache.org/
Apache License 2.0
7.86k stars 1.77k forks source link

[Bug] [sink clickhouse] Apache seatunnel version 2.3.7. Is there a version requirement for sink clickhouse? #7743

Open wuwu20048 opened 5 days ago

wuwu20048 commented 5 days ago

Search before asking

What happened

kafka import to the clickhouse reported an error . is the clickhouse version too old ? the version is 20.3.19.4

SeaTunnel Version

2.3.7

SeaTunnel Config

# Defining the runtime environment
env {
  parallelism = 2
  job.mode = "STREAMING"
  job.log_level = "DEBUG"

}
source {
  Kafka {
    schema = {
      fields {
        beanName = "string"
        method = "string"
        serverId = "string"
        serverIp = "string"
        domain = "string"
        logDate = "string"
        ceateDate = "string"
        ipAddress = "string"
        urt = "int"
        referer = "string"
        uri = "string"
        region = "string"
        clientId = "int"
        entrance = "string"
        source = "string"
        contentMd5 = "string"
        timeCost = "int"
        exceptionCode = "string"
        contentLength = "int"
        resultLength = "int"
        host = "string"
      }
    }

    format = json
    topic = "gateway_logs"
    bootstrap.servers = "172.16.22.200:9092"
    consumer.group = "test"
    result_table_name="gateway_logs"
    kafka.config = {
      client.id = client_1
      max.poll.records = 500
      auto.offset.reset = "latest"
      enable.auto.commit = "true"
    }
  }
}
sink {
      Clickhouse {
          host = "172.22.1.73:8123"
          database = "default"
          table = "logs_local"
          username = "default"
          password = "XXXXXX"
          bulk_size = 1000
          source_table_name = "gateway_logs"
          clickhouse.config = {
                max_rows_to_read = "100"
                read_overflow_mode = "throw"
              }
        }
}

Running Command

./bin/seatunnel.sh --config ./config/kafka_ck.template -m local

Error Exception

024-09-25 17:33:12,238 INFO  [c.h.c.LifecycleService        ] [main] - hz.client_1 [seatunnel-997458] [5.1] HazelcastClient 5.1 (20220228 - 21f20e7) is SHUTDOWN
2024-09-25 17:33:12,238 INFO  [s.c.s.s.c.ClientExecuteCommand] [main] - Closed SeaTunnel client......
2024-09-25 17:33:12,238 INFO  [c.h.c.LifecycleService        ] [main] - [localhost]:5801 [seatunnel-997458] [5.1] [localhost]:5801 is SHUTTING_DOWN
2024-09-25 17:33:12,242 INFO  [c.h.i.p.i.MigrationManager    ] [hz.main.cached.thread-14] - [localhost]:5801 [seatunnel-997458] [5.1] Shutdown request of Member [localhost]:5801 - 3f92f308-4697-43a5-9ff3-caa86572cf5c this master is handled
2024-09-25 17:33:12,246 INFO  [c.h.i.i.Node                  ] [main] - [localhost]:5801 [seatunnel-997458] [5.1] Shutting down connection manager...
2024-09-25 17:33:12,248 INFO  [c.h.i.i.Node                  ] [main] - [localhost]:5801 [seatunnel-997458] [5.1] Shutting down node engine...
2024-09-25 17:33:12,253 INFO  [.c.c.DefaultClassLoaderService] [main] - close classloader service
2024-09-25 17:33:12,254 INFO  [o.a.s.e.s.TaskExecutionService] [event-forwarder-0] - [localhost]:5801 [seatunnel-997458] [5.1] Event forward thread interrupted
2024-09-25 17:33:15,262 INFO  [c.h.i.i.NodeExtension         ] [main] - [localhost]:5801 [seatunnel-997458] [5.1] Destroying node NodeExtension.
2024-09-25 17:33:15,263 INFO  [c.h.i.i.Node                  ] [main] - [localhost]:5801 [seatunnel-997458] [5.1] Hazelcast Shutdown is completed in 3022 ms.
2024-09-25 17:33:15,263 INFO  [c.h.c.LifecycleService        ] [main] - [localhost]:5801 [seatunnel-997458] [5.1] [localhost]:5801 is SHUTDOWN
2024-09-25 17:33:15,263 INFO  [s.c.s.s.c.ClientExecuteCommand] [main] - Closed HazelcastInstance ......
2024-09-25 17:33:15,263 ERROR [o.a.s.c.s.SeaTunnel           ] [main] -

===============================================================================

2024-09-25 17:33:15,263 ERROR [o.a.s.c.s.SeaTunnel           ] [main] - Fatal Error,

2024-09-25 17:33:15,263 ERROR [o.a.s.c.s.SeaTunnel           ] [main] - Please submit bug report in https://github.com/apache/seatunnel/issues

2024-09-25 17:33:15,263 ERROR [o.a.s.c.s.SeaTunnel           ] [main] - Reason:SeaTunnel job executed failed

2024-09-25 17:33:15,267 ERROR [o.a.s.c.s.SeaTunnel           ] [main] - Exception StackTrace:org.apache.seatunnel.core.starter.exception.CommandExecuteException: SeaTunnel job executed failed
    at org.apache.seatunnel.core.starter.seatunnel.command.ClientExecuteCommand.execute(ClientExecuteCommand.java:211)
    at org.apache.seatunnel.core.starter.SeaTunnel.run(SeaTunnel.java:40)
    at org.apache.seatunnel.core.starter.seatunnel.SeaTunnelClient.main(SeaTunnelClient.java:34)
Caused by: java.lang.ArrayIndexOutOfBoundsException: 2
    at com.clickhouse.client.data.ClickHouseSimpleRecord.getValue(ClickHouseSimpleRecord.java:86)
    at org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.client.ClickhouseProxy.lambda$getClickhouseTableSchema$2(ClickhouseProxy.java:144)
    at java.lang.Iterable.forEach(Iterable.java:75)
    at org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.client.ClickhouseProxy.getClickhouseTableSchema(ClickhouseProxy.java:142)
    at org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.client.ClickhouseProxy.getClickhouseTableSchema(ClickhouseProxy.java:133)
    at org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.client.ClickhouseSink.prepare(ClickhouseSink.java:145)
    at org.apache.seatunnel.engine.core.parse.JobConfigParser.parseSink(JobConfigParser.java:167)
    at org.apache.seatunnel.engine.core.parse.JobConfigParser.parseSinks(JobConfigParser.java:137)
    at org.apache.seatunnel.engine.core.parse.MultipleTableJobConfigParser.parseSink(MultipleTableJobConfigParser.java:560)
    at org.apache.seatunnel.engine.core.parse.MultipleTableJobConfigParser.parse(MultipleTableJobConfigParser.java:221)
    at org.apache.seatunnel.engine.client.job.ClientJobExecutionEnvironment.getLogicalDag(ClientJobExecutionEnvironment.java:114)
    at org.apache.seatunnel.engine.client.job.ClientJobExecutionEnvironment.execute(ClientJobExecutionEnvironment.java:182)
    at org.apache.seatunnel.core.starter.seatunnel.command.ClientExecuteCommand.execute(ClientExecuteCommand.java:158)
    ... 2 more

2024-09-25 17:33:15,267 ERROR [o.a.s.c.s.SeaTunnel           ] [main] -
===============================================================================

Exception in thread "main" org.apache.seatunnel.core.starter.exception.CommandExecuteException: SeaTunnel job executed failed
    at org.apache.seatunnel.core.starter.seatunnel.command.ClientExecuteCommand.execute(ClientExecuteCommand.java:211)
    at org.apache.seatunnel.core.starter.SeaTunnel.run(SeaTunnel.java:40)
    at org.apache.seatunnel.core.starter.seatunnel.SeaTunnelClient.main(SeaTunnelClient.java:34)
Caused by: java.lang.ArrayIndexOutOfBoundsException: 2
    at com.clickhouse.client.data.ClickHouseSimpleRecord.getValue(ClickHouseSimpleRecord.java:86)
    at org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.client.ClickhouseProxy.lambda$getClickhouseTableSchema$2(ClickhouseProxy.java:144)
    at java.lang.Iterable.forEach(Iterable.java:75)
    at org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.client.ClickhouseProxy.getClickhouseTableSchema(ClickhouseProxy.java:142)
    at org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.client.ClickhouseProxy.getClickhouseTableSchema(ClickhouseProxy.java:133)
    at org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.client.ClickhouseSink.prepare(ClickhouseSink.java:145)
    at org.apache.seatunnel.engine.core.parse.JobConfigParser.parseSink(JobConfigParser.java:167)
    at org.apache.seatunnel.engine.core.parse.JobConfigParser.parseSinks(JobConfigParser.java:137)
    at org.apache.seatunnel.engine.core.parse.MultipleTableJobConfigParser.parseSink(MultipleTableJobConfigParser.java:560)
    at org.apache.seatunnel.engine.core.parse.MultipleTableJobConfigParser.parse(MultipleTableJobConfigParser.java:221)
    at org.apache.seatunnel.engine.client.job.ClientJobExecutionEnvironment.getLogicalDag(ClientJobExecutionEnvironment.java:114)
    at org.apache.seatunnel.engine.client.job.ClientJobExecutionEnvironment.execute(ClientJobExecutionEnvironment.java:182)
    at org.apache.seatunnel.core.starter.seatunnel.command.ClientExecuteCommand.execute(ClientExecuteCommand.java:158)
    ... 2 more
➜  apache-seatunnel-2.3.7

Zeta or Flink or Spark Version

No response

Java or Scala Version

jdk 1.8

Screenshots

image

Are you willing to submit PR?

Code of Conduct

YuriyGavrilov commented 4 days ago

Hi @wuwu20048 This works for me on seatunnel v 2.3.8. java Azul 17

CREATE TABLE default.sink_table
(
     id int, name String, out String
)
ENGINE = MergeTree ORDER BY id

# Set the basic configuration of the task to be performed
env {
  parallelism = 1
  job.mode = "BATCH"
}

source {
  FakeSource {
    row.num = 6
    schema = {
      fields {
        id = "int"
        name = "string"
        out = "string"
      }
    }
    rows = [
      {fields = [1, "Jia Fan", "ffrrr"], kind = INSERT}
      {fields = [2, "Hailin Wang", "ddd"], kind = INSERT}
      {fields = [3, "Tomas", "vfs"], kind = INSERT}
      {fields = [4, "Eric", "kfkd"], kind = INSERT}
      {fields = [5, "Guangdong Liu", "pew"], kind = INSERT}
      {fields = [6, "Юрий Гаврилов", "szs"], kind = INSERT}
    ]
  }
}

#transform {

#}

sink {

Clickhouse {
          host = "some-clickhouse-server:8123"
          database = "default"
          table = "sink_table"
          username = ""
          password = ""
          bulk_size = 1000
       #  source_table_name = "gateway_logs"
          clickhouse.config = {
                max_rows_to_read = "100"
                read_overflow_mode = "throw"
              }
        }

}
YuriyGavrilov commented 4 days ago

@wuwu20048 This also working

maybe problem in data types or schema or schema registry ( try to use if it exist )

Снимок экрана 2024-09-25 в 23 12 21 Снимок экрана 2024-09-25 в 23 12 47
env {
  # You can set SeaTunnel environment configuration here
  parallelism = 2
  job.mode = "STREAMING"
  checkpoint.interval = 2000
}

source {
  # This is a example source plugin **only for test and demonstrate the feature source plugin**
  FakeSource {
    parallelism = 2
    result_table_name = "fake"
    row.num = 16
    schema = {
      fields {
        id = "int"
        name = "string"
        out = "string"
      }
    }
  }

  # If you would like to get more information about how to configure SeaTunnel and see full list of source plugins,
  # please go to https://seatunnel.apache.org/docs/connector-v2/source
}

#transform {

#}

sink {

Clickhouse {
          host = "some-clickhouse-server:8123"
          database = "default"
          table = "sink_table"
          username = ""
          password = ""
          bulk_size = 1000
       #  source_table_name = "gateway_logs"
          clickhouse.config = {
                max_rows_to_read = "100"
                read_overflow_mode = "throw"
              }
        }

}
wuwu20048 commented 1 day ago

The problem is that when I use locally newer clickhouse, the version is 24.1.1.2048. The above configuration can be imported normally. The clickhouse version on the server is lower than 20.3.19.4, but the table structure definition is the same as my local. create table default.logs_local ( beanName String, method String, serverId Nullable(String), serverIp Nullable(String), domain String, logDate DateTime, ceateDate DateTime, ipAddress Nullable(String), urt Nullable(Int32), referer Nullable(String), uri Nullable(String), region Nullable(String), clientId Nullable(Int32), entrance Nullable(String), source Nullable(String), contentMd5 Nullable(String), timeCost Int32, exceptionCode Nullable(String), contentLength Nullable(Int32), resultLength Nullable(Int32), host Nullable(String) ) engine = MergeTree PARTITION BY toYYYYMMDD(logDate) ORDER BY (logDate, beanName, method)