airbytehq / airbyte

The leading data integration platform for ETL / ELT data pipelines from APIs, databases & files to data warehouses, data lakes & data lakehouses. Both self-hosted and Cloud-hosted.
https://airbyte.com
Other
16.12k stars 4.12k forks source link

[source-kafka] Empty messages #29464

Open mk-nuccioc opened 1 year ago

mk-nuccioc commented 1 year ago

Connector Name

source-kafka

Connector Version

0.2.3

What step the error happened?

During the sync

Revelant information

Defined a source with AVRO message format, with Schema Registry credentials. Protocol: SASL SSL SASL Mechanism: PLAIN Subscrition method: Subscribe to all topics matching specified pattern Auto offset Reset: latest or earliest (tried both) Specified the Client ID and Group ID Polling Time: 100 or 1000 (tried both)

Used Google BigQuery as a destination. I tried also GCS, but same result. It wrote the lines but the _airbyte_data is {}

Airbyte version v0.50.15

Relevant log output

2023-08-15 07:08:29 source > KafkaAvroDeserializerConfig values: 
    auto.register.schemas = true
    avro.reflection.allow.null = false
    avro.use.logical.type.converters = false
    basic.auth.credentials.source = USER_INFO
    basic.auth.user.info = [hidden]
    bearer.auth.credentials.source = STATIC_TOKEN
    bearer.auth.token = [hidden]
    context.name.strategy = class io.confluent.kafka.serializers.context.NullContextNameStrategy
    id.compatibility.strict = true
    key.subject.name.strategy = class io.confluent.kafka.serializers.subject.TopicNameStrategy
    latest.compatibility.strict = true
    max.schemas.per.subject = 1000
    normalize.schemas = false
    proxy.host = 
    proxy.port = -1
    schema.reflection = false
    schema.registry.basic.auth.user.info = [hidden]
    schema.registry.ssl.cipher.suites = null
    schema.registry.ssl.enabled.protocols = [TLSv1.2, TLSv1.3]
    schema.registry.ssl.endpoint.identification.algorithm = https
    schema.registry.ssl.engine.factory.class = null
    schema.registry.ssl.key.password = null
    schema.registry.ssl.keymanager.algorithm = SunX509
    schema.registry.ssl.keystore.certificate.chain = null
    schema.registry.ssl.keystore.key = null
    schema.registry.ssl.keystore.location = null
    schema.registry.ssl.keystore.password = null
    schema.registry.ssl.keystore.type = JKS
    schema.registry.ssl.protocol = TLSv1.3
    schema.registry.ssl.provider = null
    schema.registry.ssl.secure.random.implementation = null
    schema.registry.ssl.trustmanager.algorithm = PKIX
    schema.registry.ssl.truststore.certificates = null
    schema.registry.ssl.truststore.location = null
    schema.registry.ssl.truststore.password = null
    schema.registry.ssl.truststore.type = JKS
    schema.registry.url = [hidden]
    specific.avro.reader = false
    use.latest.version = false
    use.schema.id = -1
    value.subject.name.strategy = class io.confluent.kafka.serializers.subject.TopicNameStrategy
2023-08-15 07:08:36 INFO i.a.w.g.ReplicationWorkerHelper(endOfSource):161 - Total records read: 24 (48 bytes)
2023-08-15 07:08:36 INFO i.a.w.i.FieldSelector(reportMetrics):122 - Schema validation was performed to a max of 10 records with errors per stream.
2023-08-15 07:08:36 INFO i.a.w.g.BufferedReplicationWorker(readFromSource):324 - readFromSource: done. (source.isFinished:true, fromSource.isClosed:false)
2023-08-15 07:08:36 INFO i.a.w.g.BufferedReplicationWorker(processMessage):364 - processMessage: done. (fromSource.isDone:true, forDest.isClosed:false)
2023-08-15 07:08:36 INFO i.a.w.i.HeartbeatTimeoutChaperone(runWithHeartbeatThread):111 - thread status... heartbeat thread: false , replication thread: true
2023-08-15 07:08:36 INFO i.a.w.g.BufferedReplicationWorker(writeToDestination):396 - writeToDestination: done. (forDest.isDone:true, isDestRunning:true)

Contribute

mk-nuccioc commented 1 year ago

Solved editing source-kafka connector:

.withData(Jsons.jsonNode(ImmutableMap.builder().put("value", output).build())));

chokosabe commented 5 months ago

Getting this issue as well. The Kafka connector in general is in a sad state.