thake / logminer-kafka-connect

CDC Kafka Connect source for Oracle Databases leveraging Oracle Logminer
Apache License 2.0
31 stars 20 forks source link

Database connected but getting column not found error #12

Open smasilamani-cfins opened 3 years ago

smasilamani-cfins commented 3 years ago

Hello

We are using Oracle 19 C db and I am able to connect to the database however I keep getting below error. I made sure that DBA has followed through all the steps mentioned and gave required permission to the user id that I am using. Please advise.

Connect worker details

{
    "name": "dev-submission-edev-logminer",
    "config": {
        "connector.class": "com.github.thake.logminer.kafka.connect.LogminerSourceConnector",
        "tasks.max": "1",
        "db.name": "edev",
        "db.hostname": "db_name.test.com",
        "db.port": "1521",
        "db.sid": "edev",
        "db.user": "username",
        "db.fetch.size":1,
        "db.user.password": "password",
        "value.converter": "io.confluent.connect.avro.AvroConverter",
        "value.converter.schema.registry.url": "http://schema-registry:8081",
        "key.converter": "org.apache.kafka.connect.storage.StringConverter",
        "table.whitelist": "submission_object",
        "database.history.kafka.bootstrap.servers": "kafka-broker-1:9092",
        "database.history.kafka.topic": "schema-changes-submission-dev"
    }
}

