BruceChen7 / gitblog

My blog
6 stars 1 forks source link

goroutine使用 #43

Open BruceChen7 opened 1 year ago

BruceChen7 commented 1 year ago

资料

并发和并行

通过内存共享进行通讯与通过通讯来共享内存。

同步

初始化

Goroutine creation

Locks

Once

如何利用 select 和 channel 来表达优先级

for {
    select {
        case v := <-messageCh:
            fmt.Println(v)
        case <-disconnectCh:
            fmt.Println("disconnection, return")
        return
    }
}

for i := 0; i < 10; i++ {
    messageCh <- i
}
disconnectCh <- struct{}{}

// 执行的时候,可能出现如下的结果
0
1
2
3
4
disconnection, return

实现优先级

for {
    select {
        case v := <-messageCh:
            fmt.Println(v)
        case <-disconnectCh:
            for {
                select {
                    case v := <-messageCh:
                        fmt.Println(v)
                    default:
                        fmt.Println("disconnection, return")
                    return
                }
            }
        }
    }
}

如何合并两个 read channel,返回一个 channel

func merge(ch1, ch2 <-chan int) <-chan int {
    ch := make(chan int, 1)
    go func() {
    for {
        select {
            case v := <-ch1:
                ch <- v
            case v := <-ch2:
                ch <- v
        }
    }
    close(ch)
}()
    return ch
}

closed channel nevel blocks

package main

import "fmt"

func main() {
    ch := make(chan bool, 2)
    ch <- true
    ch <- true
    close(ch)

    for i := 0; i < cap(ch) +1 ; i++ {
    v, ok := <- ch
    fmt.Println(v, ok)
    }
}

// 返回
// true true
// true true
// false false  第一个 false 是指在 channel 中的 value 类型的零值,也就是 false,第二个 false 是指 channel 的状态,

利用 close(chan) 来关闭

package main

import (
    "fmt"
    "sync"
    "time"
)

func main() {
    finish := make(chan struct{})
    var done sync.WaitGroup
    done.Add(1)
    go func() {
        select {
        case <-time.After(1 * time.Hour):
        case <-finish:
        }
        done.Done()
    }()
    t0 := time.Now()
    close(finish)
    done.Wait()
    fmt.Printf("Waited %v for goroutine to stop\n", time.Since(t0))
}

A nil channel always block

优雅退出的实现

// not gracefully
package main

import (
    "fmt"
    "time"
)

type Task struct {
    ticker *time.Ticker
}

func (t *Task) Run() {
    for {
        select {
        case <-t.ticker.C:
            handle()
        }
    }
}

func handle() {
    for i := 0; i < 5; i++ {
        fmt.Print("#")
        time.Sleep(time.Millisecond * 200)
    }
    fmt.Println()
}

func main() {
    task := &Task{
        ticker: time.NewTicker(time.Second * 2),
    }
    task.Run()
}
//
// $ go run main.go
// #####
// ###^Csignal: interrupt

// Method 1: use goroutine to exit gracefully
func main() {
    task := &Task{
        ticker: time.NewTicker(time.Second * 2),
    }

    c := make(chan os.Signal)
    signal.Notify(c, os.Interrupt)

    go func() {
        select {
        case sig := <-c:
            fmt.Printf("Got %s signal. Aborting...\n", sig)
            os.Exit(1)
        }
    }()

    task.Run()
}

// Another way but more graceful
// Add a channel to Task
// It works becacuse the task is running in the main goroutine
// So what if the task is running in an ordinary goroutine???
//
type Task struct {
    closed chan struct{}
    ticker *time.Ticker
}

func (t *Task) Run() {
    for {
        select {
        // If we receive a signal, then we exits
        case <-t.closed:
            return
        case <-t.ticker.C:
            handle()
        }
    }
}

// A wrapper to notify
func (t *Task) Stop() {
    close(t.closed)
}

func main() {
    task := &Task{
        ticker: time.NewTicker(time.Second * 2),
    }

    c := make(chan os.Signal)
    // Capcture the Ctrl + C signal
    signal.Notify(c, os.Interrupt)
    // a go
    go func() {
        select {
        case sig := <-c:
            fmt.Printf("Got %s signal. Aborting...\n", sig)
            // notify another go routine
            task.Stop()
        }
    }()

    task.Run()

}

// main goroutine waits a goroutine to finish
// use sync.WaitGroup
// a new definition to tasks
type Task struct {
    closed chan struct{}
    wg     sync.WaitGroup  //
    ticker *time.Ticker
}

func main() {
    // previous code...
    task.wg.Add(1)
    // in a goroutine got do tasks
    go func() { defer task.wg.Done(); task.Run() }()

    // other code...
}

// the whole program
package main

import (
    "fmt"
    "os"
    "os/signal"
    "sync"
    "time"
)

type Task struct {
    closed chan struct{}
    wg     sync.WaitGroup
    ticker *time.Ticker
}

func (t *Task) Run() {
    for {
        select {
        case <-t.closed:
            return
        case <-t.ticker.C:
            handle()
        }
    }
}

func (t *Task) Stop() {
    close(t.closed)
    t.wg.Wait()
}

func handle() {
    for i := 0; i < 5; i++ {
        fmt.Print("#")
        time.Sleep(time.Millisecond * 200)
    }
    fmt.Println()
}

