nevalang / neva

🌊 Flow-based programming language with static types and implicit parallelism. Compiles to native code and Go
https://nevalang.org
MIT License
85 stars 7 forks source link

Race-condition with fan-in pattern #644

Open emil14 opened 1 month ago

emil14 commented 1 month ago

Every time you have N>1 senders for 1 receiver you can have race-condition

The reason for that is the way runtime interprets the program

For each connection there is a separate goroutine. Goroutines are concurrent which means if we have 2 goroutines they compete for resources.

So, 2 goroutines await for signal from sender (those are different) to deliver a message to receiver (which is the same). Even though first sender sent first there's no guarantee Go scheduler will activate corresponding connection goroutine first. Sometimes it does, sometimes it doesn't.

Here's the example

import { lists }

const lst list<bool> = [true, false]

component Main(start) (stop) {
    nodes { lists.For{Printer} }
    :start -> ($lst -> for -> :stop)
}

component Printer(data bool) (sig any) {
    nodes { If, Println }
    :data -> if
    if:then -> ('true' -> println)
    if:else -> ('false' -> println)
    println -> :sig
}

It's clear that true must be printed first. Yet it's not always the case.

This behaviour considered a bug that is caused by incorrect implementation. When there will be a language spec we will describe correct behaviour and each implementor will follow it.

We need to serialize message passing for connections that follow fan-in pattern.


This is root-cause of #575 and #635

emil14 commented 1 month ago

Shared Queue As a Serialization Mechanism

Basically each solution we can get will be serialization for connections with shared receivers. To do so, we need to get rid of concurrency for them.

The most simple way to do is this:

  1. Get rid of goroutine-per connection
  2. Implement queue and share it across all senders
  3. When sender sends to a queue, there is a (one) goroutine that reads from it and does broadcasting
connect() {
    for msg := range q {
        broadcast(msg)
    }
}

However, there's a couple of problems with this solution...

Problems

  1. It's a bottleneck, each sender sends to the same queue. Slow receiver will slow down fast senders and other receivers.
  2. Much bigger problem (not performance, but correctness) - how to handle intermediate connections?

Performance Problem (Making the bottleneck wider)

There's basically 3 ways to improve performance of this solution

  1. Add buffer to queue
  2. Add buffers to receiver channels
  3. Use several queues instead of one

It's easy to do first two but last is a bit tricky. The idea is to have queue for each fan-in pattern. I.e. each time we have N senders sharing the same receiver - they need a shared queue. Their messages needs to be serialized.

Intermediate Connections Handling (Fixing Correctness)

As soon as we got rid of "goroutine per connection" we created a problem. Let's look at broadcast() function

broadcast(msg) {
    for r := range receivers {
        r <- msg
    }
}

The problem is with this line r <- msg - who gonna receive it?

It's not a problem if we have some runtime function that reads from this port but what if this it's intermediate connection_?

By "intermediate" connection I mean connection that is exist because there was a user-defined component.

Let's take a look at these 2 versions:

component Main(start) (stop) {
    :start -> printer -> :stop
}

component Printer(data) (sig) {
    nodes { Println }
    :data -> println -> :sig
}

// VS

component Main(start) (stop) {
    nodes { Println }
    :start -> println -> :stop
}

They are equal by functionality but the first one gonna have more ports and connections (channels and gorotuines) - more message passing (more synchronization). Which is bad BTW.

