trustmaster / goflow

Flow-based and dataflow programming library for Go (golang)
MIT License
1.6k stars 125 forks source link

TestInternalConnectionIIP test panics randomly: IIP write to a closed channel #65

Closed akokhanovskyi closed 3 years ago

akokhanovskyi commented 3 years ago

TestInternalConnectionIIP panics when the IIP happens to be concurrently sent after the graph is already being shut down and the input channel is already closed.

=== RUN   TestInternalConnectionIIP
--- FAIL: TestInternalConnectionIIP (0.00s)
panic: send on closed channel

goroutine 142 [running]:
reflect.chansend(0xc0001ab140, 0xc000126388, 0x0, 0x569c7b)
    /usr/local/go/src/runtime/chan.go:665 +0x4b
reflect.Value.send(0x52bf00, 0xc0001ab140, 0x12, 0x52cb00, 0xc000126388, 0x82, 0x0, 0x596920)
    /usr/local/go/src/reflect/value.go:1524 +0x118
reflect.Value.Send(0x52bf00, 0xc0001ab140, 0x12, 0x52cb00, 0xc000126388, 0x82)
    /usr/local/go/src/reflect/value.go:1506 +0x90
github.com/trustmaster/goflow.(*Graph).sendIIPs.func1(0x52bf00, 0xc0001ab140, 0x12, 0x52cb00, 0xc000126388, 0x82, 0x52bf00)
    /home/ak/dev/go/src/github.com/trustmaster/goflow/graph_iip.go:78 +0x61
created by github.com/trustmaster/goflow.(*Graph).sendIIPs
    /home/ak/dev/go/src/github.com/trustmaster/goflow/graph_iip.go:77 +0xf3
exit status 2
FAIL    github.com/trustmaster/goflow   0.008s

The root cause of this issue boils down to the fact that goroutine A (spawned by sendIIPs()) attempts sending to a channel concurrently closed by goroutine B (executing echo component 1: e1). The chan close is triggered by closing the graph in port: close(in) in a goroutine spawned by TestInternalConnectionIIP().

There is no quick and simple fix for this since it's architecturally inaccurate to just close channels from one of multiple writing goroutines. There is a lot of material around this on the net, for example, good read here.

Thus, I'd like to open a conversation around this to get understanding of the requirements. For example:

  1. Other than IIPs, do we envision multiple writers to a single input channel in a graph?
  2. Why do we need IIPs to be sent concurrently rather than synchronously at the graph start?
trustmaster commented 3 years ago

Thanks for the bug report! I think I saw this issue before but couldn't capture it.

To answer your questions:

  1. Other than IIPs, do we envision multiple writers to a single input channel in a graph?

Yes, it is quite typical that multiple processes send packets to the same input. For example, there are producers A and B which produce some packets, and there is a consumer C which does something with them (e.g. just prints them on the screen). Both streams can be sent to the same C.In input, which works as a simple FIFO.

It is not so typical that the same input receives an IIP and normal packets. But in theory it can happen, that's why I added that test case.

  1. Why do we need IIPs to be sent concurrently rather than synchronously at the graph start?

This is a precaution because it is convenient to write processes like this:

// Reading IIPs
config := <-c.Config
setting := <-c.Setting

// Looping over data
for p := range c.In {

}

The problem with this code is that it reads IIPs in a specific order. If IIPs are sent synchronously and setting is sent before config, this network will deadlock. There are 2 workarounds for this: using buffered connections, or sending IIPs asynchronously. Buffered connections won't work in case the network already has a connection to the same inport (see question 1). That's why I went for concurrent sends.

trustmaster commented 3 years ago

shouldClose in https://github.com/trustmaster/goflow/blob/master/graph_iip.go#L69 is supposed to handle this case: if there was no connection on this port before sending the IIP, we assume that this is an IIP-only port and it is safe to be closed by the runtime. Otherwise we assume that the connection is already in use in the graph, and that it's going to be closed elsewhere.

akokhanovskyi commented 3 years ago

Thanks for your explanations!

  1. Can you help me understand the distinction between multiple processes writing to a single input channel, and chan slice/map ports? I had an impression that the latter were introduced to limit chan write to a single processor, but it looks like I got that wrong.

  2. Understood. Unfortunately, a concurrent IIP send causes a race condition: there is nothing there to guarantee that IIP will be the first one on the channel (assuming regular packets on the same inport, as in the panicking test).

