valyala / fasthttp

Fast HTTP package for Go. Tuned for high performance. Zero memory allocations in hot paths. Up to 10x faster than net/http
MIT License
21.73k stars 1.75k forks source link

Using atomic instead of mutex and delete scratch slice #1833

Closed NikoMalik closed 1 month ago

newacorn commented 1 month ago

Whether using an array with a lock or a linked list with atomic operations to manage the workerChan resources, the subsequent operations on workerChan are I/O-intensive. Given that the operations are FILO and each element involves significant I/O, I don't think a linked list has any particular advantage.

NikoMalik commented 1 month ago

Whether using an array with a lock or a linked list with atomic operations to manage the workerChan resources, the subsequent operations on workerChan are I/O-intensive. Given that the operations are FILO and each element involves significant I/O, I don't think a linked list has any particular advantage.

I got a boost in benchmark tests by almost a factor and a half

erikdubbelboer commented 1 month ago

Can you show which benchmarks and their results here?

NikoMalik commented 1 month ago

func BenchmarkWorkerPoolStartStopSerial(b *testing.B) {
    for i := 0; i < b.N; i++ {
        testWorkerPoolStartStopBENCH()
    }
}

func BenchmarkWorkerPoolStartStopConcurrent(b *testing.B) {
    concurrency := 10
    ch := make(chan struct{}, concurrency)
    b.ResetTimer()
    for i := 0; i < b.N; i++ {
        for j := 0; j < concurrency; j++ {
            go func() {
                testWorkerPoolStartStopBENCH()
                ch <- struct{}{}
            }()
        }
        for j := 0; j < concurrency; j++ {
            select {
            case <-ch:
            case <-time.After(time.Second):
                b.Fatalf("timeout")
            }
        }
    }
}

func BenchmarkWorkerPoolMaxWorkersCountSerial(b *testing.B) {
    for i := 0; i < b.N; i++ {
        testWorkerPoolMaxWorkersCountMultiBENCH(b)
    }
}

func BenchmarkWorkerPoolMaxWorkersCountConcurrent(b *testing.B) {
    concurrency := 4
    ch := make(chan struct{}, concurrency)
    b.ResetTimer()
    for i := 0; i < b.N; i++ {
        for j := 0; j < concurrency; j++ {
            go func() {
                testWorkerPoolMaxWorkersCountMultiBENCH(b)
                ch <- struct{}{}
            }()
        }
        for j := 0; j < concurrency; j++ {
            select {
            case <-ch:
            case <-time.After(time.Second * 2):
                b.Fatalf("timeout")
            }
        }
    }
}

func testWorkerPoolStartStopBENCH() {
    wp := &workerPool{
        WorkerFunc:      func(conn net.Conn) error { return nil },
        MaxWorkersCount: 10,
        Logger:          defaultLogger,
    }
    for i := 0; i < 10; i++ {
        wp.Start()
        wp.Stop()
    }
}

func testWorkerPoolMaxWorkersCountMultiBENCH(b *testing.B) {
    for i := 0; i < 5; i++ {
        testWorkerPoolMaxWorkersCountBENCH(b)
    }
}

func testWorkerPoolMaxWorkersCountBENCH(b *testing.B) {
    ready := make(chan struct{})
    wp := &workerPool{
        WorkerFunc: func(conn net.Conn) error {
            buf := make([]byte, 100)
            n, err := conn.Read(buf)
            if err != nil {
                b.Errorf("unexpected error: %v", err)
            }
            buf = buf[:n]
            if string(buf) != "foobar" {
                b.Errorf("unexpected data read: %q. Expecting %q", buf, "foobar")
            }
            if _, err = conn.Write([]byte("baz")); err != nil {
                b.Errorf("unexpected error: %v", err)
            }

            <-ready

            return nil
        },
        MaxWorkersCount: 10,
        Logger:          defaultLogger,
        connState:       func(net.Conn, ConnState) {},
    }
    wp.Start()

    ln := fasthttputil.NewInmemoryListener()

    clientCh := make(chan struct{}, wp.MaxWorkersCount)
    for i := 0; i < wp.MaxWorkersCount; i++ {
        go func() {
            conn, err := ln.Dial()
            if err != nil {
                b.Errorf("unexpected error: %v", err)
            }
            if _, err = conn.Write([]byte("foobar")); err != nil {
                b.Errorf("unexpected error: %v", err)
            }
            data, err := io.ReadAll(conn)
            if err != nil {
                b.Errorf("unexpected error: %v", err)
            }
            if string(data) != "baz" {
                b.Errorf("unexpected value read: %q. Expecting %q", data, "baz")
            }
            if err = conn.Close(); err != nil {
                b.Errorf("unexpected error: %v", err)
            }
            clientCh <- struct{}{}
        }()
    }

    for i := 0; i < wp.MaxWorkersCount; i++ {
        conn, err := ln.Accept()
        if err != nil {
            b.Fatalf("unexpected error: %v", err)
        }
        if !wp.Serve(conn) {
            b.Fatalf("worker pool must have enough workers to serve the conn")
        }
    }

    go func() {
        if _, err := ln.Dial(); err != nil {
            b.Errorf("unexpected error: %v", err)
        }
    }()
    conn, err := ln.Accept()
    if err != nil {
        b.Fatalf("unexpected error: %v", err)
    }
    for i := 0; i < 5; i++ {
        if wp.Serve(conn) {
            b.Fatalf("worker pool must be full")
        }
    }
    if err = conn.Close(); err != nil {
        b.Fatalf("unexpected error: %v", err)
    }

    close(ready)

    for i := 0; i < wp.MaxWorkersCount; i++ {
        select {
        case <-clientCh:
        case <-time.After(time.Second):
            b.Fatalf("timeout")
        }
    }

    if err := ln.Close(); err != nil {
        b.Fatalf("unexpected error: %v", err)
    }
    wp.Stop()
}

