robinhood / faust

Python Stream Processing
Other
6.72k stars 535 forks source link

How to read confluent avro topic. #622

Open abhisam opened 4 years ago

abhisam commented 4 years ago

How can I read confluent kafka avro topic using Faust. Can anyone provide a example.

afausti commented 4 years ago

@abhisam I did some research on this a while ago and I found at least two packages of interest to enable Avro in Faust applications.

I first looked at the python-schema-registry-client package which implements a synchronous Confluent Schema Registry client and a codec for Faust. See more about Faust codecs in the docs.

The other package out there is faust-avro.

Both repos mentioned above provide examples on using Faust with Avro encoded messages.

Looking at the code faust-avro seems more attractive for its ability to parse a Faust record into an Avro schema and, in addition, it is asyncio based.

python-schema-registry-client assumes the Avro schema is provided in addition to the Faust record. More here.

I decided to go with faust-avro for my project https://kafka-aggregator.lsst.io/

marcosschroh commented 4 years ago

HI,

@afausti is right. The mechanism to get avro schemas should be async but at the moment of creating the python-schema-registry-client project, the schemas abstraction was not there. After the schemas abstraction was added, I created a PR to faust to add async operations over schemas, so we can get the schemas using async and also be able to use different schemas according to the values in the headers. You can take a look the PR description.

Regarding to parse Faust records into Avro Schemas, I am planning to add the feature. I have dataclasses-avroschema that have the functionality required.

SeanZicari commented 3 years ago

I recently used faust-avro-serializer. I found it is the most seamless of the options mentioned as it has Schema Registry support integrated and takes very little configuration to allow it to work with existing schemas (which it looks up in Schema Registry) and registers new schemas automatically when needed.

marcosschroh commented 3 years ago

I have added a new functionality in python-schema-registry-client and the package dataclasses-avroschema does the trick, so now it is much easier to add an integration with avro schemas since you do not have to provide the schema yourself. Here is the documentation and this is an example

afausti commented 3 years ago

@marcosschroh the new feature to generate Avro Schemas from Faust records looks really nice, thanks for doing this.

marcosschroh commented 3 years ago

Can we close this issue? I think the general question should be: how to use avro schemas with Faust? and I think it was answered.

  1. With dataclasses-avroschema you can generate avro schemas frompython dataclasses and faust records
  2. With libraries like python-schema-registry-client, faust-avro and faust-avro-serializer is possible to make an integration between Faust and Confluent Schema Server. The libraries make use of Faust codecs (dumps and loads methods) to make possible the interaction
  3. After having async schemas in Faust https://github.com/robinhood/faust/pull/516, we won't need to use faust codecs any more because the schemas can be fetch in the Schema layer