exasol / kafka-connect-jdbc-exasol

Exasol dialect for the Kafka Connect JDBC Connector
Apache License 2.0
10 stars 7 forks source link

mysql-to-exasol/kafka-connect jdbc #15

Closed avinash0720 closed 3 years ago

avinash0720 commented 3 years ago

Hi,

I am trying to integrate mysql to Exasol using kafka connector. I am able to create topic using mysql data But when I try to consume the message in Exasol, the log says records inserted but I dont see any data in Exasol table. Could you please help me here.

The json for mysql source is as below:

{ "name": "mysql-source-connector",
"config": { "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector", "tasks.max": "2", "connection.url": "jdbc:mysql:**", "connection.user": "", "connection.password": "**", "mode": "incrementing", "table.whitelist": "test_kafka", "incrementing.column.name": "testid", "topic.prefix": "exa" } }

The json for Exasol sink is as below:

{ "name": "mysql-exa-sink-connector", "config": { "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector", "tasks.max": "1", "connection.url": "jdbc:exa:exasol-db:8563;schema=DWH_TEST", "connection.user": "****", "connection.password": "****", "topics": "exa_test_kafka", "auto.create": "true", "insert.mode": "insert" } }

And Below is the kafka connector log:

" 2020-11-23 12:35:04,493] INFO Attempting to open connection #1 to Exasol (io.confluent.connect.jdbc.util.CachedConnectionProvider:87) [2020-11-23 12:35:04,558] INFO JdbcDbWriter Connected (io.confluent.connect.jdbc.sink.JdbcDbWriter:49) [2020-11-23 12:35:04,651] INFO Checking Exasol dialect for existence of table "exa_test_kafka" (com.exasol.connect.jdbc.dialect.ExasolDatabaseDialect:511) [2020-11-23 12:35:04,667] INFO Using Exasol dialect table "exa_test_kafka" present (com.exasol.connect.jdbc.dialect.ExasolDatabaseDialect:519) [2020-11-23 12:35:04,707] INFO Setting metadata for table "exa_test_kafka" to Table{name='"exa_test_kafka"', columns=[Column{'last_update', isPrimaryKey=false, allowsNull=false, sqlType=TIMESTAMP}, Column{'company_name', isPrimaryKey=false, allowsNull=false, sqlType=VARCHAR}, Column{'test_id', isPrimaryKey=false, allowsNull=false, sqlType=DECIMAL}, Column{'location_name', isPrimaryKey=false, allowsNull=false, sqlType=VARCHAR}]} (io.confluent.connect.jdbc.util.TableDefinitions:64) [2020-11-23 12:35:04,708] INFO Closing BufferedRecords with updatePreparedStatement: null deletePreparedStatement: null (io.confluent.connect.jdbc.sink.BufferedRecords:242) [2020-11-23 12:35:04,746] INFO INSERT records:3 , but no count of the number of rows it affected is available (io.confluent.connect.jdbc.sink.BufferedRecords:193) [2020-11-23 12:35:04,746] INFO Closing BufferedRecords with updatePreparedStatement: com.exasol.jdbc.EXAPreparedStatement@1734371b deletePreparedStatement: null (io.confluent.connect.jdbc.sink.BufferedRecords:242) [2020-11-23 12:35:07,696] INFO WorkerSourceTask{id=mysql-source-connector-0} Committing offsets (org.apache.kafka.connect.runtime.WorkerSourceTask:478) [2020-11-23 12:35:07,697] INFO WorkerSourceTask{id=mysql-source-connector-0} flushing 0 outstanding messages for offset commit (org.apache.kafka.connect.runtime.WorkerSourceTask:495)

morazow commented 3 years ago

Hello @avinash0720,

Thanks for the feedback.

The log line with insert is not very accurate. It shows the size of the BufferedRecords in memory.

But actual insert into the Exasol seems to be failing. Could you please set the log level to debug and try again? Maybe it will show the error or more information.

avinash0720 commented 3 years ago

hi @morazow ,

Thanks for the quick response.

I changed the log file to DEBUG as below. I assume this is the one you were suggesting. In "/kafka/connect-log4j.properties" I changed log level from INFO to DEBUG:

