piscinajs / piscina

A fast, efficient Node.js Worker Thread Pool implementation
Other
4.05k stars 103 forks source link

Fast API #596

Open ronag opened 1 week ago

ronag commented 1 week ago

Part of the bottlenecks for Piscina at the moment is that we need to transfer data to the worker. Which is not efficient with the current implementation in NodeJS. This means that the tasks sent to piscina need to be large enough that the overhead of sending the task is not too bug.

One way around this would be to avoid the postMessage API's and use a ring buffer to write the tasks + data and let the user provide a Buffer serialize/deserialize function.

Here is a ring buffer implementation we have been using internally for inspiration:

import assert from 'node:assert'

// Make sure write and read are in different
// cache lines.
const WRITE_INDEX = 0
const READ_INDEX = 16

export function alloc(size) {
  return {
    sharedState: new SharedArrayBuffer(128),
    sharedBuffer: new SharedArrayBuffer(size),
  }
}

export function reader({ sharedState, sharedBuffer, hwmBytes = 128 * 1024, hwmItems = 1024 }) {
  const state = new Int32Array(sharedState)
  const size = sharedBuffer.byteLength
  const buffer = Buffer.from(sharedBuffer)
  const view = new DataView(sharedBuffer)
  const data = { buffer, view, offset: 0, length: 0 }

  let readPos = Atomics.load(state, READ_INDEX) | 0
  let notifying = false

  function _notify() {
    notifying = false
    Atomics.store(state, READ_INDEX, readPos)
  }

  function read(next, arg1, arg2, arg3) {
    let items = 0
    let bytes = 0

    const writePos = Atomics.load(state, WRITE_INDEX) | 0

    while (items < hwmItems && bytes < hwmBytes && readPos !== writePos) {
      const dataPos = readPos + 4
      const dataLen = view.getInt32(dataPos - 4, true) | 0

      if (dataLen === -1) {
        readPos = 0
      } else {
        assert(dataLen >= 0)
        assert(dataPos + dataLen <= size)

        readPos += 4 + dataLen
        items += 1
        bytes += dataLen

        data.offset = dataPos
        data.length = dataLen
        next(data, arg1, arg2, arg3)
      }
    }

    // Defer notify so that the returned buffers are valid for at least
    // one microtick.
    if (items > 0 && !notifying) {
      notifying = true
      setImmediate(_notify)
    }

    return items
  }

  return { read }
}

export function writer({ sharedState, sharedBuffer }, { yield: _yield, logger } = {}) {
  const state = new Int32Array(sharedState)
  const size = sharedBuffer.byteLength
  const buffer = Buffer.from(sharedBuffer)
  const view = new DataView(sharedBuffer)
  const data = { buffer, view, offset: 0, length: 0 }

  let readPos = Atomics.load(state, READ_INDEX) | 0
  let writePos = Atomics.load(state, WRITE_INDEX) | 0
  let notifying = false
  let yielding = false

  function _notify() {
    notifying = false
    Atomics.store(state, WRITE_INDEX, writePos)
  }

  function _acquire(len, update) {
    // len + {current packet header} + {next packet header}
    const required = len + 4 + 4
    assert(required >= 0)
    assert(required <= size)

    if (writePos >= readPos) {
      // 0----RxxxxxxW---S
      if (size - writePos >= required) {
        return true
      }

      if (readPos === 0) {
        return false
      }

      view.setInt32(writePos, -1, true)

      writePos = 0

      assert(writePos + 4 <= size) // must have room for next header also
      assert(writePos !== readPos)

      Atomics.store(state, WRITE_INDEX, writePos)
    }

    // 0xxxxW------RxxxS
    return readPos - writePos >= required
  }

  function _write(len, fn, arg1, arg2, arg3) {
    const dataPos = writePos + 4

    data.offset = dataPos
    data.length = len
    const dataLen = fn(data, arg1, arg2, arg3) - dataPos

    assert(dataLen <= len + 4)
    assert(dataLen >= 0)
    assert(dataPos + dataLen <= size)

    view.setInt32(dataPos - 4, dataLen, true)

    writePos += 4 + dataLen

    assert(writePos + 4 <= size) // must have room for next header also
    assert(writePos !== readPos)

    if (!notifying) {
      notifying = true
      queueMicrotask(_notify)
    }

    return true
  }

  // TODO (fix): _sleep/_wait is a bit hacky and should at least have some
  // observability in tracing or logging.
  function write(len, fn, arg1, arg2, arg3) {
    // len + {current packet header} + {next packet header} + {alignment}
    const required = len + 4 + 4 + 8 + 128 // TODO (fix): Remove extra + 128
    assert(required >= 0)
    assert(required <= size)
    assert(!yielding, 'yielding')

    for (let n = 0; !_acquire(required); n++) {
      assert(n < 1000, 'deadlock')

      if (n > 0) {
        if (n === 1) {
          logger?.warn('yielding', { readPos, writePos })
        }

        if (_yield) {
          yielding = true
          try {
            _yield?.()
          } finally {
            yielding = false
          }
        }

        Atomics.wait(state, READ_INDEX, readPos, n)
      } else {
        Atomics.store(state, WRITE_INDEX, writePos)
      }

      readPos = Atomics.load(state, READ_INDEX) | 0
    }

    _write(len, fn, arg1, arg2, arg3)

    assert(writePos !== readPos)
  }

  return { write }
}
metcoder95 commented 1 week ago

This can be pretty interesting, we can gain some bit in performance.

Tho the serialization/deserialization step might become the bottleneck isn't it? Or how do you propose to handle the scenario of sending JS objects?

Here it will mean to maintain two shared buffers to keep state and for data synchronization, do you see an issue of going out of bounds?