trustmaster / goflow

Flow-based and dataflow programming library for Go (golang)
MIT License
1.61k stars 125 forks source link

Not getting all output, and output order not preserved #4

Closed samuell closed 11 years ago

samuell commented 11 years ago

Hello Vladimir!

Don't know if this is merely support request, or a possible bug ...

I'm getting some problems with the following code: https://gist.github.com/samuell/6164115 ... in combination with a few simple components from https://github.com/samuell/blow

... and using this file as input: https://gist.github.com/samuell/6164135 (just has to lie in the same folder), which is here of a very simple format, to make it easy to spot errors: One line "AAAA...", and the other line "CCC..." and so on, for 99 lines.

The program is supposed to simply convert 'A' -> 'T', 'T' -> 'A', 'C' -> 'G' and 'G' -> 'C' (To get the "complementary" DNA base sequence), back and fourth 4 times, which should finally give back the same result as the input file.

I get very strange results.

Problem nr 1, I sometimes don't get all the output back. When I run the compiled program and pipes it directly to "wc", I DO get 99 lines, which is the same as the input file:

[samuel basecompl]$ ./basecompl_blow|wc -l
99

But if I pipe the output to a separate file, and then counts the lines, I get a varying number of lines of output ... like 63, 73, 88, etc. Seems like the output printing does not have time to finish before the pipe is closed?

[samuel basecompl]$ ./basecompl_blow > out.txt 
[samuel basecompl]$ wc -l out.txt 
68 out.txt

Problem nr 2 is that the output is not in the same order as the input. While the input looks like:

AAA...
CCC...
AAA...
CCC...
... and so on ..

.. the output has changed the order of the lines, in an irregular way. See here for an example: https://gist.github.com/samuell/6164186

samuell commented 11 years ago

Will try to do some closer debugging myself as well, but have tested a few things that don't work out (Setting number of threads to 1 solves it, but of course isn't a solution :) ), so wanted to report, in case you have some advice off hand.

trustmaster commented 11 years ago

Hi Samuel,

The problem nr 1 really seems like the output is closed before the network has finished its job. In the Greeter example just a finish channel to solve this problem. In your case it might need a bit more debugging and there is a probability that the network shutdown code is buggy. It is currently implemented this way: each component closes its output ports and shuts down after all of its input ports have been closed, the network terminates after all of its contained processes have terminated.

The problem nr 2 is much easier to understand. GoFlow acts in a non-deterministic asynchronous way. Each component is not a single-threaded state machine, it is a multi-threaded stateless machine which processes each input value in a separate goroutine. I know it may look strange at first because when you pass "ABC" sequence to a component, you expect to receive "F(A),F(B),F(C)" on the output, while with GoFlow it is not necessarily so. But on the other hand, if you make a bit more complex network with several parallel branches, with each component running in its own thread, the order is going to be non-deterministic anyway.

The fact that components are asynchronous enforces you to stop thinking in a synchronous way from the very beginning, which helps to avoid synchronization problems when the system gets a bit more complicated. The question is: how do you match inputs with the outputs then? Use the power of FBP: packets. Send the input identifier along with the output and match inputs with the outputs after processing is done. It's like with MapReduce: if you need the results in a specific order, you sort them upon reduce.

samuell commented 11 years ago

@trustmaster Many thanks for the feedback!

Yea, will have to debug problem 1 more.

With problem 2, Ok, good to know! I was just naively assuming that communication was set up in a first-in-first-out principle, but I should of course adjust my approach to how the library is design.

Anyway, many thanks for sharing this great library, and will continue to see what use I can make of it! :)

samuell commented 11 years ago

What I'm up to so far, regarding the problem of incomplete output, is to try to add this statement:

diff --git a/component.go b/component.go
index dc37e92..79ef934 100644
--- a/component.go
+++ b/component.go
@@ -95,7 +95,9 @@ func RunProc(c interface{}) bool {
                                if hasLock {
                                    locker.Unlock()
                                }
-                           }
+                           } else {
+                               println("Channel not ok, but not closed!")
+                           }j
                            inputsClose.Done()
                            return
                        }

... and when I run it, I get:

[samuel basecompl_blow]$ ./basecompl_blow > temp.out
Channel not ok, but not closed!
Channel not ok, but not closed!
Channel not ok, but not closed!
Channel not ok, but not closed!
Channel not ok, but not closed!
Channel not ok, but not closed!

... is this expected?

samuell commented 11 years ago

@trustmaster, with this change, I get all the output, and also almost the correct order of the output! ... so almost both of the issues reported were fixed! (only in one place, it seems something is shifted out of order):

