scylladb / seastar

High performance server-side application framework
http://seastar.io
Apache License 2.0
8.14k stars 1.53k forks source link

RPC framework: sending `sink` inside structures (specifically, `optional`) #871

Open kbr- opened 3 years ago

kbr- commented 3 years ago

I modified tests/unit/rpc_test.cc and implemented a simple std::optional (de)serializer:

template <typename T, typename Output>
inline void write(serializer s, Output& out, const std::optional<T>& v) {
    write(s, out, bool(v));
    if (v) {
        write(s, out, *v);
    }
}

template <typename T, typename Input>
inline std::optional<T> read(serializer s, Input& in, rpc::type<std::optional<T>>) {
    std::optional<T> v;
    auto b = read(s, in, rpc::type<bool>());
    if (b) {
        v = read(s, in, rpc::type<T>());
    }
    return v;
}

The following test demonstrates its use:

SEASTAR_TEST_CASE(test_std_opt) {
    rpc::server_options so;
    so.streaming_domain = rpc::streaming_domain_type(1);
    rpc_test_config cfg;
    cfg.server_options = so;
    return rpc_test_env<>::do_with(cfg, [] (rpc_test_env<>& env) -> future<> {
        test_rpc_proto::client c(env.proto(), {}, env.make_socket(), ipv4_addr());

        co_await env.register_handler(1, [&] () {
            return make_ready_future<std::optional<int32_t>>(std::optional{42});
        });

        co_await env.register_handler(2, [&] () {
            return make_ready_future<std::optional<int32_t>>(std::optional<int32_t>{});
        });

        auto handler1 = env.proto().make_client<std::optional<int32_t>()>(1);
        auto o = co_await handler1(c);
        BOOST_REQUIRE(o && *o == 42);

        auto handler2 = env.proto().make_client<std::optional<int32_t>()>(2);
        BOOST_REQUIRE(!(co_await handler2(c)));

        co_await c.stop();
    });
}

However, I'd like to do something more complex: I'd like to send optional<rpc::sink> / receive optional<rpc::source> through an RPC call. Unfortunately, the code doesn't compile. For example:

        co_await env.register_handler(1, [] {
            using opt_t = std::optional<rpc::sink<>>;
            return make_ready_future<opt_t>(opt_t{});
        });

gives:

error: no matching function for call to ‘read(serializer&, seastar::memory_input_stream<__gnu_cxx::__normal_iterator<seastar::temporary_buffer<char>*, std::vector<seastar::temporary_buffer<char> > > >&, seastar::rpc::type<seastar::rpc::sink<> >)’

and

error: no matching function for call to ‘write(serializer&, seastar::memory_output_stream<__gnu_cxx::__normal_iterator<seastar::temporary_buffer<char>*, std::vector<seastar::temporary_buffer<char> > > >&, const seastar::rpc::sink<>&)’

Indeed. I don't know how to (de)serialize sinks. Only the internals of the framework know how to do it. In include/seastar/rpc/rpc_impl.hh, unmarshal_one::helper<sink<T...>>:

    template<typename... T> struct helper<sink<T...>> {
        static sink<T...> doit(connection& c, Input& in) {
            return sink<T...>(make_shared<sink_impl<Serializer, T...>>(c.get_stream(get_connection_id(in))));
        }
    };

and marshall_one::helper<sink<T...>>:

    template <typename... T> struct helper<sink<T...>> {
        static void doit(Serializer& serializer, Output& out, const sink<T...>& arg) {
            put_connection_id(arg.get_id(), out);
        }
    };

I could copy-paste the serializing code and implement write for sinks,, but I can't copy-paste the deserializing code because it needs to access the connection in order to obtain the stream using the deserialized connection ID.

Now, these internal sink (de)serializing functions are called only if the return type of the handler is directly a sink, or it's wrapped in an rpc::tuple; if it's wrapped in rpc::tuple, then the compiler will dispatch from marshall_one::helper<tuple<sink<T...>, ...>> to marhsall_one::helper<sink<T...>> and so on.