log4j.rootLogger=INFO, stdout

log4j.rootLogger=DEBUG, stdout

And after making the log to DEBUG changes, attached is the log. This log file (exasol_connect_log.txt) is just part of the complete log at the exact timestamp at which I posted the exasol sink connector, else the log file is pretty huge (50 MB), so I can not attach paste the complete log.

But looking at the DEBUG logs I am still unable to figure out what is the error. Please suggest if you can figure out the issue from the new log file.

exasol_connect_log.txt

In the attached log file, I assume the relevant information starts from line 389. I added the log content before line 389 just in case if that helps you to understand better.

Thanks, Avinash

avinash0720 commented 3 years ago

Also below are the 3 records whihc are present int the topic which needs to be loaded in Exasol and the json data looks correct.

/istores/etl/kafka $ ./bin/kafka-console-consumer.sh --topic exa_test_kafka --from-beginning --bootstrap-server localhost:9092 {"schema":{"type":"struct","fields":[{"type":"int64","optional":false,"field":"test_id"},{"type":"string","optional":false,"field":"location_name"},{"type":"string","optional":false,"field":"company_name"},{"type":"int64","optional":false,"name":"org.apache.kafka.connect.data.Timestamp","version":1,"field":"last_update"}],"optional":false,"name":"test_kafka"},"payload":{"test_id":1,"location_name":"stuttgart","company_name":"ISG","last_update":1605708606000}} {"schema":{"type":"struct","fields":[{"type":"int64","optional":false,"field":"test_id"},{"type":"string","optional":false,"field":"location_name"},{"type":"string","optional":false,"field":"company_name"},{"type":"int64","optional":false,"name":"org.apache.kafka.connect.data.Timestamp","version":1,"field":"last_update"}],"optional":false,"name":"test_kafka"},"payload":{"test_id":9,"location_name":"esslingen","company_name":"ISG","last_update":1605799111000}} {"schema":{"type":"struct","fields":[{"type":"int64","optional":false,"field":"test_id"},{"type":"string","optional":false,"field":"location_name"},{"type":"string","optional":false,"field":"company_name"},{"type":"int64","optional":false,"name":"org.apache.kafka.connect.data.Timestamp","version":1,"field":"last_update"}],"optional":false,"name":"test_kafka"},"payload":{"test_id":10,"location_name":"esslingen","company_name":"ISG","last_update":1605885721000}}

Thanks, Avinash

morazow commented 3 years ago

Hello @avinash0720,

Thanks for the logs.

Indeed, it did not reveal more than before. I am not sure what might be the issue at the moment.

Could you please try out the prepared statement Kafka Connect uses (line 395 in the logs) from SQL client (DBVis, DataGrip, etc)?

INSERT INTO "exa_test_kafka"("test_id","location_name","company_name","last_update") VALUES(?,?,?,?)
avinash0720 commented 3 years ago

Hi @morazow ,

INSERT INTO "exa_test_kafka"("test_id","location_name","company_name","last_update") VALUES(?,?,?,?)

