eclipse / mosquitto

Eclipse Mosquitto - An open source MQTT broker
https://mosquitto.org
Other
8.88k stars 2.36k forks source link

API extension: support chunked message transmission for low memory devices #1380

Open xtofl opened 5 years ago

xtofl commented 5 years ago

The current mosquitto API only supports in-memory messages. For some embedded devices, large message payloads don't fit into memory all at once. There is also no structural requirement for mqtt messages to fit into memory, since all bytes are going to be packetized over the TCP connection.

I suggest an API extension to support a streaming interface, along the lines of:

struct mosquitto_payload_stream_t {
    uint32_t len;
    size_t preferred_chunk_size;

    // error reporting to be refined
    ssize_t (*read_next_chunk)(
           struct mosquitto_payload_stream_t *s,
           void *buffer,
           size_t buffer_size);
    void (*abort_stream)(struct mosquitto_payload_stream_t *s);
};

// (Note that the `payload_stream_t` can be quite easily
// implemented wrapping an open `FILE *`/file descriptor.  This makes sense
// since that also reflects a streaming interface.)

int mosquitto_publish(
    struct mosquitto *mosq,
    int *mid,
    const char *topic,
    struct mosquitto_payload_stream_t * payload,
    int qos, bool retain);
karlp commented 5 years ago

how do you see this working for the subscribe side of things?

xtofl commented 5 years ago

Good remark; strange I didn't think of spelling that out.

But likewise, one can subscribe to a 'streamed' topic by supplying a complementary interface:

struct mosquitto_sub_payload_stream_t {
    size_t preferred_chunk_size;

    // error reporting to be refined
    ssize_t (*next_chunk_arrived)(
           struct mosquitto_sub_payload_stream_t *s,
           void *buffer,
           size_t buffer_size);
    void (*stream_aborted)(struct mosquitto_sub_payload_stream_t *s);
        void (*stream_end)(struct mosquitto_sub_payload_stream_t *s);
};
karlp commented 5 years ago

fwiw, I know of people handling this by simply chunking their data into separate messages instead of trying to twist the client libraries to chunk the messages. It's ~same amount of work in your client, and a little overhead in metadata or protocol design upfront, but a lot less work on getting things right in the generic case.

xtofl commented 5 years ago

Thanks, I'm working for one of those people. This results in a protocol-over-a-protocol.

Let me elaborate on the use case: a pub/sub system for large-ish log files. The publisher can stream the results out of a file, the subscriber will stream the results right into a file. Neither needs (nor has) more than a few kilobytes of the payload at a time.

I regard the absence of a large-message support as a leak in the stream abstraction that mqtt offers. True: the leak only shows on low-resource devices.

You seem to regard this as a 'twist'. Is that because of the proposed functionality, or of the syntax? Surely twists should not happen in coherent libraries.

karlp commented 5 years ago

I'm not entirely against this, don't get me wrong, but I think you're completely wrong about mqtt providing a "stream" abstraction and this is colouring your expectations. It provides pub/sub of messages nothing about it has every implied streams to me at least. And every use of mqtt is "protocol over a protocol" there's nothing in mqtt at all about what payloads even look like, so that's always another protocol layer.

I also think you're requesting a fairly substantial amount of code to reliably handle all cases of this sort of api just for the case where you decided not to chunk in your own apis.

That said, I suspect that the hbmq python library can do this, iirc, so perhaps you could use that one?

xtofl commented 5 years ago

Sorry - you are right indeed: mqtt has no stream concept. It's because I know mqtt can run over TCP, I expected it to support the same byte-stream interface. This becomes a lot more awkward when running over UDP, which I didn't realize.

Either way - this is how they solve it in wolfmqtt: the message is an abstraction that provides a chunked payload. It does imply a substantially more complex API. So it may or may not be in the scope of the mosquitto project, which should "provide a light weight, open-source implementation, of an MQTT broker to allow new"(cf CONTRIBUTING.md)

There are several solutions to my specific problem. Choosing another library is one of them. Extending this library is a second one. Maybe using a memory mapped file is an easier one. When that goes well, is it a good idea to add it to the examples?