Aiven-Open / gcs-connector-for-apache-kafka

Aiven's GCS Sink Connector for Apache Kafka®
Apache License 2.0
70 stars 38 forks source link

GCS Files Overwritten every minute by kafka sink connector and some messages missing #347

Closed suisenkotoba closed 6 months ago

suisenkotoba commented 7 months ago

Some of the files written by kafka sink connector in bucket has 0 bytes, I wonder if this is normal behaviour?

Screenshot 2024-02-09 at 11 00 43

In another hour all of the files have 0 bytes and I notice they keep getting overwritten every minute until next hour and it keeps happening again.

Screenshot 2024-02-09 at 10 58 53

I also found this error in connector logs, not sure if they are related with the issue:

2024-02-09 04:06:01,665 ERROR [de-lsm-gcs-connector|task-1] WorkerSinkTask{id=de-lsm-gcs-connector-1} Offset commit failed, rewinding to last committed offsets (org.apache.kafka.connect.runtime.WorkerSinkTask) [task-thread-de-lsm-gcs-connector-1]
org.apache.kafka.connect.errors.ConnectException: java.lang.NullPointerException
    at io.aiven.kafka.connect.gcs.GcsSinkTask.flushFile(GcsSinkTask.java:131)
    at java.base/java.util.HashMap.forEach(HashMap.java:1421)
    at java.base/java.util.Collections$UnmodifiableMap.forEach(Collections.java:1553)
    at io.aiven.kafka.connect.gcs.GcsSinkTask.flush(GcsSinkTask.java:114)
    at org.apache.kafka.connect.sink.SinkTask.preCommit(SinkTask.java:125)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.commitOffsets(WorkerSinkTask.java:407)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.commitOffsets(WorkerSinkTask.java:377)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:221)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:206)
    at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:202)
    at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:257)
    at org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$1(Plugins.java:177)
    at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)
    at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
    at java.base/java.lang.Thread.run(Thread.java:833)
Caused by: java.lang.NullPointerException
...
2024-02-09 04:06:01,665 ERROR [de-lsm-gcs-connector|task-1] WorkerSinkTask{id=de-lsm-gcs-connector-1} Commit of offsets threw an unexpected exception for sequence number 124: null (org.apache.kafka.connect.runtime.WorkerSinkTask) [task-thread-de-lsm-gcs-connector-1]
org.apache.kafka.connect.errors.ConnectException: java.lang.NullPointerException
    at io.aiven.kafka.connect.gcs.GcsSinkTask.flushFile(GcsSinkTask.java:131)
    at java.base/java.util.HashMap.forEach(HashMap.java:1421)
    at java.base/java.util.Collections$UnmodifiableMap.forEach(Collections.java:1553)
    at io.aiven.kafka.connect.gcs.GcsSinkTask.flush(GcsSinkTask.java:114)
    at org.apache.kafka.connect.sink.SinkTask.preCommit(SinkTask.java:125)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.commitOffsets(WorkerSinkTask.java:407)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.commitOffsets(WorkerSinkTask.java:377)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:221)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:206)
    at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:202)
    at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:257)
    at org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$1(Plugins.java:177)
    at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)
    at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
    at java.base/java.lang.Thread.run(Thread.java:833)
Caused by: java.lang.NullPointerException
...

aiven version: 0.13.0 kafka version: 3.5.1 this is my connector configuration:

{
  "connector.class": "io.aiven.kafka.connect.gcs.GcsSinkConnector",
  "tasks.max": "4",
  "topics": "loan_logs",
  "gcs.credentials.path": "<credspath>",
  "gcs.bucket.name": "datalake-raw",
  "file.name.prefix": "lsm/",
  "file.name.timestamp.timezone": "Asia/Jakarta",
  "file.name.template": "{{topic}}/sink_date={{timestamp:unit=yyyy}}{{timestamp:unit=MM}}{{timestamp:unit=dd}}/sink_hour={{timestamp:unit=HH}}/p{{partition:padding=false}}-{{start_offset:padding=true}}.snappy.parquet",
  "file.compression.type": "snappy",
  "format.output.fields": "key,value,offset,timestamp,headers",
  "format.output.type": "parquet",
  "format.output.envelope": "true",
  "key.converter": "org.apache.kafka.connect.storage.StringConverter",
  "key.converter.schemas.enable": "false",
  "value.converter": "org.apache.kafka.connect.storage.StringConverter",
  "value.converter.schemas.enable": "false",
  "behavior.on.null.values": "ignore"
}

