confluentinc / kafka-connect-elasticsearch

Kafka Connect Elasticsearch connector
Other
11 stars 435 forks source link

Can't create a connector even if its loaded in Strimzi #730

Open victorcasignia opened 10 months ago

victorcasignia commented 10 months ago

Deployed a Kafka cluster using strimzi on the cloud.

Used this KafkaConnect config to get the elasticsearch plugin files.

apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnect
metadata:
  name: es-connect-cluster
  annotations:
    strimzi.io/use-connector-resources: "true"
spec:
  version: 3.6.0
  replicas: 1
  bootstrapServers: debezium-cluster-kafka-bootstrap.debezium.svc.cluster.local:9092
  config:
    group.id: connect-cluster
    offset.storage.topic: connect-cluster-offsets
    config.storage.topic: connect-cluster-configs
    status.storage.topic: connect-cluster-status
    config.storage.replication.factor: -1
    offset.storage.replication.factor: -1
    status.storage.replication.factor: -1
  build: 
    output: 
      type: docker
      image: registry/elasticsearch-kafka-connect:latest
      pushSecret: es-connect-secret
    plugins: 
      - name: elasticsearch-connector
        artifacts:
            - type: maven
              repository: https://packages.confluent.io/maven
              group: io.confluent
              artifact: kafka-connect-elasticsearch
              version: 14.0.3
  template:
    pod:
      imagePullSecrets:
        - name: es-connect-secret

The created connect clusters contains the jar files needed for elasticsearch connect. I can also see the plugin loaded using the REST API.

[kafka@es-connect-cluster-connect-0 kafka]$ curl localhost:8083/connector-plugins
[{"class":"io.confluent.connect.elasticsearch.ElasticsearchSinkConnector","type":"sink","version":"14.0.3"},{"class":"org.apache.kafka.connect.integration.BlockingConnectorTest$BlockingSinkConnector","type":"sink","version":"0.0.0"},{"class":"org.apache.kafka.connect.integration.BlockingConnectorTest$TaskInitializeBlockingSinkConnector","type":"sink","version":"0.0.0"},{"class":"org.apache.kafka.connect.integration.ErrantRecordSinkConnector","type":"sink","version":"some great version"},{"class":"org.apache.kafka.connect.integration.MonitorableSinkConnector","type":"sink","version":"some great version"},{"class":"org.apache.kafka.connect.runtime.TestSinkConnector","type":"sink","version":"some great version"},{"class":"org.apache.kafka.connect.tools.MockSinkConnector","type":"sink","version":"3.6.0"},{"class":"org.apache.kafka.connect.integration.BlockingConnectorTest$BlockingConnector","type":"source","version":"0.0.0"},{"class":"org.apache.kafka.connect.integration.BlockingConnectorTest$BlockingSourceConnector","type":"source","version":"0.0.0"},{"class":"org.apache.kafka.connect.integration.BlockingConnectorTest$ConfigBlockingConnector","type":"source","version":"0.0.0"},{"class":"org.apache.kafka.connect.integration.BlockingConnectorTest$InitializeBlockingConnector","type":"source","version":"0.0.0"},{"class":"org.apache.kafka.connect.integration.BlockingConnectorTest$TaskInitializeBlockingSourceConnector","type":"source","version":"0.0.0"},{"class":"org.apache.kafka.connect.integration.BlockingConnectorTest$ValidateBlockingConnector","type":"source","version":"0.0.0"},{"class":"org.apache.kafka.connect.integration.MonitorableSourceConnector","type":"source","version":"an entirely different version"},{"class":"org.apache.kafka.connect.mirror.MirrorCheckpointConnector","type":"source","version":"3.6.0"},{"class":"org.apache.kafka.connect.mirror.MirrorHeartbeatConnector","type":"source","version":"3.6.0"},{"class":"org.apache.kafka.connect.mirror.MirrorSourceConnector","type":"source","version":"3.6.0"},{"class":"org.apache.kafka.connect.runtime.TestSourceConnector","type":"source","version":"an entirely different version"},{"class":"org.apache.kafka.connect.runtime.WorkerTest$WorkerTestConnector","type":"source","version":"1.0"},{"class":"org.apache.kafka.connect.runtime.WorkerWithTopicCreationTest$WorkerTestConnector","type":"source","version":"1.0"},{"class":"org.apache.kafka.connect.tools.MockSourceConnector","type":"source","version":"3.6.0"},{"class":"org.apache.kafka.connect.tools.SchemaSourceConnector","type":"source","version":"3.6.0"},{"class":"org.apache.kafka.connect.tools.VerifiableSinkConnector","type":"source","version":"3.6.0"},{"class":"org.apache.kafka.connect.tools.VerifiableSourceConnector","type":"source","version":"3.6.0"}]

But whenever I create the Kafka Connector class. I get this error:

org.apache.kafka.connect.errors.ConnectException: Failed to find any class that implements Connector and which name matches io.confluent.connect.elasticsearch.ElasticsearchSinkConnector, available connectors are: PluginDesc{klass=class org.apache.kafka.connect.mirror.MirrorCheckpointConnector, name='org.apache.kafka.connect.mirror.MirrorCheckpointConnector', version='3.6.0', encodedVersion=3.6.0, type=sourcHeartbeatConnector, name='org.apache.kafka.connect.mirror.MirrorHeartbeatConnector', version='3.6.0', encodedVersion=3.6.0, type=source, typeName='source', location='classpath'}, PluginDesc{klass=class org.apache.kafka.connect.mirror.MirrorSourceConnector, name='org.apache.kafka.connect.mirror.MirrorSourceConnector', version='3.6.0', encodedVersion=3.6.0, type=source, typeName='source', location='classpath'}

Using this connector config:

apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnector
metadata:
  name: es-connector
  labels:
    strimzi.io/cluster: es-connect-cluster
spec:
  class: io.confluent.connect.elasticsearch.ElasticsearchSinkConnector
  config:
    topics: topic_here
    connection.url: elastic_host_here
    connection.username: elastic
    connection.password: password_here

I have also tried manually creating the connector using the REST API but I still get the same error.