confluentinc / kafka-connect-jdbc

Kafka Connect connector for JDBC-compatible databases
Other
24 stars 959 forks source link

JDBC Kafka Connector Not Sending Messages in Timestamp mode, only in Bulk #1005

Open aleks-myplanet opened 3 years ago

aleks-myplanet commented 3 years ago

Hi,

I'm encountering an issue where for a specific query when deployed with a JDBC kafka connector in timestamp mode, it never sends any messages. That exact same connector, with the exact same query, with the exact same connector config works perfectly fine when deployed in bulk mode.

I believe it is the query causing this since changing the query to something I know works in timestamp mode resolves the problem.

An example of the query causing the issue (removing sensitive data such as table names, column names, etc): SELECT * FROM (SELECT COLUMN1, COLUMN2, COLUMN3, COLUMN4, COLUMN5, COLUMN6, COLUMN7, COLUMN8, COLUMN9 FROM TABLE1 INNER JOIN (SELECT COLUMN1, COLUMN5 FROM TABLE2) ON TABLE2.COLUMN1 = TABLE1.COLUMN1 INNER JOIN (SELECT COLUMN5, COLUMN3, COLUMN10, STATUS FROM TABLE3) ON TABLE3.COLUMN5 = TABLE2.COLUMN5 AND TABLE3.COLUMN3 = TABLE1.COLUMN11 INNER JOIN (SELECT COLUMN5, COLUMN10 FROM TABLE4) ON TABLE4.COLUMN5 = TABLE3.COLUMN5 INNER JOIN (SELECT COLUMN12, COLUMN8, COLUMN3 FROM TABLE5) ON TABLE5.COLUMN12 = TABLE4.COLUMN12 AND TABLE5.COLUMN3 = TABLE3.COLUMN10 INNER JOIN (SELECT COLUMN12, COLUMN2, COLUMN5 FROM TABLE6) ON TABLE6.COLUMN12 = TABLE5.COLUMN12 WHERE TABLE6.COLUMN2 = 'VALUE')

I don't believe the query is the issue since it executes correctly and sends messages to the topic when deploy with a connector in bulk mode.

Some things I've already tried:

  1. Increase log level in kafka connect to look for errors. No errors or relevant logs. The connector just flushes 0 messages always, nothing special.
  2. Double check the connector offsets in case those timestamps are wrong and resulting in no messages. According to the extra logs the timestamps are correct.
  3. Testing every single documented kafka connect connector parameter. Spent a long time trying out different options that kafka connect provides for configuration. Can't find anything that seems to address the problem
  4. Reducing the number of joins in the connector query. This did have an impact, and messages began to be sent after limiting it to just a single inner join. BUT I have other connectors with over 8 inner joins but those do send messages in timestamp mode. The issue doesn’t appear to be linked to data size or joins then.
  5. Increasing resources to kafka connect. No difference
  6. No github issues on the kafka connect repo, so this problem doesn’t appear to be documented as far as I can tell
  7. Trying different topics in case it was limited to a specific topic, issue happens no matter the topic.
  8. Updating drivers. I assumed it may be an issue on the database the connector is connecting to, maybe outdated drivers. No difference

Has anyone encountered a similar problem before? Any suggestions on how to resolve it?

vtypal commented 3 years ago

I also had the same issue with both timestamp and timestamp+incrementing mode. Only bulk mode worked properly.

zak-n commented 3 years ago

I am also experiencing the exact issue on a project of mine. We're also using Kafka to poll an Oracle DB. And it seems like, at some point, it just stops polling messages in Timestamp mode. Currently, we're running that same connector in Bulk, which seems to have mitigated the issue. However, it's not really ideal for us to run that connector in Bulk and so we're hoping to get any input that may help us eliminate this problem altogether.

SpenserZC commented 3 years ago

we also use timestamp mode in polling an Oracle DB,It not work. but the bulk is ok. Is there anyone can resolve it.

SpenserZC commented 3 years ago

we also use timestamp mode in polling an Oracle DB,It not work. but the bulk is ok. Is there anyone can resolve it.