This could be solved by Optimization step (whether that source-code lvl optimizer or IR optimizer, doesn't matter). However, we don't have optimizer how and it's not clear whether any program could be optimized up to the point where it doesn't have "intermediate connections".

So, the thing is after rewriting runtime will handle later but not the first. Because we don't have goroutine that could transfer our message from printer:data -> println:data more. So nobody reads from printer/in:data[0] these days. We gonna block. And no buffer will help us.

Solution

There's 2 ways to solve it

  1. implement optimizer that would get rid of all intermediate connections
  2. support intermediate connections at the level of runtime

The answer is... We need both! And we need to start from the second one.

The reason is that optimisation should be optimisation_. We should not (probably) think of slow programs as of incorrect ones (even tho I personally would love to).

Second, it's not clear whether we can optimize any program, as been said, this way. Is any program could be 100% inlined into one giant Main? I'm not sure (even tho I personally would love to!).

So we need to support this. How?

connect(prog) {
    for item := range q {
        broadcast(item, prog)
    }
}

broadcast(item, prog) {
    finalReceivers := getFinalReceivers(item, prog)
    for finalReceiver := range finalReceivers {
        finalReceiver <- item.Msg
    }
}

I didn't describe getFinalReceivers but you get the idea. We need to traverse the graph until the leaf. Which is a runtime function.

As an optimisation for this instead of could make runtime remember addr of the leaf once it found so we don't have to do it all the time on each msg.

That... feels like that optimization we talked about before right? I know it's weird. But I feel like runtime should be able to work with this. We should not just push it fully onto compiler's shoulders. Why? Debugging

A note on -debug (and tracing)

I think we should log events each time we "unwrap" intermediate step. Ofc it's weird to log all 3 levels sequentially but what can we do? Yep it's gonna be sent, pending, received even tho nothing actually happened.

This also related to #347 and #94

Other option could ofc not to log them and be ready that you won't see the whole picture in logs. This feels bad and especially for message traces that going to be implemented someday.

Other Option

Other option would be to append message back to the queue (to the tail) but with different receiver. That could be either final one (leaf/runtimeFunc) or just one next level. However there's not much sense in that. We gonna do >=1 extra sending to queue ->chan and that's less performant.

Log/trace thing feels like more sense if we send to q chan but why? I don't know.

emil14 commented 1 month ago

IRGen generates intermediate-step-less programs

This is improvement of https://github.com/nevalang/neva/issues/644#issuecomment-2118988464

  1. We wanna do as less as possible at runtime
  2. We ready to do as much as possible at compiler
  3. We need be able to easily debug our programs

How can we have all three? Ok look

Do not find leafs at Runtime

Find them at compile time. Probably that would be irgen but I'm not sure. IR Connection gonna me smt like

type Connection struct {
    Sender PortAddr
    Receiver []Receiver
}

type ReceiverSide struct {
    PortAddr PortAddr // final
    Transit []PortAddr // intermediate (just to log)
}

This way we don't have to traverse the graph at runtime (even at first sending to remember it for later) but still have metadata we need to do the even listener calls we need

emil14 commented 1 month ago

Graph Reduction Algorithm In IR

This is actual solution for https://github.com/nevalang/neva/issues/644#issuecomment-2118998148

Idea is to add extra step to irgen that will do graph reduction in a way that all intermediate connections (and related ports) are removed from graph.

It's not the most efficient (probably possible to do in one go) but good enough

(ports, net) {
    intermediatePorts = {}
    resultNet = []

    for conn in net {
        // possible in theory to `if intermediatePort.has(conn.sender) { continue }`

        receivers = []

        for receiver in conn.receivers {
            if !isIntermediate(receiver) {
                receivers = [...receivers, receiver]
                continue
            }

            intermediatePort.add(receiver)
            finalReceivers = getFinalReceivers(receiver) // one intermediate receiver could lead to multiple final receivers
            receivers = [...receivers, ...finalReceivers]
        }

        conn = { conn.sender, receivers }
        result = [...result, conn]
    }

    // it's possible to leave ports as is but we can do better
    // by removing intermediate ports cuz there're no connections to them
    resultPorts = {}

    // resultNet only contains connections with final receivers
    // but some of them has senders that are intermediate resultPorts
    // we need to remove these connections and ports for those nodes
    for conn in resultNet {
        if !intermediatePort.has(conn.sender) { // itermidiate port is always receiver for parent and sender for child
            continue
        }

        // basically just add ports for this connection

        resultPorts = {
            ...resultPorts,
            conn.sender
        }

        for receiver in conn.receivers {
            resultPorts = {
                ...resultPorts,
                conn.sender
            }
        }
    }
}

func isIntermediate(sender) {
    // here we check if theres a receiver with exact same port
    // as given sender (connections should be map for that)
    // (we should not depend on if recevier is runtime function
    // because there's :stop in Main that is both not intermediate nor runtime function
    // and there might (not sure) be other cases of that)
}

This way we'll have graph where basically bunch of runtime functions does message passing via queue (serialized pub-sub basically) and we will have no race conditions. You can see highlighted ports on this picture, they will be removed.

2024-05-30 01 16 59

And this is how this

component Main(start) (stop) {
    :start -> printer -> :stop
}

component Printer(data) (sig) {
    nodes { Println }
    :data -> println -> :sig
}

Becomes this

component Main(start) (stop) {
    nodes { Println }
    :start -> println -> :stop
}

We don't need a goroutine to do intermediate step anymore. By not having several goroutines sending to same receiver we avoid race condition.

emil14 commented 4 weeks ago
let fanInMap = {
  r1: ["s1", "s2"],
  r2: ["s1", "s3"]
};

let fanOutMap = {
  s1: ["r1", "r2"],
  s2: ["r1"],
  s3: ["r2"],
};
emil14 commented 1 week ago

Related to #677