But if the sink is wrapped in anything else, say std::optional or some user-defined struct, there is no way to call back the internal (de)serializing functions after the compiler dispatches into calling user's serializer.

What I need is sending optional<sink>. I tried to use rpc::optional<sink>, but there is no serializer written for rpc::optional inside marshall_one, and for a good reason: it cannot be done correctly. Indeed, the deserializer for rpc::optional does the following (unmarshal_one::helper<optional<T>>):

    template<typename T> struct helper<optional<T>> {
        static optional<T> doit(connection& c, Input& in) {
            if (in.size()) {
                return optional<T>(read(c.serializer<Serializer>(), in, type<typename remove_optional<T>::type>()));
            } else {
                return optional<T>();
            }
        }
    };

I.e. it checks whether it's not nullopt by checking in.size() > 0. That is, it assumes that there is no trailing data after the optional. It would be then incorrect to send e.g. rpc::tuple<rpc::optional<int32_t>, int32_t> where the first element is nullopt, because in that case in.size() > 0 would be true.

One potential solution would be to create a new optional type inside the framework which (de)serializes by using a boolean that indicates whether it's a nullopt, as shown at the beginning of the post. Then unmarshal_one::helper<optional_v2<sink>> could dispatch to unmarshal_one::helper<sink>.

Another solution, much more general, would be to somehow allow calling back the sink (de)serialization code. This would allow e.g. putting sinks into user-defined structures and sending these. But I don't know if it's possible without some huge refactors.

cc @gleb-cloudius perhaps you have some idea?

gleb-cloudius commented 3 years ago

What is the use case that you are having in mind? Can it be solved by having two RPCs? One that pass rpc::source and another that does not. The call that pass rpc::source and return rpc::sink was intended to allow opening stream session and associate some data with it.

-- Gleb.

kbr- commented 3 years ago

The optional would be used to implement the following algorithm:

  1. Node A sends node B some metadata
  2. Node B checks some condition based on the received metadata:
    • if the condition is true, B sends a sink to A (which A receives as a source) and streams data to A
    • otherwise B sends nullopt
  3. If A receives a sink, it consumes the streamed data; otherwise it knows that the condition did not pass and we abandon the operation

I do this in https://github.com/scylladb/scylla/pull/8169. I worked around the lack of optional by always sending the sink and additional info so node A can see that the condition is false as well - in that case it simply won't use the source.

It could be solved by two RPCs too, one for sending the metadata and checking the condition, the other for sending the sink.


For sending a sink inside a struct, imagine a use case like this: we want to send some "large" data structure that is made of a sequence of "small" parts, each part has the same type. Additionally, we want to send some "small" metadata that is of different type. We could do it by sending a struct that contains the small metadata and a sink, and then send the large data structure through the sink by sending these small parts. But I don't have anything specific in mind at this point. Can obviously be solved by two RPCs - one for sending the small metadata and one for sending the large data structure - but if the two things are somehow logically bundled why not bundle them "physically" as well. Currently this use case is covered by rpc::tuple at least, but I don't see a reason why structs couldn't work as well (except that implementation could be tricky). Alternatively, we could modify the type of the sink, be it sink<variant<small metadata type, large struct fragment>>. Then the sender would first send the metadata and then large struct fragments. But that's ugly because we know that metadata will only come once as the first thing and then everything else are large struct fragments. We lose type safety etc.

gleb-cloudius commented 3 years ago

On Mon, Mar 01, 2021 at 02:30:07AM -0800, Kamil Braun wrote:

The optional would be used to implement the following algorithm:

  1. Node A sends node B some metadata
  2. Node B checks some condition based on the received metadata:
    • if the condition is true, B sends a sink to A (which A receives as a source) and streams data to A
    • otherwise B sends nullopt

Just send a status back, or send an error through the stream. The stream was already created - you gain nothing by trying to hide the flag in an optional. One of rpc stream features that was not yet implemented was "one way" stream where a receiver of a stream does not send its part of the stream back which indicates that the stream will be one sided and another side will be automatically closed. Trying to play games like you want to do here will just make it harder.

  1. If A receives a sink, it consumes the streamed data; otherwise it knows that the condition did not pass and we abandon the operation

