emqx / MQTTX

A Powerful and All-in-One MQTT 5.0 client toolbox for Desktop, CLI and WebSocket.
https://mqttx.app
Apache License 2.0
3.85k stars 444 forks source link

[Bug]Can not encode the embedded oneof keyword type in sparkplugb proto schema #1628

Open jason-hulkman opened 6 months ago

jason-hulkman commented 6 months ago

I copy the schema into the MQTTX Script's Schema input box, and save it.

I input the target json that need to encode. { "metrics": [ { "name": "testKey", "alias": 1, "timestamp": 1713020729000, "int_value":100 } ], "extensions": null, "timestamp": 1713020729100, "seq": 36 }

And the output decode json lost the "int_value":100

Payload { metrics: { 0: { name: "testKey", alias: 1, timestamp: 1713020729000 } }, timestamp: 1713020729100, seq: 36 }

Expected

the output should be Payload { metrics: [ 0: { name: "testKey", alias: 1, timestamp: 1713020729000, int_value:100 } ], timestamp: 1713020729100, seq: 36 }

Environment

More detail

same bug in many protobufjs project or tools and the online js lib editor and runner https://npm.runkit.com/protobufjs

ysfscream commented 6 months ago

Hi, indeed, the issue you're experiencing is known and has been documented in the protobuf.js GitHub issue tracker: https://github.com/protobufjs/protobuf.js/issues/521

It appears that an official fix still needs to be formally implemented. However, we are considering trying some potential fixes to see if we can temporarily resolve the issue. Thank you for your feedback and patience! Please get in touch with me if you need any more updates or help.

jason-hulkman commented 6 months ago

Your EMQX said it support the Sparkplug, but the MQTTX does not support it. Very stange, are you two different team without any communication?

And another problem is the Sparkplug need set the MQTT connetion's Last Will Payload with protobuf as NDEATH payload, and your MQTTX does not support base64 and Hex, so I force transfer the hex to the ugly string.

image

The msg protocol transfer, I let the AI rewrite some simple functions to encode some type of the that my Sparkplug project will used to test. and run it at the MQTTX's Script'Function.

function encodeVarint(value) { const buffer = []; while (value > 127) { buffer.push((value & 0x7F) | 0x80); value >>>= 7; } buffer.push(value); return Buffer.from(buffer); }

function encodeUint32(fieldNumber, value) { const tag = (fieldNumber << 3) | 0; return Buffer.concat([encodeVarint(tag), encodeVarint(value)]); }

function encodeUint64(fieldNumber, value) { const tag = (fieldNumber << 3) | 0; return Buffer.concat([encodeVarint(tag), encodeVarint(value)]); }

function encodeStringWithFieldNumber(fieldNumber, value) { const tag = (fieldNumber << 3) | 2; const stringBytes = Buffer.from(value, 'utf8'); const length = encodeVarint(stringBytes.length); return Buffer.concat([encodeVarint(tag), length, stringBytes]); }

function encodeFloat(fieldNumber, value) { const tag = (fieldNumber << 3) | 5; const buffer = Buffer.alloc(5); buffer.writeUInt8(tag, 0); buffer.writeFloatLE(value, 1); return buffer; }

function encodeMetric(metric) { const buffers = []; buffers.push(encodeStringWithFieldNumber(1, metric.name)); buffers.push(encodeUint64(3, metric.timestamp)); buffers.push(encodeUint32(4, metric.datatype));

// Dynamic encoding based on the datatype
switch (metric.datatype) {
    case 1: // int8
    case 2: // int16
    case 3: // int32
        buffers.push(encodeUint32(10, metric.int_value)); // Assuming field number 10 for integer values
        //console.log(metric.name, metric.value);
        break;
    case 4: // int64
    case 8: // uint64
        buffers.push(encodeUint64(11, metric.long_value)); // Assuming field number 11 for long values
        break;
    case 9: // float
        buffers.push(encodeFloat(12, metric.float_value)); // Assuming field number 12 for float values
        break;
    case 12: // string
        buffers.push(encodeStringWithFieldNumber(13, metric.string_value)); // Assuming field number 13 for string values
        break;
    default:
        console.error("Unsupported data type");
}
// Calculate the total length of the metric message and encode it as a nested message
const totalBuffer = Buffer.concat(buffers);
const length = encodeVarint(totalBuffer.length);
return Buffer.concat([encodeVarint((2 << 3) | 2), length, totalBuffer]); // Field 2 as a length-delimited field

}

function encodeMetrics(metrics) { const buffers = metrics.map(metric => encodeMetric(metric)); return Buffer.concat(buffers); }

function encodePayload(payload) { const buffers = [encodeUint64(1, payload.timestamp)]; payload.metrics.forEach(metric => { buffers.push(encodeMetric(metric)); }); buffers.push(encodeUint64(3, payload.seq)); return Buffer.concat(buffers); }

const testPayload = { metrics: [ { name: "voltage", timestamp: 0, datatype: 9, float_value: 1000.0 }, { name: "current", timestamp: 0, datatype: 9, float_value: 2000.0 } ], timestamp: 0, seq: 5 };