I am using the same configuration in different kafka with version 3.3 and aiven 0.9.0 and there has not been any issue similar to this. Is there any new configuration I am missing?

suisenkotoba commented 7 months ago

topic config

apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaTopic
metadata:
  annotations:
    kubectl.kubernetes.io/last-applied-configuration: |
      {"apiVersion":"kafka.strimzi.io/v1beta2","kind":"KafkaTopic","metadata":{"annotations":{},"labels":{"strimzi.io/cluster":"kafka01"},"name":"loan-logs---184077157bfc68094e73c105123319efa806b1f3","namespace":"kafka-prod"},"spec":{"config":{"cleanup.policy":"delete","max.message.bytes":"1048588","min.insync.replicas":"1","retention.bytes":"-1","retention.ms":"1209600000"},"partitions":10,"replicas":3,"topicName":"loan_logs"}}
  creationTimestamp: "2023-11-09T03:34:54Z"
  generation: 5
  labels:
    strimzi.io/cluster: kafka01
  name: loan-logs---184077157bfc68094e73c105123319efa806b1f3
  namespace: kafka-prod
  resourceVersion: "141694147"
  uid: 1936b625-2603-402c-be22-1d7333cfd029
spec:
  config:
    cleanup.policy: delete
    max.message.bytes: "1048588"
    message.format.version: 3.0-IV1
    min.insync.replicas: "1"
    retention.bytes: "-1"
    retention.ms: "1209600000"
  partitions: 10
  replicas: 3
  topicName: loan_logs
status:
  conditions:
  - lastTransitionTime: "2023-11-22T09:28:54.111074277Z"
    status: "True"
    type: Ready
  observedGeneration: 5
  topicName: loan_logs

sink connector:

apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnector
metadata:
  labels:
    strimzi.io/cluster: gcs-connect-cluster
  name: de-lsm-gcs-connector
  namespace: kafka-operator
spec:
  class: io.aiven.kafka.connect.gcs.GcsSinkConnector
  config:
    topics: loan_logs
    file.name.prefix: lsm/
    gcs.credentials.path: /opt/kafka/external-configuration/gcs-auth-secret/google_credentials.json
    gcs.bucket.name: datalake-raw
    file.name.timestamp.timezone: Asia/Jakarta
    file.name.template: "{{topic}}/sink_date={{timestamp:unit=yyyy}}{{timestamp:unit=MM}}{{timestamp:unit=dd}}/sink_hour={{timestamp:unit=HH}}/p{{partition:padding=false}}-{{start_offset:padding=true}}.snappy.parquet"
    file.compression.type: snappy
    format.output.fields: key,value,offset,timestamp,headers
    format.output.type: parquet
    format.output.envelope: true
    behavior.on.null.values: ignore
    key.converter: org.apache.kafka.connect.storage.StringConverter
    key.converter.schemas.enable: false
    value.converter: org.apache.kafka.connect.storage.StringConverter
    value.converter.schemas.enable: false
  tasksMax: 4

I forgot to mention earlier, we set up a cron job to pause-resume kafka connector for every 15 minutes to avoid small file problem in GCS. This is for cron jobs:

---
apiVersion: batch/v1
kind: CronJob
metadata:
  name: de-lsm-gcs-connector-pause
  namespace: kafka-prod
spec:
  schedule: "15,30,45,00 * * * *"
  jobTemplate:
    spec:
      template:
        spec:
          containers:
          - name: de-lsm-gcs-connector-pause
            image: quay.io/curl/curl:8.6.0
            imagePullPolicy: IfNotPresent
            command:
            - /usr/bin/curl
            - -s
            - -X
            - PUT
            - http://gcs-connect-cluster-connect-api:8083/connectors/de-lsm-gcs-connector/pause
          restartPolicy: OnFailure
---
apiVersion: batch/v1
kind: CronJob
metadata:
  name: de-lsm-gcs-connector-resume
  namespace: kafka-prod
spec:
  schedule: "14,29,44,57 * * * *"
  jobTemplate:
    spec:
      template:
        spec:
          containers:
          - name: de-lsm-gcs-connector-resume
            image: quay.io/curl/curl:8.6.0
            imagePullPolicy: IfNotPresent
            command:
            - /usr/bin/curl
            - -s
            - -X
            - PUT
            - http://gcs-connect-cluster-connect-api:8083/connectors/de-lsm-gcs-connector/resume
          restartPolicy: OnFailure