I do this in https://github.com/scylladb/scylla/pull/8169. I worked around the lack of optional by always sending the sink and additional info so node A can see that the condition is false as well - in that case it simply won't use the source.

It could be solved by two RPCs too, one for sending the metadata and checking the condition, the other for sending the sink.


For sending a sink inside a struct, imagine a use case like this: we want to send some "large" data structure that is made of a sequence of "small" parts, each part has the same type. Additionally, we want to send some "small" metadata that is of different type. We could do it by sending a struct that contains the small metadata and a sink, and then send the large data structure through the sink by sending these small parts. But I don't have anything specific in mind at this point.

The rpc stream was literally added for your "large" data structure example.

Can obviously be solved by two RPCs - one for sending the small metadata and one for sending the large data structure - but if the two things are somehow logically bundled why not bundle them "physically" as well. Currently this use case is covered by rpc::tuple at least, but I don't see a reason why structs couldn't work as well (except that implementation could be tricky).

rpc::tuple is making return values if RPC calls extensible while maintaining backwards compatibility. It was needed as a way to move from variadic futures that supported it naturally. It is not to make some return values optional.

Alternatively, we could modify the type of the sink, be it sink<variant<small metadata type, large struct fragment>>. Then the sender would first send the metadata and then large struct fragments. But that's ugly because we know that metadata will only come once as the first thing and then everything else are large struct fragments. We lose type safety etc.

If you want to send completely different things use different rpc calls. You only obfuscate things by trying to bundle the unbundable.

Theoretically it would be nice to be able to put sink/source anywhere deep in a struct hierarchy just for academic purposes, but then you need to expose rpc internals to user provided serializes and any example usage will be as contrived as your example here.

-- Gleb.

kbr- commented 3 years ago

The rpc stream was literally added for your "large" data structure example.

The problem here is the "small" metadata that I want to send together with the "large" data struct. It would be nice to do something like (on the sending end):

struct all_data {
    small_metadata_t small_metadata;
    rpc::sink<large_structure_fragment_t> large_structure_sink;
};

and on the receiving end:

struct all_data {
    small_metadata_t small_metadata;
    rpc::source<large_structure_fragment_t> large_structure_sink;
};

rpc::tuple is making return values if RPC calls extensible while maintaining backwards compatibility. It was needed as a way to move from variadic futures that supported it naturally. It is not to make some return values optional.

I can also use it to solve the problem above, by sending

rpc::tuple<small_metadata_t, rpc::sink<large_structure_fragment_t>>

What I'm asking is whether we could generalize it: there's nothing special about tuples compared to usual structs.

gleb-cloudius commented 3 years ago

On Mon, Mar 01, 2021 at 03:06:31AM -0800, Kamil Braun wrote:

The rpc stream was literally added for your "large" data structure example.

The problem here is the "small" metadata that I want to send together with the "large" data struct. It would be nice to do something like (on the sending end):

struct all_data {
    small_metadata_t small_metadata;
    rpc::sink<large_structure_fragment_t> large_structure_sink;
};

and on the receiving end:

struct all_data {
    small_metadata_t small_metadata;
    rpc::source<large_structure_fragment_t> large_structure_sink;
};

What is the problem in sending metadata and sink as two different rpc parameters? This is exactly how opening rpc connection was intended to be done. Why do you think putting sink/source into the struct has such large value that rpc internals need to be exposed to a user code and idl compiler added special handling for rpc streams.

rpc::tuple is making return values if RPC calls extensible while maintaining backwards compatibility. It was needed as a way to move from variadic futures that supported it naturally. It is not to make some return values optional.

I can also use it to solve the problem above, by sending rpc::tuple<small_metadata_t, rpc::sink<large_structure_fragment_t>>. What I'm asking is whether we could generailize it: there's nothing special about tuples compared to usual structs.

Don't. It will break horribly when you will want to add one more member to that tuple.

-- Gleb.

kbr- commented 3 years ago

What is the problem in sending metadata and sink as two different rpc parameters?

