peterprib / node-red-contrib-kafka-manager

Implement
GNU General Public License v3.0
22 stars 12 forks source link

Using Avro schemas ? #27

Closed TommyHalvorsen closed 3 years ago

TommyHalvorsen commented 3 years ago

Hi. Ive been looking into using this node which looks good. How ever, I'm dependent on using Avro schemas to serialize / deserialize the messages. Is this possible in some way using this node as-is ?

peterprib commented 3 years ago

The code is based on kafka-node which doesn't have this feature. Could add this feature in pub/sub or by a separate node to do serialisation and deserialization as I note there doesn't appear to be a transform for arvo messages.

peterprib commented 3 years ago

Have updated https://flows.nodered.org/node/node-red-contrib-prib-functions transform node with JSON to ARVO and ARVO to JSON plus snappy compress and uncompress transforms. Hopefully this is a work around for ARVO. Must admit haven't tested with Kafka. Payload may fail as it is type buffer so may have to fix this aspect for kafka manager and/or transform.

TommyHalvorsen commented 3 years ago

Sorry for late respons. The idea sounds good. How ever, when I add the transform node I get an error (when deploying) saying "Flows stopped due to missing node types.", and pointing to the Transform node.. My mistake somehow ? Also, it spells AVRO, not ARVO, but thats of minor importance

peterprib commented 3 years ago

Sorry and the spelling issue. Fixed. Am a little dyslexic which can be annoying at times.. If there is a issue with transform it should have messages in the log so would need to check the log for error messages. Should be errors before node started message if missing nodes. May be due to the snappy additional model which may have to build some code.

TommyHalvorsen commented 3 years ago

Looks to be unable to update and even install it at the moment. Seems to be related to snappy some how. Logfile attached.

2020-10-30T06_48_12_455Z-debug.log

peterprib commented 3 years ago

What I will do is take out snappy on dependencies but leave in the logic so it can be used if someone installs snappy separately. From what I have seen snappy is sensitive to requiring a few dependencies that are only used for install. I have had to issue rebuild it get it working. Once when I upgraded npm.

peterprib commented 3 years ago

Have just deployed a new version making snappy require deferred unless used and removed dependency so should install. If you want to use snappy then this has to be installed separately

TommyHalvorsen commented 3 years ago

Deployment working good. Transform node giving this error : "AVRO to JSON this.buf.utf8Slice is not a function"

From log : 30 Oct 11:20:20 - [debug] transform {"label":"transformNode catch","shortMessage":"Error(s)","error":"AVRO to JSON this.buf.utf8Slice is not a function","stack":"Error: AVRO to JSON this.buf.utf8Slice is not a function\n at transformNode._inputCallback (C:\Users\TommyHalvorsen\.node-red\node_modules\node-red-contrib-prib-functions\transform\transform.js:328:16)\n at transformNode.Node._emitInput (C:\Users\TommyHalvorsen\AppData\Roaming\npm\node_modules\node-red\node_modules\@node-red\runtime\lib\nodes\Node.js:200:18)\n at Immediate._onImmediate (C:\Users\TommyHalvorsen\AppData\Roaming\npm\node_modules\node-red\node_modules\@node-red\runtime\lib\nodes\Node.js:179:33)\n at processImmediate (internal/timers.js:456:21)"} 30 Oct 11:20:20 - [error] [transform:51202688.e66298] AVRO to JSON this.buf.utf8Slice is not a function

peterprib commented 3 years ago

Found https://github.com/mtth/avsc/issues/59 along with https://github.com/mtth/avsc/issues/22. If I read things correctly it appears to be an issue with messages from confluent https://github.com/uber-archive/confluent-schema-registry/blob/master/avro-serializer/src/main/java/io/confluent/kafka/serializers/AbstractKafkaAvroDeserializer.java#L41 as it adds a MAGIC_BYTE + schema id at start of message. Can you confirm so I can ponder best way forward. May mean special transform for confluent messages or look for MAGIC_BYTE and react. In either case how do I handle schema id as assume required data for consumer

peterprib commented 3 years ago

Have added new transform function for confluence in v13 if this is your issue. I see a similar issue may revolve around MS product that issues messages for avro. Decided to keeping these types of things in transform function as it be the solution for other message handling packages.

TommyHalvorsen commented 3 years ago

Tested now and when deploying node after configuring Confluence -> JSON : invalid setup undefined type name: record The data is from a java application which reads source from MS Excel, but insert is java.

peterprib commented 3 years ago

The confluence form on message is <schema (4 bytes)> and the schema has form {:} and it uses message schema to determine Avro schema to be used. The error message implies it is having trouble with the schema. By the way you are sending it implies it isn't using confluence format. Need to see avro schema, original message (xcel file) and message as received from java app to determine what is going wrong. Have you done the reverse, that is run the xcel file thru arvo to json and compared message to understand differential?

peterprib commented 3 years ago

Have add xlsx transform so you can simulate what the java process is doing. That is xlsx to json to avro and insert payload and/or validate content the same if that helps.

TommyHalvorsen commented 3 years ago

Have add xlsx transform so you can simulate what the java process is doing. That is xlsx to json to avro and insert payload and/or validate content the same if that helps.

Will do some testing.

Also, had some time to look into the original setup again, and looks to me that when using AVRO-JSON now I get the following error : "AVRO to JSON invalid union index: -1".

peterprib commented 3 years ago

There appears to be related issues closed/fixed in avsc which is used to do conversion. They seem to resolve/fix issues fast so suggest posting schema and message in https://github.com/mtth/avsc/issues.