trustmaster / goflow

Flow-based and dataflow programming library for Go (golang)
MIT License
1.61k stars 125 forks source link

enable use of go anycast channels (i.e. use multiple endpoints and let g... #23

Closed btittelbach closed 10 years ago

btittelbach commented 10 years ago

Hi,

Playing around with goflow, I discovered the example program would crash if I would connect two instances of Printer to Greeter. While that might seem nonsensical from a FBP viewpoint, it DOES make perfect sense from the language's point of view. In Go you are supposed to be able to have multiple goroutines receive on the same channel. Go will pseudo-randomly choose a ready goroutine to receive the next message. At the very least it avoids crashes.

Modified example program:

package main

import (
    "fmt"

    "./flow"
)

// A component that generates greetings
type Greeter struct {
    flow.Component               // component "superclass" embedded
    Name           <-chan string // input port
    Res            chan<- string // output port
}

// Reaction to a new name input
func (g *Greeter) OnName(name string) {
    greeting := fmt.Sprintf("Hello, %s!", name)
    // send the greeting to the output port
    g.Res <- greeting
}

func (g *Greeter) Init() {
    fmt.Println("Greeter Init", g)
}

// A component that prints its input on screen
type Printer struct {
    flow.Component
    Line <-chan string // inport
    n    int
}

// Prints a line when it gets it
func (p *Printer) OnLine(line string) {
    fmt.Println(p.n, line)
}

func (p *Printer) Init() {
    fmt.Println("Printer Init", p)
}

// Our greeting network
type GreetingApp struct {
    flow.Graph // graph "superclass" embedded
}

// Graph constructor and structure definition
func NewGreetingApp() *GreetingApp {
    n := new(GreetingApp) // creates the object in heap
    n.InitGraphState()    // allocates memory for the graph
    // Add processes to the network
    n.Add(new(Greeter), "greeter")
    n.Add(&Printer{n: 1}, "printer1")
    n.Add(&Printer{n: 2}, "printer2")
    // Connect them with a channel
    n.Connect("greeter", "Res", "printer1", "Line")
    n.Connect("greeter", "Res", "printer2", "Line")
    // Our net has 1 inport mapped to greeter.Name
    n.MapInPort("In", "greeter", "Name")
    return n
}

func main() {
    // Create the network
    net := NewGreetingApp()
    // We need a channel to talk to it
    in := make(chan string)
    net.SetInPort("In", in)
    // Run the net
    flow.RunNet(net)
    // Now we can send some names and see what happens
    in <- "John"
    in <- "Boris"
    in <- "Hanna"
    // Close the input to shut the network down
    close(in)
    // Wait until the app has done its job
    <-net.Wait()
}
btittelbach commented 10 years ago

Also added code for n-to-1 connections, since they are the even more common case in go and of course: tests.

trustmaster commented 10 years ago

This is a major addition. It has to be properly documented though, so there are no surprises in how multiple connections behave. For example, 1-to-N connections in NoFlo use "copy reference and send it to all receivers" semantics. I like this pseudo-random choice more, because it follows the FBP "single owner of the same packet at any time" rule.

Good job, @btittelbach!