shownb / shownb.github.com

shownb.github.io
shownb.github.io
5 stars 1 forks source link

使用Go语言每分钟处理1百万请求--代码 #22

Open shownb opened 6 years ago

shownb commented 6 years ago
var (
    MaxWorker = os.Getenv("MAX_WORKERS")
    MaxQueue  = os.Getenv("MAX_QUEUE")
)

// Job represents the job to be run
type Job struct {
    Payload Payload
}

// A buffered channel that we can send work requests on.
var JobQueue chan Job

// Worker represents the worker that executes the job
type Worker struct {
    WorkerPool  chan chan Job
    JobChannel  chan Job
    quit        chan bool
}

func NewWorker(workerPool chan chan Job) Worker {
    return Worker{
        WorkerPool: workerPool,
        JobChannel: make(chan Job),
        quit:       make(chan bool)}
}

// Start method starts the run loop for the worker, listening for a quit channel in
// case we need to stop it
func (w Worker) Start() {
    go func() {
        for {
            // register the current worker into the worker queue.
            w.WorkerPool <- w.JobChannel

            select {
            case job := <-w.JobChannel:
                // we have received a work request.
                if err := job.Payload.UploadToS3(); err != nil {
                    log.Errorf("Error uploading to S3: %s", err.Error())
                }

            case <-w.quit:
                // we have received a signal to stop
                return
            }
        }
    }()
}

// Stop signals the worker to stop listening for work requests.
func (w Worker) Stop() {
    go func() {
        w.quit <- true
    }()
}
func payloadHandler(w http.ResponseWriter, r *http.Request) {

    if r.Method != "POST" {
        w.WriteHeader(http.StatusMethodNotAllowed)
        return
    }

    // Read the body into a string for json decoding
    var content = &PayloadCollection{}
    err := json.NewDecoder(io.LimitReader(r.Body, MaxLength)).Decode(&content)
    if err != nil {
        w.Header().Set("Content-Type", "application/json; charset=UTF-8")
        w.WriteHeader(http.StatusBadRequest)
        return
    }

    // Go through each payload and queue items individually to be posted to S3
    for _, payload := range content.Payloads {

        // let's create a job with the payload
        work := Job{Payload: payload}

        // Push the work onto the queue.
        JobQueue <- work
    }

    w.WriteHeader(http.StatusOK)
}
type Dispatcher struct {
    // A pool of workers channels that are registered with the dispatcher
    WorkerPool chan chan Job
}

func NewDispatcher(maxWorkers int) *Dispatcher {
    pool := make(chan chan Job, maxWorkers)
    return &Dispatcher{WorkerPool: pool}
}

func (d *Dispatcher) Run() {
    // starting n number of workers
    for i := 0; i < d.maxWorkers; i++ {
        worker := NewWorker(d.pool)
        worker.Start()
    }

    go d.dispatch()
}

func (d *Dispatcher) dispatch() {
    for {
        select {
        case job := <-JobQueue:
            // a job request has been received
            go func(job Job) {
                // try to obtain a worker job channel that is available.
                // this will block until a worker is idle
                jobChannel := <-d.WorkerPool

                // dispatch the job to the worker job channel
                jobChannel <- job
            }(job)
        }
    }
}

新的demo

package main

import "fmt"
import "time"

func worker(id int, jobs <-chan int, results chan<- int) {
    for j := range jobs {
        fmt.Println("worker", id, "started  job", j)
        time.Sleep(time.Second)
        fmt.Println("worker", id, "finished job", j)
        results <- j * 2
    }
}

func main() {
    jobs := make(chan int, 100)
    results := make(chan int, 100)

    for w := 1; w <= 3; w++ {
        go worker(w, jobs, results)
    }

    for j := 1; j <= 5; j++ {
        jobs <- j
    }
    close(jobs)
    for a := 1; a <= 5; a++ {
        <-results
    }
}

读码后感 《通过Go来处理每分钟达百万的数据请求》里面有什么bug 文中的第一次尝试简直是傻逼,没那么简单直接无限go,所以直接爆体了。 第二次尝试,加了chan的缓存,但是居然不用协程了,也是傻逼,所以没多久延时就会其高,因为都在等着Queue <- payload。

func StartProcessor() {
    for {
        select {
        case job := <-Queue:
            job.payload.UploadToS3()  // <-- STILL NOT GOOD
        }
    }
}

综合来看,要考虑的情况就是,究竟要多少协程去处理。 所以说到底就是找怎么控制协程的数量,找到 首先需要知道瓶颈在什么地方,因为瓶颈的存在,好像木桶理论,你其他木板再高,决定装水量的,还是在最短那块板。 知道瓶颈在哪里,再去找平衡,制定策略。 文章中,有几个短板需要注意 1, web方面,单机能处理多少请求 2,能运行处理多少个payload.UploadToS3 瓶颈在不能太多的运行 payload.UploadToS3 的协程,所以单机需要控制协程数量,然后web请求又是固定的(每秒百万),所以需要通过集群来处理。

以下是别人评测,很有道理。哈哈 https://gocn.vip/article/5

因為我們最近也要實現一個類似的功能,我就仔細研究了一下這篇文章:
1. Dispatcher 完全沒必要存在,channel 本身就有 fan-out 的功能
2. 該用 for range 不用,還要 select
3. 能達到 100 萬完全是S3的能力
4. 之所以API成為瓶頸是因為原來的實現太笨
5. 標題起得挺大
總結:Ruby 程序員沒搞懂 Go 就亂用,胡亂實驗了幾種方法,碰巧蒙對了,最後也沒給出最有效率的寫法。

参考 http://www.cnblogs.com/artong0416/ https://gobyexample.com/worker-pools go作者

shownb commented 6 years ago
Your dispatcher is building up an unbound number of go routines which doesn't seem like a good idea, but obviously your servers are performing just fine according to your monitoring. Rather than the go routines being the problem, it is the s3upload that is causing havoc when it is unbound I believe. So the following would work I would think:

In the original "Try Again" just add a simple boolean channel. Spawn go routines for the s3 upload and pass the channel as a parameter having it drain itself after s3 upload is complete.

limiter := make(chan bool, MAX_FANOUT) (defined somewhere, globally, etc)

case job := <-Queue:
limiter <- true
go job.payload.UploadToS3(limiter)

Internal to UploadToS3:
<-limiter

Less is more. All you are trying to do is make a 2D processing "space" that you manage (MAX_FANOUT x MAX_QUEUE). I get that what you wrote is a lot sexier than what I am suggesting, but seeing more code than is necessary tends to bother me.

他的意思是case job := <-JobQueue:条件下,每次都搞一个go协程,这样会产生很多的协程。但服务器比较好,都抗住了,但是呢,相比这个问题,s3upload产生的更严重的问题当他无条件产生的时候。