wepay / kafka-connect-bigquery

DEPRECATED. PLEASE USE https://github.com/confluentinc/kafka-connect-bigquery. A Kafka Connect BigQuery sink connector
Apache License 2.0
155 stars 191 forks source link

BigQuery Exception, cannot write data to BigQuery using JSON Converter #178

Open bmd-benitaclarissa opened 5 years ago

bmd-benitaclarissa commented 5 years ago

Hi,

I am trying to write data from Kafka to BigQuery using Kafka Connect JSON Converter. But, I always got error. How to solve this issue?

Here is the configuration:

{
    "name": "sink_bq_MyTable",
    "config": {
        "connector.class": "com.wepay.kafka.connect.bigquery.BigQuerySinkConnector",
        "key.converter": "org.apache.kafka.connect.json.JsonConverter",
        "key.converter.schema.registry.url": "http://shema-registry:8081",
        "key.converter.schemas.enable": "false",
        "value.converter": "org.apache.kafka.connect.json.JsonConverter",
        "value.converter.schema.registry.url": "http://schema-registry:8081",
        "value.converter.schemas.enable": "true",
    "transforms": "unwrap",  
    "transforms.unwrap.type": "io.debezium.transforms.UnwrapFromEnvelope",
    "project": "my-bq-project-id",
        "keyfile": "/home/ubuntu/key.json",
        "datasets": ".*=my-bq-dataset",
    "autoCreateTables":"false",
    "autoUpdateSchemas":"false",
        "topics": "cdc.dbo.MyTable",
        "topicsToTables":"cdc.dbo.(.*)=$1"
    }
}

Here is the exception:

java.lang.ClassCastException: [B cannot be cast to java.nio.ByteBuffer
    at com.wepay.kafka.connect.bigquery.convert.BigQueryRecordConverter.convertObject(BigQueryRecordConverter.java:99)
    at com.wepay.kafka.connect.bigquery.convert.BigQueryRecordConverter.convertStruct(BigQueryRecordConverter.java:129)
    at com.wepay.kafka.connect.bigquery.convert.BigQueryRecordConverter.convertRecord(BigQueryRecordConverter.java:73)
    at com.wepay.kafka.connect.bigquery.convert.BigQueryRecordConverter.convertRecord(BigQueryRecordConverter.java:51)
    at com.wepay.kafka.connect.bigquery.BigQuerySinkTask.getRecordRow(BigQuerySinkTask.java:143)
    at com.wepay.kafka.connect.bigquery.BigQuerySinkTask.put(BigQuerySinkTask.java:165)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:538)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:321)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:224)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:192)
    at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)
    at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)

Thanks in advance

mtagle commented 5 years ago

What's the record of the object you're attempting to write? It looks as if the converter is attempting to convert a byte array but it's getting it in an unexpected format.

bmd-benitaclarissa commented 5 years ago

The first field is timestamp that using the bytes data type

{
  "schema": {
    "type": "struct",
    "fields": [
      {
        "type": "struct",
        "fields": [
          {
            "type": "bytes",
            "optional": false,
            "field": "timestamp"
          },
          {
            "type": "string",
            "optional": false,
            "field": "ID"
          },
          {
            "type": "string",
            "optional": false,
            "field": "My Field"
          },
...
...
...
bmd-benitaclarissa commented 5 years ago

Hi @mtagle,

I am trying to remove timestamp field (which data type is "bytes"). But, there is a new error message like below

2019-08-16 06:56:35,801] ERROR WorkerSinkTask{id=sink_bigquery_bankAccount-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted. (org.apache.kafka.connect.runtime.WorkerSinkTask)
com.wepay.kafka.connect.bigquery.exception.ConversionConnectException: Top-level Kafka Connect schema must be of type 'struct'
    at com.wepay.kafka.connect.bigquery.convert.BigQueryRecordConverter.convertRecord(BigQueryRecordConverter.java:70)
    at com.wepay.kafka.connect.bigquery.convert.BigQueryRecordConverter.convertRecord(BigQueryRecordConverter.java:51)
    at com.wepay.kafka.connect.bigquery.BigQuerySinkTask.getRecordRow(BigQuerySinkTask.java:143)
    at com.wepay.kafka.connect.bigquery.BigQuerySinkTask.put(BigQuerySinkTask.java:165)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:538)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:321)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:224)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:192)
    at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)
    at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)