I used this benchmark based on tests resultsUsingAtomic:

goarch: amd64
pkg: github.com/valyala/fasthttp
cpu: 11th Gen Intel(R) Core(TM) i5-11400F @ 2.60GHz
=== RUN   BenchmarkWorkerPoolStartStopSerial
BenchmarkWorkerPoolStartStopSerial
BenchmarkWorkerPoolStartStopSerial-12
  216567              5454 ns/op            1494 B/op         21 allocs/op
=== RUN   BenchmarkWorkerPoolStartStopConcurrent
BenchmarkWorkerPoolStartStopConcurrent
BenchmarkWorkerPoolStartStopConcurrent-12
   35728             31353 ns/op           17913 B/op        250 allocs/op
=== RUN   BenchmarkWorkerPoolMaxWorkersCountSerial
BenchmarkWorkerPoolMaxWorkersCountSerial
BenchmarkWorkerPoolMaxWorkersCountSerial-12
    2488            523983 ns/op          253846 B/op       1143 allocs/op
=== RUN   BenchmarkWorkerPoolMaxWorkersCountConcurrent
BenchmarkWorkerPoolMaxWorkersCountConcurrent
BenchmarkWorkerPoolMaxWorkersCountConcurrent-12
    1339            811728 ns/op         1019013 B/op       4624 allocs/op
PASS
ok      github.com/valyala/fasthttp     5.554s

resultsUsingMutexAndSlice:

goarch: amd64
pkg: github.com/valyala/fasthttp
cpu: 11th Gen Intel(R) Core(TM) i5-11400F @ 2.60GHz
=== RUN   BenchmarkWorkerPoolStartStopSerial
BenchmarkWorkerPoolStartStopSerial
BenchmarkWorkerPoolStartStopSerial-12
  211056              5594 ns/op            1508 B/op         21 allocs/op
=== RUN   BenchmarkWorkerPoolStartStopConcurrent
BenchmarkWorkerPoolStartStopConcurrent
BenchmarkWorkerPoolStartStopConcurrent-12
   35869             32778 ns/op           18003 B/op        250 allocs/op
=== RUN   BenchmarkWorkerPoolMaxWorkersCountSerial
BenchmarkWorkerPoolMaxWorkersCountSerial
BenchmarkWorkerPoolMaxWorkersCountSerial-12
    2433            537527 ns/op          256182 B/op       1093 allocs/op
=== RUN   BenchmarkWorkerPoolMaxWorkersCountConcurrent
BenchmarkWorkerPoolMaxWorkersCountConcurrent
BenchmarkWorkerPoolMaxWorkersCountConcurrent-12
    1458            876898 ns/op         1027666 B/op       4420 allocs/op
PASS
ok      github.com/valyala/fasthttp     5.788s
erikdubbelboer commented 1 month ago

Any idea what is causing the extra allocations?

NikoMalik commented 1 month ago

Any idea what is causing the extra allocations?

i fixed

cpu: 11th Gen Intel(R) Core(TM) i5-11400F @ 2.60GHz
=== RUN   BenchmarkWorkerPoolStartStopSerial
BenchmarkWorkerPoolStartStopSerial
BenchmarkWorkerPoolStartStopSerial-12
  205198              5466 ns/op            1494 B/op         21 allocs/op
=== RUN   BenchmarkWorkerPoolStartStopConcurrent
BenchmarkWorkerPoolStartStopConcurrent
BenchmarkWorkerPoolStartStopConcurrent-12
   34980             30404 ns/op           17959 B/op        250 allocs/op
=== RUN   BenchmarkWorkerPoolMaxWorkersCountSerial
BenchmarkWorkerPoolMaxWorkersCountSerial
BenchmarkWorkerPoolMaxWorkersCountSerial-12
    2520            509416 ns/op          251338 B/op       1050 allocs/op
=== RUN   BenchmarkWorkerPoolMaxWorkersCountConcurrent
BenchmarkWorkerPoolMaxWorkersCountConcurrent
BenchmarkWorkerPoolMaxWorkersCountConcurrent-12
    1652            782699 ns/op         1008180 B/op       4218 allocs/op
PASS
ok      github.com/valyala/fasthttp     5.588s 

problem was in getch()

NikoMalik commented 1 month ago

Seems like the code isn't completely thread safe, 3 tests failed with the race detector.

I may have ruled out the last possible data races

erikdubbelboer commented 1 month ago

Thanks!