exasol / kafka-connect-jdbc-exasol

Exasol dialect for the Kafka Connect JDBC Connector
Apache License 2.0
10 stars 7 forks source link

Batch mode not supported for exasol sink connector #5

Closed kamrankhan1 closed 2 years ago

kamrankhan1 commented 5 years ago

We have implemented connectors to sync data from MySQL to Exasol. However, we found that the kafka-connect-jdbc exasol does not support batch inserts/updates. If we increase the "batch.size" property to more than 1, we start getting errors. Keeping the batch size to 1 makes the sink connector really slow. Can you please fix this issue

morazow commented 5 years ago

Hello,

Thanks for reporting the issue!

Could you please provide more information on the error?

Please be aware that the batch.size parameter is used for the producers. From you comment, I assume that the Exasol is used as consumer so maybe it is not affecting the inserts.

You can check out the Kafka connect configuration parameters here.

Interesting parameters might be these:

kamrankhan1 commented 5 years ago

Hi Muhammet,

Thanks for quick turnaround. I might have misunderstood the use of batch.size parameter but what baffles me is that if I change the batch.size to 2, the inserts/updates to exasol stop working.

However, the important thing for me right now is to do batch inserts/updates to exasol via this connector. Below is a sample config for one of my connectors. Currently, with batch.size property set to 1, the inserts/updates happen 1 by 1 which makes the inserts very slow when connector is started for first time. Can you help me with a sample sink connector config which does the batch inserts/updates.

Sample Sink Connector Config

"config": { "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector", "connection.password": "*", "tasks.max": "1", "topics": "users", "batch.size": "1", "transforms": "unwrap", "auto.evolve": "true", "connection.user": "", "name": "connector_users_sink", "auto.create": "true", "connection.url": "jdbc:exa:127.0.0.1", "insert.mode": "upsert", "pk.mode": "record_value", "pk.fields": "ID" }

Regards, Kamran

On Thu, 29 Aug 2019 at 14:07, Muhammet Orazov notifications@github.com wrote:

Hello,

Thanks for reporting the issue!

Could you please provide more information on the error?

Please be aware that the batch.size parameter is used for the producers. From you comment, I assume that the Exasol is used as consumer so maybe it is not affecting the inserts.

You can check out the Kafka connect configuration parameters here https://kafka.apache.org/documentation/#connectconfigs.

Interesting parameters might be these:

  • max.message.bytes
  • fetch.max.bytes
  • max.partition.fetch.bytes

— You are receiving this because you authored the thread. Reply to this email directly, view it on GitHub https://github.com/exasol/kafka-connect-jdbc-exasol/issues/5?email_source=notifications&email_token=AFLIBVGJ3MSB75ZCEDRZEHDQG6DEBA5CNFSM4IRLIUGKYY3PNVWWK3TUL52HS4DFVREXG43VMVBW63LNMVXHJKTDN5WW2ZLOORPWSZGOD5NW7QY#issuecomment-526086083, or mute the thread https://github.com/notifications/unsubscribe-auth/AFLIBVBIRDOEPZQODAZSUTLQG6DEBANCNFSM4IRLIUGA .

morazow commented 5 years ago

Hello @kamrankhan1,

The batching is not done from connector side but instead from Kafka itself. There are many parameters which you can try out. I agree it can be frustrating though.

Please have look into this discussion, it asks similar question. Additionally, you can check out these parameters:

I hope these are helpful.

Best

kamrankhan1 commented 5 years ago

Thanks Muhammet. Will give it a try and will update you.

Regards, Kamran

On Mon, 2 Sep 2019 at 15:26, Muhammet Orazov notifications@github.com wrote:

Hello @kamrankhan1 https://github.com/kamrankhan1,

The batching is not done from connector side but instead from Kafka itself. There are many parameters which you can try out. I agree it can be frustrating though.

