awakia / catchup

Issue based repository to catch up technologies and movements
5 stars 0 forks source link

GoでのChannelの使い方についてマスターする #40

Open awakia opened 9 years ago

awakia commented 9 years ago

https://gobyexample.com/channels

をどんどん読み進めれば終わりそう。

https://gobyexample.com/stateful-goroutines

まで。

その他大事なこと

GOMAXPROCSの設定

GOMAXPROCSによって使用するOSレベルのスレッド数を指定できる。(Goルーチンはスレッドではない)これはデフォルトで1に設定されているため、設定しないとパフォーマンスが出ない。なので通常CPU数にしておくと良い。

runtime.GOMAXPROCS(runtime.NumCPU()) 

追記

https://sites.google.com/site/gopatterns/concurrency

もAdvancedで面白い

awakia commented 9 years ago

Channel Baffering

https://gobyexample.com/channel-buffering

make(chan bool, 1)make(chan bool)は違う。 前者はバッファリング1でやるので同じルーチンで扱える。 後者は送信側と受信側の対応が取れるまで通信しない。

awakia commented 9 years ago

Select + Channel

https://gobyexample.com/select

チャネルを引数に取るSelect文を用いると1つのgoルーチンで複数のチャネルを受け取ることが出来る。基本はFor文の中でやる。

    for {
        select {
        case msg1 := <-c1:
            fmt.Println("received", msg1)
        case msg2 := <-c2:
            fmt.Println("received", msg2)
        }
    }
awakia commented 9 years ago

ChannelのTimeout

https://gobyexample.com/timeouts

チャネルのタイムアウトはSelect文を使うと定義できる

   select {
    case res := <-c1:
        fmt.Println(res)
    case <-time.After(time.Second * 1):
        fmt.Println("timeout 1")
    }
awakia commented 9 years ago

Non Blocking

https://gobyexample.com/non-blocking-channel-operations

チャネルのSelect文にデフォルトをつけると、そこで受信をまったりしなくなる。送信も同様。

   select {
    case msg := <-messages:
        fmt.Println("received message", msg)
    default:
        fmt.Println("no message received")
    }
awakia commented 9 years ago

ChannelのCloseとRange

https://gobyexample.com/closing-channels https://gobyexample.com/range-over-channels

チャネルは送信が終わったタイミングでCloseすることができる。

こうすると以下のように無限ループのコードでも全てのchannelに入れた処理が終わったら終了するプログラムをかけるし、

func main() {
    jobs := make(chan int, 5)
    done := make(chan bool)

    go func() {
        for {
            j, more := <-jobs
            if more {
                fmt.Println("received job", j)
            } else {
                fmt.Println("received all jobs")
                done <- true
                return
            }
        }
    }()

    for j := 1; j <= 3; j++ {
        jobs <- j
        fmt.Println("sent job", j)
    }
    close(jobs)
    fmt.Println("sent all jobs")

    // We await the worker using the
    // [synchronization](channel-synchronization) approach
    // we saw earlier.
    <-done
}

終了があるので、for range構文でも扱うことができるようになる。

    queue := make(chan string, 2)
    queue <- "one"
    queue <- "two"
    close(queue)
    for elem := range queue {
        fmt.Println(elem)
    }
awakia commented 9 years ago

TimerとTicker

https://gobyexample.com/timers https://gobyexample.com/tickers

指定した時間後にChannelを送信してくれるTimerと

    timer1 := time.NewTimer(time.Second * 2)
    <-timer1.C
    fmt.Println("Timer 1 expired")

定期時間おきにチャネルを送信してくれるTickerがある

    ticker := time.NewTicker(time.Millisecond * 500)
    go func() {
        for t := range ticker.C {
            fmt.Println("Tick at", t)
        }
    }()
    time.Sleep(time.Millisecond * 1600)
    ticker.Stop()
awakia commented 9 years ago

Worker Pool

https://gobyexample.com/worker-pools

ここまでの応用でWorker Poolが作れる。 3つのWorkerで9このJobを動かす例

func worker(id int, jobs <-chan int, results chan<- int) {
    for j := range jobs {
        fmt.Println("worker", id, "processing job", j)
        time.Sleep(time.Second)
        results <- j * 2
    }
}

func main() {
    jobs := make(chan int, 100)
    results := make(chan int, 100)

    for w := 1; w <= 3; w++ {
        go worker(w, jobs, results)
    }

    for j := 1; j <= 9; j++ {
        jobs <- j
    }
    close(jobs)

    for a := 1; a <= 9; a++ {
        <-results
    }
}
awakia commented 9 years ago

Rate Limiting

ChannelとTickerをうまく組み合わせることでRate Limitをすることが出来る。

200ms起きにタスクを実行する例

requests := make(chan int, 5)
    for i := 1; i <= 5; i++ {
        requests <- i
    }
    close(requests)
    limiter := time.Tick(time.Millisecond * 200)
    for req := range requests {
        <-limiter
        fmt.Println("request", req, time.Now())
    }

結果

