confluentinc / kafka-connect-jdbc

Kafka Connect connector for JDBC-compatible databases
Other
19 stars 955 forks source link

[source in timestamps mode] unable to fix ambiguous column in the query on Oracle or MySQL #691

Open david-maumenee opened 5 years ago

david-maumenee commented 5 years ago

I want to define a JDBC source connector (v5.2.2) in timestamp mode from a query that joins two tables on Oracle. Both tables have a column named LAST_UPD. I want to use this column from the table with "ind" alias

SQL Query

SELECT ind.row_id, ind.LAST_UPD last_update, usr.LOGIN, ind.FST_NAME prenom, ind.LAST_NAME nom FROM siebesr.s_contact ind INNER JOIN siebesr.s_user usr ON usr.par_row_id = ind.row_id

JdbcSourceConnector Config

{
  "name": "oracle_ambiguous_column_test4",
  "config": {
    "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
    "tasks.max": "1",
    "connection.url": "ZZZZ",
    "connection.user": "YYYYY",
    "connection.password": "XXXX",
    "mode": "timestamp",
    "poll.interval.ms": "3600000",
    "batch.max.rows": "1000",
    "topic.prefix": "oracle_ambiguous_column_test-topic",
    "transforms":"createKey",
    "transforms.createKey.type": "org.apache.kafka.connect.transforms.ValueToKey",
    "transforms.createKey.fields": "ROW_ID",

    "query": "SELECT ind.row_id, ind.LAST_UPD last_update, usr.LOGIN, ind.FST_NAME prenom, ind.LAST_NAME nom FROM siebesr.s_contact ind
INNER JOIN siebesr.s_user usr ON usr.par_row_id = ind.row_id",

    "timestamp.column.name": "ind.LAST_UPD",
    "quote.sql.identifiers" : "never"
  }
}

Generated SQL query

SELECT ind.row_id, ind.LAST_UPD last_update, usr.LOGIN, ind.FST_NAME prenom, ind.LAST_NAME nom FROM siebesr.s_contact ind INNER JOIN siebesr.s_user usr ON usr.par_row_id = ind.row_id WHERE ind.LAST_UPD > ? AND ind.LAST_UPD < ? ORDER BY ind.LAST_UPD ASC

This query runs well with sqldeveloper !

Exception

Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:177)
org.apache.kafka.connect.errors.DataException: ind.LAST_UPD is not a valid field name
        at org.apache.kafka.connect.data.Struct.lookupField(Struct.java:254)
        at org.apache.kafka.connect.data.Struct.get(Struct.java:74)
        at io.confluent.connect.jdbc.source.TimestampIncrementingCriteria.extractOffsetTimestamp(TimestampIncrementingCriteria.java:217)
        at io.confluent.connect.jdbc.source.TimestampIncrementingCriteria.extractValues(TimestampIncrementingCriteria.java:187)
        at io.confluent.connect.jdbc.source.TimestampIncrementingTableQuerier.extractRecord(TimestampIncrementingTableQuerier.java:192)
        at io.confluent.connect.jdbc.source.JdbcSourceTask.poll(JdbcSourceTask.java:315)

The query is no sent to Oracle see : https://github.com/apache/kafka/blob/2.2/connect/api/src/main/java/org/apache/kafka/connect/data/Struct.java

other tests

With Default value : quote.sql.identifiers = ALWAYS

TEST1

TEST2

TEST3

TEST5

Question

Did I miss something (config option or SQL query) ? If I don't define "quote.sql.identifiers" : "never" (test3) the query is sent to Oracle and I have an oracle exception due to quote around identifier

Does Kafka Connect need a fix to allow prefixing a column name with the table ?

wicknicks commented 5 years ago

can you share logs (possibly TRACE level in io.confluent package) of what happens in test2? I think that scenario should have worked. Also, what version of Oracle are you using?

One workaround might be to store the query in the database (as a logical view) and then querying it instead from the connector (without using the query config).

the4thamigo-uk commented 5 years ago

We have this issue as well with postgres. I can capture the full SQL with DEBUG logging from the line INFO Begin using SQL query:, and this query runs successfully in pgadmin.

the4thamigo-uk commented 5 years ago

Note that when I use the equivalent of TEST3, I get a different error, namely:

org.apache.kafka.connect.errors.DataException: my_table.my_field is not a valid field name
    at org.apache.kafka.connect.data.Struct.lookupField(Struct.java:254)
    at org.apache.kafka.connect.data.Struct.get(Struct.java:74)
    at io.confluent.connect.jdbc.source.TimestampIncrementingCriteria.extractOffsetTimestamp(TimestampIncrementingCriteria.java:217)
    at io.confluent.connect.jdbc.source.TimestampIncrementingCriteria.extractValues(TimestampIncrementingCriteria.java:187)
    at io.confluent.connect.jdbc.source.TimestampIncrementingTableQuerier.extractRecord(TimestampIncrementingTableQuerier.java:192)
    at io.confluent.connect.jdbc.source.JdbcSourceTask.poll(JdbcSourceTask.java:315)
    at org.apache.kafka.connect.runtime.WorkerSourceTask.poll(WorkerSourceTask.java:245)
    at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:221)
    at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)
    at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)

