scylladb / seastar

High performance server-side application framework
http://seastar.io
Apache License 2.0
8.28k stars 1.54k forks source link

In `seastar::http::reply::write_body`, using `output_stream::write(temporary_buffer)` causes crash #1701

Open meilofveeningen-rl opened 1 year ago

meilofveeningen-rl commented 1 year ago

When using seastar::http::http_server and when replying to a request in streaming mode using reply::write_body, the following causes a crash:

void fn(seastar::http::reply& rep) {
    rep.write_body("bin", [](seastar::output_stream<char>&& stream_) -> future<> {
        auto stream = std::move(stream_);
        co_await stream.write(seastar::temporary_buffer<char>{3}));
        co_await stream.close();
    }
}

The equivalent code using write(buff.begin, buff.size()) works.

This seems to be because write(temporary_buffer) is implemented in terms of net::packet. In this example, close triggers a flush, which calls seastar::http::internal::http_chunked_data_sink_impl::put(seastar::net::packet), which is not implemented.

There may be a good reason for this, but at least it would be nice if this were documented.

meilofveeningen-rl commented 1 year ago

As also reported here: https://groups.google.com/g/seastar-dev/c/WHYB3oG6Hz0/m/9Gats5wvAQAJ

niekbouman commented 1 month ago

I can reproduce the problem (see entirely below), and it seems that I can fix the problem by giving an implementation for put(net::packet) (in src/http/common.cc) that extracts the temporary_buffer from the packet again, and then calls the other put overload. But doing it in this way seems a bit silly...

@xemul @nyh Could you perhaps help with this? The http_chunked_data_sink_impl has been introduced in Pavel's PR 1360, which Nadav reviewed: https://github.com/scylladb/seastar/pull/1360#discussion_r1055201113 Although that PR focused on http-requests, it gives this unintended effect on the reply side.

class http_chunked_data_sink_impl : public data_sink_impl {
    output_stream<char>& _out;

   //...
public:
    http_chunked_data_sink_impl(output_stream<char>& out) : _out(out) {
    }

    virtual future<> put(net::packet data) override { 
        auto v = data.release();
        assert(ssize(v) == 1);
        return this->put(std::move(v[0])); // call the put function defined below
    }

    using data_sink_impl::put;
    virtual future<> put(temporary_buffer<char> buf) override {
        if (buf.size() == 0) {
            // size 0 buffer should be ignored, some server
            // may consider it an end of message
            return make_ready_future<>();
        }
        auto size = buf.size();
        return write_size(size).then([this, buf = std::move(buf)] () mutable {
            return _out.write(buf.get(), buf.size());
        }).then([this] () mutable {
            return _out.write("\r\n", 2);
        });
    }