Please have look into this discussion https://github.com/confluentinc/kafka-connect-jdbc/issues/290, it asks similar question. Additionally, you can check out these https://docs.confluent.io/2.0.0/connect/connect-jdbc/docs/index.html#configuration-options parameters https://docs.confluent.io/current/installation/configuration/consumer-configs.html :

  • batch.max.rows
  • poll.interval.ms

I hope these are helpful.

Best

— You are receiving this because you were mentioned. Reply to this email directly, view it on GitHub https://github.com/exasol/kafka-connect-jdbc-exasol/issues/5?email_source=notifications&email_token=AFLIBVFHXJJGOX3F5S7CEB3QHTPONA5CNFSM4IRLIUGKYY3PNVWWK3TUL52HS4DFVREXG43VMVBW63LNMVXHJKTDN5WW2ZLOORPWSZGOD5VLASI#issuecomment-527085641, or mute the thread https://github.com/notifications/unsubscribe-auth/AFLIBVDRBMAPH7PB4B5YSM3QHTPONANCNFSM4IRLIUGA .

smaspe commented 4 years ago

Hi,

We've been investigating what appears to be the same issue, so I can add some of our findings:

Based on that, I believe the "MERGE" query cannot be used in a batched prepared statement (see below for reproduction that should be enough proof of that).

There doesn't seem to be an easy way to go around this:

Reproduction

The error is easy to reproduce. Assuming you have a table SOME_TABLE in a schema SOME_SCHEMA, with some columns "id" and "value":

try {
    Class.forName("com.exasol.jdbc.EXADriver");
} catch (ClassNotFoundException e) {
    e.printStackTrace();
}
try (
        Connection connection = DriverManager.getConnection(
                "jdbc:exa:localhost:8888;schema=SOME_SCHEMA",
                "sys",
                "exasol"
        );
        PreparedStatement statement = connection.prepareStatement(
                "merge into SOME_SCHEMA.SOME_TABLE  using (select ? as \"id\", ? as \"value\") as incoming on target.\"id\" = incoming.\"id\"\n" +
                        "    when matched then update set target.\"value\" = incoming.\"value\" where target.\"id\" = incoming.\"id\"\n" +
                        "    when not matched then insert (\"id\", \"value\") values (target.\"id\", target.\"value\");"
        )
) {
    statement.setObject(1, "1");
    statement.addBatch();
    statement.setObject(1, "2");
    statement.addBatch();
    statement.executeUpdate();
} catch (SQLException e) {
    e.printStackTrace();
}
morazow commented 4 years ago

Hey @smaspe,

Thanks for the feedback!

Sorry, for the delayed reply. I am going to reproduce your suggestions and see if we can fix/solve it.

morazow commented 4 years ago

Hey @smaspe,

I have discussed this with colleague responsible with interfaces. Unfortunately, indeed merge with batches is not supported.

I am going to update the readme mentioning it. If you need this feature, you can open an idea ticket with us in IDEAs page.

Thanks for the reporting it!

Best

avinash0720 commented 3 years ago

hi @morazow ,

I am also facing this issue. I am unable to use 'upsert' mode for Exasol sink connector.

I too get below error:

[2020-12-02 17:35:55,464] WARN Write of 4 records failed, remainingRetries=9 (io.confluent.connect.jdbc.sink.JdbcSinkTask:76) java.sql.SQLException: Feature not supported: Prepared statement with multiple row count results (Session: 1684988813808230386)

The merge query generated by kafka is correct but still I get this error.

Are you aware of any work around for this upsert issue? Could you please suggest me some alternative.

Thanks, Avinash

morazow commented 3 years ago

Hello @avinash0720,

Yes, unfortunately, it is not possible to use Exasol MERGE statement with batches.

One alternative is to use UDF based import from Kafka into Exasol.

At the moment, I am not aware of the ongoing work to support this feature. Please ask in the community or open an IDEA ticket so that our Product Management can assess the request.

morazow commented 2 years ago

This project has been discontinued. Please use exasol/kafka-connector-extension for integration with Apache Kafka.