So when I do TEST2, the where clause is constructed using the alias which obviously fails. When I do TEST3, the where clause is correct, but the timestamp field cant be found by kafka connect because the column is named by the alias.

wicknicks commented 5 years ago

Yeah, would be great if you could share logs.

re: the error message for TEST3, it's probably the same error message (that came from a different version). But yeah, we cannot have field names with "." in them.

david-maumenee commented 5 years ago

can you share logs (possibly TRACE level in io.confluent package) of what happens in test2? I think that scenario should have worked. Also, what version of Oracle are you using?

One workaround might be to store the query in the database (as a logical view) and then querying it instead from the connector (without using the query config).

Thanks for your response.

I thought TEST2 would work too, but SQL allow ONLY alias in GROUP BY, ORDER BY, or HAVING clauses to refer to the column, NOT in where. https://stackoverflow.com/questions/28802134/sql-not-recognizing-column-alias-in-where-clause

I use Oracle 11.2.0.4.0

We first start using View but the table is huge (10 000 000 rows) and so each query on the view is very slow because the filtering on LAST_UPD indexed field is done after refreshing the view. So we would like to use table directly to be faster.

david-maumenee commented 5 years ago

Note that when I use the equivalent of TEST3, I get a different error, namely:

org.apache.kafka.connect.errors.DataException: my_table.my_field is not a valid field name
  at org.apache.kafka.connect.data.Struct.lookupField(Struct.java:254)
  at org.apache.kafka.connect.data.Struct.get(Struct.java:74)
  at io.confluent.connect.jdbc.source.TimestampIncrementingCriteria.extractOffsetTimestamp(TimestampIncrementingCriteria.java:217)
  at io.confluent.connect.jdbc.source.TimestampIncrementingCriteria.extractValues(TimestampIncrementingCriteria.java:187)
  at io.confluent.connect.jdbc.source.TimestampIncrementingTableQuerier.extractRecord(TimestampIncrementingTableQuerier.java:192)
  at io.confluent.connect.jdbc.source.JdbcSourceTask.poll(JdbcSourceTask.java:315)
  at org.apache.kafka.connect.runtime.WorkerSourceTask.poll(WorkerSourceTask.java:245)
  at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:221)
  at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)
  at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)
  at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
  at java.util.concurrent.FutureTask.run(FutureTask.java:266)
  at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
  at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
  at java.lang.Thread.run(Thread.java:748)

So when I do TEST2, the where clause is constructed using the alias which obviously fails. When I do TEST3, the where clause is correct, but the timestamp field cant be found by kafka connect because the column is named by the alias.

I think you have a different message because of your version of JDBC Kafka Connect. There is a new option "quote.sql.identifiers" not yet documented at https://docs.confluent.io/current/connect/kafka-connect-jdbc/source-connector/source_config_options.html but describe in this pull request https://github.com/confluentinc/kafka-connect-jdbc/pull/572.

The only difference between TEST3 and TEST4 (the correspond to the detail test at the begin) is the value of this parameter quote.sql.identifiers. In TEST4 the request generated is correct but not send to Oracle.

grange74 commented 4 years ago

I'm having the same problem but with MySQL. Trying to left join two tables which have the same datetime column that I'm using with timestamp mode. I have one of them in the select query. If i use the column name without the table prefix in 'timestamp.column.name', i get mysql error saying ambiguous column. If i add the table prefix, i get Unknown column in where clause. If i set 'quote.sql.identifiers' to never then i get DataException: is not a valid field name. Haven't been able to find a solution.

david-maumenee commented 4 years ago

As @wicknicks saids, one workaround is to store the query in the database (as a logical view) and then querying it instead from the connector (without using the query config).

aliasbadwolf commented 4 years ago

@david-maumenee Change the query to an inner query and then use the alias name for timestamp column.

query = "SELECT * from (SELECT ind.row_id, ind.LAST_UPD last_update, usr.LOGIN, ind.FST_NAME prenom, ind.LAST_NAME nom FROM siebesr.s_contact ind
INNER JOIN siebesr.s_user usr ON usr.par_row_id = ind.row_id) a"
timestamp.column.name="last_update"

Also, if you want to keep the same column name i.e. LAST_UPD then add a dummy column for LAST_UPD, use that as timestamp column and the later remove that column using SMT. Configuration below:

query = "SELECT * from (SELECT ind.row_id, ind.LAST_UPD, ind.LAST_UPD TMP_LAST_UPD, usr.LOGIN, ind.FST_NAME prenom, ind.LAST_NAME nom FROM siebesr.s_contact ind
INNER JOIN siebesr.s_user usr ON usr.par_row_id = ind.row_id) a"
timestamp.column.name="TMP_LAST_UPD"
"transforms": "ReplaceField",
"transforms.ReplaceField.type": "org.apache.kafka.connect.transforms.ReplaceField$Value",
"transforms.ReplaceField.blacklist": "TMP_LAST_UPD"
ignesious commented 3 years ago

This issue happens on the DB2 side as well. As suggested by @aliasbadwolf it resolves with an alias. Would be good if its fixed.

Drugoy commented 3 years ago

We experienced a similar error with Oracle DB. The logs also contained this hinting WARN: WARN || JDBC type -101 (TIMESTAMP WITH TIME ZONE) not currently supported [io.confluent.connect.jdbc.dialect.OracleDatabaseDialect]