const encodedPayload = encodePayload(testPayload); console.log("Encoded Payload:", encodedPayload.toString('hex'));

const nodeDeviceData = [ { device_id: "sub_device_001", metrics: [ { name: "temperature", datatype: 9, alias: 1, } ] }, { device_id: "sub_device_002", metrics: [ { name: "voltage", datatype: 9, alias: 1, }, { name: "current", datatype: 9, alias: 2 } ] } ]; function generateMetricValue(valueTypeId) { const valueNames = { 1: 'int_value', // Int8 2: 'int_value', // Int16 3: 'int_value', // Int32 4: 'long_value', // Int64 5: 'int_value', // UInt8 6: 'int_value', // UInt16 7: 'int_value', // UInt32 8: 'long_value', // UInt64 9: 'float_value', // Float 10: 'double_value', // Double 11: 'boolean_value', // Boolean 12: 'string_value', // String // Add other cases as necessary }; const valueType = valueNames[valueTypeId] || 'unknown_value'; const valueFunctions = { int_value: () => ({ int_value: randomInt() }), long_value: () => ({ long_value: randomInt(0, 1000000) }), float_value: () => ({ float_value: randomFloat() }), double_value: () => ({ double_value: randomFloat(0, 100, true) }), boolean_value: () => ({ boolean_value: randomBoolean() }), string_value: () => ({ string_value: randomString() }), unknown_value: () => ({ unknown_value: null }) }; return valueFunctions[valueType](); }

function randomInt(min = 0, max = 255) { return Math.floor(Math.random() * (max - min + 1)) + min; }

function randomFloat(min = 0, max = 100, isDouble = false) { let multiplier = isDouble ? 1000000 : 100; return (Math.random() (max - min) + min).toFixed(2) multiplier / multiplier; }

function randomBoolean() { return Math.random() < 0.5; }

function randomString(length = 10) { let result = ''; const characters = 'ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789'; for (let i = 0; i < length; i++) { result += characters.charAt(Math.floor(Math.random() * characters.length)); } return result; }

// Function to simulate metric value generation based on datatype function generateMetricValue(valueTypeId) { switch (valueTypeId) { case 1: // Int8 case 2: // Int16 case 3: // Int32 return { int_value: randomInt() }; case 4: // Int64 case 8: // UInt64 return { long_value: randomInt(0, 1000000) }; // Larger range for 64-bit case 9: // Float return { float_value: randomFloat() }; case 10: // Double return { double_value: randomFloat(0, 100, true) }; case 11: // Boolean return { boolean_value: randomBoolean() }; case 12: // String return { string_value: randomString() }; default: return { unknown_value: null }; } }

function randomFloat(min = 0, max = 100) { return (Math.random() * (max - min) + min).toFixed(2); }

// Function to create payload with manually encoded metrics function createPayload(device, isNode, timestamp) { let metrics = device.metrics.map(metric => ({ name: metric.name, timestamp: timestamp, datatype: metric.datatype, ...generateMetricValue(metric.datatype) }));

if (isNode) {
    metrics.push({
        name: "bdSeq",
        timestamp: timestamp,
        datatype: 3, 
        int_value: getBdSeqNum() 
    });
}
const payload = {
    timestamp: timestamp,
    metrics: metrics,
    seq: isNode ? getBdSeqNum() : getSeqNum()
};
return encodePayload(payload);

}

let seq = 0; // Global sequence number let bdSeq = 0; function getSeqNum() { if (seq == 256) { seq = 0; } return seq++; }

function getBdSeqNum() { if (bdSeq == 256) { bdSeq = 0; } return bdSeq++; } function bufferToString(buffer) { let string = ''; for (let i = 0; i < buffer.length; i++) { const charCode = buffer[i]; // Check if the character needs to be escaped if (charCode < 32 || charCode > 126 || charCode === 92) { // Escape special characters with backslash string += \\${charCode.toString().padStart(3, '0')}; } else { // Otherwise, append the character as is string += String.fromCharCode(charCode); } } return string; }

// Example usage: const bufferData = Buffer.from('Hello, world!\n', 'utf8'); const string = bufferToString(bufferData); console.log(string);

ysfscream commented 6 months ago
  1. Support for SparkplugB: YES. EMQX supports SparkplugB, while MQTTX currently does not fully support it; it only supports Protobuf message transmission. EMQX and MQTTX are not part of a unified team. MQTTX, with its limited resources, has already planned to include SparkplugB support in our roadmap, and we expect to implement it soon.

  2. Will Message Format: Currently, the Will message supports only JSON format and plaintext, both configured as strings. We have not endorsed other formats yet. The demand for Will messages in formats other than these has been minimal, but we will explore how we might accommodate additional formats.

  3. Script Complexity: The script you've mentioned seems quite complex; are there any specific issues you are encountering while running it? We're here to help troubleshoot and ensure it works effectively for your needs.

MorelSerge commented 1 month ago

I'm experiencing issues with oneof as well, see screenshot below. For some reason, the id_string does not get added to the protobuf encoded message.

image