Closed tehmoon closed 4 years ago
This is a PoC
package main
import (
"log"
)
type Module interface{
Init() (error)
}
const (
MessageTypeReset MessageType = iota
MessageTypeData
MessageTypeClose
)
type MessageType int
type Message struct{
Type MessageType
Payload []byte
}
type GenerateText struct {
in chan *Message
out chan *Message
}
func (m *GenerateText) Init() (error) {
go func(module *GenerateText) {
words := []string{"Hello", "golang", "boston", "meetup",}
for message := range module.in {
switch message.Type {
case MessageTypeReset:
module.out <- &Message{Type: MessageTypeReset,}
for _, word := range words {
module.out <- &Message{Type: MessageTypeData, Payload: []byte(word),}
}
module.out <- &Message{Type: MessageTypeClose,}
default:
module.out <- message
}
}
close(module.out)
}(m)
return nil
}
func NewGenerateText(in, out chan *Message) (module *GenerateText) {
return &GenerateText{
in: in,
out: out,
}
}
type Stdout struct {
in chan *Message
out chan *Message
}
func (m *Stdout) Init() (error) {
go func(module *Stdout) {
for message := range module.in {
switch message.Type {
case MessageTypeData:
log.Println(string(message.Payload[:]))
default:
module.out <- message
}
}
close(module.out)
}(m)
return nil
}
func NewStdout(in, out chan *Message) (module *Stdout) {
return &Stdout{
in: in,
out: out,
}
}
func main() {
in := make(chan *Message, 5)
out := make(chan *Message, 5)
modc := make(chan *Message, 5)
mod1 := NewGenerateText(in, modc)
err := mod1.Init()
if err != nil {
panic(err)
}
mod2 := NewStdout(modc, out)
err = mod2.Init()
if err != nil {
panic(err)
}
in <- &Message{Type: MessageTypeReset,}
isclosed := false
LOOP: for message := range out {
switch message.Type {
case MessageTypeReset:
case MessageTypeClose:
if isclosed {
close(in)
break LOOP
}
isclosed = true
in <- message
default:
in <- message
}
}
if !isclosed {
close(in)
}
}
All the channels are directly patch together and there is only the Init()
function in the interface.
Waiting is done after receiving a Close
message type.
The last loop controls the pipeline, it will make sure it waits for a second Close
message to start closing the channel.
Closing the channel is the LAST operation, after that it is not possible to go back anymore.
PR https://github.com/tehmoon/cryptocli/pull/30 opened still WIP
Done!
The downside of the current design is the fact that is one stream oriented. There is no mechanism of restarting the pipeline or signals other than closing the output channel.
So as part as the new design:
Message
structure like:Wait()
orInit()
, because the pipeline will implement the state machine on both ends