confluentinc / kafka-connect-bigquery

A Kafka Connect BigQuery sink connector
Apache License 2.0
3 stars 1 forks source link

kafkaDataFieldName causes table creation failure if empty, even if includeKafkaData is false #134

Open ideasculptor opened 3 years ago

ideasculptor commented 3 years ago

I sent the following config:

{
  "name": "MyDomainEventBigQuerySinkConnector",
  "config": {
    "connector.class": "com.wepay.kafka.connect.bigquery.BigQuerySinkConnector",
    "value.converter": "io.confluent.connect.protobuf.ProtobufConverter",
    "errors.log.enable": "true",
    "errors.log.include.messages": "true",
    "topics": "MyDomainEvent",
    "batchLoadIntervalSec": "120",
    "project": REDACTED,
    "defaultDataset": REDACTED,
    "keyfile": REDACTED,
    "keySource": "JSON",
    "sanitizeTopics": "true",
    "sanitizeFieldNames": "true",
    "kafkaDataFieldName":"",
    "autoCreateTables": "true",
    "allowNewBigQueryFields": "true",
    "allowBigQueryRequiredFieldRelaxation": "true",
    "allowSchemaUnionization": "true",
    "upsertEnabled": "false",
    "deleteEnabled": "false",
    "timePartitioningType": "HOUR",
    "bigQueryMessageTimePartitioning": "false",
    "includeKafkaData": "false",
    "value.converter.schema.registry.url": "http://schema-registry:8081",
    "bigQueryPartitionDecorator": "true"
  }
}

Even though includeKafkaData is false, it still attempted to create a table with the kafkadata field, setting the name to an empty string. Basic sanity checking of inputs doesn't seem unreasonable, especially since control center will not allow you to unset a value of a config if it was included in an uploaded config, even if the value was an empty string. It passes the empty string along no matter what you do in the UI. So maybe it should recognize that an empty string is the same thing as null?

connect            | [2021-09-03 04:07:20,626] INFO Attempting to create table `REDACTED`.`MyDomainEvent` with schema Schema{fields=[Field{name=metadata, type=RECORD, mode=NULLABLE, description=null, policyTags=null}, Field{name=customStringField, type=STRING, mode=NULLABLE, description=null, policyTags=null}, Field{name=customLongField, type=INTEGER, mode=NULLABLE, description=null, policyTags=null}, Field{name=, type=RECORD, mode=NULLABLE, description=null, policyTags=null}]} (com.wepay.kafka.connect.bigquery.SchemaManager)
connect            | [2021-09-03 04:07:20,770] ERROR Task failed with com.wepay.kafka.connect.bigquery.exception.BigQueryConnectException error: Failed to create table GenericData{classInfo=[datasetId, projectId, tableId], {datasetId=temporal, tableId=MyDomainEvent}} (com.wepay.kafka.connect.bigquery.write.batch.KCBQThreadPoolExecutor)
connect            | Exception in thread "pool-5-thread-1" com.wepay.kafka.connect.bigquery.exception.BigQueryConnectException: Failed to create table GenericData{classInfo=[datasetId, projectId, tableId], {datasetId=temporal, tableId=MyDomainEvent}}
connect            | Caused by: Field missing name
connect            |    at com.wepay.kafka.connect.bigquery.write.row.AdaptiveBigQueryWriter.attemptTableCreate(AdaptiveBigQueryWriter.java:170)
connect            |    at com.wepay.kafka.connect.bigquery.write.row.AdaptiveBigQueryWriter.performWriteRequest(AdaptiveBigQueryWriter.java:115)
connect            |    at com.wepay.kafka.connect.bigquery.write.row.BigQueryWriter.writeRows(BigQueryWriter.java:118)
connect            |    at com.wepay.kafka.connect.bigquery.write.batch.TableWriter.run(TableWriter.java:96)
connect            |    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
ideasculptor commented 3 years ago

having investigated transformations, I find myself wondering why the kafkaData thing is even included in the connector since it can be handled much more flexibly via transformations. Easier fix is probably to remove it and update documentation to explain how to do the same thing via transformation. Same goes for setting up message timestamp based partitioning, since the message timestamp must be included in the value via a transformation in order to use it. That one really needs some documentation.

C0urante commented 3 years ago

Even though includeKafkaData is false, it still attempted to create a table with the kafkadata field

This is not a supported property of the connector. I'll try to update the docs to remove it as it appears nowhere in the code base (at least, not that I've been able to find).

Basic sanity checking of inputs doesn't seem unreasonable

Agree that preflight validation to ensure that the property isn't given an empty value or one that can't be used for a BigQuery field name (when sanitization is disabled) would be useful; however...

especially since control center will not allow you to unset a value of a config if it was included in an uploaded config, even if the value was an empty string.

... this is an issue of the UI you're working with, not of the connector.

I find myself wondering why the kafkaData thing is even included in the connector since it can be handled much more flexibly via transformations.

That's a fair point. We can probably do this for the 3.0 release and make life easier for users and maintainers of the connector alike. The one thing that does give me pause is that wallclock time is included in the Kafka data field, which may be a little trickier to include since AFAIK Connect doesn't come with an SMT that does that out of the box, but I don't know how useful that would really be and it'd probably be worth the tradeoff to remove it along with the rest of the kafkaDataFieldName functionality.

Same goes for setting up message timestamp based partitioning, since the message timestamp must be included in the value via a transformation in order to use it.

This is actually incorrect; message timestamp partitioning works fine without being included in the record value as long as these conditions are met:

I believe this is a valid feature of the connector as it allows users to target partitions based on Kafka record timestamps without having to create column-partitioned tables (which may be difficult if multiple sources are writing to the table and ingestion time-partitioning is desirable for most of them).

james-johnston-thumbtack commented 9 months ago

If anyone else is wondering about this:

The includeKafkaData config option was removed entirely some time ago: https://github.com/confluentinc/kafka-connect-bigquery/commit/5dbec3327deeb305d50185fc1896b91a9f694b2a

Setting kafkaDataFieldName appears to be the replacement.

The Confluent documentation at https://docs.confluent.io/kafka-connectors/bigquery/current/kafka_connect_bigquery_config.html is definitely not up-to-date.

james-johnston-thumbtack commented 9 months ago

I find myself wondering why the kafkaData thing is even included in the connector since it can be handled much more flexibly via transformations.

also it would seem that the standard transformation to look for is InsertField: https://docs.confluent.io/platform/current/connect/transforms/insertfield.html

which could be used instead of includeKafkaData