airbytehq / airbyte

The leading data integration platform for ETL / ELT data pipelines from APIs, databases & files to data warehouses, data lakes & data lakehouses. Both self-hosted and Cloud-hosted.
https://airbyte.com
Other
16.05k stars 4.11k forks source link

Source Kafka: add support to avro format #13220

Open mattkohl-flex opened 2 years ago

mattkohl-flex commented 2 years ago

Tell us about the problem you're trying to solve

I am trying to enable a Kafka source with topics on which messages are published in Avro format.

Although I am able to set up the source successfully, running a connection fails due to a JSON parsing exception. It looks to me like the Kafka source connector seems to expect messages in JSON format only.

Describe the solution you’d like

When setting up a Kafka source (or a Connection, whichever makes more sense), I would like the option to specify the format of the source messages & have Avro be a supported option.

Describe the alternative you’ve considered or used

As the S3 source just added support for Avro, I am considering sinking my Avro messages to S3, then using Airbyte's S3 connector as a Source for my connection.

Thank you for this tool & the great work you do! ❤️🙏

marcosmarxm commented 2 years ago

@mattkohl-flex did you try to read from a Kafka that has Avro messages? the read functino is using default KafkaConsumer and maybe the transformation is made automatically. Any error logs will help here too.

mattkohl-flex commented 2 years ago

Hi @marcosmarxm - yes, I tried to run a sync using a Kafka source connector on a topic with Avro-formatted messages.

Here's an excerpt of the logs showing what looks to me like an attempt to parse the Avro as JSON:

2022-05-25 16:03:53 source > 2022-05-25 16:03:53 INFO i.a.i.s.k.KafkaSource(check):57 - Successfully connected to Kafka brokers for topic 'i.redacted.this'.
2022-05-25 16:03:53 source > 2022-05-25 16:03:53 INFO i.a.i.s.k.KafkaSourceConfig(propertiesByProtocol):81 - Kafka protocol config: {"sasl_mechanism":"PLAIN","sasl_jaas_config":"org.apache.kafka.common.security.plain.PlainLoginModule   required username=\"i.redacted.this.too\"   password=\"and.this.too\";","security_protocol":"SASL_SSL"}
2022-05-25 16:03:53 source > 2022-05-25 16:03:53 INFO i.a.i.s.k.KafkaSourceConfig(getConsumer):105 - Kafka subscribe method: {"topic_pattern":"us-east-2.*","subscription_type":"subscribe"}
2022-05-25 16:03:54 source > Exception in thread "main" org.apache.kafka.common.errors.SerializationException: Error deserializing key/value for partition i.redacted.this-4 at offset 2047. If needed, please seek past the record to continue consumption.
2022-05-25 16:03:54 source > Caused by: org.apache.kafka.common.errors.SerializationException: com.fasterxml.jackson.core.JsonParseException: Illegal character ((CTRL-CHAR, code 0)): only regular white space (\r, \n, \t) is allowed between tokens
2022-05-25 16:03:54 source >  at [Source: (byte[])"\u0000\u0000\u0001��\u0002\u0002��\u0018\u0002 3653d1012b16b300\u0002\u0000\u0000\u0000\u0000"; line: 1, column: 2]
2022-05-25 16:03:54 source > Caused by: com.fasterxml.jackson.core.JsonParseException: Illegal character ((CTRL-CHAR, code 0)): only regular white space (\r, \n, \t) is allowed between tokens
2022-05-25 16:03:54 source >  at [Source: (byte[])"\u0000\u0000\u0001��\u0002\u0002��\u0018\u0002 3653d1012b16b300\u0002\u0000\u0000\u0000\u0000"; line: 1, column: 2]
2022-05-25 16:03:54 source >   at com.fasterxml.jackson.core.JsonParser._constructError(JsonParser.java:2391)
2022-05-25 16:03:54 source >   at com.fasterxml.jackson.core.base.ParserMinimalBase._reportError(ParserMinimalBase.java:735)
2022-05-25 16:03:54 source >   at com.fasterxml.jackson.core.base.ParserMinimalBase._throwInvalidSpace(ParserMinimalBase.java:713)
2022-05-25 16:03:54 source >   at com.fasterxml.jackson.core.json.ReaderBasedJsonParser._skipWSOrEnd(ReaderBasedJsonParser.java:2462)
2022-05-25 16:03:54 source >   at com.fasterxml.jackson.core.json.ReaderBasedJsonParser.nextToken(ReaderBasedJsonParser.java:698)
2022-05-25 16:03:54 source >   at com.fasterxml.jackson.databind.ObjectMapper._readTreeAndClose(ObjectMapper.java:4704)
2022-05-25 16:03:54 source >   at com.fasterxml.jackson.databind.ObjectMapper.readTree(ObjectMapper.java:3090)
2022-05-25 16:03:54 source >   at org.apache.kafka.connect.json.JsonDeserializer.deserialize(JsonDeserializer.java:64)
2022-05-25 16:03:54 source >   at org.apache.kafka.connect.json.JsonDeserializer.deserialize(JsonDeserializer.java:32)
2022-05-25 16:03:54 source >   at org.apache.kafka.common.serialization.Deserializer.deserialize(Deserializer.java:60)
2022-05-25 16:03:54 source >   at org.apache.kafka.clients.consumer.internals.Fetcher.parseRecord(Fetcher.java:1386)
2022-05-25 16:03:54 source >   at org.apache.kafka.clients.consumer.internals.Fetcher.access$3400(Fetcher.java:133)
2022-05-25 16:03:54 source >   at org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.fetchRecords(Fetcher.java:1617)
2022-05-25 16:03:54 source >   at org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.access$1700(Fetcher.java:1453)
2022-05-25 16:03:54 source >   at org.apache.kafka.clients.consumer.internals.Fetcher.fetchRecords(Fetcher.java:686)
2022-05-25 16:03:54 source >   at org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:637)
2022-05-25 16:03:54 source >   at org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1303)
2022-05-25 16:03:54 source >   at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1237)
2022-05-25 16:03:54 source >   at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1210)
2022-05-25 16:03:54 source >   at io.airbyte.integrations.source.kafka.KafkaSource.read(KafkaSource.java:94)
2022-05-25 16:03:54 source >   at io.airbyte.integrations.base.IntegrationRunner.runInternal(IntegrationRunner.java:153)
2022-05-25 16:03:54 source >   at io.airbyte.integrations.base.IntegrationRunner.run(IntegrationRunner.java:105)
2022-05-25 16:03:54 source >   at io.airbyte.integrations.source.kafka.KafkaSource.main(KafkaSource.java:135)
2022-05-25 16:03:54 INFO i.a.w.g.DefaultReplicationWorker(lambda$getReplicationRunnable$6):337 - Source has no more messages, closing connection.