Even though there is job to pause the connector, it did not seem to affect the overwriting. The connector would be in state PAUSED briefly then start running again before resume job started and the overwriting continue.

I did say there were overwritten files with 0 byte size, but that was the case in dev environment. In production the files are not 0 bytes, but they are still got overwritten every minute. Since the filename has partition number prefix and ended with offset, I notice the offset got repeated in the same partition but different hour. and when I check the content there were indeed duplicate key with same offset. The files in GCS bucket are organized by sink date and sink hour, so different hour different object prefix. I think if there is not any prefix by sink date/hour, there would not be any duplicate.

Screenshot 2024-02-15 at 14 04 17

(files in sink hour 9)

Screenshot 2024-02-15 at 14 02 50

(files in sink hour 10)

suisenkotoba commented 6 months ago

@jeqo is there anything else I can provide?

jeqo commented 6 months ago

@suisenkotoba just trying to find some time to look into this :) let me see what's possible tomorrow

suisenkotoba commented 6 months ago

alright thanks @jeqo

jeqo commented 6 months ago

@suisenkotoba for completeness, could you also share the connect worker configurations?

suisenkotoba commented 6 months ago
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnect
metadata:
  annotations:
    strimzi.io/use-connector-resources: "true"
  name: gcs-connect-cluster
  namespace: kafka-prod
spec:
  bootstrapServers: kafka01-kafka-bootstrap:9092
  config:
    config.storage.replication.factor: 3
    config.storage.topic: gcs-connect-cluster-configs
    group.id: gcs-connect-cluster
    max.retries: 100
    offset.storage.replication.factor: 3
    offset.storage.topic: gcs-connect-cluster-offsets
    retry.backoff.ms: 5000
    status.storage.replication.factor: 3
    status.storage.topic: gcs-connect-cluster-status
  jvmOptions:
    "-Xms": "8g"
    "-Xmx": "8g"
  externalConfiguration:
    volumes:
    - name: gcs-auth-secret
      secret:
        secretName: gcs-auth-secret
  image: asia.gcr.io/amf-prod/amf/kafka-connect:0.0.2
  metricsConfig:
    type: jmxPrometheusExporter
    valueFrom:
      configMapKeyRef:
        key: metrics-config.yml
        name: connect-metrics
  replicas: 1
  template:
    pod:
      affinity:
        nodeAffinity:
          requiredDuringSchedulingIgnoredDuringExecution:
            nodeSelectorTerms:
            - matchExpressions:
              - key: name
                operator: In
                values:
                - infra-workers
        podAntiAffinity:
          requiredDuringSchedulingIgnoredDuringExecution:
          - labelSelector:
              matchLabels:
                strimzi.io/cluster: gcs-connect-cluster
            topologyKey: kubernetes.io/hostname
      tolerations:
      - effect: NoSchedule
        key: dedicated
        operator: Equal
        value: infra
  version: 3.5.1

is this what you mean? @jeqo

jeqo commented 6 months ago

@suisenkotoba thanks for sharing this info! Just be careful on not sharing too much info on a public issue :)

Let's me start by trying to separate issue, and check if I understood your context:

The "empty files" issue

Seems to happen only on dev. It's not a expected behavior and seems to be related with the NPE shared -- though the stack trace is lacking details. Could you share what comes after:

Caused by: java.lang.NullPointerException
... <--

That should tell us what is causing the NPE.

Overwritten files

By overwritten, I think you mean files from the same partition/offset are present in different hours (i.e. duplicated) As your are sharing images of 2 different hours and highlighting files with same partition/offset, I'm getting this idea. Is this understanding correct?

I see why you may want to fake this "batching" behavior on connectors by using pause/resume; but I don't think it helps to achieve what you are looking for.

GCS Connector does batching itself in what's internally called "RecordGrouper". This Record Grouper is defined by the path patter defined, and keeps a map of [target paths, records] in memory until records are flushed to GCS. The frequency is defined by the worker config offset.flush.interval.ms (defaults to 1minute). This is why you see new files written every minute or so.

If I'm reading your pause/resume cron jobs right, it means that connector is paused at min 00, and resumed at min 14, then paused at min 15 again. Is this what you are observing as well? If this is correct, then we could replace this, by setting offset.flush.interval.ms to 15 min, so all events from 15 min will be kept in memory until offset is flushed and files written to GCS. This way pause/resuming won't be needed. The only consideration may be to have enough memory to keep events from 15min; but as you are already doing something similar by resuming connector every ~15min, then it should work similarly.

