scylladb / kafka-connect-scylladb

Kafka Connect Scylladb Sink
Apache License 2.0
46 stars 22 forks source link

Implement throttling queries to Scylla #67

Open Bouncheck opened 2 years ago

Bouncheck commented 2 years ago

It seems that the current behaviour of the connector is to send records as fast as possible, without waiting for futures to complete. This can lead to issues with some configurations.

For example simple test that sends 10k records failed unexpectedly when ran by github actions runner:

Error:  io.connect.scylladb.integration.ScyllaDbSinkConnectorIT.insert  Time elapsed: 32.609 s  <<< ERROR!
org.apache.kafka.connect.errors.RetriableException: com.datastax.driver.core.exceptions.NoHostAvailableException: All host(s) tried for query failed (tried: /0.0.0.0:9042 (com.datastax.driver.core.exceptions.BusyPoolException: [/0.0.0.0] Pool is busy (no available connection and the queue has reached its max size 256)))
    at io.connect.scylladb.integration.ScyllaDbSinkConnectorIT.insert(ScyllaDbSinkConnectorIT.java:173)
Caused by: com.datastax.driver.core.exceptions.NoHostAvailableException: All host(s) tried for query failed (tried: /0.0.0.0:9042 (com.datastax.driver.core.exceptions.BusyPoolException: [/0.0.0.0] Pool is busy (no available connection and the queue has reached its max size 256)))
    at io.connect.scylladb.integration.ScyllaDbSinkConnectorIT.insert(ScyllaDbSinkConnectorIT.java:173)
Caused by: com.datastax.driver.core.exceptions.NoHostAvailableException: All host(s) tried for query failed (tried: /0.0.0.0:9042 (com.datastax.driver.core.exceptions.BusyPoolException: [/0.0.0.0] Pool is busy (no available connection and the queue has reached its max size 256)))
    at io.connect.scylladb.integration.ScyllaDbSinkConnectorIT.insert(ScyllaDbSinkConnectorIT.java:173)

This is the method to which the exception points Looks like it does not care about current number of "in flight" queries before executing another.