Here is the data structure:

{
  "schema": {
    "type": "struct",
    "fields": [
      {
        "type": "string",
        "optional": false,
        "field": "No_"
      },
      {
        "type": "string",
        "optional": false,
        "field": "Name"
      },
      {
        "type": "string",
        "optional": false,
        "field": "__deleted"
      }
    ],
    "optional": false,
    "name": "TestTable.Value"
  },
  "payload": {
    "No_": "ABC123",
    "Name": "Here is the name of the record",
    "__deleted": "false"
  }
}

Could you help me to solve this? Did anyone get the same issue? Could you tell me how to make it works?

Thanks in advance.

ricardodejuan commented 4 years ago

@bmd-benita Hi, could you write JSON data to BigQuery? or this library only works with Avro format?

michTalebzadeh commented 3 years ago

Hi,

I have a similar error writing to BigQuery

This is message sent that I can get it from console

$KAFKA_HOME/bin/kafka-console-consumer.sh --bootstrap-server rhes75:9092,rhes75:9093,rhes75:9094,rhes564:9092,rhes564:9093,rhes564:9094,rhes76:9092,rhes76:9093,rhes76:9094 --from-beginning --topic md --property print.key=true

Note that it also prints kafka key

9485818a-e6c5-434d-9096-29c6e3f55148    {"schema": { "type": "struct", "fields": [ { "field": "rowkey", "type": "string", "optional": true}],"optional": false,"name": "BQ"}, "payload": {"rowkey": "9485818a-e6c5-434d-9096-29c6e3f55148"}}

The error thrown is