Back to "the same files duplicated between hours", it may be caused by pausing tasks and files being written but offsets not committed; then when resumed (in the next hour) connector starts from previous offsets and stores them on the new hour.

I'd suggest to give a try to keep connector running, tune the flushing, and remove pause/resume scripts, and see if this is enough to solve the issue.

suisenkotoba commented 6 months ago

@jeqo thanks for your feedback

Overwritten files

ignoring the duplicate in different hour, the files will be overwritten every minute in infinite loop. Actually the error is similar with error in dev which has 0 byte files.

I've tried turning off the cron jobs before yet the files were still overwritten. I'll try creating new connector with worker config offset.flush.interval.ms as you suggested

suisenkotoba commented 6 months ago

@jeqo I re-created the kafka connect with additional config offset.flush.interval.ms=900000 yet the worker sink still failed to commit offset. However the error log seemed to a bit different, this time the NPE related to key schema.

2024-02-16 10:31:50,545 INFO [de-gcs-connector|task-3] [Consumer clientId=connector-consumer-de-gcs-connector-3, groupId=connect-de-gcs-connector] Seeking to offset 21927 for partition loan_logs-9 (org.apache.kafka.clients.consumer.KafkaConsumer) [task-thread-de-gcs-connector-3]
2024-02-16 10:31:50,545 INFO [de-gcs-connector|task-3] [Consumer clientId=connector-consumer-de-gcs-connector-3, groupId=connect-de-gcs-connector] Seeking to offset 22487 for partition loan_logs-8 (org.apache.kafka.clients.consumer.KafkaConsumer) [task-thread-de-gcs-connector-3]
2024-02-16 10:31:50,545 ERROR [de-gcs-connector|task-3] WorkerSinkTask{id=de-gcs-connector-3} Commit of offsets threw an unexpected exception for sequence number 1: null (org.apache.kafka.connect.runtime.WorkerSinkTask) [task-thread-de-gcs-connector-3]
org.apache.kafka.connect.errors.ConnectException: java.lang.NullPointerException: Cannot invoke "org.apache.kafka.connect.data.Schema.type()" because the return value of "org.apache.kafka.connect.data.Schema.keySchema()" is null
    at io.aiven.kafka.connect.gcs.GcsSinkTask.flushFile(GcsSinkTask.java:131)
    at java.base/java.util.HashMap.forEach(HashMap.java:1421)
    at java.base/java.util.Collections$UnmodifiableMap.forEach(Collections.java:1553)
    at io.aiven.kafka.connect.gcs.GcsSinkTask.flush(GcsSinkTask.java:114)
    at org.apache.kafka.connect.sink.SinkTask.preCommit(SinkTask.java:125)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.commitOffsets(WorkerSinkTask.java:407)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.commitOffsets(WorkerSinkTask.java:377)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:221)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:206)
    at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:202)
    at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:257)
    at org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$1(Plugins.java:177)
    at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)
    at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
    at java.base/java.lang.Thread.run(Thread.java:833)
Caused by: java.lang.NullPointerException: Cannot invoke "org.apache.kafka.connect.data.Schema.type()" because the return value of "org.apache.kafka.connect.data.Schema.keySchema()" is null
    at io.confluent.connect.avro.AvroData.fromConnectSchema(AvroData.java:894)
    at io.confluent.connect.avro.AvroData.fromConnectSchema(AvroData.java:772)
    at io.confluent.connect.avro.AvroData.fromConnectSchema(AvroData.java:757)
    at io.aiven.kafka.connect.common.output.SinkSchemaBuilder.headersSchema(SinkSchemaBuilder.java:153)
    at io.aiven.kafka.connect.common.output.SinkSchemaBuilder.outputFieldSchema(SinkSchemaBuilder.java:166)
    at io.aiven.kafka.connect.common.output.SinkSchemaBuilder.avroSchemaFor(SinkSchemaBuilder.java:97)
    at io.aiven.kafka.connect.common.output.SinkSchemaBuilder.buildSchema(SinkSchemaBuilder.java:86)
    at io.aiven.kafka.connect.common.output.parquet.ParquetOutputWriter.writeRecords(ParquetOutputWriter.java:62)
    at io.aiven.kafka.connect.gcs.GcsSinkTask.flushFile(GcsSinkTask.java:129)
    ... 16 more