shouldClose in https://github.com/trustmaster/goflow/blob/master/graph_iip.go#L69 is supposed to handle this case

Right, but the problem is another way round. It's not the runtime closing a channel and preventing a regular packet from being sent. It's the network shutdown happening before the IIP is sent asynchronously: so the shouldClose is fine, it just handles a different condition.

Do you think it would be outrageous to require that inports that receive IIPs were buffered? This seems like a cleaner solution to me. All IIPs are guaranteed and delivered to their inports synchronously before the network starts. The order of reading IIPs can be arbitrary. We can ensure that inport chans are buffered by attempting a non-blocking write and raising an error:

if !channel.TrySend(reflect.ValueOf(ip.data)) {
    return fmt.Errorf("could not send IIP")
}
trustmaster commented 3 years ago

Can you help me understand the distinction between multiple processes writing to a single input channel, and chan slice/map ports? I had an impression that the latter were introduced to limit chan write to a single processor, but it looks like I got that wrong.

Multiple processes writing to a single input port

The main use case for this feature is when there are multiple producers which produce similar data of the same type, and one consumer that reads this data in no particular order. In this case they share the same channel which acts as a simple FIFO queue.

A typical example of this is: processes s1, s2, and s3 are sensors which send some data e.g. temperature. Process avg calculates moving average. Outputs of all sensors can be connected to the same input of avg, so it will be calculating the moving average of all 3 sensors.

Array ports

Array ports are used when a receiver needs to know which sender sent a particular packet. This feature is needed for operations like map/reduce and routing.

One typical example is a reducer which listens to N connected workers and waits for all of them to complete before producing a result. This cannot be achieved with just a single channel, because it doesn't guarantee the one value per each worker semantics.

A more advanced use, especially for Map ports, is HTTP application router which routes arbitrary paths to specific handlers/controllers.

Right, but the problem is another way round. It's not the runtime closing a channel and preventing a regular packet from being sent. It's the network shutdown happening before the IIP is sent asynchronously: so the shouldClose is fine, it just handles a different condition.

Now I see what is going on there, thanks a lot for the explanation! Yeah, then actually using buffered channels like that is the best solution.

akokhanovskyi commented 3 years ago

Thanks again for your helpful explanations.

I have not yet quite wrapped my head around how to allow for multiple concurrent writers to a single channel in a safe way. My best bet right now would be to "decorate" the inport with an "invisible" fan-in to orchestrate the chan close whenever we detect a request to connect multiple processes to a single inport. The fan-in would only close the port chan once all inputs are closed. Do you have any better ideas?

Having a fan-in would also kind of fix the original problem, except I'm still inclined to rework the IIPs to be sent synchronously on the network start to maintain the delivery order. That's unfortunately not a one-liner, though.

trustmaster commented 3 years ago

Closing a channel that is used for fan-in is already solved for situation when the fan-in connections are added with Graph.Connect(). You can see incChanListenersCount and its references. decChanListenerCount is used to decrease a number of references when the network terminates.

I think the issue exists then only when IIPs or external (network) ports are in play. Adding reference counting for them could be a potential solution.

trustmaster commented 3 years ago

Check out the potential fix in #68. I can't test the regression, because the original issue was not happening on my machine with Go 1.15.0.

I'm still not sure whether to expose API for the reference counting. This test is rather synthetic and normally you wouldn't attach a channel via graph inport and also set an IIP on it.

akokhanovskyi commented 3 years ago

Thanks for this. I don't see incChanListenersCount() being used on the graph in ports, do you think it would be fair to assume that a similar problem may happen when you nest a subgraph and attach multiple writers to its in port?

I am experimenting to see if there is a solution that would:

  1. avoid writer synchronization altogether (so you don't have to expose or even use the refcount API)
  2. promote processes closing their outputs when no more data is expected, allowing for a more lean network shutdown.

I think this should be doable, since we are in full control of the graph wire-up, including edges, in&out ports, and IIPs - I'll share as soon as I have something useful.

trustmaster commented 3 years ago

Yes, I already mentioned that a similar problem exists for graph inports. But those are more tricky, because subgraphs may be nested. Consider an example when inport In of G1 is mapped to inport In of its subgraph G2, mapped to In of G3, etc. Technically, they all share the same channel, which should be closed once. If we do reference counting for SetInPort() like we do for sendIIPs() and Connect(), this nested subraph will have count = 3 for the input channel. But closing In of G1 should close the channel which "cuts through" layers of nesting.