apache / seatunnel

SeaTunnel is a next-generation super high-performance, distributed, massive data integration tool.
https://seatunnel.apache.org/
Apache License 2.0
7.82k stars 1.76k forks source link

[Bug] [Connector-V2] [connector-kafka] [KafkaSourceConfig] The following content cannot be set in the config configuration file to consume different tables in Kafka: database.include or table.include. #6870

Closed KelvinChi closed 2 months ago

KelvinChi commented 4 months ago

Search before asking

What happened

I am using OGG to write change data of two tables into the same Kafka Topic. By configuring ogg_json.table.include in the configuration file, I cannot achieve seatunnel to consume the corresponding tables in Kafka, and there will be errors reporting that the primary key cannot be null.

SeaTunnel Version

2.3.5

SeaTunnel Config

env {
  parallelism = 2
  job.mode = "streaming"
  checkpoint.interval = 3000
}

source {
  Kafka {
    bootstrap.servers = "xx.xx.xx.xx:9092,xx.xx.xx.xx:9092,xx.xx.xx.xx:9092"
    topic = "xx_topic"
    consumer.group = "xx_scandata"
    format = ogg_json
    ogg_json.database.include = "XX_XXXX"
    ogg_json.table.include = "SCANDATA"
    schema = {
      fields {
            TE_IMES07_SCANDATA_ID = "BIGINT"
            ORDER_NO = "STRING"
            VIN = "STRING"
            MATERIAL_NO = "STRING"
            PLANT_NO = "STRING"
            TPS = "STRING"
            ULOC_ID = "STRING"
            QUAN = "DECIMAL(38, 18)"
            SCAN_TIME = "DATE"
      }
    }
  }
}

sink {
   StarRocks {
     base-url = "jdbc:mysql://xx.xx.xx.xx:9030"
     nodeUrls = ["xx.xx.xx.xx:8030", "xx.xx.xx.xx:8030", "xx.xx.xx.xx:8030"]
     username = "root"
     password = "xxxxxxxx"
     database = "test"
     table = "SCANDATA"
     enable_upsert_delete = true
   }
}

Running Command

bin/seatunnel.sh -c scandata_streaming.config

Error Exception

logs/seatunnel-engine-server.log.2024-05-17-1:Error: NULL value in non-nullable column 'TE_IMES07_SCANDATA_ID'. Row: [NULL, NULL, NULL, 'S0BEFCL6F200400338', '5001', NULL, NULL, NULL, NULL]

Zeta or Flink or Spark Version

Zeta

Java or Scala Version

Java

Screenshots

No response

Are you willing to submit PR?

Code of Conduct

github-actions[bot] commented 3 months ago

This issue has been automatically marked as stale because it has not had recent activity for 30 days. It will be closed in next 7 days if no further activity occurs.

github-actions[bot] commented 2 months ago

This issue has been closed because it has not received response for too long time. You could reopen it if you encountered similar problems in the future.