[2021-03-17 09:29:16,655] ERROR WorkerSinkTask{id=bigquery-sink-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted. Error: Top-level Kafka Connect schema must be of type 'struct' (org.apache.kafka.connect.runtime.WorkerSinkTask:612)

This is the the standalone properties file

bootstrap.servers=rhes75:9092,rhes75:9093,rhes75:9094,rhes564:9092,rhes564:9093,rhes564:9094,rhes76:9092,rhes76:9093,rhes76:9094
key.converter=org.apache.kafka.connect.storage.StringConverter
#key.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=true
value.converter=org.apache.kafka.connect.json.JsonConverter
value.converter.schemas.enable=true
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter=org.apache.kafka.connect.storage.StringConverter
##internal.value.converter=org.apache.kafka.connect.storage.StringConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false
offset.storage.file.filename=/tmp/connect_bq.offsets
offset.flush.interval.ms=10000

and this is the sink properties file

name=bigquery-sink
connector.type=bigquery-connector
connector.class=com.wepay.kafka.connect.bigquery.BigQuerySinkConnector
defaultDataset=test
project=axial-glow-224522
topics=md
autoCreateTables=false
gcsBucketName=tmp_storage_bucket
queueSize=-1
bigQueryRetry=0
bigQueryRetryWait=1000
bigQueryMessageTimePartitioning=false
bigQueryPartitionDecorator=true
timePartitioningType=DAY
keySource=FILE
keyfile=/home/hduser/GCPFirstProject-d75f1b3a9817.json
sanitizeTopics=false
schemaRetriever=com.wepay.kafka.connect.bigquery.retrieve.IdentitySchemaRetriever
threadPoolSize=10
allBQFieldsNullable=false
avroDataCacheSize=100
batchLoadIntervalSec=120
convertDoubleSpecialValues=false
enableBatchLoad=false
upsertEnabled=false
deleteEnabled=false
mergeIntervalMs=60000
mergeRecordsThreshold=-1
autoCreateBucket=true
allowNewBigQueryFields=false
allowBigQueryRequiredFieldRelaxation=false
allowSchemaUnionization=false
kafkaDataFieldName=null
kafkaKeyFieldName=null

Thanks

amithapa commented 2 years ago

@michTalebzadeh Any luck with the solution?

aakarshg commented 2 years ago

Any update on this? Facing the same. When using org.apache.kafka.connect.storage.StringConverter for key.converter. tagging @amithapa @michTalebzadeh since you guys were last commenting with similar issue

ShahNewazKhan commented 2 years ago

+1

shivakumarss commented 2 years ago

+1

shivakumarss commented 2 years ago

This is my current config with this everything works fine, just that key is not persisted in the big query.

CREATE SINK CONNECTOR `gcpbq52` WITH(
        "connector.class"='com.wepay.kafka.connect.bigquery.BigQuerySinkConnector',
        "keyfile"='/root/src/confluent/auth.json',
        "project"='dev-project-123',
        "defaultDataset"='dev_biqquery_poc',
        "topics"='topicNew01Dev',
        "timestampPartitionFieldName"='created_at',
        "bigQueryPartitionDecorator"='false',
        "timePartitioningType"='DAY',
        "sanitizeTopics"='false',
        "transforms"='tableChange',
        "transforms.tableChange.type"='org.apache.kafka.connect.transforms.RegexRouter',
        "transforms.tableChange.regex"='.+',
        "transforms.tableChange.replacement"='table_name_here',
        "queueSize"='5',
        "auto.create.tables"='true',
        "autoCreateTables"='true',
        "key.converter"='org.apache.kafka.connect.storage.StringConverter',
        "mergeIntervalMs"='5000',
        "bigQueryRetry"='2'
);

when i add following config then the below exception triggers.

"kafkaKeyFieldName"='request_id',

Currently, for the key there is no schema present it is just a string.

any help is appreated here.

com.wepay.kafka.connect.bigquery.exception.ConversionConnectException: Top-level Kafka Connect schema must be of type 'struct'
    at com.wepay.kafka.connect.bigquery.convert.BigQueryRecordConverter.convertRecord(BigQueryRecordConverter.java:88)
    at com.wepay.kafka.connect.bigquery.convert.BigQueryRecordConverter.convertRecord(BigQueryRecordConverter.java:49)
    at com.wepay.kafka.connect.bigquery.utils.SinkRecordConverter.lambda$getRegularRow$2(SinkRecordConverter.java:135)
    at java.base/java.util.Optional.ifPresent(Optional.java:183)
    at com.wepay.kafka.connect.bigquery.utils.SinkRecordConverter.getRegularRow(SinkRecordConverter.java:134)
    at com.wepay.kafka.connect.bigquery.utils.SinkRecordConverter.getRecordRow(SinkRecordConverter.java:74)
    at com.wepay.kafka.connect.bigquery.write.batch.TableWriter$Builder.addRow(TableWriter.java:196)
    at com.wepay.kafka.connect.bigquery.BigQuerySinkTask.put(BigQuerySinkTask.java:276)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:584)
Pangstar commented 1 year ago

+1

bmd-benitaclarissa commented 1 year ago

@bmd-benita Hi, could you write JSON data to BigQuery? or this library only works with Avro format?

Yes, I use the io.confluent.connect.json.JsonSchemaConverter to write the data in JSON format from Kafka (CDC) to BigQuery. The connector is running in Kafka Connect v7.1.1 and it works properly now.

slavab89 commented 10 months ago

Yes, I use the io.confluent.connect.json.JsonSchemaConverter to write the data in JSON format from Kafka (CDC) to BigQuery. The connector is running in Kafka Connect v7.1.1 and it works properly now.

@bmd-benitaclarissa May i ask whether you represent logical types in the JSON schema and how? For instance, how do you represent a Timestamp? or a JSON type in BQ