ludocode / mpack

MPack - A C encoder/decoder for the MessagePack serialization format / msgpack.org[C]
MIT License
533 stars 82 forks source link

Make it possible to use a dynamic buffer with mpack writer #92

Closed tarruda closed 2 years ago

tarruda commented 2 years ago

This is how we currently initialize an mpack_writer_t:

        mpack_writer_init(&writer, buf, sizeof(buf));
        mpack_writer_set_flush(&writer, mpack_buffer_flush);

That means mpack writes to a statically sized buffer, then invokes a callback for us to copy that somewhere (usually a file descriptor). But if the target is a dynamic buffer we end up doing a redundant copy. Can we enhance this API so it is possible to use a dynamic buffer directly? For example, if I wanted to use an sds instance as the dynamic buffer, is there a clean way to do it?

ludocode commented 2 years ago

Yes, it's possible. Just a quick warning first: it's much more complicated than what you're doing now for very little benefit. What would be even simpler is to just call mpack_writer_init_growable() like the example on the README and copy it to an sds string afterwards with sdsnewlen(). I realize you want to avoid the copy from the buffer to the string, but writing to a growable contiguous string is going to incur lots of copies anyway because most calls to realloc() will resize by copying the whole thing. If you truly want a zero-copy write into a dynamic growable buffer, it must be non-contiguous, like C++ std::deque<char> or a Pottery pager of chars.

If you really want the writer to write directly into an sds string, well, keep reading.

It's possible to implement a flush function for a custom growable buffer but it's complicated and poorly documented. The flush callback gets a pointer to the writer so you can change its buffer, position and end pointers after growing your string.

The complicated part is figuring out where the data is coming from. If you're flushing to a network socket or whatever, flushing is really simple because it doesn't matter where the data comes from: you just copy the data to be flushed into the socket. But with a growable buffer, it matters.

MPack has an optimization where if you write something big (say, a string bigger than the buffer), it won't bother copying it to the buffer; it will just pass it directly to your flush function (after flushing what's already in the buffer.) So when calling your flush callback, the writer might be giving you a pointer to the buffer, in which case you want to do nothing because it already wrote the data into your string; or it might be giving you data that's coming from outside the buffer, in which case you need to copy it into the string yourself.

You can see how MPack does this for its own growable buffer with mpack_writer_init_growable and mpack_growable_writer_flush. Basically we need to do the same thing for sds.

I've implemented this for sds below. It uses sdsgrowzero() to add space to the string for the writer to write into and it shrinks it to the correct length during teardown using sdsrange(). I've tried to separate out the cases of the flush function as clearly as possible so it's probably longer than it needs to be.

#include "mpack/mpack.h"
#include "sds.h"

static void flush_mpack_writer_sds(mpack_writer_t* writer, const char* data, size_t length) {
    size_t current_length; // number of bytes already in the string
    const char* extra_data; // extra bytes to be copied to the string
    size_t extra_length; // number of extra bytes to be copied to the string

    if (data == writer->buffer) {
        if (mpack_writer_buffer_used(writer) == length) {
            // in the special case that we're flushing during teardown, we
            // don't want to grow; we want to do nothing at all.
            fprintf(stderr, "flushing teardown\n");
            return;
        }

        // the data to be flushed is already in our string. we don't actually
        // want to copy anything; just grow.
        fprintf(stderr, "flushing buffer\n");
        current_length = length;
        extra_data = NULL;
        extra_length = 0;

    } else {
        // the data to be flushed is coming from aside. it's not yet in our
        // string, so we'll need to grow the string, then copy it in.
        fprintf(stderr, "flushing extra data\n");
        current_length = mpack_writer_buffer_used(writer);
        extra_data = data;
        extra_length = length;
    }

    // calculate the new length of the string
    sds* s = (sds*)mpack_writer_context(writer);
    size_t new_length = sdslen(*s) * 2;
    if (new_length < current_length + extra_length)
        new_length = current_length + extra_length;

    // grow the string
    fprintf(stderr, "growing to %zi bytes\n", new_length);
    *s = sdsgrowzero(*s, new_length);

    // reset writer variables
    writer->buffer = *s;
    writer->position = *s + current_length;
    writer->end = *s + new_length;

    // append the extra data, if any
    if (extra_length > 0) {
        memcpy(writer->position, extra_data, extra_length);
        writer->position += extra_length;
    }
}

