Closed tillrohrmann closed 1 year ago
It's still unclear to me who should own the segmented queue, if the invoker or arc'ed between invoker and partition processors. The former would allow the segmented queue to remain single thread (as it's implemented today), but would be practically useless for its goal, as the invoker could lag behind and get its input channels filled up before moving the data from the input channel to the queue. The latter solution probably makes more sense, but it gets as complex as building a mpsc channel as the ones in tokio. A third solution entails having a dedicated event loop that takes care of the segmented queue, although this probably opens up the discussion to further considerations, such as the additional overhead of another event loop.
WDYT? @tillrohrmann @igalshilman
I think that it would be enough for now, if indeed the invoker owns this queue and move any excess work from its input there. While it is true that with this configuration it will not help with ingest and invoker rate imbalance, it will help with invoker and concurrency limit imbalance.
So far I also thought that the invoker would own the segmented queue. Having a spillable unbounded queue (the segmented queu) + making the input channel of the invoker bounded, should prevent the partition processor from pushing too many requests into the channel so that we would OOM (maybe I missed the point why you think that this model would make it practically useless, if so could you elaborate @slinkydeveloper?).
maybe I missed the point why you think that this model would make it practically useless, if so could you elaborate @slinkydeveloper?
In the sense that we'll still have an unbounded in memory buffering before going to the segmented queue. But maybe what you said actually solves it: making that channel bounded.
In the sense that we'll still have an unbounded in memory buffering before going to the segmented queue. But maybe what you said actually solves it: making that channel bounded.
Where would this come from (excluding the in memory buffering of the consensus module for a second)?
Where would this come from (excluding the in memory buffering of the consensus module for a second)?
If we let the invoker own the segmented queue, we still need an in memory channel from the partition processor(s) to the invoker. If this channel is unbounded and we're overloaded, we might end up in a situation where we OOM before the invoker gets scheduled and spills this queue to disk. Does that make sense?
Ah ok. I think the queue between the partition processor and the invoker should be bounded, then.
@igalshilman i've stumbled on an issue which i believe is related to the queue state machine, you can easily reproduce it by cloning this branch https://github.com/slinkydeveloper/restate/tree/issues/285 and running the verification tests. You can see in the first log lines what is the tmp_dir
used for the segmented queue.
@slinkydeveloper, indded I'm able to reproduce via the verification test, but can't reproduce in isolation like a unit test or so. I wonder if we are using the queue indeed in a single threaded fashion.
Looking at the select loop of the invoker, it seems like we might try to enqueue
while the future for dequeing
is not yet fulfilled hence unblocks the loop, where it shouldn't.
Indeed, moving the dequeue outside of the select
solved this issue:
loop {
if let Some(invoke_input_command) = segmented_input_queue.dequeue().await {
state_machine_coordinator
.must_resolve_partition(invoke_input_command.partition)
.handle_invoke(
invoke_input_command.inner,
StartInvocationTaskArguments::new(
&client,
&journal_reader,
&service_endpoint_registry,
&retry_policy,
suspension_timeout,
response_abort_timeout,
message_size_warning,
message_size_limit,
&mut invocation_tasks,
&invocation_tasks_tx,
&mut retry_timers,
)
).await;
}
tokio::select! {
// --- Spillable queue loading/offloading
Some(invoke_input_command) = invoke_stream.next() => {
segmented_input_queue.enqueue(invoke_input_command).await
},
// --- Other commands (they don't go through the spillable queue)
Some(input_message) = other_input_rx.recv() => {
...
Don't know if this approach makes sense to you or not, but at least we know wsup.
Slightly off topic, but i saw that the default value for flushing is 1028
which is really really low, let's have that 4 million at least.
Don't know if this approach makes sense to you or not, but at least we know wsup.
Not sure this is logically correct though, in my mind the invoker loop should optimistically try to do one of the following:
All these if
s should be equally awaited on at the same time, hence the big select statement.
I wonder if we are using the queue indeed in a single threaded fashion.
It should, because we invoke its methods within the same event loop, so definitely there shouldn't be any multi-threading involved (it should not compile at all if that was the case). We concurrently wait on the select statement though, meaning you get your method polled many times.
From the way you describe the bug, it seems to me that there might be some hidden concurrency within the queue that breaks linearizability?
Why do you return None
rather than a pending future when there are no new elements?
From the way you describe the bug, it seems to me that there might be some hidden concurrency within the queue that breaks linearizability?
The queue is not a concurrent data structure, it is meant to be executed sequentially. The methods are async
for the non blocking file io (reading and writing the segmenets) but it is not designed to be used concurrently.
This means, for example, that it is not possible to issue two concurrent interleaving dequeues.
Perhaps there is a simple modification to make it concurrent, but i didn't think about it.
Not sure this is logically correct though, in my mind the invoker loop should optimistically try to do one of the following:
I think that this is logically correct, also keep in mind that enq and deq should rarely actually block the invoker, as most of the time this completes immediately.
Why do you return None rather than a pending future when there are no new elements?
Because it is not a concurrent data structure as of so far, if you'd like to restructure it in a different way, we can try and plan it.
I think that I was able to make it concurrent with few little modifications, so that it will fit exactly to the needs of the invoker. Lets connect offline and I will show you, and we can discuss how to go about this. If this works out, then will reflect the outcome of the discussion here.
We should investigate whether it makes sense to just use a temp dir for this problem