databendlabs / openraft

rust raft with improvements
Apache License 2.0
1.38k stars 155 forks source link

Using a channel to asynchronously receive a reply from StateMachine::apply(). #208

Open drmingdrmer opened 2 years ago

drmingdrmer commented 2 years ago

The reply should happen asynchronously, somehow. Make this method synchronous and instead of using the result, pass a channel where to post the completion.

The Raft core can then collect completions on this channel and update the client with the result once all the preceding operations have been applied to the state machine. This way we'll reach operation pipelining w/o the need to wait for the completion of each operation inline.

https://github.com/datafuselabs/openraft/blob/46644c8409c7dff627c991cc65849507ad5265b3/openraft/src/storage.rs#L324-L329

github-actions[bot] commented 2 years ago

👋 Thanks for opening this issue!

Get help or engage by:

schreter commented 2 years ago

Just yesterday, we discussed it with the colleagues what this means in detail (also for the snapshot, that's similar).

In fact, we need a two-stage process: first, the state machine update or snapshot is triggered on the &mut self to prepare the operation. This may even complete the operation fully, but on the other hand, it may need to (sometimes) synchronize, so it still needs to be async. As a result, we can return a 3-state value with immediate result, immediate error or a boxed dyn Future + 'static (e.g., representing a task to be executed in the background). Then, for all such futures, we can start background tasks with appropriate report back to the main loop of the Raft core. The started tasks (and also immediately completed, if there was no future reported) can be kept in a List or Deque or similar structure. Since they are produced in-order, the structure grows to the right.

When the task completes, as mentioned, it will send the completion message. If it is at the head of the queue, then we can report the completion back to the client and repeatedly check the new head. I.e., if we have tasks 1,2,3,4,5 completed out-of-order 2,3,5,1,4, then the completion of 2, 3 and 5 will do nothing, completion of 1 (head of the outstanding requests) will send the completion of 1 to the client and then check the remainder of the queue, i.e., also send completions for already-completed requests 2 and 3. 4 is incomplete, so the processing stops there. Following the same logic, when 4 completes, it will send completions for 4 and 5 to the client.

drmingdrmer commented 2 years ago

As a result, we can return a 3-state value with immediate result, immediate error or a boxed dyn Future + 'static (e.g., representing a task to be executed in the background).

So this is what you expected the API to be, right?

// where R: AppResponse
type ApplyResult = Result<R, Error>;

enum Either<T, U>{
    Left(T),
    Right(U),
}

fn apply_to_state_machien() -> Either<ApplyResult, Box<dyn Future<Output=<ApplyResult>>>

If it is, what about making the return value just a Future?

With GAT, will it be as efficient as returning an immediate result?

schreter commented 2 years ago

Yes, something like that. Simple Future won't work, since the Future which needs to be deferred to a background task must be 'static. Further, the implementation may need to await something. Concretely, we have exactly this situation in our code: the modification preparation needs write access (via &mut self), but also needs to await allocation of resources (which is typically immediate, but not always). Then, after resources are acquired, based on other factor, it can either execute the request immediately or build the 'static Future to be awaited in a task.

drmingdrmer commented 2 years ago

Then, after resources are acquired, based on other factor, it can either execute the request immediately or build the 'static Future to be awaited in a task.

Is it all right to return a ready Future if it can execute the request immediately? This would make the API simpler. Unless in this way it affects the performance.

schreter commented 2 years ago

Is it all right to return a ready Future if it can execute the request immediately? This would make the API simpler. Unless in this way it affects the performance.

That's true, but not sufficient.

The problem is as follows:

Instead of returning a double-future, it's also possible to call the API with a result receiver (which needs to be 'static). Then, instead of returning the result directly by the Future, the returned Future would be the one just preparing the state machine (or not - it could've computed the result directly) and the actual result will be sent later via the result receiver provided to the call.

This way, the API would be fairly simple and the "simple" implementations could just set the result on the receiver before returning, more complex ones would do it in a background task.

The receiver would effectively schedule a callback to the main loop via the mpsc channel (or whatever other primitive is used, I'm not sure).

I.e., something like:

async fn apply_to_state_machine(&mut self, entries: &[&Entry<D>], result_receiver: Runtime::ResultReceiver<R>);

where ResultReceiver would have something like:

fn send_result(&self, index: u64, result: Result<R, StorageError>);

A simple implementation would implement it then like this:

async fn apply_to_state_machine(&mut self, entries: &[&Entry<D>], result_receiver: Runtime::ResultReceiver<R>)
{
    for entry in entries {
        result_receiver.send_result(entry.log_id.index, internal_state.apply(entry).await);
    }
}

I hope it is clearer now.