func main() {
    task := &Task{
        closed: make(chan struct{}),
        ticker: time.NewTicker(time.Second * 2),
    }

    c := make(chan os.Signal)
    signal.Notify(c, os.Interrupt)

    task.wg.Add(1)
    go func() { defer task.wg.Done(); task.Run() }()

    select {
    case sig := <-c:
        fmt.Printf("Got %s signal. Aborting...\n", sig)
        task.Stop()
    }
}

几种并发控制的形式

使用 WaitGroup

channel 的使用

func (s *Scheduler) Start() {
    go func() {
        s.ticker = time.NewTimer(s.interval)
    for {
        select {
        case <-s.interrupt :
                    // 在此做一些退出前的必要操作
            return
                case <-s.ticker.C:
                    // 在此循环运行任务
    }
    }()
}

func (s *Scheduler) Shutdown() {
    /* 中断调度 */
    s.interrupt <- true
}

使用 context

// Stream generates values with DoSomething and sends them to out
 // until DoSomething returns an error or ctx.Done is closed.
func Stream(ctx context.Context, out chan<- Value) error {
   for {
    v, err := DoSomething(ctx)
    if err != nil {
        return err
    }
    select {
        case <-ctx.Done():
        return ctx.Err()
        case out <- v:
    }
   }
}

一个例子

package main

import (
  "context"
  "fmt"
  "math/rand"
  "time"
)

// Slow function
func sleepRandom(fromFunction string, ch chan int) {
  // defer cleanup
  defer func() { fmt.Println(fromFunction, "sleepRandom complete") }()

  //Perform a slow task
  //For illustration purpose,
  //Sleep here for random ms
  seed := time.Now().UnixNano()
  r := rand.New(rand.NewSource(seed))
  randomNumber := r.Intn(100)
  sleeptime := randomNumber + 100
  fmt.Println(fromFunction, "Starting sleep for", sleeptime, "ms")
  time.Sleep(time.Duration(sleeptime) * time.Millisecond)
  fmt.Println(fromFunction, "Waking up, slept for ", sleeptime, "ms")

  // write on the channel if it was passed in
  if ch != nil {
    ch <- sleeptime
  }
}

//Function that does slow processing with a context
//Note that context is the first argument
func sleepRandomContext(ctx context.Context, ch chan bool) {

  //Cleanup tasks
  //There are no contexts being created here
  //Hence, no canceling needed
  defer func() {
    fmt.Println("sleepRandomContext complete")
    ch <- true
  }()

  //Make a channel
  sleeptimeChan := make(chan int)

  //Start slow processing in a goroutine
  //Send a channel for communication
  go sleepRandom("sleepRandomContext", sleeptimeChan)

  //Use a select statement to exit out if context expires
  select {
  case <-ctx.Done():
    //If context is cancelled, this case is selected
    //This can happen if the timeout doWorkContext expires or
    //doWorkContext calls cancelFunction or main calls cancelFunction
    //Free up resources that may no longer be needed because of aborting the work
    //Signal all the goroutines that should stop work (use channels)
    //Usually, you would send something on channel,
    //wait for goroutines to exit and then return
    //Or, use wait groups instead of channels for synchronization
    fmt.Println("sleepRandomContext: Time to return")
  case sleeptime := <-sleeptimeChan:
    //This case is selected when processing finishes before the context is cancelled
    fmt.Println("Slept for ", sleeptime, "ms")
  }
}

//A helper function, this can, in the real world do various things.
//In this example, it is just calling one function.
//Here, this could have just lived in main
func doWorkContext(ctx context.Context) {

  //Derive a timeout context from context with cancel
  //Timeout in 150 ms
  //All the contexts derived from this will returns in 150 ms
  ctxWithTimeout, cancelFunction := context.WithTimeout(ctx, time.Duration(150)*time.Millisecond)

  //Cancel to release resources once the function is complete
  defer func() {
    fmt.Println("doWorkContext complete")
    cancelFunction()
  }()

  //Make channel and call context function
  //Can use wait groups as well for this particular case
  //As we do not use the return value sent on channel
  ch := make(chan bool)
  go sleepRandomContext(ctxWithTimeout, ch)

  //Use a select statement to exit out if context expires
  select {
  case <-ctx.Done():
    //This case is selected when the passed in context notifies to stop work
    //In this example, it will be notified when main calls cancelFunction
    fmt.Println("doWorkContext: Time to return")
  case <-ch:
    //This case is selected when processing finishes before the context is cancelled
    fmt.Println("sleepRandomContext returned")
  }
}

func main() {
  //Make a background context
  ctx := context.Background()
  //Derive a context with cancel
  ctxWithCancel, cancelFunction := context.WithCancel(ctx)

  //defer canceling so that all the resources are freed up
  //For this and the derived contexts
  defer func() {
    fmt.Println("Main Defer: canceling context")
    cancelFunction()
  }()

  //Cancel context after a random time
  //This cancels the request after a random timeout
  //If this happens, all the contexts derived from this should return
  go func() {
    sleepRandom("Main", nil)
    cancelFunction()
    fmt.Println("Main Sleep complete. canceling context")
  }()
  //Do work
  doWorkContext(ctxWithCancel)
}

type/golang #public