The INSERT command will not work because EXASOL is case sensitive and it stores all meta data of tables in CAPITAL letters, so it is unable to recognise the small letters of the table and columns. If columns and tables are in small letters then exasol implicitly converts it into CAPITAL letters and fetches the meta data. But since the query has columns and table in quotes ("), exasol is explicitly looking for small letters and hence it is failing.

Another thing is there is no schema mentioned for the table in the query, even though I have provided the schema name in the post sink connector as mentioned in github below: https://github.com/exasol/kafka-connect-jdbc-exasol/blob/master/exasol-sink.json

{
"name": "mysql-exa-sink-connector",
"config": { "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector", "tasks.max": "1", "connection.url": "jdbc:exa:exasol-db:8563;schema=DWH_TEST", "connection.user": "**", "connection.password": "***", "topics": "exa_test_kafka", "auto.create": "true", "insert.mode": "insert" }

So not sure how should I handle this case sensitive problem but shouldn't it be handled automatically by the JDBC driver?

Thanks in advance, Avinash

morazow commented 3 years ago

Hey @avinash0720,

Since you already specified the schema in the connection string, it runs query inside this Exasol schema.

Yes, unfortunately the JDBC driver does not handle it automatically. Similarly, the Kafka Exasol connector escapes the identifiers with double quotes.

Since I do not see any special characters, you can set the topics to all caps in the sink configuration file, ("EXA_TEST_KAFKA").

Still, it would be good to know what went wrong in previous runs. Was there another table created and we were expecting data in wrong table?

avinash0720 commented 3 years ago

hi @morazow ,

I was able to solve the issue. I created the table beforehand with table name and column name in small letters and disabling auto.create = true in connector sink POST command.

So now I can see the data in the Exasol database, only issue is that the table and columns are in small letters. May be you have a solution to it, if so that would be great.

Thanks in advance. Regards, Avinash

avinash0720 commented 3 years ago

Hi @morazow ,

Regarding capital letters for topic: I already tried creating the topic in capital letters yesterday and it was still failing due to case sensitivity of the columns. So I tried to create the source columns (from mysql source connector) in capital letters. But then the source connector was failing with very very strange issue.

Anyways we know now that issue was with case sensitivity. I have one solution at-least.

Thanks for continuous support, much appreciable. I was not expecting so fast responses but you were constantly replying, thank you so much.

If I am able to find solution to the case sensitive issue I will write to you, if not then bug you again :)

Thanks and Regards, Avinash

I used below command:

morazow commented 3 years ago

Hey @avinash0720,

Thanks for the feedback! We always appreciate contributions!

Please let us know how it goes. It would be very helpful if you could send reproducible issue, then we can investigate it on our side.

avinash0720 commented 3 years ago

hi @morazow ,

You are welcome. The issue was definitely the case sensitivity. I wrote customised query while getting data from source connector (mysql) and gave aliases in capital letters for the columns and when I ran sink connector then kafka was able to insert data into EXASOL table.

Thanks, Avinash

avinash0720 commented 3 years ago

hi @morazow,

I am done with my POC and trying to implement the same in production. I get error when I try to make a JDBC source connection to MySql DB.

The error is as below: " org.apache.kafka.connect.runtime.rest.errors.BadRequestException: Connector configuration is invalid and contains the following 2 error(s): Invalid value java.sql.SQLException: Access denied for user '**'@'*' (using password: YES) for configuration Couldn't open connection to jdbc:mysql://***:3306/orders "

Below are my source connection details: {
"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector", "tasks.max": "2", "connection.url":"jdbc:mysql://**:3306/orders", "connection.user": "**", "connection.password": "**", "mode": "bulk", "topic.prefix": "MYSQL_ORDER_KAFKA", "poll.interval.ms": "1000", "query": "select orders_id as ORDER_SRC_ID, shops_orders_id as ORDER_NUMBER, orders_date as ORDER_DATE, date_last_modified as DATE_LAST_MODIFIED from orders.order where orders_date > '2021-01-01'" }

I read an article at below link that if password (Mysql) has special characters then this error comes. MySql password does have special character is my case but I am passing it in 'connection.password' as per the solution mentioned in this article.

https://github.com/confluentinc/kafka-connect-jdbc/issues/422

So I am clueless now how to proceed.

I also tried to run the connector log in DEBUG mode and there is no useful information. Could you think of any other possible root cause?

Thank you in advance, Avinash

morazow commented 3 years ago

Hello @avinash0720,

Sorry for the late reply. And thanks for asking it in both repositories!

I also see that your username is empty, do you have to set it? I am not sure in this case the default user is used in the Kafka JDBC.

avinash0720 commented 3 years ago

hi @morazow , Yes it is set, I just removed it for privacy, I should have filled '*', anyways, proper username is given. Another information, when I try to connect to mysql DB from the kafka server then I am able to connect but when I try to create a JDBC source connection to same mysql DB then I get error.

" org.apache.kafka.connect.runtime.rest.errors.BadRequestException: Connector configuration is invalid and contains the following 2 error(s): Invalid value java.sql.SQLException: Access denied for user ''@'' (using password: YES) for configuration Couldn't open connection to jdbc:mysql://****:3306/orders "

Thanks, Avinash

morazow commented 3 years ago

Hello @avinash0720,

Could you please try with escaping non alphanumeric characters? If it is possible please also try to change the use password.

avinash0720 commented 3 years ago

hi @morazow , escaping the special characters worked. But I really struggled on this issue, it should be documented somewhere in exasol. Thank you so much for the support. Regards, Avinash

morazow commented 3 years ago

Hey @avinash0720,

Great to hear that it worked for you!

Since this is more MySQL Kafka connect issue, could you please also comment to https://github.com/confluentinc/kafka-connect-jdbc/issues/422 that escaping works?

I have added it to the next documentation todo so that we do not forget.

Thanks and best regards, Muhammet Orazov

avinash0720 commented 3 years ago

Hi @morazow , I have added to the confluentinc/kafka-connect-jdbc#422 as well. Thank again. Regards, Avinash

avinash0720 commented 3 years ago

hi @morazow ,

I have another production issue and after doing detailed analysis I am still unable to find root cause and I have no clue. I have a sink connector which loads data into Exasol. The connector was working fine for a month but now it failed with below error message:

" [2021-02-23 20:44:27,799] ERROR WorkerSinkTask{id=sink-connector-exasol} Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:187) org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception. Caused by: org.apache.kafka.connect.errors.ConnectException: java.sql.SQLException: Exception chain: java.sql.SQLException: Not implemented for CharColumn - setScale() "

I have some decimal fields in target table, I thought it might be some precision issue so I changed all columns to VARCHAR(500) but still I get same error message. Note that all fields in kafka which I am loading into Exasol are either INTEGER or DECIMAL to character truncation is out of scope.

I tried the DEBUG mode (connect-log4j.properties) of the connector logs and it also does not give much information and shows same error log as mentioned above.

Could you please suggest some possible cause of the issue.

Thanks, Avinash

morazow commented 3 years ago

Hello @avinash0720,

The error indicated that it wants to set the scale, but type is char. You should set the table column also to the decimal data type.

I assume your source is still MySQL? Kafka Connect JDBC serializes the decimal as bytes when reading from the source. But you could change this using numeric.mapping property. But, there are still some issues with MySQL.

I think you check if the source decimal values converted to the BigDecimal before sinking to Exasol.

Please have a look into these references:

avinash0720 commented 3 years ago

hi @morazow , Thanks for the quick response. Yes, issue seems to be with DECIMAL and yes my source is Mysql. I tried making Exasol columns as DECIMAL earlier but it was still failing with same error.

I will go though your suggestions and the links and try again. Will keep you informed as it goes. Thanks, Avinash

morazow commented 3 years ago

@avinash0720,

Great, thanks! Please let us know how it goes.

avinash0720 commented 3 years ago

hi @morazow, Indeed the issue is same as mentioned in your ticket (https://github.com/confluentinc/kafka-connect-jdbc/issues/563). I see these values for decimal fields:

    "PRODUCTS_PRICE_BACKEND": "Atrc",
    "PURCHASE_PRICE_NET": "AWC8",
    "PRODUCTS_PRICE_SHOP": "Atrc",
    "DISCOUNT_PERCENTAGE": "AA==",
    "FIFO_SUPPLIERS_PRICE_EUR": "AWC8",

I tried to set the "numeric.mapping" property, still I get same issue (as mentioned for Mysql DECIMAL fields, it still does not work). So I assume I am left with the only option to handle it within the query: CAST the Decimal fields to CHAR fields. I tried casting DECIMAL to NUMERIC but Mysql does not support CAST to NUMERIC.

I hope this issue gets resolved quickly. Let me know if you have any better solution. Thank you for the support. Regards. Avinash

avinash0720 commented 2 years ago

hi @morazow, Do we have solution for this issue now? (confluentinc/kafka-connect-jdbc#563) My prod kafka connector is failing with this error: java.sql.SQLException: Exception chain:\njava.sql.SQLException: Not implemented for CharColumn - setScale()

My source is Mysql and target is Exasol. The code was working for 8 months but now again failed with this issue.

Thanks, Avinash