dmathieu / kafka-connect-opensearch

Kafka Connect plugin for sending events to OpenSearch
Other
20 stars 6 forks source link

SASL_PLAINTEXT - Help Needed - Authentication getting failed with connector. #41

Open mann2108 opened 2 years ago

mann2108 commented 2 years ago
  1. I have configured the SASL_PLAINTEXT.

  2. Updated connect-distributed.properties file -

bootstrap.servers=10.30.1.101:9092
group.id=connect-cluster

key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=true
value.converter.schemas.enable=true

offset.storage.topic=connect-offsets
offset.storage.replication.factor=1

config.storage.topic=connect-configs
config.storage.replication.factor=1

status.storage.topic=connect-status
status.storage.replication.factor=1

offset.flush.interval.ms=10000

listeners=HTTP://:8083

plugin.path=/home/devuser/connectors

sasl.mechanism=PLAIN
security.protocol=SASL_PLAINTEXT
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
  username="admin" \
  password="admin-secret";
  1. plugin.path = /home/devuser/connectors contains the kafka-connect-opensearch JAR file.

  2. Once done, I was successfully able to start the kafka-connect server on 8083 Port.

  3. Now, Using POSTMAN I tried to create my first connector.

REQUEST - POST - 10.30.1.101:8083/connectors/ JSON BODY -

{
    "name": "first-opensearch-connector",
    "config": {
    "connector.class": "com.dmathieu.kafka.opensearch.OpenSearchSinkConnector",
    "type.name": "_doc",
    "connection.password": "admin",
    "tasks.max": "1",
    "topics": "my_topic",
    "name": "first-opensearch-connector",
    "connection.username": "admin",
    "value.converter.schemas.enable": "false",
    "connection.url": "http://10.30.1.101:9200",
    "key.ignore": "true",
    "value.converter": "org.apache.kafka.connect.json.JsonConverter",
    "schema.ignore": "true",
    "drop.invalid.message":"true",
    "behavior.on.malformed.documents":"fail",
    "write.method":"UPSERT",
    "read.timeout.ms":"10000",
    "errors.log.enable": "true",
    "errors.log.include.messages": "true",
    "errors.tolerance": "all",
    "transforms":"AddPrefix,TimestampRouter,InsertField",
    "transforms.AddPrefix.type":"org.apache.kafka.connect.transforms.RegexRouter",
    "transforms.AddPrefix.regex":".*",
    "transforms.AddPrefix.replacement":"acme_$0",
    "transforms.TimestampRouter.type": "org.apache.kafka.connect.transforms.TimestampRouter",
    "transforms.TimestampRouter.topic.format": "foo-bar-${topic}-${timestamp}",
    "transforms.TimestampRouter.timestamp.format": "YYYYMMdd",
    "transforms.InsertField.type": "org.apache.kafka.connect.transforms.InsertField$Value",
    "transforms.InsertField.static.field": "MessageSource",
    "transforms.InsertField.static.value": "Kafka Connect framework",
    "connection-compression": "gzip",
    "sasl.mechanism": "PLAIN",
    "security.protocol": "SASL_PLAINTEXT",
    "sasl.jaas.config": "org.apache.kafka.common.security.plain.PlainLoginModule required",
    "username": "admin", 
    "password": "admin-secret"}
}

Snapshots - image

dmathieu commented 2 years ago

Are you able to connect to the same OpenSearch database with the same configuration, using the Elasticsearch Java library? (the one used in this package). Also, is your configuration working with the elasticsearch connector? AFAIK, we just pass through the config options, exactly the same way as they are with the elasticsearch connector.

mann2108 commented 2 years ago

I didn't understand, I have used the latest JAR for kafka-connect-opensearch, given in the release tab. I am not using an elasticsearch connector, I am only using the opensearch connector forked by you.

mann2108 commented 2 years ago

Can you confirm the connect-distributed.properties configuration, I have share the file content above, also the post request body for creating a connector task?

dmathieu commented 2 years ago

I can't give you any expert validation on that configuration, no. I forked the elasticsearch connector, and I'm doing my best to maintain it, but I'm neither an Elasticsearch, OpenSearch or even Java expert.

My question is because at the moment, this library is still using the elasticsearch java client.

mann2108 commented 2 years ago

I see, I have one question, is it okay or safe to use elasticsearch java client with OpenSearch as a sink? OpenSearch provides their own client as well - https://opensearch.org/docs/latest/clients/java-rest-high-level/.

dmathieu commented 2 years ago

It is safe for now, but not upgradable. I have a plan to use the OpenSearch client, but they don't provide access to the access/security areas, which are required for tests.

See https://github.com/opensearch-project/opensearch-java/issues/59

XI1876-ManojSharma commented 2 years ago

@mann2108 Are you able to register kafka-connect-opensearch connector. I am getting {"error_code":500,"message":"Failed to find any class that implements Connector and which name matches com.dmathieu.kafka.opensearch.OpenSearchSinkConnector, available connectors are... @dmathieu can you help me in this regards?

dmathieu commented 2 years ago

This kind of error usually indicates the jar isn't in the proper folder. The path to the folder where the jar is need to be specified in the environment variable CONNECT_PLUGIN_PATH.

XI1876-ManojSharma commented 2 years ago

Hi, Can you please add this to the readme file as it is missing from there. I would be great if complete step is mention separately for distributed as well as standalone.

We are trying to configure this in AWS MSK ? could you please help us here?