confluentinc / kafka-connect-jdbc

Kafka Connect connector for JDBC-compatible databases
Other
19 stars 955 forks source link

Schema issue with MicrosoftSqlServer Dialect | JDBC Sink Connector #1311

Open tushargoyal22 opened 1 year ago

tushargoyal22 commented 1 year ago

if I specify table.name.format as just table_name which is raft_tbl in my case then it dumps the data in the table under the default schema dbo

whereas when I tried to specify table.name.format as guest.raft_tbl where guest is one of the schemas in my database, com.microsoft.sqlserver.jdbc.SQLServerException: Database 'guest' does not exist. Make sure that the name is entered correctly. , I am not sure why it is looking for Database guest

JDBC sink connector config for reference

{
    "name": "SinkConnector3",
    "config": {
        "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
        "tasks.max": 1,
        "topics": "raft.public.goyaltu.subscription.manager.test_client",

        "key.converter": "io.confluent.connect.avro.AvroConverter",
        "key.converter.schema.registry.url": "http://mwkafka-schema-registry-staging.deshaw.com:5036",
        "value.converter": "io.confluent.connect.avro.AvroConverter",
        "value.converter.schema.registry.url": "http://mwkafka-schema-registry-staging.deshaw.com:5036",

        "transforms": "AlignSchemaWithRegistry",
        "transforms.AlignSchemaWithRegistry.type": "deshaw.raft.connect.transforms.AlignSchemaWithRegistry",
        "transforms.AlignSchemaWithRegistry.schema.registry.urls": "http://mwkafka-schema-registry-staging.deshaw.com:5036",

        "connection.url": "jdbc:sqlserver://riskdb;databaseName=sandbox;integratedSecurity=true;authenticationScheme=JavaKerberos;encrypt=true;trustServerCertificate=true;",

        "dialect.name": "SqlServerDatabaseDialect",

        "insert.mode": "upsert",
        "table.name.format": "guest.raft_tbl",
        "pk.mode": "record_key",
        "pk.fields": "id",
        "batch.size": "2",
        "poll.interval.ms": 60000,

        "tasks.recovery.mechanism": "STATE_BASED",

        "consumer.override.client.id": "raft.goyaltu.sinkconnector.jdbc.client",
        "consumer.override.group.id": "raft.test.sinkconnector.consumer",

        "consumer.override.java.security.auth.login.config": "/prod/tools/middleware/kafkautils/dist/etc/config/kafka/zookeeper_client_jaas.conf",
        "consumer.override.security.protocol": "SASL_PLAINTEXT",
        "consumer.override.sasl.mechanism": "GSSAPI",
        "consumer.override.sasl.kerberos.service.name": "kafka",
        "consumer.override.sun.security.jgss.native": true,
        "consumer.override.sun.security.jgss.lib": "/usr/libexec/libgsswrap.so",
        "consumer.override.javax.security.auth.useSubjectCredsOnly": false
    }
}