The problem is to send metadata and sink to the client. To send it to the server, I can use two parameters as you said. In other words, the problem is to put both the metadata and sink in the return type of the handler.

Don't. It will break horribly when you will want to add one more member to that tuple.

Then I won't add more members. But I don't understand why it would break?

kbr- commented 3 years ago

Example:

env.register_handler(1, [] (rpc::source<> source, client_metadata_t client_metadata) -> future<rpc::tuple<rpc::sink<large_struct_fragment_t>, server_metadata_t>> {
    // use client metadata...
    auto sink = source.make_sink<...>(...);
    server_metadata_t server_metadata {...};
    return make_ready_future<...>(rpc::tuple{std::move(sink), std::move(server_metadata)});
});

as you can see, to return both a sink and the metadata, I had to put them in rpc::tuple.

The problem is to generalize this so I could send e.g. optional<sink> or whatever struct containing a sink.

gleb-cloudius commented 3 years ago

On Mon, Mar 01, 2021 at 03:21:55AM -0800, Kamil Braun wrote:

What is the problem in sending metadata and sink as two different rpc parameters?

The problem is to send metadata and sink to the client. To send it to the server, I can use two parameters as you said. In other words, the problem is to put both the metadata and sink in the return type of the handler.

Ah, I see what you mean now. You want to return value to contain source + something else.

Don't. It will break horribly when you will want to add one more member to that tuple.

Then I won't add more members. But I don't understand why it would break?

The assumption is that you will :) I probably misunderstood what you what to use rpc::tuple for. If you want to use it to return multiple values there is no problem doing so and this is what it intended for. If you want to use it to make returning rpc::source optional then you have the problem of adding new members.

-- Gleb.

kbr- commented 3 years ago

Now I'm doing something like:

// server:
env.register_handler(1, [] (rpc::source<> source, client_metadata_t client_metadata) -> future<rpc::tuple<rpc::sink<large_struct_fragment_t>, bool>> {
    // use client metadata...
    auto sink = source.make_sink<...>(...);
    bool is_sink_valid = ...
    return make_ready_future<...>(rpc::tuple{std::move(sink), is_sink_valid});
});

// client:
auto handler = make_client(...)
auto [source, is_source_valid] = co_await handler(...)
if (is_source_valid) {
    // use source...
} else {
    // do something else...
}

so: because I can't return optional<sink>, I return both sink and bool, and the bool indicates if the sink is "really there". It seems like an ugly hack to me: because I can't express the optionality of the sink in the type system, I need to use a separate flag to denote whether the sink is valid or phony.

Also, the server must create the sink unconditionally, even if it sends is_sink_valid = false.

It would be nice not to have to do hacks like this.

gleb-cloudius commented 3 years ago

On Mon, Mar 01, 2021 at 03:35:47AM -0800, Kamil Braun wrote:

Now I'm doing something like:

// server:
env.register_handler(1, [] (rpc::source<> source, client_metadata_t client_metadata) -> future<rpc::tuple<rpc::sink<large_struct_fragment_t>, bool>> {
    // use client metadata...
    auto sink = source.make_sink<...>(...);
    bool is_sink_valid = ...
    return make_ready_future<...>(rpc::tuple{std::move(sink), is_sink_valid});
});

// client:
auto handler = make_client(...)
auto [source, is_source_valid] = co_await handler(...)
if (is_source_valid) {
    // use source...
} else {
    // do something else...
}

so: because I can't return optional<sink>, I return both sink and bool, and the bool indicates if the sink is "really there". It seems like an ugly hack to me: because I can't express the optionality of the sink in the type system, I need to use a separate flag to denote whether the sink is valid or phony.

You do not have to do it like that. You can have an RPC that checks the validity and then another one that creates the stream. This also saves you the stream creation in the first place.

Also, the server must create the sink unconditionally, even if it sends is_sink_valid = false.

The stream is already created, creating a sink is just a formality. With support for one way stream it will not have to be.

The way you want to do things is to create the stream (additional tcp connection and all that) before you do the check and then try to save on insignificant operation that creates sink object in memory. If you had open coded it instead of using RPC you would never do it like that.