request 1 2012-10-19 00:38:18.687438 +0000 UTC
request 2 2012-10-19 00:38:18.887471 +0000 UTC
request 3 2012-10-19 00:38:19.087238 +0000 UTC
request 4 2012-10-19 00:38:19.287338 +0000 UTC
request 5 2012-10-19 00:38:19.487331 +0000 UTC

最初は3つ同時、後は200msずつにする例

    burstyLimiter := make(chan time.Time, 3)

    for i := 0; i < 3; i++ {
        burstyLimiter <- time.Now()
    }

    go func() {
        for t := range time.Tick(time.Millisecond * 200) {
            burstyLimiter <- t
        }
    }()

    burstyRequests := make(chan int, 5)
    for i := 1; i <= 5; i++ {
        burstyRequests <- i
    }
    close(burstyRequests)
    for req := range burstyRequests {
        <-burstyLimiter
        fmt.Println("request", req, time.Now())
    }

結果

request 1 2012-10-19 00:38:20.487578 +0000 UTC
request 2 2012-10-19 00:38:20.487645 +0000 UTC
request 3 2012-10-19 00:38:20.487676 +0000 UTC
request 4 2012-10-19 00:38:20.687483 +0000 UTC
request 5 2012-10-19 00:38:20.887542 +0000 UTC

追加検証

追加検証の結果このやり方はあんまり良くない事がわかった。 これだとある処理に時間がかかった場合、結局Limitterの方には0.2秒ずつタスク処理できるものが追加されてしまうため、ちゃんと0.2秒ずつになってない気がする。 普通に、処理した後0.2秒ずつ待つという実装のほうが通常使えそう。

request 1 2009-11-10 23:00:02 +0000 UTC
request 2 2009-11-10 23:00:03 +0000 UTC
request 3 2009-11-10 23:00:04 +0000 UTC
request 4 2009-11-10 23:00:05 +0000 UTC
request 5 2009-11-10 23:00:06 +0000 UTC
request 6 2009-11-10 23:00:06 +0000 UTC
request 7 2009-11-10 23:00:06 +0000 UTC
request 8 2009-11-10 23:00:06 +0000 UTC
request 9 2009-11-10 23:00:06 +0000 UTC
request 10 2009-11-10 23:00:06 +0000 UTC
request 11 2009-11-10 23:00:06.2 +0000 UTC
request 12 2009-11-10 23:00:06.4 +0000 UTC
request 13 2009-11-10 23:00:06.6 +0000 UTC
request 14 2009-11-10 23:00:06.8 +0000 UTC
request 15 2009-11-10 23:00:07 +0000 UTC
awakia commented 9 years ago

Goでマルチスレッドプログラミング

この後の3つ

  1. https://gobyexample.com/atomic-counters
  2. https://gobyexample.com/mutexes
  3. https://gobyexample.com/stateful-goroutines

は、Goでマルチスレッド的なプログラミングをする時に使う。

1はAtomic処理、2はMutex処理、3は2をGo的にMutexを使わずにChannelを使って同期をとるとどうなるかというプログラムになっている。 3の方がパフォーマンスは遅くなっているが見通しは良くなる。(このプログラムでは10倍ぐらい遅くなっているがチャネルの方はチャネルを使って毎回結果を戻しているからフェアな比較じゃない気がする)

この時に、runtime.Gosched()というのが出てくるがスレッドではないGoルーチンがスレッドのように、別のGoルーチンに順番にリソースを配分していく仕組みらしい。これが理解できてないとこの3つの理解が難しい http://stackoverflow.com/questions/13107958/what-exactly-does-runtime-gosched-do

awakia commented 9 years ago

Google Searchの例がめっちゃ良かった

http://talks.golang.org/2012/concurrency.slide#42 - 50

var (
    Web = fakeSearch("web")
    Image = fakeSearch("image")
    Video = fakeSearch("video")
)

type Search func(query string) Result

func fakeSearch(kind string) Search {
        return func(query string) Result {
              time.Sleep(time.Duration(rand.Intn(100)) * time.Millisecond)
              return Result(fmt.Sprintf("%s result for %q\n", kind, query))
        }
}
func main() {
    rand.Seed(time.Now().UnixNano())
    start := time.Now()
    results := Google("golang")
    elapsed := time.Since(start)
    fmt.Println(results)
    fmt.Println(elapsed)
}
func First(query string, replicas ...Search) Result {
    c := make(chan Result)
    searchReplica := func(i int) { c <- replicas[i](query) }
    for i := range replicas {
        go searchReplica(i)
    }
    return <-c
}
func Google(query string) (results []Result) {
    c := make(chan Result)
    go func() { c <- First(query, Web1, Web2) } ()
    go func() { c <- First(query, Image1, Image2) } ()
    go func() { c <- First(query, Video1, Video2) } ()
    timeout := time.After(80 * time.Millisecond)
    for i := 0; i < 3; i++ {
        select {
        case result := <-c:
            results = append(results, result)
        case <-timeout:
            fmt.Println("timed out")
            return
        }
    }
    return
}