lensesio / stream-reactor

A collection of open source Apache 2.0 Kafka Connector maintained by Lenses.io.
https://lenses.io
Apache License 2.0
996 stars 364 forks source link

How to support schema registry by AvroConverter #840

Open cdmikechen opened 2 years ago

cdmikechen commented 2 years ago

Issue Guidelines

Please review these questions before submitting any issue?

What version of the Stream Reactor are you reporting this issue for?

3.0.1

Are you running the correct version of Kafka/Confluent for the Stream reactor release?

Yes

Do you have a supported version of the data source/sink .i.e Cassandra 3.0.9?

Have you read the docs?

https://docs.lenses.io/5.0/integrations/connectors/stream-reactor/sources/mqttsourceconnector/

What is the expected behaviour?

I use JSON by default for kafka data storage. I've found that this project can support AvroConverter and I also found that we can use connect.converter.avro.schemas to specify avro file. But when pushing data to kafka as a source connector, we should support the schema registry mode and register the avro schema to the schema registry service.

What was observed?

Except for some descriptions in kudu, I didn't find how to configure it in other places.

What is your Connect cluster configuration (connect-avro-distributed.properties)?

What is your connector properties configuration (my-connector.properties)?

{
    "name": "mqtt-source-device",
    "config": {
        "connector.class": "com.datamountaineer.streamreactor.connect.mqtt.source.MqttSourceConnector",
        "tasks.max": "1",
        "connect.mqtt.kcql": "INSERT INTO mqtt-device-test SELECT * FROM `$device/update/request/+` WITHCONVERTER=`com.datamountaineer.streamreactor.connect.converters.source.JsonSimpleConverter`",
        "connect.mqtt.client.id": "kafka-connect-mqtt-test",
        "connect.mqtt.hosts": "tcp://xxx.xxx.svc.cluster.local:1883",
        "connect.mqtt.service.quality": "1"
    }
}

Please provide full log files (redact and sensitive information)

No

ethanttbui commented 2 years ago

maybe you can try using the confluent avro converter like in this example?

jclarysse commented 1 year ago

We have a similar question related to Avro events consumed by StreamReactor MQTT source (latest version):

Would it be reasonable and valuable to allow Schema Registry URLs as well? Since the PR will affect all connectors (kafka-connect-common), is there any performance or security concern?