-- Gleb.

kbr- commented 3 years ago

You do not have to do it like that. You can have an RPC that checks the validity and then another one that creates the stream. This also saves you the stream creation in the first place.

I'd much prefer to have only 1 RPC verb instead of 2. No particular reason, just personal preference. I think it's more elegant to do it in one call. Well, it also saves a round-trip (but that's perhaps not so important here).

The stream is already created, creating a sink is just a formality. With support for one way stream it will not have to be.

The way you want to do things is to create the stream (additional tcp connection and all that) before you do the check and then try to save on insignificant operation that creates sink object in memory. If you had open coded it instead of using RPC you would never do it like that.

Yeah, it would be nice to somehow prevent the stream creation as well. But even if we already created the stream there is value in not sending the sink - type safety. If we don't send the sink, the other side won't use it by accident.

gleb-cloudius commented 3 years ago

On Mon, Mar 01, 2021 at 05:50:18AM -0800, Kamil Braun wrote:

Yeah, it would be nice to somehow prevent the stream creation as well. It is very easy - do not create one until you know you need it.

-- Gleb.

kbr- commented 3 years ago

Could you provide a snippet of code that sketches the solution? In particular, I'm interested in the type(s) of the handler(s).

gleb-cloudius commented 3 years ago

On Mon, Mar 01, 2021 at 06:11:46AM -0800, Kamil Braun wrote:

Could you provide a snippet of code that sketches the solution? In particular, I'm interested in the type(s) of the handler(s).

Do not create an rpc stream until you verified that you will use it. What king of sketch do you need for that?

if (need_stream) { sink = create one source = rpc_send(sink); }

?

-- Gleb.

kbr- commented 3 years ago

How does the client (the one executing the snippet you provided) know the value of need_stream? In my previous snippet, it was the server who can calculate need_stream (it was called is_sink_valid back then).

Could you include a snippet with the full (high-level) algorithm of the client? Including how he obtains need_stream. Note: we can assume that the server knows how to calculate need_stream.

gleb-cloudius commented 3 years ago

On Mon, Mar 01, 2021 at 06:18:00AM -0800, Kamil Braun wrote:

How does the client (the one executing the snippet you provided) know the value of need_stream? In whatever way you want it to. You know the logic, I have no idea. The point is it is done before you open a stream, not after.

-- Gleb.

kbr- commented 3 years ago

I already explained the logic: the client doesn't know need_stream. Only the server does. So the server knows whether or not to open a stream. So we have a contradiction:

  1. the client must decide whether to open a stream before contacting the server,
  2. but the client can know how to open a stream only after contacting the server.

The only way I see to resolve this is to have the client contact the server twice:

  1. once to obtain need_stream,
  2. the second time to obtain the source (if need_stream == true).

Unfortunately, there's a problem here: between 1 and 2 need_stream may change (on the server side).

gleb-cloudius commented 3 years ago

On Mon, Mar 01, 2021 at 07:37:50AM -0800, Kamil Braun wrote:

The only way I see to resolve this is to have the client contact the server twice:

  1. once to obtain need_stream,
  2. the second time to obtain the source (if need_stream == true).

Unfortunately, there's a problem here: between 1 and 2 need_stream may change.

Good point. You can make server create the stream instead (call back to the client after it connects), or make version check as part of data that goes through the stream. Anyway you cannot make returning sink optional anyway. It has to be closed explicitly by the stream initiator.

-- Gleb.

kbr- commented 3 years ago

(call back to the client after it connects)

That's interesting, but meh. IMO too complicated, I'd rather create the throw-away stream: the operation is not so common anyway, so the additional wasted connection doesn't make a difference; also the version mismatch is actually an exceptional situation.

or make version check as part of data that goes through the stream

This could be done by having something like

rpc::sink<std::variant<version, large_struct_fragment>>

Then sending first the version, then large struct fragments. But that's ugly :( We know during compile time that version comes only once, then everything else is fragments. But because of the APIs we need to dispatch on the variant on each received element.

Anyway you cannot make returning sink optional anyway.

:(