confluentinc / kafka-connect-jdbc

Kafka Connect connector for JDBC-compatible databases
Other
19 stars 955 forks source link

Sybase ASE Sink Connector - Not able to transfer data(TEXT datatype) from debezium postgres source to Sybase ase sink(jconn). #1363

Open hotpotato0 opened 1 year ago

hotpotato0 commented 1 year ago

I trying to transfer data from debezium postgres source to Sybase ASE. But in the target Sybase ASE(using jconn4 *sybase ase jdbc driver), TEXT data types are not worked.

Environment: Kafka : docker.io/bitnami/kafka:2.7.0-debian-10-r68 Zookeeper : docker.io/bitnami/zookeeper:3.8.0-debian-10-r20 Debezium Postgresql source connector : https://debezium.io/documentation/reference/stable/connectors/postgresql.html (Version 2.2.1.Final) JdbcSinkConnector (10.7.3) / Sink DBMS : Sybase ASE 16 / jdbc : jconn4-7.07-SP110

Error message org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception. at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:614) at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:329) at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:232) at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:201) at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:185) at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:234) at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) at java.base/java.lang.Thread.run(Thread.java:834) Caused by: java.lang.ClassCastException: class java.lang.String cannot be cast to class com.sybase.jdbc4.jdbc.SybLob (java.lang.String is in module java.base of loader 'bootstrap'; com.sybase.jdbc4.jdbc.SybLob is in unnamed module of loader org.apache.kafka.connect.runtime.isolation.PluginClassLoader @6be7bf6d) at com.sybase.jdbc4.tds.TdsParam.prepareForSend(TdsParam.java:230) at com.sybase.jdbc4.jdbc.ParamManager.checkParams(ParamManager.java:1180) at com.sybase.jdbc4.tds.Tds.sendDynamicExecuteParams(Tds.java:1500) at com.sybase.jdbc4.tds.Tds.dynamicExecute(Tds.java:1404) at com.sybase.jdbc4.jdbc.SybPreparedStatement.sendQuery(SybPreparedStatement.java:2883) at com.sybase.jdbc4.jdbc.SybStatement.sendBatch(SybStatement.java:1923) at com.sybase.jdbc4.jdbc.SybStatement.executeBatch(SybStatement.java:1882) at com.sybase.jdbc4.jdbc.SybStatement.executeBatch(SybStatement.java:1800) at com.sybase.jdbc4.jdbc.SybPreparedStatement.executeBatch(SybPreparedStatement.java:1832) at io.confluent.connect.jdbc.sink.BufferedRecords.executeUpdates(BufferedRecords.java:196) at io.confluent.connect.jdbc.sink.BufferedRecords.flush(BufferedRecords.java:186) at io.confluent.connect.jdbc.sink.JdbcDbWriter.write(JdbcDbWriter.java:80) at io.confluent.connect.jdbc.sink.JdbcSinkTask.put(JdbcSinkTask.java:90) at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:586) ... 10 more

jdbc sink connecter config(jtds)

{
    "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
    "dialect.name": "SybaseDatabaseDialect",
    "table.name.format": "tbl_tst_cdc_05_text_sink",
    "connection.password": "******",
    "tasks.max": "1",
    "topics": "debezium_source.connect_dev.tbl_tst_cdc_05_text",
    "delete.enabled": "true",
    "connection.user": "sd",
    "name": "sybase_jdbc_sink_tbl_tst_cdc_05_text",
    "connection.url": "jdbc:jtds:sybase://localhost:3000/kis_temp",
    "value.converter": "org.apache.kafka.connect.json.JsonConverter",
    "insert.mode": "upsert",
    "pk.mode": "record_key",
    "key.converter": "org.apache.kafka.connect.json.JsonConverter",
    "pk.fields": "seq"
}

-> It's work. But this jdbc driver have a performance issues.(less than 30%, jconn4 jdbc driver)

jdbc sink connecter config(jconn4)

{
    "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
    "dialect.name": "SybaseDatabaseDialect",
    "table.name.format": "tbl_tst_cdc_05_text_sink",
    "connection.password": "******",
    "tasks.max": "1",
    "topics": "debezium_source.connect_dev.tbl_tst_cdc_05_text",
    "delete.enabled": "true",
    "connection.user": "sd",
    "name": "sybase_jdbc_sink_tbl_tst_cdc_05_text",
    "connection.url": "jdbc:sybase:Tds:localhost:3000/sink",
    "value.converter": "org.apache.kafka.connect.json.JsonConverter",
    "insert.mode": "upsert",
    "pk.mode": "record_key",
    "key.converter": "org.apache.kafka.connect.json.JsonConverter",
    "pk.fields": "seq"
}

-> not worked.

Topic(debezium_source.connect_dev.tbl_tst_cdc_05_text) message

{
    "schema": {
        "type": "struct",
        "fields": [
            {
                "type": "int32",
                "optional": false,
                "default": 0,
                "field": "seq"
            },
            {
                "type": "string",
                "optional": true,
                "field": "pg_text_to_ase_text"
            },
            {
                "type": "string",
                "optional": true,
                "field": "pg_text_to_ase_unitext"
            }
        ],
        "optional": false,
        "name": "debezium_source.connect_dev.tbl_tst_cdc_05_text.Value"
    },
    "payload": {
        "seq": 5,
        "pg_text_to_ase_text": "67e9f30be4b4e85fb3b60c6997f8f740",
        "pg_text_to_ase_unitext": null
    }
}
sangeet259 commented 1 month ago

Hi @hotpotato0 , can you share the schema at the source (postgres) and sink (sybase) ?