confluentinc / kafka-connect-bigquery

A Kafka Connect BigQuery sink connector
Apache License 2.0
3 stars 2 forks source link

Not able to send kafka json messages to bigquery #406

Open nassereddinebelghith opened 7 months ago

nassereddinebelghith commented 7 months ago

The problem I'm facing is related to schema compatibility in Kafka Connect. The error message indicates that the top-level Kafka Connect schema must be of type 'struct', but the schema being received does not adhere to this requirement. This suggests that there's a mismatch between the schema expected by the connector and the schema being provided with the Kafka message.

Here's what I've tried:

Checking Kafka Message Schema: I've verified the schema of my Kafka message to ensure it's correctly structured. However, it seems that the schema is not compliant with the 'struct' type requirement. Connector Configuration: I've checked the configuration of my Kafka connector to ensure that schema-related parameters are correctly set. Specifically, I've ensured that the key.converter.schemas.enable and value.converter.schemas.enable parameters are both configured to true if schemas are being used. Compatibility of Versions: I've verified the compatibility of versions between my Kafka connector, Kafka cluster, and schema. Data Examination: I've examined the incoming data to ensure it adheres to the expected schema. Incorrect data can lead to schema conversion errors. Despite these efforts, the error persists, indicating that there may still be a discrepancy between the schema expected by the connector and the schema provided with the Kafka message. To resolve this issue, I may need to further investigate the schema configurations and the format of the incoming data to ensure they align with the expectations of the Kafka connector. Additionally, considering the specific details of the schema and the connector's requirements may provide further insights into resolving the problem.

Here is an exemple of conf of the connect & connector:

apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnect
metadata:
  name: connect-json-bigquery-sink
  annotations:
    strimzi.io/use-connector-resources: "true"
spec:
  logging:
    type: external
    valueFrom:
      configMapKeyRef:
        name: strimzi-kafka-connect-log4j-properties
        key: log4j.properties
  bootstrapServers: strimzi-cluster:9092
  template:
    pod:
      imagePullSecrets:
        - name: $(AGILE_FABRIC_SECRET)
    serviceAccount:
      metadata:
        annotations:
          iam.gke.io/gcp-service-account: phx-kafka-bigquery-connector@my-gcp-project-id.iam.gserviceaccount.com
  version: 3.5.2
  replicas: 3
  image: phenix-docker-virtual.enterpriserepo.fr.carrefour.com/phenix/platform/phx-strimzi-kafka-connect:2.0.0-SNAPSHOT
  config:
    config.providers: env,secrets
    config.providers.env.class: io.strimzi.kafka.EnvVarConfigProvider
    config.providers.secrets.class: io.strimzi.kafka.KubernetesSecretConfigProvider
    group.id: strimzi-connect-json-bigquery-sink
    offset.storage.topic: strimzi-connect-json-bigquery-sink-offsets
    config.storage.topic: strimzi-connect-json-bigquery-sink-configs
    status.storage.topic: strimzi-connect-json-bigquery-sink-status
    key.converter: org.apache.kafka.connect.storage.StringConverter
    value.converter: org.apache.kafka.connect.json.JsonConverter
  resources:
    limits:
      memory: 8Gi
    requests:
      cpu: 100m
      memory: 8Gi
  readinessProbe:
    initialDelaySeconds: 15
    timeoutSeconds: 5
  livenessProbe:
    initialDelaySeconds: 15
    timeoutSeconds: 5
  metricsConfig:
    type: jmxPrometheusExporter
    valueFrom:
      configMapKeyRef:
        name: jmx-kafka-connect-prometheus
        key: jmx-kafka-connect-prometheus.yml
  externalConfiguration:
    env:
      - name: SCHEMA_REGISTRY_URL
        valueFrom:
          configMapKeyRef:
            name: kafka-config
            key: SCHEMA_REGISTRY
      - name: GCP_PROJECT
        valueFrom:
          configMapKeyRef:
            name: bigquery-connect-config
            key: GCP_PROJECT
      - name: CONNECT_INTERNAL_KEY_CONVERTER
        valueFrom:
          configMapKeyRef:
            name: bigquery-connect-config
            key: CONNECT_INTERNAL_KEY_CONVERTER
      - name: CONNECT_INTERNAL_VALUE_CONVERTER
        valueFrom:
          configMapKeyRef:
            name: bigquery-connect-config
            key: CONNECT_INTERNAL_VALUE_CONVERTER
      - name: CONNECT_CONSUMER_MAX_POLL_RECORDS
        valueFrom:
          configMapKeyRef:
            name: bigquery-connect-config
            key: CONNECT_CONSUMER_MAX_POLL_RECORDS
      - name: CONNECT_CONSUMER_MAX_POLL_INTERVAL_MS
        valueFrom:
          configMapKeyRef:
            name: bigquery-connect-config
            key: CONNECT_CONSUMER_MAX_POLL_INTERVAL_MS
      - name: CONNECT_CONSUMER_ISOLATION_LEVEL
        valueFrom:
          configMapKeyRef:
            name: bigquery-connect-config
            key: CONNECT_CONSUMER_ISOLATION_LEVEL
      - name: CONNECT_CONSUMER_AUTO_OFFSET_RESET
        valueFrom:
          configMapKeyRef:
            name: bigquery-connect-config
            key: CONNECT_CONSUMER_AUTO_OFFSET_RESET
      - name: CONNECT_CONSUMER_SESSION_TIMEOUT_MS
        valueFrom:
          configMapKeyRef:
            name: bigquery-connect-config
            key: CONNECT_CONSUMER_SESSION_TIMEOUT_MS
      - name: CONNECT_SESSION_TIMEOUT_MS
        valueFrom:
          configMapKeyRef:
            name: bigquery-connect-config
            key: CONNECT_SESSION_TIMEOUT_MS
      - name: CONNECT_HEARTBEAT_INTERVAL_MS
        valueFrom:
          configMapKeyRef:
            name: bigquery-connect-config
            key: CONNECT_HEARTBEAT_INTERVAL_MS
      - name: CONNECT_METRICS_RECORDING_LEVEL
        valueFrom:
          configMapKeyRef:
            name: bigquery-connect-config
            key: CONNECT_METRICS_RECORDING_LEVEL
      - name: CONNECT_OFFSET_FLUSH_INTERVAL_MS
        valueFrom:
          configMapKeyRef:
            name: bigquery-connect-config
            key: CONNECT_OFFSET_FLUSH_INTERVAL_MS
      - name: CONNECT_OFFSET_FLUSH_TIMEOUT_MS
        valueFrom:
          configMapKeyRef:
            name: bigquery-connect-config
            key: CONNECT_OFFSET_FLUSH_TIMEOUT_MS
      - name: CONNECTOR_RETRIES
        valueFrom:
          configMapKeyRef:
            name: bigquery-connect-config
            key: CONNECTOR_RETRIES
      - name: CONNECTOR_TIMEOUT_SECONDS
        valueFrom:
          configMapKeyRef:
            name: bigquery-connect-config
            key: CONNECTOR_TIMEOUT_SECONDS
      - name: CONNECT_TASK_SHUTDOWN_GRACEFUL_TIMEOUT_MS
        valueFrom:
          configMapKeyRef:
            name: bigquery-connect-config
            key: CONNECT_TASK_SHUTDOWN_GRACEFUL_TIMEOUT_MS
      - name: CONNECT_REQUEST_TIMEOUT_MS
        valueFrom:
          configMapKeyRef:
            name: bigquery-connect-config
            key: CONNECT_REQUEST_TIMEOUT_MS
      - name: CONNECT_CONNECTOR_CLIENT_CONFIG_OVERRIDE_POLICY
        valueFrom:
          configMapKeyRef:
            name: bigquery-connect-config
            key: CONNECT_CONNECTOR_CLIENT_CONFIG_OVERRIDE_POLICY
      - name: CONNECT_WORKER_SYNC_TIMEOUT_MS
        valueFrom:
          configMapKeyRef:
            name: bigquery-connect-config
            key: CONNECT_WORKER_SYNC_TIMEOUT_MS