static void teardown_mpack_writer_sds(mpack_writer_t* writer) {
    sds* s = (sds*)mpack_writer_context(writer);

    if (mpack_writer_error(writer) != mpack_ok) {
        // free the string
        sdsfree(*s);
        *s = NULL;
    } else {
        // trim the string to the correct length
        size_t len = mpack_writer_buffer_used(writer);
        sdsrange(*s, 0, len);
        fprintf(stderr, "trimming to %zi bytes\n", len);
    }

    // clear writer variables for safety
    writer->buffer = NULL;
    writer->position = NULL;
    writer->end = NULL;
    writer->context = NULL;

}

static void init_mpack_writer_sds(mpack_writer_t* writer, sds* s) {
    *s = sdsempty();
    *s = sdsgrowzero(*s, MPACK_WRITER_MINIMUM_BUFFER_SIZE);
    mpack_writer_init(writer, *s, sdslen(*s));
    mpack_writer_set_context(writer, s);
    mpack_writer_set_flush(writer, flush_mpack_writer_sds);
    mpack_writer_set_teardown(writer, teardown_mpack_writer_sds);
}

int main(void) {
    sds s;
    mpack_writer_t writer;
    init_mpack_writer_sds(&writer, &s);

    mpack_start_map(&writer, 4);
        // the example on the msgpack homepage
        mpack_write_cstr(&writer, "compact");
        mpack_write_bool(&writer, true);
        mpack_write_cstr(&writer, "schema");
        mpack_write_uint(&writer, 0);
        // a big write to test writing extra data (mpack wants to bypass the buffer)
        mpack_write_cstr(&writer, "lipsum");
        mpack_write_cstr(&writer, "Lorem ipsum dolor sit amet, consectetur adipiscing elit. Integer nec tempor arcu. Suspendisse potenti. Aliquam sit amet tortor vehicula magna efficitur consequat molestie vitae eros. Proin molestie vel nunc at pharetra. Nulla dapibus nunc at pharetra vestibulum. Etiam elit elit, imperdiet nec neque et, auctor sagittis est. Ut sed tincidunt sapien, non malesuada ligula.");
        // lots more data to test growing again
        mpack_write_cstr(&writer, "array");
        mpack_start_array(&writer, 200);
            for (int i = 0; i < 200; ++i)
                mpack_write_int(&writer, i);
        mpack_finish_array(&writer);
    mpack_finish_map(&writer);

    mpack_error_t error = mpack_writer_destroy(&writer);
    if (error != mpack_ok) {
        fprintf(stderr, "error %i writing data!\n", error);
        // don't free the sds string; the writer freed it due to the error
        return EXIT_FAILURE;
    }

    // write the resulting data to a file, view it with e.g. msgpack2json -di out.mp
    FILE* file = fopen("out.mp", "wb");
    fwrite(s, 1, sdslen(s), file);
    fclose(file);

    sdsfree(s);
    return EXIT_SUCCESS;
}

I left in the fprintfs() so you can see what's going on; just remove them if you want to use it.

I realize this is messy and complicated. It should be much simpler to figure out why it's calling your flush function. There are also some imperfections in how it flushes. For example when writing extra data that bypasses the buffer, it needs to flush the buffer first. Your string may then grow, but not enough to fit the extra data, so it'll have to grow again. MPack's growable writer has the same problem. I'm sure this will never matter for you but it keeps me up at night. A proper API for general-purpose flush and fill callbacks that is easy to use and will do the perfectly optimal thing in all cases is something I've been pondering for many years. I intend to solve this and add stream templates to Pottery someday, and then upgrade MPack to use a similar API (while maintaining backwards compatibility with the current "old" API.)

Also, I didn't do any error handling in the above. The way sds does error handling is actually terrible. It returns NULL if a call fails, but doesn't free the original string. So you can't actually do things like s = sdscat(s, "hello"); like the docs say if you care about errors. You have to store the return value in a temporary, then only replace the string if the call didn't fail. Don't use sds if you care about error handling.

(In fact don't use sds at all. It's very allocation heavy for short strings, even allocating on sdsempty(). Modern implementations of std::string have an internal buffer for storing short strings without allocations. It's easy to do the same in C; Pottery's string does it for example. Of course Pottery's string ignores allocation errors at the moment. I have my own string implementation for my personal projects which does correct error handling. It may be worth it to write your own.)

tarruda commented 2 years ago

@ludocode I will analyze and decide what is the best course of action. Thank you very much for the long and detailed answer.