wepay / kafka-connect-bigquery

DEPRECATED. PLEASE USE https://github.com/confluentinc/kafka-connect-bigquery. A Kafka Connect BigQuery sink connector
Apache License 2.0
155 stars 191 forks source link

Ignore additional field in kafka record if not present in BQ table #306

Open subham611 opened 2 years ago

subham611 commented 2 years ago

I have a BQ table with field {"id":"string", "amount":"long"}. I am passing kafka record as {"id":"1", "amount":1, "extraField":"someField"} and this results in following error:

Task failed with java.lang.NullPointerException error: null (com.wepay.kafka.connect.bigquery.write.batch.KCBQThreadPoolExecutor:70)
Exception in thread "pool-5-thread-2" java.lang.NullPointerException
    at com.wepay.kafka.connect.bigquery.convert.BigQuerySchemaConverter.convertSchema(BigQuerySchemaConverter.java:104)
    at com.wepay.kafka.connect.bigquery.convert.BigQuerySchemaConverter.convertSchema(BigQuerySchemaConverter.java:46)
    at com.wepay.kafka.connect.bigquery.SchemaManager.getBigQuerySchema(SchemaManager.java:479)
    at com.wepay.kafka.connect.bigquery.SchemaManager.convertRecordSchema(SchemaManager.java:315)
    at com.wepay.kafka.connect.bigquery.SchemaManager.getAndValidateProposedSchema(SchemaManager.java:286)
    at com.wepay.kafka.connect.bigquery.SchemaManager.getTableInfo(SchemaManager.java:269)
    at com.wepay.kafka.connect.bigquery.SchemaManager.updateSchema(SchemaManager.java:242)
    at com.wepay.kafka.connect.bigquery.write.row.AdaptiveBigQueryWriter.attemptSchemaUpdate(AdaptiveBigQueryWriter.java:159)
    at com.wepay.kafka.connect.bigquery.write.row.AdaptiveBigQueryWriter.performWriteRequest(AdaptiveBigQueryWriter.java:110)
    at com.wepay.kafka.connect.bigquery.write.row.BigQueryWriter.writeRows(BigQueryWriter.java:118)
    at com.wepay.kafka.connect.bigquery.write.batch.TableWriter.run(TableWriter.java:96)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
    at java.base/java.lang.Thread.run(Thread.java:829)
[2022-08-05 05:22:34,130] ERROR [client_event_receiver_bq_connector|task-0] Task failed with java.lang.NullPointerException error: null (com.wepay.kafka.connect.bigquery.write.batch.KCBQThreadPoolExecutor:70)
Exception in thread "pool-5-thread-1" java.lang.NullPointerException
    at com.wepay.kafka.connect.bigquery.convert.BigQuerySchemaConverter.convertSchema(BigQuerySchemaConverter.java:104)
    at com.wepay.kafka.connect.bigquery.convert.BigQuerySchemaConverter.convertSchema(BigQuerySchemaConverter.java:46)
    at com.wepay.kafka.connect.bigquery.SchemaManager.getBigQuerySchema(SchemaManager.java:479)
    at com.wepay.kafka.connect.bigquery.SchemaManager.convertRecordSchema(SchemaManager.java:315)
    at com.wepay.kafka.connect.bigquery.SchemaManager.getAndValidateProposedSchema(SchemaManager.java:286)
    at com.wepay.kafka.connect.bigquery.SchemaManager.getTableInfo(SchemaManager.java:269)
    at com.wepay.kafka.connect.bigquery.SchemaManager.updateSchema(SchemaManager.java:242)
    at com.wepay.kafka.connect.bigquery.write.row.AdaptiveBigQueryWriter.attemptSchemaUpdate(AdaptiveBigQueryWriter.java:159)
    at com.wepay.kafka.connect.bigquery.write.row.AdaptiveBigQueryWriter.performWriteRequest(AdaptiveBigQueryWriter.java:110)
    at com.wepay.kafka.connect.bigquery.write.row.BigQueryWriter.writeRows(BigQueryWriter.java:118)
    at com.wepay.kafka.connect.bigquery.write.batch.TableWriter.run(TableWriter.java:96)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
    at java.base/java.lang.Thread.run(Thread.java:829)

My connector config is as below:

{
      "connector.class"              : "com.wepay.kafka.connect.bigquery.BigQuerySinkConnector",
      "tasks.max"                       : "1",
      "topics"                              : "kafka_bq_receiver-1",
      "key.converter"                   : "org.apache.kafka.connect.json.JsonConverter",
      "value.converter"                 : "org.apache.kafka.connect.json.JsonConverter",
      "value.converter.schemas.enable"  : "false",
      "key.converter.schemas.enable"    : "false",
      "errors.deadletterqueue.topic.name" : "kafka-dq-sink",
      "errors.log.include.messages" : "true",
      "sanitizeTopics" : "true",
      "autoCreateTables" : "false",
      "autoUpdateSchemas" : "false",
      "schemaRetriever" : "com.wepay.kafka.connect.bigquery.retrieve.IdentitySchemaRetriever",
      "defaultDataset" : "df",
      "allowNewBigQueryFields" : "true",
      "allowBigQueryRequiredFieldRelaxation": "true",
      "keyfile" : "key.json",
      "bigQueryPartitionDecorator": "false",
      "timestamp": "UTC",
      "transforms" : "setTopic",
      "transforms.setTopic.type": "io.confluent.connect.transforms.ExtractTopic$Value",
      "transforms.setTopic.field": "id"
    }

Is there any way to ignore this additional field in kafka record instead of generating error?

subham611 commented 2 years ago
protected InsertAllRequest createInsertAllRequest(PartitionedTableId tableId,
                                                    Collection<InsertAllRequest.RowToInsert> rows) {
    return InsertAllRequest.newBuilder(tableId.getFullTableId(), rows)
        .setIgnoreUnknownValues(false)
        .setSkipInvalidRows(false)
        .build();
  }

here setIgnoreUnknownValues is marked as false. Can we make it true or create variable which can be set be user?