bytedance / bitsail

BitSail is a distributed high-performance data integration engine which supports batch, streaming and incremental scenarios. BitSail is widely used to synchronize hundreds of trillions of data every day.
https://bytedance.github.io/bitsail/
Apache License 2.0
1.62k stars 331 forks source link

[Bug][Connector] KafkaConnector not registered #497

Closed lfyzjck closed 10 months ago

lfyzjck commented 10 months ago

What happened

When run hive2kafka job, bitsail raise following exceptions:

2023-11-28 11:58:44,855 WARN  com.bytedance.bitsail.base.packages.PluginStore              [] - Class com.bytedance.bitsail.connector.kafka.sink.KafkaSink not register in mapping file.
2023-11-28 11:58:44,855 WARN  com.bytedance.bitsail.base.packages.PluginStore              [] - Class com.bytedance.bitsail.connector.kafka.sink.KafkaSink not register in mapping file.
2023-11-28 11:58:44,855 ERROR com.bytedance.bitsail.core.flink116.bridge.program.FlinkDAGBuilderFactory [] - failed to create writer DAG builder
2023-11-28 11:58:44,857 ERROR com.bytedance.bitsail.core.Engine                            [] -

The cause of the job failure maybe due to:
java.lang.RuntimeException: com.bytedance.bitsail.common.BitSailException: PLUGIN_NOT_FOUND_ERROR - The class com.bytedance.bitsail.connector.kafka.sink.KafkaSink not exists in plugin store.
    at com.bytedance.bitsail.core.flink116.bridge.program.FlinkDAGBuilderFactory.lambda$getDataWriterDAGBuilders$1(FlinkDAGBuilderFactory.java:102)
    at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
    at java.util.Spliterators$ArraySpliterator.forEachRemaining(Spliterators.java:948)
    at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)
    at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
    at java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708)
    at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
    at java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:566)
    at com.bytedance.bitsail.core.flink116.bridge.program.FlinkDAGBuilderFactory.getDataWriterDAGBuilders(FlinkDAGBuilderFactory.java:105)
    at com.bytedance.bitsail.core.api.program.UnifiedProgram.prepare(UnifiedProgram.java:105)
    at com.bytedance.bitsail.core.api.program.UnifiedProgram.configure(UnifiedProgram.java:88)
    at com.bytedance.bitsail.core.Engine.run(Engine.java:93)
    at com.bytedance.bitsail.core.Engine.start(Engine.java:65)
    at com.bytedance.bitsail.core.Engine.main(Engine.java:55)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355)
    at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222)
    at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:98)
    at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:843)
    at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:240)
    at org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1087)
    at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1165)
    at java.security.AccessController.doPrivileged(Native Method)
    at javax.security.auth.Subject.doAs(Subject.java:422)
    at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1772)
    at org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
    at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1165)
Caused by: com.bytedance.bitsail.common.BitSailException: PLUGIN_NOT_FOUND_ERROR - The class com.bytedance.bitsail.connector.kafka.sink.KafkaSink not exists in plugin store.
    at com.bytedance.bitsail.common.BitSailException.asBitSailException(BitSailException.java:46)
    at com.bytedance.bitsail.base.packages.LocalFSPluginFinder.loadPlugin(LocalFSPluginFinder.java:116)
    at com.bytedance.bitsail.base.packages.LocalFSPluginFinder.findPluginInstance(LocalFSPluginFinder.java:101)
    at com.bytedance.bitsail.base.packages.LocalFSPluginFinder.findPluginInstance(LocalFSPluginFinder.java:84)
    at com.bytedance.bitsail.core.flink116.bridge.program.FlinkDAGBuilderFactory.construct(FlinkDAGBuilderFactory.java:166)
    at com.bytedance.bitsail.core.flink116.bridge.program.FlinkDAGBuilderFactory.getDataWriterDAGBuilder(FlinkDAGBuilderFactory.java:114)
    at com.bytedance.bitsail.core.flink116.bridge.program.FlinkDAGBuilderFactory.lambda$getDataWriterDAGBuilders$1(FlinkDAGBuilderFactory.java:99)
    ... 29 more

What do you expect to happen

No response

How to reproduce

job config:

{
  "job": {
    "common": {
      "job_name": "hive2kafka_test",
      "user_name": "",
      "optional": {
    "batch.size": 10000,
    "buffer.memory": 67108864
      }
    },
    "reader": {
      "class": "com.bytedance.bitsail.connector.legacy.hive.source.HiveInputFormat",
      "db_name": "db1",
      "table_name": "tbl1",
      "metastore_properties": {
        "hive.metastore.uris": ""
      },
      "partition": "p_date=2023-11-22",
      "format_type": "json",
      "columns": [
        { "index": 0, "name": "job_id", "type": "bigint" }
      ]
    },
    "writer": {
      "class": "com.bytedance.bitsail.connector.kafka.sink.KafkaSink",
      "kafka_servers": "...",
      "topic_name": "hive2kafka-test",
      "columns": [
        { "index": 0, "name": "job_id", "type": "BIGINT" }
      ]
    }
  }
}

Build Environment

No response

Execution Environment

No response

BitSail version

master

BitSail Component or Code Module

BitSail Connector

Are you willing to submit PR?

Code of Conduct