trustmaster / goflow

Flow-based and dataflow programming library for Go (golang)
MIT License
1.6k stars 125 forks source link

Closing Component channels that are not explicitly marked as ports #72

Closed notmgsk closed 3 months ago

notmgsk commented 3 years ago

Somewhere under the hood, it seems as though goflow is calling close on any node in its graph if that node has a Out channel field, even if that port is not connected in the graph. For example,

package main

import (
    "log"

    "github.com/trustmaster/goflow"
)

type Receiver struct {
    name string
    In   <-chan []byte
    Out  chan<- []byte
}

func NewReceiver(name string) *Receiver {
    return &Receiver{name: name}
}

func (db *Receiver) Process() {
    for in := range db.In {
        log.Printf("Processing Receiver for node %s", db.name)
        db.Out <- in
    }
}

type DataBuffer struct {
    name string
    In   <-chan []byte
    Out  chan<- []byte
}

func NewDataBuffer(name string) *DataBuffer {
    return &DataBuffer{name: name}
}

func (db *DataBuffer) Process() {
    for in := range db.In {
        log.Printf("Processing for node %s: %v", db.name, in)
    }
}

func main() {
    g := goflow.NewGraph()

    g.Add("greeter", new(Receiver))
    g.Add("printer", new(DataBuffer))

    g.Connect("greeter", "Out", "printer", "In")

    g.MapInPort("In", "greeter", "In")

    in := make(chan []byte)

    g.SetInPort("In", in)

    wait := goflow.Run(g)

    in <- []byte{1, 0}

    close(in)

    <-wait
}

outputs

2009/11/10 23:00:00 Processing Receiver for node 
2009/11/10 23:00:00 Processing for node : [1 0]
panic: close of nil channel

~Changing the name of DataBuffer.Out to DataBuffer.LeaveMeAlone~ Making the field private (or removing it) fixes the issue.

I'm new to goflow (looks neat!) so maybe this is intended behaviour. From a newbie perspective it is unexpected.

trustmaster commented 3 months ago

This is intended behavior. If you want to hide a channel from GoFlow's graph, make it private.

Altenrion commented 2 weeks ago

@trustmaster correct me please if i'm wrong: goflow executor will close all channel ports that would not have their connections before starting the graph, so we should not care about closing such channels ourselves?

trustmaster commented 2 weeks ago

@Altenrion GoFlow only takes care of closing output ports that have channels attached to them when component is finished (not when started). If outputs are not attached to anything, it will not close them.

If your question is about inputs, e.g. what if you have multiple of them like

type Receiver struct {
    name string
    In   <-chan []byte
    Opt <-chan int
    Out  chan<- []byte
}

If e.g. Opt is not connected to anything, GoFlow runtime doesn't do anything about it. What matters is how your Process() handles such optional ports. An example would be


func (db *Receiver) Process() {
    // Default value
    opt := 0
    // Check if Opt is connected
    if db.Opt != nil {
        opt = <-db.Opt
    }
    for in := range db.In {
        log.Printf("Processing Receiver for node %s", db.name)
        // Do something with opt here
        db.Out <- in
    }
}
Altenrion commented 2 weeks ago

@trustmaster thanks for examples. I found that GoFlow closes output ports (attached with other side component port) when processing of component ends.

In my case i have "infinite" flows that starts & runs for a long period sequentially. So for handling "optional" channel ports i made a higher management wrapper, that wraps the component, and checks if in the flow scenario this exact port is not intended to be connected, than fire "portCloser(portname)" logic on each component, for ports that are marked as non attached.

This was a non trivial endeavour but what i came up with is like that:

type PortCloser interface {
    ClosePort(name string)
}

type Receiver struct {
    name string
    In   <-chan []byte
    Opt  <-chan int
    Out  chan<- []byte
}

func (r *Receiver) ClosePort(name string) {
    switch name {
    case "Opt":
        ch := make(chan int)
        close(ch)
        r.Opt = ch
    }
}

func (r *Receiver) Process() {
    //.....
}

Right now i'm thinking if this is optimal solution or not, and will be testing different cases inside Process() around this. Will definitely try your recommendation about db.Opt != nil.

trustmaster commented 2 weeks ago

@Altenrion I'm not sure I understand what you are trying to achieve. If you explain what your long-running component does and why you need to close the ports when running it would be easier for me.

In FBP, the "close all outputs on end" is a typical tactic for graceful shutdown of the graph. It does not work in 100% of cases though, and it is not intended for anything other than just graceful shutdown.