confluentinc / kafka-connect-jdbc

Kafka Connect connector for JDBC-compatible databases
Other
19 stars 955 forks source link

Jdbc Sink Connector - configuration validation runs before replacement when using a file Config Provider #1412

Open EnamCapgemini opened 4 months ago

EnamCapgemini commented 4 months ago

Problem : I have created a kafka sink connector for our use case using Kafka Connect REST API and We already set config provider as file.

Below is the configuration :

curl -i -X PUT -H "Content-Type:application/json" \ http://kfk-conn-svc:8083/connectors/jdbc-schema-validation-reporting-sink-connector/config \ -d'{ "connection.password": "${file:/secret/kafka_con_db.properties:PASSWORD}", "connection.url": "jdbc: + ${file:/secret/kafka_con_db.properties:URL}", "connection.user": "${file:/secret/kafka_con_db.properties:USER}", "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector", "errors.deadletterqueue.context.headers.enable": "true", "errors.deadletterqueue.topic.name": "SchemaValidation-Reporting-Failed", "errors.log.enable": "true", "errors.log.include.messages": "true", "errors.retry.delay.max.ms": "60000", "errors.retry.timeout": "300000", "errors.tolerance": "all", "key.converter": "org.apache.kafka.connect.storage.StringConverter", "max.retries": "30", "name": "jdbc-schema-validation-reporting-sink-connector", "quote.sql.identifiers": "never", "retry.backoff.ms": "10000", "table.name.format": "SWIFT_INPUT", "topics": "SchemaValidation-Reporting", "transforms": "insertField", "transforms.insertField.timestamp.field": "timestamp!", "transforms.insertField.type": "org.apache.kafka.connect.transforms.InsertField$Value", "type": "sink", "value.converter": "org.apache.kafka.connect.json.JsonConverter", "value.converter.schemas.enable": "true" }'

but the required connector is not running as expected and giving the status as below

{ "connector": { "state": "RUNNING", "worker_id": "192.168.26.16:8083" }, "name": "jdbc-schema-validation-reporting-sink-connector", "tasks": [ { "id": 0, "state": "FAILED", "trace": "org.apache.kafka.connect.errors.ConnectException: Not a valid JDBC URL: ${file:/secret/kafka_con_db.properties:URL}\n\tat io.confluent.connect.jdbc.dialect.DatabaseDialects.extractJdbcUrlInfo(DatabaseDialects.java:175)\n\tat io.confluent.connect.jdbc.dialect.DatabaseDialects.findBestFor(DatabaseDialects.java:119)\n\tat io.confluent.connect.jdbc.sink.JdbcSinkTask.initWriter(JdbcSinkTask.java:54)\n\tat io.confluent.connect.jdbc.sink.JdbcSinkTask.start(JdbcSinkTask.java:46)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.initializeAndStart(WorkerSinkTask.java:309)\n\tat org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:186)\n\tat org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:237)\n\tat java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)\n\tat" ... "worker_id": "192.168.26.16:8083" } ], "type": "sink" }

But when I am providing the static/fixed "connection.url": "jdbc:oracle:thin:@myoracle.db.server:1521:mydb" and others fields similar as above it is running successfully.

Please help me out to resolve that and let me know if any configuration is incorrect or any other info is need?