2024-02-16 10:31:50,545 ERROR [de-gcs-connector|task-0] WorkerSinkTask{id=de-gcs-connector-0} Offset commit failed, rewinding to last committed offsets (org.apache.kafka.connect.runtime.WorkerSinkTask) [task-thread-de-gcs-connector-0]
org.apache.kafka.connect.errors.ConnectException: java.lang.NullPointerException: Cannot invoke "org.apache.kafka.connect.data.Schema.type()" because the return value of "org.apache.kafka.connect.data.Schema.keySchema()" is null
    at io.aiven.kafka.connect.gcs.GcsSinkTask.flushFile(GcsSinkTask.java:131)
    at java.base/java.util.HashMap.forEach(HashMap.java:1421)
    at java.base/java.util.Collections$UnmodifiableMap.forEach(Collections.java:1553)
    at io.aiven.kafka.connect.gcs.GcsSinkTask.flush(GcsSinkTask.java:114)
    at org.apache.kafka.connect.sink.SinkTask.preCommit(SinkTask.java:125)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.commitOffsets(WorkerSinkTask.java:407)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.commitOffsets(WorkerSinkTask.java:377)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:221)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:206)
    at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:202)
    at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:257)
    at org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$1(Plugins.java:177)
    at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)
    at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
    at java.base/java.lang.Thread.run(Thread.java:833)
Caused by: java.lang.NullPointerException: Cannot invoke "org.apache.kafka.connect.data.Schema.type()" because the return value of "org.apache.kafka.connect.data.Schema.keySchema()" is null
    at io.confluent.connect.avro.AvroData.fromConnectSchema(AvroData.java:894)
    at io.confluent.connect.avro.AvroData.fromConnectSchema(AvroData.java:772)
    at io.confluent.connect.avro.AvroData.fromConnectSchema(AvroData.java:757)
    at io.aiven.kafka.connect.common.output.SinkSchemaBuilder.headersSchema(SinkSchemaBuilder.java:153)
    at io.aiven.kafka.connect.common.output.SinkSchemaBuilder.outputFieldSchema(SinkSchemaBuilder.java:166)
    at io.aiven.kafka.connect.common.output.SinkSchemaBuilder.avroSchemaFor(SinkSchemaBuilder.java:97)
    at io.aiven.kafka.connect.common.output.SinkSchemaBuilder.buildSchema(SinkSchemaBuilder.java:86)
    at io.aiven.kafka.connect.common.output.parquet.ParquetOutputWriter.writeRecords(ParquetOutputWriter.java:62)
    at io.aiven.kafka.connect.gcs.GcsSinkTask.flushFile(GcsSinkTask.java:129)
    ... 16 more

At the time we did not employ a schema registry, was that a problem? Though we used both org.apache.kafka.connect.storage.StringConverter for key.converter and value.converter, is it necessary to have schema inside a message as well?

jeqo commented 6 months ago

Interesting. In this case it's not failing because of input value conversion, but building the output struct based on:

  "format.output.fields": "key,value,offset,timestamp,headers",
  "format.output.type": "parquet",
  "format.output.envelope": "true",

Could you try without headers? As it's failing when building the schema for headers.

suisenkotoba commented 6 months ago

@jeqo I tried to set header.converter to use org.apache.kafka.connect.storage.StringConverter. It seemed work. The default header converter from kafka connect is org.apache.kafka.connect.storage.SimpleHeaderConverter, right? Does that mean the problem is with this converter? According to this, schema is inferred, so it is not necessary to provide schema beforehand, do I get this right? If in one topic one message has header and the other has not, would it be a problem with this SimpleHeaderConverter?

jeqo commented 6 months ago

Not a problem with the converter per se, but with how the Connector is expecting the headers to be typed as. The connector is building a schema per batch of records to write to GCS. This schema is inferred from the first record. For headers, the schema is either null or a map typed by the first element in the first record headers. If another header element is typed differently, the connector will fail.

So, if you have headers typed differently, I can see how using String converter would help your pipeline to work as it can build the schema properly typing everything to string.

Looking further into this: If the type changes between records, it may fail when building the Avro/Parquet output or miss headers. This will require further testing. An improvement path that could be explore is to base the map type not only on the first record in the batch but in all records, so we can guarantee that defining the schema will either fail per batch or be valid for all records. This will be a bit more expensive but improve safety and data quality.

Let me know if the current workaround is good enough for you. I can create another issue for header schema improvements.

suisenkotoba commented 6 months ago

thanks @jeqo for your answer. yes current workaround is good enough for now

jeqo commented 6 months ago

Will close this one then, and follow up the investigation here https://github.com/Aiven-Open/commons-for-apache-kafka-connect/issues/227