we also use timestamp mode in polling an Oracle DB,It not work. but the bulk is ok. Is there anyone can resolve it. it worked for me, use this post: https://github.com/confluentinc/kafka-connect-jdbc/issues/408#issuecomment-389176015

harshrajsinha commented 3 years ago

I am also facing same issue, used mode as timestamp. Topic offset does not on inserting new row.

evertonpavan commented 3 years ago

same issue here, only mode=bulk works correctly.

ferchu commented 3 years ago

i had the same problem and in my case the issue was that the timestamp column had the type INT instead of TIMESTAMP.

atrbgithub commented 3 years ago

@zak-n was you able to resolve this?

We're seeing exactly the same. We even have 2 connectors setup querying the same table, both incrementing mode. Suddenly one of them will stop producing new messages to the topic.

aakashchhabra1989 commented 3 years ago

I setup a JDBC source connector using timestamp mode. configuration is straight forward as below: This is working fine in postgres. but when using MSSQL with datetime type column. instead of picking of cdc all data is getting picked up from table just like bulk mode. there is no error in log as well

{ "name": "JdbcSourceConnector_1", "config": { "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector", "connection.user": "postgres", "connection.password": "xxxxxxx", "connection.url": "jdbc:postgresql://postgres:5432/postgres", "tasks.max": "1", "topic.prefix": "connectbulk", "table.whitelist": "timestamp_table", "mode": "timestamp", "timestamp.column.name": "last_updated_time", "value.converter": "org.apache.kafka.connect.json.JsonConverter", "value.converter.schema.enable": "false", "poll.interval.ms": "12000" } }

aleks-myplanet commented 3 years ago

Nope, this hasn't been solved in any way.

Aleksandar Stefanovic Myplanet myplanet.com http://www.myplanet.com P: +1.866.232.7456

On Mon, Aug 2, 2021 at 12:42 PM aakashchhabra1989 @.***> wrote:

I setup a JDBC source connector using timestamp mode. configuration is straight forward as below: This is working fine in postgres. but when using MSSQL with datetime type column. instead of picking of cdc all data is getting picked up from table just like bulk mode. there is no error in log as well

{ "name": "JdbcSourceConnector_1", "config": { "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector", "connection.user": "postgres", "connection.password": "xxxxxxx", "connection.url": "jdbc:postgresql://postgres:5432/postgres", "tasks.max": "1", "topic.prefix": "connectbulk", "table.whitelist": "timestamp_table", "mode": "timestamp", "timestamp.column.name": "last_updated_time", "value.converter": "org.apache.kafka.connect.json.JsonConverter", "value.converter.schema.enable": "false", "poll.interval.ms": "12000" } }

— You are receiving this because you authored the thread. Reply to this email directly, view it on GitHub https://github.com/confluentinc/kafka-connect-jdbc/issues/1005#issuecomment-891171386, or unsubscribe https://github.com/notifications/unsubscribe-auth/AKZSAQNYW67SOIQL72K3CRLT23DIFANCNFSM4XLKCGWA .

teimyBr commented 2 years ago

I know the source of the Problem. This is the Cache Connection. I forked the Connector and gets every new Run a new Connection and close it afterwards. And also set the Connection as ReadOnly. And dont call the method stmt.close because in the oracle documentation stands this chache the Results in the ojdbc.

YeonghyeonKO commented 10 months ago

Did you check the time of your DB setting? SELECT SYSDATE FROM DUAL;

JdbcSourceConnector uses two timestamps(beginTime and endTime) before running tasks in Timestamp mode.

ref : https://github.com/confluentinc/kafka-connect-jdbc/blob/master/src/main/java/io/confluent/connect/jdbc/source/TimestampIncrementingCriteria.java#L169

ketirani commented 10 months ago

I am new to Kafka, and this post is very old, but since I was facing the same issue, until discovered that using timestamp mode requires the timestamp.column.name to be specified in uppercase. I am really speechless, spending a couple of hours, until stumbled into it via Splunk. hope to be helpful to some other poor, stray soul like me who ends up at this page. I am using Oracle and object names in Oracle are uppercase when using data dictionary.

YeonghyeonKO commented 10 months ago

@ketirani yes, it's because Oracle always use uppercase when dealing with the name of Column when kafka tries to poll the metadata(such as column names)