confluentinc / kafka-connect-bigquery

A Kafka Connect BigQuery sink connector
Apache License 2.0
3 stars 2 forks source link

JSON data type is not working #370

Open kgw7401 opened 11 months ago

kgw7401 commented 11 months ago

I am currently trying to load data using the BigQuery Sink Connector. I have defined the schema for the data I want to load into BigQuery, and the schema looks like the following:

[
  {
    "name": "device_id",
    "type": "STRING",
    "mode": "REQUIRED"
  },
  {
    "name": "version",
    "type": "STRING",
    "mode": "NULLABLE"
  },
  {
    "name": "device",
    "type": "RECORD",
    "mode": "NULLABLE",
    "fields": [
      {
        "name": "category",
        "type": "STRING",
        "mode": "NULLABLE"
      },
      {
        "name": "os",
        "type": "STRING",
        "mode": "NULLABLE"
      }
    ]
  },
  {
    "name": "parameters",
    "type": "JSON",
    "mode": "NULLABLE"
  }
]

The issue I’m facing is that when loading a field with the JSON data type in BigQuery, the field is recognized as a RECORD type, causing the operation to fail. The error message I’m encountering is as follows:

(location parameters, reason: invalid): This field: parameters is not a record.

Additionally, the Connector YAML is as follows:

---
apiVersion: platform.confluent.io/v1beta1
kind: Connector
metadata:
name: bigquery-event-log-sink-connector
spec:
  taskMax: 1
  configs:
    defaultDataset: event_log
    project: <project_name>
    topics: <topic_name>
    autoCreateTables: "false"
    autoUpdateSchemas: "false"
    bigQueryPartitionDecorator: "false"
    timestampPartitionFieldName: timestamp
    topic2TableMap: <topic_name>:<table_name>
    value.converter: org.apache.kafka.connect.json.JsonConverter
    value.converter.schemas.enable: "false"
    transforms: "TimestampIntCast,TimestampConverter"
    transforms.TimestampIntCast.type: org.apache.kafka.connect.transforms.Cast$Value
    transforms.TimestampIntCast.spec: timestamp:int64
    transforms.TimestampConverter.type: org.apache.kafka.connect.transforms.TimestampConverter$Value
    transforms.TimestampConverter.target.type: string
    transforms.TimestampConverter.field: timestamp
    transforms.TimestampConverter.format: "yyyy-MM-dd HH:mm:ss"

help me with this issue pls

b-goyal commented 11 months ago

How are you sending the JSON input, could you share a sample record ? Sample record from my testing which worked fine

{
  "device_id": "device_id1223",
  "version": "1.1.x",
  "device": {
    "category": "C11",
    "os": "Windows"
  },
  "parameters": "{\"city\": \"Mountain View\",\"state\": \"CA\",\"zipcode\": 94041}"
}

BQ table -

Screenshot 2023-12-20 at 10 24 21 AM