yugabyte / yb-kafka-connector

Kafka Connect YugabyteDB Connector
Apache License 2.0
19 stars 9 forks source link

'Value should be of type Integer' exception for `int` type column. #9

Open ximik3 opened 4 years ago

ximik3 commented 4 years ago

Replaying tutorial https://docs.yugabyte.com/latest/reference/connectors/kafka-connect-yugabytedb/

Works fine with the original schema: CREATE TABLE demo.test_table (key text, value bigint, ts timestamp, PRIMARY KEY (key)); {"key" : "A", "value" : 1, "ts" : 1541559411000}

However after removing timestamp and changing value type from bigint to int: CREATE TABLE demo.test_table (key text, value int, PRIMARY KEY (key)); {"key" : "A", "value" : 1} connector fails with java.lang.IllegalArgumentException: Value should be of type Integer.

Logs:

[2020-04-27 14:56:12,830] INFO Processing 1 records from Kafka. (com.yb.connect.sink.YBSinkTask:95)
[2020-04-27 14:56:12,831] INFO Add column value of type int (com.yb.connect.sink.YBSinkTask:229)
[2020-04-27 14:56:12,831] INFO Add column key of type text (com.yb.connect.sink.YBSinkTask:229)
[2020-04-27 14:56:12,845] INFO Insert INSERT INTO yugabyte.table3(value,key) VALUES (?,?) (com.yb.connect.sink.YBSinkTask:439)
[2020-04-27 14:56:12,846] INFO Prepare SinkRecord{kafkaOffset=0, timestampType=CreateTime} ConnectRecord{topic='topic1', kafkaPartition=0, key=null, keySchema=Schema{STRING}, value={value=1, key=A}, valueSchema=null, timestamp=1587993155158, headers=ConnectHeaders(headers=)} Key/Schema=null/Schema{STRING} Value/Schema={value=1, key=A}/null (com.yb.connect.sink.YBSinkTask:441)
[2020-04-27 14:56:12,848] INFO Bind 'value' of type int (com.yb.connect.sink.YBSinkTask:255)
[2020-04-27 14:56:12,849] ERROR WorkerSinkTask{id=yugabyte-sink-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted. (org.apache.kafka.connect.runtime.WorkerSinkTask:559)
java.lang.IllegalArgumentException: Value should be of type Integer.
        at com.yb.connect.sink.YBSinkTask.bindByColumnType(YBSinkTask.java:259)
        at com.yb.connect.sink.YBSinkTask.bindFields(YBSinkTask.java:401)
        at com.yb.connect.sink.YBSinkTask.getStatements(YBSinkTask.java:449)
        at com.yb.connect.sink.YBSinkTask.put(YBSinkTask.java:96)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:539)

Env details:

