lensesio / stream-reactor

A collection of open source Apache 2.0 Kafka Connector maintained by Lenses.io.
https://lenses.io
Apache License 2.0
996 stars 364 forks source link

MQTT Connector: Can't use _key in WITHTARGET #968

Open jadamon42 opened 1 year ago

jadamon42 commented 1 year ago

What version of the Stream Reactor are you reporting this issue for?

MQTT Sink Connector

What is the expected behaviour?

I want to do some kcql like the following, given that the key for my kafka record is an AVRO object

INSERT INTO `some/mqtt/topic/$_key.uniqueKey`
SELECT * 
FROM some.kafka.topic
WITHTARGET _key.uniqueKey

What was observed?

Doesn't work. We can do INSERT INTO _key but dynamic targets only work if the target is a field within the value.

EDIT

Also found out this fails too:

INSERT INTO `some/static/mqtt/topic`
SELECT _key.uniqueKey, someField, someField2
FROM some.kafka.topic

causes the following:

Encountered error A KCQL error occurred.Can't find the field '_key' 
jadamon42 commented 1 year ago

I'm having all sorts of issues with this connector. My avro schema has a field that is an array of records ([{}, {}, {}]). I want to include that what I'm sending to my MQTT topic, but I get A KCQL error occurred.Can't flatten from schema:Schema{ARRAY} by selecting 'myFieldThatIsAnArray'

I tried looking into this on my own, as came across using WITHSTRUCTURE as a possible solution. This is essentially what I tried doing:

INSERT INTO blah
SELECT feild1.subfield AS alias, field2.subfield AS alias2, field2.someArray
FROM blah WITHSTRUCTURE

But I got a really strange error: Encountered error A KCQL error occurred.alias2 can't be found in field1,field2