diff --git a/component.go b/component.go
index dc37e92..75d6eec 100644
--- a/component.go
+++ b/component.go
@@ -102,17 +102,15 @@ func RunProc(c interface{}) bool {
                        if hasRecv {
                            // Call the receival handler for this channel
                            handlersDone.Add(1)
-                           go func() {
-                               if hasLock {
-                                   locker.Lock()
-                               }
-                               valArr := [1]reflect.Value{val}
-                               onRecv.Call(valArr[:])
-                               if hasLock {
-                                   locker.Unlock()
-                               }
-                               handlersDone.Done()
-                           }()
+                           if hasLock {
+                               locker.Lock()
+                           }
+                           valArr := [1]reflect.Value{val}
+                           onRecv.Call(valArr[:])
+                           if hasLock {
+                               locker.Unlock()
+                           }
+                           handlersDone.Done()
                        }
                    }
                }()

... so now, when I run, it looks like:

[samuel basecompl_blow]$ go build basecompl_blow.go;./basecompl_blow|wc -l
99

... and the content of out.tmp looks like this: https://gist.github.com/samuell/6198013 (See the switch of order in the top ... this tends to happen at different places though, sometimes in the middle of the file, etc).

samuell commented 11 years ago

BTW, correct order is actually, AFAIS, in the "specification" of FBP. On [1], J.P.M mentions: " Processes are connected by means of FIFO (first-in, first-out) queues or connections which can hold up to some maximum number of IPs (called a queue's "capacity")"

... so for defining a serial (or "linear", or "1 dimensional") "network" (if you can call that a network), with the structure of:

input->basecomplementer1->basecomplementer2->basecomplementer3->basecomplementer4->printer

... as in my case, then, AFAIS, the FIFO principle should garantuee that correct order is kept. Isn't it?

[1] http://www.jpaulmorrison.com/fbp/concepts.shtml

trustmaster commented 11 years ago

I've debugged basecompl_blow example a little and got some rather interesting results. E.g. if I put a simple counter into the Printer and make it log onto the console (right after fmt.Println(string(line))), I see that it prints all the 99 lines. However, the out.txt file contains e.g. just 88. So it has something to do with how standard output to a file works. At first I thought that we have just a mere data race here, but it probably isn't.

I'll try to debug it more to see where exactly it gets wrong.

That patch you made just makes all components synchronous. Yes, pure FBP means that. GoFlow is not pure FBP, it is a mixture of FBP and Actor model built on top of Go's CSP. FBP components are synchronous, GoFlow components are asynchronous. FBP can't deal with Unbounded nondeterminism while GoFlow can. It's my scientific experiment :)

samuell commented 11 years ago

On 08/10/2013 12:29 PM, Vladimir Sibirov wrote:

I've debugged basecompl_blow example a little and got some rather interesting results. E.g. if I put a simple counter into the Printer and make it log onto the console (right after |fmt.Println(string(line))|), I see that it prints all the 99 lines. However, the out.txt file contains e.g. just 88.

Hmm, that's interesting! Will try it out as well ... many thanks for testing!

So it has something to do with how standard output to a file works. At first I thought that we have just a mere data race here, but it probably isn't.

I'll try to debug it more to see where exactly it gets wrong.

That patch you made just makes all components synchronous. Yes, pure FBP means that. GoFlow is not pure FBP, it is a mixture of FBP and Actor model built on top of Go's CSP. FBP components are synchronous, GoFlow components are asynchronous. FBP can't deal with Unbounded nondeterminism http://en.wikipedia.org/wiki/Unbounded_nondeterminism while GoFlow can. It's my scientific experiment :)

Ok! (Will check the link) :)

samuell commented 11 years ago

But, would it not be enough to just run every process in a separate go-routine? ... that is, to execute the RunNet() and RunProc() on line 309 and 311 in network.go, with the go keyword? ... like this:

diff --git a/network.go b/network.go
index e777ae2..0cb1151 100644
--- a/network.go
+++ b/network.go
@@ -306,9 +306,9 @@ func (n *Graph) run() {
        // Check if it is a net or proc
        r := reflect.ValueOf(v).Elem()
        if r.FieldByName("Graph").IsValid() {
-           RunNet(v)
+           go RunNet(v)
        } else {
-           RunProc(v)
+           go RunProc(v)
        }
    }
    // Wait for all processes to terminate

I get same results as before, when I add this change.

With the go-routine in the event loop in component.go (which I removed), I get the impression that it is not guaranteed to finish, before the inputs are closed down etc? ... that is why I removed that goroutine call ... (or am I missing something there?).

I was trying to just ad a finish channel, and do a finish <- true inside it, but never got it to work, so just removed it.

trustmaster commented 11 years ago

I have found the problem. It is a race for output stream in the Printer. GoFlow has a Thread-safety facility for such cases. I've opened a pull request with a patch to a Printer. In the basecompl_blow.go you need to replace

network.Add(new(blow.Printer), "printer")

with

network.Add(blow.NewPrinter(), "printer")
samuell commented 11 years ago

@trustmaster Ah, super, thanks a bunch ... will try out ...

samuell commented 11 years ago

@trustmaster yes, works perfectly! :+1: :) :) :)