   //...

This is my reproducer:

#include "stop_signal.hh"
#include <cstdint>
#include <seastar/core/app-template.hh>
#include <seastar/core/coroutine.hh>
#include <seastar/core/future.hh>
#include <seastar/core/seastar.hh>
#include <seastar/core/sleep.hh>
#include <seastar/core/thread.hh>
#include <seastar/http/file_handler.hh>
#include <seastar/http/function_handlers.hh>
#include <seastar/http/handlers.hh>
#include <seastar/http/httpd.hh>
#include <seastar/http/reply.hh>
#include <seastar/json/json_elements.hh>
#include <seastar/net/socket_defs.hh>
#include <seastar/util/log.hh>

seastar::logger logger("main");

class handl : public seastar::httpd::handler_base {
  public:
    virtual seastar::future<std::unique_ptr<seastar::http::reply>> handle(const seastar::sstring& path,
                                                                          std::unique_ptr<seastar::http::request> req,
                                                                          std::unique_ptr<seastar::http::reply> rep)
    {
        rep->_content = "hello";
        rep->done("html");

        rep->write_body("bin", [](seastar::output_stream<char>&& stream_) -> seastar::future<> {
            auto stream = std::move(stream_);
            seastar::temporary_buffer<char> p{3};
            co_await stream.write(std::move(p));
            co_await stream.close();
        });
        return seastar::make_ready_future<std::unique_ptr<seastar::http::reply>>(std::move(rep));
    }
};
int main(int argc, char** argv)
{
    using namespace seastar;
    seastar::app_template app;

    app.run(argc, argv, [] {
        return seastar::async([] {
            seastar_apps_lib::stop_signal s;
            seastar::httpd::http_server server("test");
            server.set_content_streaming(true);

            server._routes.add(seastar::httpd::operation_type::GET, httpd::url("/test"), new handl{});

            server.listen(static_cast<uint16_t>(8000)).get();
            s.wait().get();
            server.stop().get();
        });
    });
}
xemul commented 1 month ago

The same is true for http_content_length_data_sink_impl

xemul commented 1 month ago

And http_content_length_data_sink_impl would just drop the packet :( however, I don't really know under what circumstances this one is used

xemul commented 1 month ago

I think a very good example of how to handle packet in data sink sits in the pip_data_sink_impl:

    future<> put(net::packet data) override {
        return do_with(data.release(), [this] (std::vector<temporary_buffer<char>>& bufs) {
            return do_for_each(bufs, [this] (temporary_buffer<char>& buf) {
                return put(buf.share());
            });
        });
    }

and currently I think that the way to go is to provide default implementation for data_sink_impl::put(net::packet) thing like above, so that inheriting classes can forget about net::packet.

On the other hand I remember some ancient discussion with @avikivity about net::packet vs temporary_buffer usage in iostreams but I forgot what the outcome was :(

niekbouman commented 1 month ago

ah I see, so that goes along the same lines as my temp fix, but then properly passing on all temp_bufs in the packet (instead of only the first)

niekbouman commented 1 month ago

On the other hand I remember some ancient discussion with @avikivity about net::packet vs temporary_buffer usage in iostreams but I forgot what the outcome was :(

Do you still remember that @avikivity , or shall I make a PR that applies Pavel's solution to data_sink_impl::put(net::packet) ?

xemul commented 1 month ago

On the other hand I remember some ancient discussion with @avikivity about net::packet vs temporary_buffer usage in iostreams but I forgot what the outcome was :(

@avikivity , can you please remind us what the idea/plan was?

avikivity commented 2 weeks ago

It was a mistake to let net::packet become part of iostream. It's a low-level concept of a scatter/gather packet with headspace for headers, which works well for tcp and network devices, but doesn't fit streams.

However, removing net::packet from iostream is a complicated exercise (involving a year of deprecation) and shouldn't stand in the way of a simple fix.

avikivity commented 2 weeks ago
    future<> put(net::packet data) override {
        return do_with(data.release(), [this] (std::vector<temporary_buffer<char>>& bufs) {
            return do_for_each(bufs, [this] (temporary_buffer<char>& buf) {
                return put(buf.share());
            });
        });
    }

Coroutines exist.

avikivity commented 2 weeks ago

(closed by mistake)

niekbouman commented 2 weeks ago

I think that the way to go is to provide default implementation for data_sink_impl::put(net::packet) thing like above, so that inheriting classes can forget about net::packet.

Hmm, I now see that data_sink_impl also implements the temporary_buffer overload of put using the packet-overload of put:

    virtual future<> put(temporary_buffer<char> buf) {
        return put(net::packet(net::fragment{buf.get_write(), buf.size()}, buf.release()));
    }

so if we implement the put overload for packet as you suggest, then we will get infinite recursion between the packet and temporary_buffer overloads of put, no?

xemul commented 1 week ago

However, removing net::packet from iostream is a complicated exercise (involving a year of deprecation)

Why not just bumping the API level? It should make this exercise much sorter

and shouldn't stand in the way of a simple fix.

Absolutely yes

xemul commented 1 week ago

so if we implement the put overload for packet as you suggest, then we will get infinite recursion between the packet and temporary_buffer overloads of put, no?

If inheriting class doesn't implement any of those -- yes :( Then maybe

class data_sink_impl {
    ...
protected:
    future<> fallback_put(net::packet data) {
        return do_with(data.release(), [this] (std::vector<temporary_buffer<char>>& bufs) {
            return do_for_each(bufs, [this] (temporary_buffer<char>& buf) {
                return put(buf.share());
            });
        });
    }
};
...
class http_chunked_data_sink_impl {
    virtual future<> put(net::packet data)  override { return data_sink_impl::fallback_put(std::move(data)); }
};

?

niekbouman commented 3 days ago
    future<> put(net::packet data) override {
        return do_with(data.release(), [this] (std::vector<temporary_buffer<char>>& bufs) {
            return do_for_each(bufs, [this] (temporary_buffer<char>& buf) {
                return put(buf.share());
            });
        });
    }

Coroutines exist.

See https://github.com/scylladb/seastar/pull/2436 (in coroutine style, as Avi Kivity seems to prefer that style)