---
apiVersion: rbac.authorization.k8s.io/v1
kind: Role
metadata:
  name: connect-json-bigquery-sink-role
  namespace: platform-dev
rules:
  - apiGroups: [""]
    resources: ["secrets"]
    verbs: ["get"]
---
apiVersion: rbac.authorization.k8s.io/v1
kind: RoleBinding
metadata:
  name: connect-json-bigquery-sink-role-binding
  namespace: platform-dev
subjects:
  - kind: ServiceAccount
    name: connect-json-bigquery-sink-connect
    namespace: platform-dev
roleRef:
  kind: Role
  name: connect-json-bigquery-sink-role
  apiGroup: rbac.authorization.k8s.io
----

apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnector
metadata:
  name: project-indep-connector
  labels:
    strimzi.io/cluster: connect-json-bigquery-sink
spec:
  class: com.wepay.kafka.connect.bigquery.BigQuerySinkConnector
  tasksMax: 1
  config:
    topics: projects-indep-price-purchase-unit-test
    project: "my-project-id"
    defaultDataset: project_indep_test
    keyfile: "${secrets:platform-dev/bigquery-connect-secrets:bigquery-connect.keyfile}"
    keySource: JSON
    key.converter: org.apache.kafka.connect.storage.StringConverter
    value.converter: org.apache.kafka.connect.json.JsonConverter
    value.converter.schemas.enable: false
    key.converter.schemas.enable: false
    bigquery.retry.wait.max.seconds: 600
    bigquery.retry.backoff.max.ms: 600000
    bigquery.retry.backoff.initial.delay.ms: 1000
    errors.tolerance: all
    errors.retry.delay.max.ms: 60000
    errors.log.enable: true
    errors.deadletterqueue.topic.name: "project_indep_dlq_bigquery_sink_dead_letter"
    errors.deadletterqueue.context.headers.enable: true

    allBQFieldsNullable: true 
    error.tolerance: all
    sanitizeTopics: false
    autoCreateTables: false
    autoUpdateSchemas: false
    schemaRetriever: com.wepay.kafka.connect.bigquery.retrieve.IdentitySchemaRetriever
    bufferSize: 100
    maxWriteSize: 100
    tableWriteWait: 1000
    timestamp: UTC
    bigQueryPartitionDecorator: false

PS: The data sent to Kafka must be in JSON format, but the schema is dynamic. Therefore, I cannot specify the schema by having a static BigQuery table already created. Instead, I need the connector to create the table dynamically while injecting the JSON data. My Kafka message must have a string key and a JSON-formatted value.

Can anyone help, please?