P.S.: This issue was reproduced by @rahuldesirazu . (From slack channel conversation: https://yugabyte-db.slack.com/archives/CG0KQF0GG/p1588075753476800)

vitorcarra commented 3 years ago

Any news on this? I am facing the same issue.

suranjan commented 3 years ago

@vitorcarra We have fixed these issues in our newer version.

Following are the steps to build the required jars needed by Kafka Connect. We are planning to make these jars available in maven so that all these steps can be avoided.

git clone https://github.com/yugabyte/dsbulk.git
cd dsbulk
// install locally
mvn clean install -DskipTests
git clone https://github.com/yugabyte/messaging-connectors-commons.git
cd messaging-connectors-commons
// install locally
mvn clean install -DskipTests
git clone https://github.com/yugabyte/yb-kafka-sink.git
cd kafka-sink
mvn clean package -DskipTests
[INFO] Replacing /home/centos/yb-kafka-sink/dist/target/kafka-connect-yugabytedb-sink-1.4.1-SNAPSHOT.jar with /home/centos/yb-kafka-sink/dist/target/kafka-connect-yugabytedb-sink-distribution-1.4.1-SNAPSHOT-shaded.jar

You can copy the kafka-connect-yugabytedb-sink-1.4.1-SNAPSHOT.jar in the Kafka Connect’s class path. A sample configuration file will be as follows:

{
 "name": "connect-avro",
 "config": {
  "connector.class": "com.datastax.oss.kafka.sink.CassandraSinkConnector",
  "tasks.max": "12",
  "maxNumberOfRecordsInBatch":"100",
  "topics": "avro-stream",
  "contactPoints": "17.15.18.15,17.15.22.135,17.151.23.28",
  "loadBalancing.localDc": "us-west-2",
  "topic.avro-stream.stocks.ticks.mapping": "name=key, symbol=value.symbol, ts=value.datetime, exchange=value.exchange, industry=value.industry, value=value.value",
  "topic.avro-stream.stocks.ticks.consistencyLevel": "QUORUM"
 }
}

We are in the process of updating our documentation, please report if you face any issue.

vitorcarra commented 3 years ago

Hi,

Thank you for the information! I've followed all the steps and the last step failed. Do you have any idea to help me?

I can't reach such URL: https://repo.sjc.dsinternal.org/artifactory/datastax-releases-local/io/confluent/kafka-connect-avro-converter/5.3.1/kafka-connect-avro-converter-5.3.1.pom

[INFO] -------------< com.yugabyte:kafka-connect-yugabytedb-sink >------------- [INFO] Building YugabyteDB Kafka Sink Connector - Sink 1.4.1-SNAPSHOT [2/3] [INFO] --------------------------------[ jar ]--------------------------------- Downloading from datastax-releases-local: https://repo.sjc.dsinternal.org/artifactory/datastax-releases-local/io/confluent/kafka-connect-avro-converter/5.3.1/kafka-connect-avro-converter-5.3.1.pom Downloading from datastax-releases-local: https://repo.sjc.dsinternal.org/artifactory/datastax-releases-local/io/confluent/kafka-streams-avro-serde/5.3.1/kafka-streams-avro-serde-5.3.1.pom [INFO] ------------------------------------------------------------------------ [INFO] Reactor Summary for YugabyteDB Kafka Sink Connector 1.4.1-SNAPSHOT: [INFO] [INFO] YugabyteDB Kafka Sink Connector .................... SUCCESS [ 1.343 s] [INFO] YugabyteDB Kafka Sink Connector - Sink ............. FAILURE [ 0.485 s] [INFO] YugabyteDB Kafka Sink Connector - Distribution ..... SKIPPED [INFO] ------------------------------------------------------------------------ [INFO] BUILD FAILURE [INFO] ------------------------------------------------------------------------ [INFO] Total time: 1.928 s [INFO] Finished at: 2021-05-11T17:41:42-03:00 [INFO] ------------------------------------------------------------------------ [ERROR] Failed to execute goal on project kafka-connect-yugabytedb-sink: Could not resolve dependencies for project com.yugabyte:kafka-connect-yugabytedb-sink:jar:1.4.1-SNAPSHOT: Failed to collect dependencies at io.confluent:kafka-connect-avro-converter:jar:5.3.1: Failed to read artifact descriptor for io.confluent:kafka-connect-avro-converter:jar:5.3.1: Could not transfer artifact io.confluent:kafka-connect-avro-converter:pom:5.3.1 from/to datastax-releases-local (https://repo.sjc.dsinternal.org/artifactory/datastax-releases-local/): transfer failed for https://repo.sjc.dsinternal.org/artifactory/datastax-releases-local/io/confluent/kafka-connect-avro-converter/5.3.1/kafka-connect-avro-converter-5.3.1.pom: Unknown host repo.sjc.dsinternal.org: nodename nor servname provided, or not known -> [Help 1]

suranjan commented 3 years ago

@vitorcarra Could you please check if it is a temporary network problem? I just tried again on a fresh machine and it worked. I can remove the dependency on https://repo.sjc.dsinternal.org/artifactory/datastax-releases-local repository in case it still doesn't work for you. Please let me know.

vitorcarra commented 3 years ago

@suranjan Thank you for checking.

Unfortunately, I've got the same error again:

Unknown host repo.sjc.dsinternal.org: nodename nor servname provided, or not known -> [Help 1]

suranjan commented 3 years ago

@vitorcarra

Ok, looks like some issue with your firewall? Could you please apply the patch from https://gist.github.com/suranjan/cb261c2b5af2ea4a0bdc11f7de0aa505 and try once again. I have removed the dependency on repo.sjc.dsinternal.org and want to understand if that's the only issue at your end.

vitorcarra commented 3 years ago

Hi, @suranjan ! I've tried but got another network issue. I am using my personal computer and there is no custom firewall configuration. Actually, I am new at this mvn and java world. Maybe I am doing something really wrong. But the first to mvn commands listed on steps you sent before worked just fine.

[ERROR] Failed to execute goal on project kafka-connect-yugabytedb-sink: Could not resolve dependencies for project com.yugabyte:kafka-connect-yugabytedb-sink:jar:1.4.1-SNAPSHOT: Failed to collect dependencies at io.confluent:kafka-connect-avro-converter:jar:5.3.1: Failed to read artifact descriptor for io.confluent:kafka-connect-avro-converter:jar:5.3.1: Could not transfer artifact io.confluent:kafka-connect-avro-converter:pom:5.3.1 from/to maven-default-http-blocker (http://0.0.0.0/): Blocked mirror for repositories: [Confluent (http://packages.confluent.io/maven/, default, releases)] -> [Help 1]

suranjan commented 3 years ago

@vitorcarra Looks like the "Blocked mirror for repositories:" issue is due to a new version of maven. Changing the repository URL to "HTTPS", should work. I will push the changes once reviewed internally, in the meantime, you can also try the same.

vitorcarra commented 3 years ago

Hello @suranjan . Changing URL to use HTTPS fixed the issue. Thank you so much!