lensesio / stream-reactor

A collection of open source Apache 2.0 Kafka Connector maintained by Lenses.io.
https://lenses.io
Apache License 2.0
1k stars 362 forks source link

MQTT source: how to keep track of origin topic when using wildcard subscriptions #1307

Open dberardo-com opened 1 month ago

dberardo-com commented 1 month ago

i have an mqtt source connector and it is subscribed correctly to a wildcard topic. the connector receives messages from all topics and writes them into a single kafka topic. my question is:

  1. how can i keep track of the original MQTT topic the message was sent to? ideally i would like to include the topic name in the kafka message body and as part of its key as well
  2. is it possible to write the messages within new kafka topics using the same name as the original MQTT topic ? e.g. mqttTopics/# ---> message written in kafka topic mqttTopics.myTopic.mySubtopic .. .etc ?
stheppi commented 1 month ago

Hi @dberardo-com ,

At the moment there is no option of doing that. What you have suggested on point 2, regarding the output topic is potentially your requirement.

What can be done quickly, is to add the source Mqtt topic/queue to be added as a header, and then use an SMT to use the header , adapt to your needs, and set the new target topic.

We welcome contributions if you are up for it.

dberardo-com commented 1 month ago

i have managed to achieve partly the point 2 of my requirements above by doing:

INSERT INTO $ SELECT * FROM $share/kafka-prod/#

but it would be nice to know if the created topic could be renamed during creation.

What can be done quickly, is to add the source Mqtt topic/queue to be added as a header,

how to achieve this ?