scylladb / scylladb

NoSQL data store using the seastar framework, compatible with Apache Cassandra
http://scylladb.com
GNU Affero General Public License v3.0
13.54k stars 1.29k forks source link

Make observable::operator() return future<> #9672

Open xemul opened 2 years ago

xemul commented 2 years ago

The observable class keeps the list of std::function<void(Args...)> subscribers onboard. When set (called) the value notifies all observers about the change. Being void the callback that needs to start an asynchronous operation can do nothing but send it into the background by ignoring the top-most future. It would be nice if observable::operator() returned back a future. This would require patching current observables, but all of them are executing in awaitable context already.

denesb commented 2 years ago

observable kicks off notifications when the value of some variable changes. In this context there is no way to wait for notifications to finish, so all this change would do is move all the background future waiting into a single place.

xemul commented 2 years ago

"No way" to wait or "no sense"? The database::update_version caller can wait on its future by just returning. The sstable::_on_closed is also assigned in a context where it can wait for the returned future. I didn't check others though.

denesb commented 2 years ago

I mean not with keeping the current convenient operator= semantics. We can switch to future<> set_value(), but that destroys the seamless drop-in nature of observable<>.

avikivity commented 2 years ago

observable was non-future by design to avoid making it even more complicated. If there's a good reason we can consider it, but it has to be really good.

bhalevy commented 2 years ago

@avikivity currently there are numerous places we drop the observer future and let the continuation run unchecked in the background. They must be waited on for proper shutdown.

xemul commented 2 years ago

@bhalevy , the migration manager's push (that obseves db schema_version changes) is the serialized action that's joined on storage service stop.

avikivity commented 2 years ago

Can you give some examples? If the callers are left to do it themselves, I agree it's better done here.

bhalevy commented 2 years ago

@avikivity here's an example:

void storage_service::install_schema_version_change_listener() {
    _listeners.emplace_back(make_lw_shared(_db.local().observable_schema_version().observe([this] (utils::UUID schema_version) {
        (void)_schema_version_publisher.trigger();
    })));
}

trigger calls:

future<> storage_service::publish_schema_version() {
    return _migration_manager.local().passive_announce(_db.local().get_version());
}

which calls

future<> migration_manager::passive_announce(utils::UUID version) {
    return _gossiper.container().invoke_on(0, [version] (auto&& gossiper) {
        mlogger.debug("Gossiping my schema version {}", version);
        return gossiper.add_local_application_state(gms::application_state::SCHEMA, gms::versioned_value::schema(version));
    });
}

Similarly, gossiper notifications use:

class i_endpoint_state_change_subscriber {
public:
    virtual ~i_endpoint_state_change_subscriber() {}
    /**
     * Use to inform interested parties about the change in the state
     * for specified endpoint
     *
     * @param endpoint endpoint for which the state change occurred.
     * @param epState  state that actually changed for the above endpoint.
     */
    virtual void on_join(inet_address endpoint, endpoint_state ep_state) = 0;

    virtual void before_change(inet_address endpoint, endpoint_state current_state, application_state new_statekey, const versioned_value& newvalue) = 0;

    virtual void on_change(inet_address endpoint, application_state state, const versioned_value& value) = 0;

    virtual void on_alive(inet_address endpoint, endpoint_state state) = 0;

    virtual void on_dead(inet_address endpoint, endpoint_state state) = 0;

    virtual void on_remove(inet_address endpoint) = 0;

    /**
     * Called whenever a node is restarted.
     * Note that there is no guarantee when that happens that the node was
     * previously marked down. It will have only if {@code state.isAlive() == false}
     * as {@code state} is from before the restarted node is marked up.
     */
    virtual void on_restart(inet_address endpoint, endpoint_state state) = 0;
};

And even a simple subscriber like the stream manger needs to do the (void)continuation song and dance to return void.

void stream_manager::on_remove(inet_address endpoint) {
    if (has_peer(endpoint)) {
        sslog.info("stream_manager: Close all stream_session with peer = {} in on_remove", endpoint);
        //FIXME: discarded future.
        (void)container().invoke_on_all([endpoint] (auto& sm) {
            sm.fail_sessions(endpoint);
        }).handle_exception([endpoint] (auto ep) {
            sslog.warn("stream_manager: Fail to close sessions peer = {} in on_remove", endpoint);
        });
    }
}

void stream_manager::on_restart(inet_address endpoint, endpoint_state ep_state) {
    if (has_peer(endpoint)) {
        sslog.info("stream_manager: Close all stream_session with peer = {} in on_restart", endpoint);
        //FIXME: discarded future.
        (void)container().invoke_on_all([endpoint] (auto& sm) {
            sm.fail_sessions(endpoint);
        }).handle_exception([endpoint] (auto ep) {
            sslog.warn("stream_manager: Fail to close sessions peer = {} in on_restart", endpoint);
        });
    }
}

void stream_manager::on_dead(inet_address endpoint, endpoint_state ep_state) {
    if (has_peer(endpoint)) {
        sslog.info("stream_manager: Close all stream_session with peer = {} in on_dead", endpoint);
        //FIXME: discarded future.
        (void)container().invoke_on_all([endpoint] (auto& sm) {
            sm.fail_sessions(endpoint);
        }).handle_exception([endpoint] (auto ep) {
            sslog.warn("stream_manager: Fail to close sessions peer = {} in on_dead", endpoint);
        });
    }
}
bhalevy commented 2 years ago

@xemul as I mentioned above, similar to the subject of this issue, i_endpoint_state_change_subscriber methods should be futurized too otherwise each subscriber needs to track background tasks it might leave behind.

Instead the gossiper (or notifier generally speak) could keep a tab on them using a gate and if a background tasks needs the subscriber, or any other resource to remain valid while it runs, it should get a shared ptr of it.

avikivity commented 2 years ago

You make a strong case. We can argue that all the clients should clean up after themselves, but that's just making them do more work.

Perhaps we should have two versions of observable. named_value for example wouldn't like to be involved with futures.

The future version should also serialize updates to make sure they are applied in order.

vladzcloudius commented 2 years ago

@avikivity an example where a callback needs to return a future is:

In those cases we need to be able to: 1) Serialize changes. 2) Allow waiting for ongoing updates to finish during the shutdown. 3) Allow reporting a failed update.

The later is probably a topic for a separate discussion - it's not clear what a callback needs to do if an invalid parameter update is requested: log it and ignore or make scylla commit a suicide?

The problem with the former is that the user may miss the message in the log and hit the issue during the next restart. Committing a suicide is not great either - what if a user made a mistake. Starting a node with 6TB of data back can take hours.