Closed tgrabiec closed 1 year ago
Probably introduced in 1a6bf2e9ca0275eade59ed35b00778a05375207f
I don't see the evidence in this log:
WARN
s observed in this log only show that pings happen after the DIRECT_FD_VERB handler was unregistered, which is fine, I think they are a red herring. messaging_service
is stopped later.fd.stop()
is called before ms.stop()
in cql_test_env.cc. fd.stop()
destroys all pinging workers. So by looking at the code, pinging shouldn't happen by the time ms.stop()
is called.
Maybe stopping the pinger doesn't wait for all verbs?
I added tracing to messaging_service::get_rpc_client(), and this verb is the only verb issued in the program.
Maybe @gleb-cloudius has some idea.
I added tracing to messaging_service::get_rpc_client(), and this verb is the only verb issued in the program.
I think that's because the node is pinging itself - maybe there are no other RPCs that a node issues to itself, hence this is the only verb we see in a single-node cluster.
The question is whether this verb is issued after messaging service has been stopped. I don't think it is.
What about the backtrace? It shows some things that are completely unrelated to pinging.
column_mapping::~column_mapping() at schema.cc:?
db::cql_table_large_data_handler::internal_record_large_cells(sstables::sstable const&, sstables::key const&, clustering_key_prefix const*, column_definition const&, unsigned long, unsigned long) const at ./db/large_data_handler.cc:180
operator() at ./db/large_data_handler.cc:123
Usually RPC is not send to the node itself at least in my code. IIRC truncate does it. May be there are others.
The problem is that messaging service is stopped while there is an active rpc::client. Stopping the service just removes the clients, which deallocates them. This causes use-afer-free on the rpc client.
The last valid frame is this:
operator() at ./build/release/seastar/./seastar/src/rpc/rpc.cc:788
Which comes from here:
}).then_wrapped([this] (future<> f) {
std::exception_ptr ep;
if (f.failed()) {
ep = f.get_exception();
if (_connected) {
if (is_stream()) {
788: log_exception(*this, log_level::error, "client stream connection dropped", ep);
Which invoked connection::_logger. This field no longer points to a logger, but something else (cql_table_large_data_handler), hence you see the cql stuff later in the backtrace, which eventually trips over and sigsegvs.
The problem is that messaging service is stopped while there is an active rpc::client. Stopping the service just removes the clients, which deallocates them. This causes use-afer-free on the rpc client.
So it's an in-flight RPC response arriving after messaging is stopped (and the client destroyed).
Note that the failure detector, when stopping and destroying workers, ensures that all ping calls are finished - it co_awaits on the pinging fiber. To make sure that the calls finish promptly, it aborts them before waiting, which underneath uses the Seastar cancellable
interface.
Hence I don't think the bug is on the failure detector side - it takes care of waiting for everything to finish. Maybe we don't handle responses to cancelled RPCs properly in the Seastar RPC implementation?
Looking at RPC impl, we see:
void client::wait_for_reply(id_type id, std::unique_ptr<reply_handler_base>&& h, std::optional<rpc_clock_type::time_point> timeout, cancellable* cancel) {
if (timeout) {
h->t.set_callback(std::bind(std::mem_fn(&client::wait_timed_out), this, id));
h->t.arm(timeout.value());
}
if (cancel) {
cancel->cancel_wait = [this, id] {
_outstanding[id]->cancel();
_outstanding.erase(id);
};
h->pcancel = cancel;
cancel->wait_back_pointer = &h->pcancel;
}
_outstanding.emplace(id, std::move(h));
}
So canceling an RPC call will resolve it immediately by the means of _outstanding[id]->cancel()
, without waiting for the send loop.
That could explain use-after-free, right @gleb-cloudius ?
Looking at RPC impl, we see:
void client::wait_for_reply(id_type id, std::unique_ptr<reply_handler_base>&& h, std::optional<rpc_clock_type::time_point> timeout, cancellable* cancel) { if (timeout) { h->t.set_callback(std::bind(std::mem_fn(&client::wait_timed_out), this, id)); h->t.arm(timeout.value()); } if (cancel) { cancel->cancel_wait = [this, id] { _outstanding[id]->cancel(); _outstanding.erase(id); }; h->pcancel = cancel; cancel->wait_back_pointer = &h->pcancel; } _outstanding.emplace(id, std::move(h)); }
So canceling an RPC call will resolve it immediately by the means of
_outstanding[id]->cancel()
, without waiting for the send loop.That could explain use-after-free, right @gleb-cloudius ?
There is no more send loop. And why it should wait for it anyway? It drops the message from the queue, so nothing will be sent. What happens on cancel is exactly the same as what happens on timeout:
void client::wait_timed_out(id_type id) {
_stats.timeout++;
_outstanding[id]->timeout();
_outstanding.erase(id);
}
By send loop, I mean this continuation:
client::client(const logger& l, void* s, client_options ops, socket socket, const socket_address& addr, const socket_address& local)
: rpc::connection(l, s), _socket(std::move(socket)), _server_addr(addr), _local_addr(local), _options(ops) {
_socket.set_reuseaddr(ops.reuseaddr);
// Run client in the background.
// Communicate result via _stopped.
// The caller has to call client::stop() to synchronize.
(void)_socket.connect(addr, local).then([this, ops = std::move(ops)] (connected_socket fd) {
fd.set_nodelay(ops.tcp_nodelay);
if (ops.keepalive) {
fd.set_keepalive(true);
fd.set_keepalive_parameters(ops.keepalive.value());
}
set_socket(std::move(fd));
feature_map features;
if (_options.compressor_factory) {
features[protocol_features::COMPRESS] = _options.compressor_factory->supported();
}
if (_options.send_timeout_data) {
features[protocol_features::TIMEOUT] = "";
}
if (_options.stream_parent) {
features[protocol_features::STREAM_PARENT] = serialize_connection_id(_options.stream_parent);
}
if (!_options.isolation_cookie.empty()) {
features[protocol_features::ISOLATION] = _options.isolation_cookie;
}
return send_negotiation_frame(std::move(features)).then([this] {
return negotiate_protocol(_read_buf);
}).then([this] () {
_propagate_timeout = !is_stream();
set_negotiated();
return do_until([this] { return _read_buf.eof() || _error; }, [this] () mutable {
^^^ SEND LOOP ^^^^
if (is_stream()) {
return handle_stream_frame();
}
return read_response_frame_compressed(_read_buf).then([this] (std::tuple<int64_t, std::optional<rcv_buf>> msg_id_and_data) {
auto& msg_id = std::get<0>(msg_id_and_data);
auto& data = std::get<1>(msg_id_and_data);
auto it = _outstanding.find(std::abs(msg_id));
if (!data) {
_error = true;
} else if (it != _outstanding.end()) {
auto handler = std::move(it->second);
_outstanding.erase(it);
(*handler)(*this, msg_id, std::move(data.value()));
} else if (msg_id < 0) {
try {
std::rethrow_exception(unmarshal_exception(data.value()));
} catch(const unknown_verb_error& ex) {
// if this is unknown verb exception with unknown id ignore it
// can happen if unknown verb was used by no_wait client
get_logger()(peer_address(), format("unknown verb exception {:d} ignored", ex.type));
} catch(...) {
// We've got error response but handler is no longer waiting, could be timed out.
log_exception(*this, log_level::info, "ignoring error response", std::current_exception());
}
} else {
// we get a reply for a message id not in _outstanding
// this can happened if the message id is timed out already
get_logger()(peer_address(), log_level::debug, "got a reply for an expired message id");
}
});
});
});
}).then_wrapped([this] (future<> f) {
783:
std::exception_ptr ep;
if (f.failed()) {
ep = f.get_exception();
if (_connected) {
if (is_stream()) {
log_exception(*this, log_level::error, "client stream connection dropped", ep);
} else {
log_exception(*this, log_level::error, "client connection dropped", ep);
}
} else {
if (is_stream()) {
log_exception(*this, log_level::debug, "stream fail to connect", ep);
} else {
log_exception(*this, log_level::debug, "fail to connect", ep);
}
}
}
_error = true;
_stream_queue.abort(std::make_exception_ptr(stream_closed()));
return stop_send_loop(ep).then_wrapped([this] (future<> f) {
f.ignore_ready_future();
_outstanding.clear();
if (is_stream()) {
deregister_this_stream();
} else {
abort_all_streams();
}
}).finally([this]{
_stopped.set_value();
});
});
}
What ensures that line 783 does not run after the verb is cancelled?
Oh, it's probably a "receive loop"?
So is this a bug in the messaging_service, which doesn't call client::stop() on messaging_service::stop() ?
Oh, it's probably a "receive loop"?
Yes this is receive loop and I do not see the problem, though have no time to look deep now. If a response is received after cancellation an entry will not be found in _outstanding
and if it is received before it removes it from there by itself. Probably at this point we need to disable cancellation of the element.
I can see that this changed in 1c8ea817cd5c76cf6e8efffdad203c411f5a4bca, after which messaging_service::stop() no longer calls rpc::client::stop(). But we must stop the clients before destruction.
@xemul What's your take on this?
I can see that this changed in 1c8ea81, after which messaging_service::stop() no longer calls rpc::client::stop(). But we must stop the clients before destruction.
@xemul What's your take on this?
But m.s. still calls rpc::client::stop() on messaging_service::shutdown() that happens on storage_service::drain/drain_on_shutdown. It's cql_test_env that doesn't shutdown m.s. and doesn't drain storage_service.
There are three options:
_shutting_down
bool there, I remember that some other services behave the same, but cannot remember which ones exactly)No vulnerable branches, not backporting.
Happens sporadically in release mode in schema_change_test.
Log evidence: