nevalang / neva

🌊 Dataflow programming language with static types and implicit parallelism. Compiles to native code and Go
https://nevalang.org
MIT License
91 stars 7 forks source link

Runtime Algorithm #677

Closed emil14 closed 2 months ago

emil14 commented 3 months ago

TLDR: we need algorithm that is race-free and deadlock-free. The idea is to create N queues for each fan-in occurrence. Each sender that is involved into >=1 fan-in's must send data to those queues. Requirements are

Further I will describe the implementation.

Compiler Part (Irgen)

So now we have understanding which senders must not send to receivers directly (to avoid race-condition) but instead must send to shared queue.

Runtime Part

With this algorithm we have guarantee that if two sends sent messages to the same receiver, receiver will receive them in exact order as they were sent. This is thanks to shared queue which works as a serialized broker.

Buffer size of the queue

Shared queue could have buffer size dependent on both senders and receivers count

Senders count matters because the more senders we have, the higher chance of blocking. With more senders queue might become full faster.

Receiver count matters because messages are processed sequentially which means second message waits first to be received by all receivers. This is a requirement because we must preserve ordering. It also means that the latency of a broadcast equals latency of all receivers.

Buffer size of a receiver channel (the "slow receiver" problem)

Latency of a receiver not only affects senders (fast sender must wait for slow receivers) butalso receivers themselves - fast receiver must wait its slow "neighbors" to receive next message. This is because of a broadcasting algorithm (which is queue-based round-robin). This problem must be solved without spawning more broadcast goroutines because we might get race-condition this way.

Not perfect (we still have possibility of block) but possible solution must be adding buffers to receiver port channels. Buffer size can depend of number of senders because there's nothing more to depend on. In real life buffer size must depend on throughput of sender and receiver (at their ratio) but we don't know that (unless without making language more complex). That especially might be enough with combination of #671

emil14 commented 3 months ago

The Need For This Design (problem with simple variant with single queue)

UPDATE: turns out design in #670 is incorrect and we have to to move to something more complex (perhaps to smt like this)

Because all senders share the same queue the deadlock might appear - sender S1 sends to receiver R1 but R1 is busy trying to sends it's output data to the queue, which is impossible because queue is busy trying to serve S1

I was thinking about more simple ways like this - let each sender has it's own channel. Then let there be a one single goroutine with giant select over all senders (code generation or reflection). Turns out it's not race-free design - imagine S1 and S2 both blocked because select-goroutine is busy (processing previous request, buffer is full).

I would like to note that increasing a buffer might defer the problem, reduce chances of it's probability. But no buffer (except infinite out) will truly solve it.

emil14 commented 3 months ago

Problem with this design

If we have these 2 conditions at the same time

  1. Fan-in
  2. Loop

Then we might have a deadlock again

Screenshot 2024-06-20 at 13 19 46

N2 blocks while sending because nobody can receive. N2 is the one who should receive but it's busy trying to send


Possible solution could be to have a "proxy" channel to outport in this case. Which however mean that we might have a "race" (out-of-order-delivery) again

emil14 commented 3 months ago

Very Raw Idea

Let's assume that output ports are Go channels (as in the original design), but we don't have separate goroutines for fan-out connections. This means there are no situations where goroutines write to the same receiver and compete.

Instead, for each receiver, there is a separate goroutine with an infinite loop and a select statement.

In the select statement, we read from all sender-output ports, and whoever writes first gets processed first.

A race condition can occur, however, if two or more senders manage to write before the receiver can read.


Example

  1. s1 writes, we read it and attempt to write to r1, but get blocked because r1 is busy.
  2. We don't proceed to the next loop iteration because we are waiting for r1 to become available.
  3. In the meantime, two events occur: first s2 attempts to write (and gets blocked), then s3 faces the same fate.
  4. Meanwhile, r1 becomes available and reads from it, allowing us to proceed to the next iteration.

This is where the race condition occurs - in the next iteration, the select statement randomly chooses between s2 and s3.


Solution

To solve this, we need to defeat randomness by somehow understanding the order in which the senders became ready. We need a way to prioritize.

Since the problem arises when there are N >= 2 messages, we can read these N messages, sort them, and send them in the correct order.

Of course, we need the queueItem to have some sort of sequence number, which should be incremented by the sender when it sends a message.


for {
  q = SortableQueue{}
  select {
  case msg := <-s1:
    q.Push(msg)
  case msg := <-s2:
    q.Push(msg)
  case receiver <- <-q.Pop():
  }
}

SortableQueue {
  ch chan Msg
  Push(msg)
  Pop() chan Message
}
emil14 commented 2 months ago

After merge of #670 we gonna close this. We still have problems with out-of-order delivery but we better describe them separately.