defval / di

🛠 A full-featured dependency injection container for go programming language.
MIT License
232 stars 13 forks source link

concurrent map read and map write #22

Closed gfyrag closed 4 years ago

gfyrag commented 4 years ago

When calling "Invoke" function in multiple goroutine, it can lead to concurrent read and map write in this part of the code :

// check cycle verified
if !c.verified[param.Key()] {
    err := checkCycles(c, param)
    if err != nil {
        return err
    }
    c.verified[param.Key()] = true
}
defval commented 4 years ago

Hello @gfyrag. Thanks for your interest.

di has no goroutine safety. If you want concurrent usage, you should manage your own mutex for access to the container.

Relevant topics in uber-go/dig:

gfyrag commented 4 years ago

Ok, sorry i didn't see the issue. Why not integrate a mutex? Just when writing the value should be enough and with no cost after some cycles?

defval commented 4 years ago

@gfyrag Mutex affects not only the part of the code is responsible for checking cycles. It will affect the entire type resolving process.

di solves only the application initialization problems. And I don't recommend using this in runtime. This is the main reason making the not thread-safe library.

If you want to run some functions concurrently, you need to create one entry point function or type with a method like Run() that will manage your goroutines. And invoke it once.

gfyrag commented 4 years ago

Ok. My use case is to start runner function in go routines like this :

func fn1() {
    ....
}

func fn2() {
    ...
}

for _, fn := range []interface{} { fn1 fn2 } {
    go func(fn interface{}) {
        container.Invoke(fn)
    }(fn)
}

Go routines are started at the very beginning of the process and never stopped. So it seems to be a valid use case for me.

Use your solution complicate things but i understand the reasons.

Thks!

defval commented 4 years ago

This is how I solve this problem in my code using github.com/oklog/run:

Dispatcher

package run

import (
    "context"

    "github.com/oklog/run"
)

// An worker is a run actor. Each actor in the run group will be run concurrently.
type Worker interface {
    // Run runs the actor and must block it until done.
    Run(ctx context.Context) error
    // Stop gives a stop signal for the actor.
    Stop(err error)
}

// Dispatcher is a concurrent worker controller.
type Dispatcher struct {
    group   run.Group
    workers []Worker
}

// NewDispatcher creates new worker dispatcher.
func NewDispatcher(workers []Worker) *Dispatcher {
    return &Dispatcher{
        group:   run.Group{},
        workers: workers,
    }
}

// Run runs all concurrent workers.
func (d *Dispatcher) Run(ctx context.Context) error {
    for _, worker := range d.workers {
        w := worker
        wrappedRun := func() error { return w.Run(ctx) }
        d.group.Add(wrappedRun, worker.Stop)
    }
    return d.group.Run()
}

Stop signal handler

package run

import (
    "context"
    "os"
    "os/signal"
    "syscall"
)

type Shutdowner struct {
    stop chan os.Signal
}

func NewShutdowner() *Shutdowner {
    return &Shutdowner{
        stop: make(chan os.Signal),
    }
}

func (s *Shutdowner) Run(context.Context) error {
    signal.Notify(s.stop, syscall.SIGTERM, syscall.SIGINT)
    <-s.stop
    return nil
}

func (s *Shutdowner) Stop(err error) {
    close(s.stop)
}

And main.go code:


func main() {
    c, err := di.New(
        di.Provide(run.NewDispatcher),
        di.Provide(run.NewShutdowner, di.As(new(run.Worker))),
        di.Provide(mypkg.NewMyConcurrentWorker, di.As(new(run.Worker))),
        // other workers that will be run concurrently
    )

    if err != nil { 
       // handle error
    }

    var dispatcher *run.Dispatcher
    if err := c.Resolve(&dispatcher); err != nil { 
        // handle error
    }
    if err := dispatcher.Run(context.TODO()); err != nil { 
        // handle error
    }
}

@gfyrag Can I close the issue? =)

gfyrag commented 4 years ago

Yes. Thks for the advice, i will probably do a thing like this.