AmitKumarDas / fun-with-programming

ABC - Always Be Coding
2 stars 2 forks source link

[go] memory pool for perf #34

Closed AmitKumarDas closed 2 years ago

AmitKumarDas commented 3 years ago
// https://blog.cloudflare.com/recycling-memory-buffers-in-go/
// This mechanism can be used to safely
// make a shared pool. It can even be shared
// across goroutines since **channel** is 
// perfectly safe from multiple goroutines.
//
// Does this look like a lockless threadsafe 
// ring buffer i.e. can be shared?
import (
    "container/list"
    "fmt"
    "math/rand"
    "runtime"
    "time"
)

var makes int
var frees int

func makeBuffer() []byte {
  makes += 1
  return make([]byte, rand.Intn(5000000)+5000000)
}

type queued struct {
  when time.Time
  slice []byte
}

func makeRecycler() (in, out chan []byte) {
  in = make(chan []byte) // unbuffered hence blocking
  out = make(chan []byte) // unbuffered hence blocking

  go func() {
    q := new(list.List) // len == 1 always?
    for {
      if q.Len() == 0 { // make only when list is empty
        q.PushFront(queued{when: time.Now(), slice: makeBuffer()})
      }

      e := q.Front()

      timeout := time.NewTimer(time.Minute) // 1 minute timer
      select {
      case b := <-out:
        timeout.Stop()
        q.PushFront(queued{when: time.Now(), slice: b})
      case in <- e.Value.(queued).slice:
        timeout.Stop()
        q.Remove(e)
      case <-timeout.C:
        e := q.Front()
        for e != nil {
          n := e.Next()
          if time.Since(e.Value.(queued).when) > time.Minute {
            q.Remove(e)
            e.Value = nil
          }
          e = n
        }
      }
    }
  }()
  return
}

func main() {
  pool := make([][]byte, 20)

  in, out := makeRecycler()

  var m runtime.MemStats
  for {
    b := <-in   // blocking
    i := rand.Intn(len(pool))
    if pool[i] != nil {
      out <- pool[i]
    }

    pool[i] = b

    time.Sleep(time.Second)

    bytes := 0
    for i := 0; i < len(pool); i++ {
      if pool[i] != nil {
        bytes += len(pool[i])
      }
    }

    runtime.ReadMemStats(&m)
    fmt.Printf("%d,%d,%d,%d,%d,%d,%d\n", m.HeapSys, bytes, m.HeapAlloc
      m.HeapIdle, m.HeapReleased, makes, frees)
  }
}
AmitKumarDas commented 3 years ago
// https://blog.questionable.services/article/using-buffer-pools-with-go/
//
// - https://github.com/oxtoacart/bpool
package bpool

type SizedBufferPool struct {
  c chan *bytes.Buffer
  a int
}

// SizedBufferPool creates a new BufferPool 
// bounded to the given size
//
// size defines the number of buffers to be
// retained in the pool
//
// alloc sets the initial capacity of a new 
// buffer to minimize calls to make()
func NewSizedBufferPool(size int, alloc int) (bp *SizedBufferPool) {
    return &SizedBufferPool{
        c: make(chan *bytes.Buffer, size),  // buffered
        a: alloc,
    }
}

// Get gets a Buffer from the pool or creates
// a new one if none are available in the pool
//
// Buffers have a pre-allocated capacity
func (bp *SizedBufferPool) Get() (b *bytes.Buffer) {
    select {  // non blocking
    case b = <-bp.c:
        // reuse existing buffer
    default:
        // or create new buffer
        // buffer is []byte
        b = bytes.NewBuffer(make([]byte, 0, bp.a))
    }
    return
}

// Put returns the given Buffer to the pool
func (bp *SizedBufferPool) Put(b *bytes.Buffer) {
    b.Reset() // no more content

    // Release buffers over our maximum capacity 
    // and re-create a pre-sized buffer to replace it
    if cap(b.Bytes()) > bp.a {
        // discard the buffer & create a new
        b = bytes.NewBuffer(make([]byte, 0, bp.a))
    }

    select { // non blocking
    case bp.c <- b:
    default: // discard the buffer if the pool is full
    }
}
// buffer.Cap() method in Go 1.5 is different
// from calling cap(b.Bytes()). The latter 
// returns the capacity of the **unread** portion
// of the buffer’s underlying slice, which may
// not be the total capacity if you’ve read 
// from it during its lifetime. This doesn’t 
// affect our implementation as we call b.Reset()
// (which resets the **read offset**) before we
// check the capacity, which means we get the
// “correct” (full) capacity of the underlying
// slice.