[2021-02-25 19:59:24,680] INFO [Worker clientId=connect-1, groupId=kafka-connect] Starting task dev-submission-edev-logminer-0 (org.apache.kafka.connect.runtime.distributed.DistributedHerder) [2021-02-25 19:59:24,680] INFO Creating task dev-submission-edev-logminer-0 (org.apache.kafka.connect.runtime.Worker) [2021-02-25 19:59:24,681] INFO ConnectorConfig values: config.action.reload = restart connector.class = com.github.thake.logminer.kafka.connect.LogminerSourceConnector errors.log.enable = false errors.log.include.messages = false errors.retry.delay.max.ms = 60000 errors.retry.timeout = 0 errors.tolerance = none header.converter = null key.converter = class org.apache.kafka.connect.storage.StringConverter name = dev-submission-edev-logminer tasks.max = 1 transforms = [] value.converter = class io.confluent.connect.avro.AvroConverter (org.apache.kafka.connect.runtime.ConnectorConfig) [2021-02-25 19:59:24,681] INFO EnrichedConnectorConfig values: config.action.reload = restart connector.class = com.github.thake.logminer.kafka.connect.LogminerSourceConnector errors.log.enable = false errors.log.include.messages = false errors.retry.delay.max.ms = 60000 errors.retry.timeout = 0 errors.tolerance = none header.converter = null key.converter = class org.apache.kafka.connect.storage.StringConverter name = dev-submission-edev-logminer tasks.max = 1 transforms = [] value.converter = class io.confluent.connect.avro.AvroConverter (org.apache.kafka.connect.runtime.ConnectorConfig$EnrichedConnectorConfig) [2021-02-25 19:59:24,681] INFO TaskConfig values: task.class = class com.github.thake.logminer.kafka.connect.SourceTask (org.apache.kafka.connect.runtime.TaskConfig) [2021-02-25 19:59:24,681] INFO Instantiated task dev-submission-edev-logminer-0 with version 1.0 of type com.github.thake.logminer.kafka.connect.SourceTask (org.apache.kafka.connect.runtime.Worker) [2021-02-25 19:59:24,681] INFO StringConverterConfig values: converter.encoding = UTF8 converter.type = key (org.apache.kafka.connect.storage.StringConverterConfig) [2021-02-25 19:59:24,682] INFO AvroConverterConfig values: bearer.auth.token = [hidden] schema.registry.ssl.truststore.type = JKS schema.reflection = false auto.register.schemas = true basic.auth.credentials.source = URL schema.registry.ssl.keystore.password = [hidden] schema.registry.ssl.provider = schema.registry.ssl.endpoint.identification.algorithm = https schema.registry.ssl.truststore.location = value.subject.name.strategy = class io.confluent.kafka.serializers.subject.TopicNameStrategy schema.registry.url = [http://schema-registry:8081] schema.registry.ssl.keystore.location = schema.registry.ssl.trustmanager.algorithm = PKIX schema.registry.ssl.key.password = [hidden] schema.registry.ssl.keystore.type = JKS proxy.port = -1 schema.registry.ssl.secure.random.implementation = schema.registry.ssl.cipher.suites = [] max.schemas.per.subject = 1000 schema.registry.ssl.truststore.password = [hidden] basic.auth.user.info = [hidden] proxy.host = use.latest.version = false schema.registry.ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1] schema.registry.ssl.protocol = TLS schema.registry.basic.auth.user.info = [hidden] bearer.auth.credentials.source = STATIC_TOKEN schema.registry.ssl.keymanager.algorithm = SunX509 key.subject.name.strategy = class io.confluent.kafka.serializers.subject.TopicNameStrategy (io.confluent.connect.avro.AvroConverterConfig) [2021-02-25 19:59:24,682] INFO KafkaAvroSerializerConfig values: bearer.auth.token = [hidden] schema.registry.ssl.truststore.type = JKS schema.reflection = false auto.register.schemas = true basic.auth.credentials.source = URL schema.registry.ssl.keystore.password = [hidden] schema.registry.ssl.provider = schema.registry.ssl.endpoint.identification.algorithm = https schema.registry.ssl.truststore.location = value.subject.name.strategy = class io.confluent.kafka.serializers.subject.TopicNameStrategy schema.registry.url = [http://schema-registry:8081] schema.registry.ssl.keystore.location = schema.registry.ssl.trustmanager.algorithm = PKIX schema.registry.ssl.key.password = [hidden] schema.registry.ssl.keystore.type = JKS proxy.port = -1 schema.registry.ssl.secure.random.implementation = schema.registry.ssl.cipher.suites = [] max.schemas.per.subject = 1000 schema.registry.ssl.truststore.password = [hidden] basic.auth.user.info = [hidden] proxy.host = use.latest.version = false schema.registry.ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1] schema.registry.ssl.protocol = TLS schema.registry.basic.auth.user.info = [hidden] bearer.auth.credentials.source = STATIC_TOKEN schema.registry.ssl.keymanager.algorithm = SunX509 key.subject.name.strategy = class io.confluent.kafka.serializers.subject.TopicNameStrategy (io.confluent.kafka.serializers.KafkaAvroSerializerConfig) [2021-02-25 19:59:24,682] INFO KafkaAvroDeserializerConfig values: bearer.auth.token = [hidden] schema.registry.ssl.truststore.type = JKS schema.reflection = false auto.register.schemas = true basic.auth.credentials.source = URL schema.registry.ssl.keystore.password = [hidden] schema.registry.ssl.provider = schema.registry.ssl.endpoint.identification.algorithm = https schema.registry.ssl.truststore.location = specific.avro.reader = false value.subject.name.strategy = class io.confluent.kafka.serializers.subject.TopicNameStrategy schema.registry.url = [http://schema-registry:8081] schema.registry.ssl.keystore.location = schema.registry.ssl.trustmanager.algorithm = PKIX schema.registry.ssl.key.password = [hidden] schema.registry.ssl.keystore.type = JKS proxy.port = -1 schema.registry.ssl.secure.random.implementation = schema.registry.ssl.cipher.suites = [] max.schemas.per.subject = 1000 schema.registry.ssl.truststore.password = [hidden] basic.auth.user.info = [hidden] proxy.host = use.latest.version = false schema.registry.ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1] schema.registry.ssl.protocol = TLS schema.registry.basic.auth.user.info = [hidden] bearer.auth.credentials.source = STATIC_TOKEN schema.registry.ssl.keymanager.algorithm = SunX509 key.subject.name.strategy = class io.confluent.kafka.serializers.subject.TopicNameStrategy (io.confluent.kafka.serializers.KafkaAvroDeserializerConfig) [2021-02-25 19:59:24,682] INFO AvroDataConfig values: connect.meta.data = true enhanced.avro.schema.support = false schemas.cache.config = 1000 (io.confluent.connect.avro.AvroDataConfig) [2021-02-25 19:59:24,682] INFO Set up the key converter class org.apache.kafka.connect.storage.StringConverter for task dev-submission-edev-logminer-0 using the connector config (org.apache.kafka.connect.runtime.Worker) [2021-02-25 19:59:24,682] INFO Set up the value converter class io.confluent.connect.avro.AvroConverter for task dev-submission-edev-logminer-0 using the connector config (org.apache.kafka.connect.runtime.Worker) [2021-02-25 19:59:24,682] INFO Set up the header converter class org.apache.kafka.connect.storage.SimpleHeaderConverter for task dev-submission-edev-logminer-0 using the worker config (org.apache.kafka.connect.runtime.Worker) [2021-02-25 19:59:24,687] INFO Initializing: org.apache.kafka.connect.runtime.TransformationChain{} (org.apache.kafka.connect.runtime.Worker) [2021-02-25 19:59:24,687] INFO ProducerConfig values: acks = -1 batch.size = 16384 bootstrap.servers = [kafka-broker-1:9092] buffer.memory = 33554432 client.dns.lookup = default client.id = connector-producer-dev-submission-edev-logminer-0 compression.type = none connections.max.idle.ms = 540000 delivery.timeout.ms = 2147483647 enable.idempotence = false interceptor.classes = [] key.serializer = class org.apache.kafka.common.serialization.ByteArraySerializer linger.ms = 0 max.block.ms = 9223372036854775807 max.in.flight.requests.per.connection = 1 max.request.size = 1048576 metadata.max.age.ms = 300000 metadata.max.idle.ms = 300000 metric.reporters = [] metrics.num.samples = 2 metrics.recording.level = INFO metrics.sample.window.ms = 30000 partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner receive.buffer.bytes = 32768 reconnect.backoff.max.ms = 1000 reconnect.backoff.ms = 50 request.timeout.ms = 2147483647 retries = 2147483647 retry.backoff.ms = 100 sasl.client.callback.handler.class = null sasl.jaas.config = null sasl.kerberos.kinit.cmd = /usr/bin/kinit sasl.kerberos.min.time.before.relogin = 60000 sasl.kerberos.service.name = null sasl.kerberos.ticket.renew.jitter = 0.05 sasl.kerberos.ticket.renew.window.factor = 0.8 sasl.login.callback.handler.class = null sasl.login.class = null sasl.login.refresh.buffer.seconds = 300 sasl.login.refresh.min.period.seconds = 60 sasl.login.refresh.window.factor = 0.8 sasl.login.refresh.window.jitter = 0.05 sasl.mechanism = GSSAPI security.protocol = PLAINTEXT security.providers = null send.buffer.bytes = 131072 ssl.cipher.suites = null ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1] ssl.endpoint.identification.algorithm = https ssl.key.password = null ssl.keymanager.algorithm = SunX509 ssl.keystore.location = null ssl.keystore.password = null ssl.keystore.type = JKS ssl.protocol = TLS ssl.provider = null ssl.secure.random.implementation = null ssl.trustmanager.algorithm = PKIX ssl.truststore.location = null ssl.truststore.password = null ssl.truststore.type = JKS transaction.timeout.ms = 60000 transactional.id = null value.serializer = class org.apache.kafka.common.serialization.ByteArraySerializer (org.apache.kafka.clients.producer.ProducerConfig) [2021-02-25 19:59:24,698] INFO Kafka version: 5.5.3-ccs (org.apache.kafka.common.utils.AppInfoParser) [2021-02-25 19:59:24,698] INFO Kafka commitId: ceb4dd5f3aa4a84e (org.apache.kafka.common.utils.AppInfoParser) [2021-02-25 19:59:24,698] INFO Kafka startTimeMs: 1614283164698 (org.apache.kafka.common.utils.AppInfoParser) [2021-02-25 19:59:24,710] INFO [Producer clientId=connector-producer-dev-submission-edev-logminer-0] Cluster ID: 6hAwpBviTnCy4OF2oPtVGw (org.apache.kafka.clients.Metadata) [2021-02-25 19:59:24,713] INFO SourceConnectorConfig values: batch.size = 1000 db.attempts = 3 db.backoff.ms = 10000 db.fetch.size = 1 db.hostname = db_name.test.com db.logminer.dictionary = ONLINE db.name = dev db.port = 1521 db.sid = edev db.timezone = UTC db.user = username db.user.password = password poll.interval.ms = 2000 start.scn = 0 table.whitelist = submission_object.PRODCT tombstones.on.delete = true (com.github.thake.logminer.kafka.connect.SourceConnectorConfig) [2021-02-25 19:59:24,854] INFO Connected to database at db_name.test.com:1521/edev (com.github.thake.logminer.kafka.connect.SourceConnectorConfig) [2021-02-25 19:59:25,255] INFO Oracle Kafka Connector is starting (com.github.thake.logminer.kafka.connect.SourceTask) [2021-02-25 19:59:25,265] INFO WorkerSourceTask{id=dev-submission-edev-logminer-0} Source task finished initialization and start (org.apache.kafka.connect.runtime.WorkerSourceTask) [2021-02-25 19:59:25,284] INFO Determined current scn of database as 338391432131 (com.github.thake.logminer.kafka.connect.initial.SelectSource) [2021-02-25 19:59:25,284] INFO Getting dictionary details for table : TableId(owner=submission_object, table=PRODCT) (com.github.thake.logminer.kafka.connect.SchemaService) [2021-02-25 19:59:45,748] INFO WorkerSourceTask{id=dev-submission-edev-logminer-0} Committing offsets (org.apache.kafka.connect.runtime.WorkerSourceTask) [2021-02-25 19:59:45,749] INFO WorkerSourceTask{id=dev-submission-edev-logminer-0} flushing 0 outstanding messages for offset commit (org.apache.kafka.connect.runtime.WorkerSourceTask) [2021-02-25 19:59:45,749] ERROR WorkerSourceTask{id=dev-submission-edev-logminer-0} Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask) java.lang.IllegalStateException: Column PRODCT_NO does not exist in schema definition at com.github.thake.logminer.kafka.connect.initial.TableFetcher.poll(TableFetcher.kt:44) at com.github.thake.logminer.kafka.connect.initial.SelectSource.poll(SelectSource.kt:67) at com.github.thake.logminer.kafka.connect.StartedState$poll$2.invoke(SourceTask.kt:118) at com.github.thake.logminer.kafka.connect.StartedState.poll(SourceTask.kt:124) at com.github.thake.logminer.kafka.connect.SourceTask.poll(SourceTask.kt:190) at org.apache.kafka.connect.runtime.WorkerSourceTask.poll(WorkerSourceTask.java:276) at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:243) at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:185) at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:235) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748)

thake commented 3 years ago

Thanks for raising this issue. Unfortunately, Oracle 19c is not supported as of now. But if you can contribute a PR that will make logminer-kafka-connect work for 19c I would be very happy to review it.

PhilipTrauner commented 3 years ago

I'd be interested in making this work with 19c and up.

java.lang.IllegalStateException: Column <column> does not exist in schema definition

I'm also getting the same error right now, do you have any pointers where I should start looking? (@thake)