Open sujit-baniya opened 3 years ago
Need some hint if you can
type AutoConfig struct {
Context context.Context
FuncMap FuncMap
Options []Option
}
type Auto struct {
ctx context.Context
processor func(nodec <-chan *node, wg *sync.WaitGroup, fnc chan<- FuncResult)
fm FuncMap
opts *Options
concurrency int
ctxErrc <-chan error
}
func (auto *Auto) Process(in interface{}) (<-chan FuncResult, error) {
graph, err := funcsGraph(auto.fm, []interface{}{in})
if err != nil {
return nil, err
}
nodec, err := genSortedNodes(graph)
if err != nil {
return nil, err
}
numGoroutines := goroutines(auto.opts, len(auto.fm))
var wg sync.WaitGroup
wg.Add(numGoroutines)
fnc := make(chan FuncResult, len(auto.fm))
for i := 0; i < numGoroutines; i++ {
go auto.processor(nodec, &wg, fnc)
}
go func() {
wg.Wait()
select {
case err := <-auto.ctxErrc:
fnc <- FuncResult{err: err}
default:
}
close(fnc)
}()
return fnc, nil
}
func New(cfg AutoConfig) *Auto {
auto := &Auto{
ctx: cfg.Context,
opts: newOptions(cfg.Options...),
fm: cfg.FuncMap,
}
ctxErrc, notifyErrOnce := ctxErr()
auto.ctxErrc = ctxErrc
auto.processor = func(nodec <-chan *node, wg *sync.WaitGroup, fnc chan<- FuncResult) {
defer wg.Done()
atomic.AddInt32(&goroutinesCount, +1)
for node := range nodec {
select {
case <-auto.ctx.Done():
notifyErrOnce(auto.ctx)
return
default:
}
execNodeFunc(auto.ctx, node, fnc, notifyErrOnce)
}
}
return auto
}
Above is working fine with this:
const jsonStr = `{"greet": "hello", "name" : "gopher"}`
config := async.AutoConfig{
Context: context.Background(),
FuncMap: async.FuncMap{
"reader": strings.NewReader,
"decoder": async.DependsOn("reader").To(json.NewDecoder),
"parser": async.DependsOn("decoder").To(func(d *json.Decoder) (msg, error) {
var m msg
err := d.Decode(&m)
return m, err
}),
"greet": async.DependsOn("parser").To(func(m msg) string {
return strings.Title(m.Greet)
}),
"name": async.DependsOn("parser").To(func(m msg) string {
return strings.Title(m.Name)
}),
"greeter": async.DependsOn("greet", "name").To(func(greet, name string) string {
return fmt.Sprintf("%s, %s!", greet, name)
}),
},
}
auto := async.New(config)
fnc, err := auto.Process(jsonStr)
if err != nil {
log.Fatal(err)
}
for fn := range fnc {
res, err := fn.Returned()
if err != nil {
log.Fatal(err)
}
fmt.Println(res[0]) // Hello, Gopher!
}
Can you please suggest how can I move these two process from Process to NewAuto
function
graph, err := funcsGraph(auto.fm, []interface{}{in})
if err != nil {
return nil, err
}
nodec, err := genSortedNodes(graph)
if err != nil {
return nil, err
}
@sujit-baniya sorry for the (almost a month) late response.
I don't know if I got what you mean by moving the graph and its sorting algorithm to NewAuto
. The NewAuto
is a custom constructor you're creating?
Hi @rdleal Thank god you saw issue :)
Currently different inputs, the tool requires to create new graph for each input. This might cause some problem in performance. In ideal scenario,
as a user, I would create a graph once and run it through multiple inputs as required.
I've implemented it in a way I wanted but not sure if it's the best way to do it.
https://gist.github.com/sujit-baniya/c8886c1de3fa2ed8f11e1f652ea91a57
It seems you created a sort of cache of the graph in the initialNode
attribute of the Auto
struct. Did that modification addressed your performance issue?
Yes. At least I don't need to create graph for each input :) Maybe you could suggest any other better way to do the same
Nice! Firstly thanks for using the code as a reference for solving your problem.
You brought a nice use case for improving the performance of the package like calling Auto
multiple times with the same graph. Caching the graph is a good solution!
Do you have some benchmarks about the performance hit?
I'll write some benchmarks myself in order to check the performance hit and figure out a way to solve them in the package.
@rdleal not much difference
cpu: Intel(R) Core(TM) i7-9750H CPU @ 2.60GHz
BenchmarkCachedFlow-12 33320 35259 ns/op
BenchmarkNonCachedFlow-12 30145 40004 ns/op
PASS
ok 3.994s
Hi,
Currently when sending multiple payload in
async.Auto
mode, for each payload, I need to createasync.Auto
. So trying to understand if there's any way we could do something like this.