emscripten-core / emscripten

Emscripten: An LLVM-to-WebAssembly Compiler
Other
25.76k stars 3.3k forks source link

How to access Instanced Wasm context from custom worker? #17491

Closed Night12138 closed 2 years ago

Night12138 commented 2 years ago

This question seems a bit odd, but it's no doubt that I'm working on it. The reason is that we need to send audio to AudioWorklet, which does not support multi-threading, so we use Worker for audio processing. We found that we can instantiate Wasm by sending wasmModule and wasmMemory to the AudioWorklet, just according to the worker.js generated by Emscripten. But we currently have a problem: when the AudioWorklet tries to modify the data in WasmMemory, it will throw an exception from the main Worker: null function or function signature mismatch, we also try to run this function with a setInterval in the main Worker, it did not throw that exception. At the same time, not only AudioWorklet, we try to inspect the Worker created by Emscripten, running this function will also cause an exception on the main Worker. So what my question here is: how can I do to access Wasm functions from the derived Worker? Here is some useful reproducible code

C++ a MessageDispatcher, which can run a sub worker function in main worker

class MessageDispatcher {
    std::vector<std::function<void()>> msgs;
    std::mutex m;
    long interval = 0;

public:
    void check() {
        std::lock_guard<std::mutex> lg(m);
        for (auto& func : msgs) {
            func();
        }
        msgs.clear();
    }
    void submit(const std::function<void()>&& func) {
        std::lock_guard<std::mutex> lg(m);
        msgs.push_back(func);
    }
    MessageDispatcher() {
        auto checker = [](void* data) -> void { static_cast<MessageDispatcher*>(data)->check(); };
        interval = emscripten_set_interval(checker, 50, this);
    }
    ~MessageDispatcher() {
        emscripten_clear_interval(interval);
    }
};

MessageDispatcher dispatcher;

C++ init a buffer

class DAWPlaybackHelper {
private:
    std::vector<float*> buffers{};
    std::vector<float> buffer{};

public:

    void pullBuffer(int len, int channels) {
        for (int i = 0; i < channels; i++) {
            for (int j = 0; j < len; j++) {
                buffers[i][j] *= 1.5f;
            }
        }
    }

    void pushBuffer(int len, int channels) {
        for (int i = 0; i < channels; i++) {
            for (int j = 0; j < len; j++) {
                buffers[i][j] = float(-j);
            }
        }
    }

    uintptr_t init(int len, int channels) {
        buffer.resize(len*channels);
        auto* _buffer = buffer.data();
        memset(_buffer, 0, len * channels * sizeof(float));

        buffers.resize(channels);
        for (int i = 0; i < channels; i++) {
            buffers[i] = &_buffer[i * len];
        }
        std::cout << "init helper with " << len << ' ' << channels << std::endl;
        buffers[0][0] = 1.f;
        return reinterpret_cast<uintptr_t>(_buffer);
    }
} *pHelper = nullptr;

EMSCRIPTEN_KEEPALIVE
uintptr_t InitPlayback(int len, int channels) {
    pHelper = new DAWPlaybackHelper();
    return pHelper->init(len, channels);
}
extern "C" {
void EMSCRIPTEN_KEEPALIVE pullBuffer(int len, int channels) {
    pHelper->pullBuffer(len, channels);
}

void EMSCRIPTEN_KEEPALIVE pushBuffer(int len, int channels) {
    pHelper->pushBuffer(len, channels);
}
uintptr_t EMSCRIPTEN_KEEPALIVE getHelperPtr() {return reinterpret_cast<uintptr_t>(pHelper);}
}

main worker

import createMyModule from './wasm.js';
createMyModule().then(( _m) => {
    Module = _m;
    const { InitPlayback, wasmModule, wasmMemory } = Module; // wasmModule is not export currently, we made some modify to generated code to let wasmModule export

    const ptr = InitPlayback(SLICE_LEN, MAX_CHANNEL);
    postMessage({
      type: 'SEND_TO_SUB_WORKER',
      data: { ptr, wasmModule, wasmMemory },
    });
}).catch(e => console.log(e));

sub worker, same to generated worker.js file, load in self way and do inspect at devTools, like run Module._pullBuffer(128, 2)

import createMyModule from './wasm.js';

'use strict';

var Module = {};

// Thread-local guard variable for one-time init of the JS state
var initializedJS = false;

// Proxying queues that were notified before the thread started and need to be
// executed as part of startup.
var pendingNotifiedProxyingQueues = [];

function threadPrintErr() {
  var text = Array.prototype.slice.call(arguments).join(' ');
  console.error(text);
}
function threadAlert() {
  var text = Array.prototype.slice.call(arguments).join(' ');
  postMessage({cmd: 'alert', text: text, threadId: Module['_pthread_self']()});
}
var err = threadPrintErr;
self.alert = threadAlert;

Module['instantiateWasm'] = (info, receiveInstance) => {
  // Instantiate from the module posted from the main thread.
  // We can just use sync instantiation in the worker.
  var instance = new WebAssembly.Instance(Module['wasmModule'], info);
  // TODO: Due to Closure regression https://github.com/google/closure-compiler/issues/3193,
  // the above line no longer optimizes out down to the following line.
  // When the regression is fixed, we can remove this if/else.
  receiveInstance(instance);
  // We don't need the module anymore; new threads will be spawned from the main thread.
  Module['wasmModule'] = null;
  return instance.exports;
}

self.onmessage = (e) => {
  try {
    if (e.data.cmd === 'load') { // Preload command that is called once per worker to parse and load the Emscripten code.

      // Module and memory were sent from main thread
      Module['wasmModule'] = e.data.wasmModule;

      Module['wasmMemory'] = e.data.wasmMemory;

      Module['buffer'] = Module['wasmMemory'].buffer;

      Module['ENVIRONMENT_IS_PTHREAD'] = true;

      //if (typeof e.data.urlOrBlob == 'string') {
      //  importScripts(e.data.urlOrBlob);
      //} else {
      //  var objectUrl = URL.createObjectURL(e.data.urlOrBlob);
      //  importScripts(objectUrl);
      //  URL.revokeObjectURL(objectUrl);
      //}
      createMyModule(Module).then(function (instance) {
        Module = instance;
        setInterval(()=>Module._pullBuffer(128,2), 10); // cause main worker throw exception
      });
    } else if (e.data.cmd === 'run') {
      // This worker was idle, and now should start executing its pthread entry
      // point.
      // performance.now() is specced to return a wallclock time in msecs since
      // that Web Worker/main thread launched. However for pthreads this can
      // cause subtle problems in emscripten_get_now() as this essentially
      // would measure time from pthread_create(), meaning that the clocks
      // between each threads would be wildly out of sync. Therefore sync all
      // pthreads to the clock on the main browser thread, so that different
      // threads see a somewhat coherent clock across each of them
      // (+/- 0.1msecs in testing).
      Module['__performance_now_clock_drift'] = performance.now() - e.data.time;

      // Pass the thread address inside the asm.js scope to store it for fast access that avoids the need for a FFI out.
      Module['__emscripten_thread_init'](e.data.threadInfoStruct, /*isMainBrowserThread=*/0, /*isMainRuntimeThread=*/0, /*canBlock=*/1);

      // Also call inside JS module to set up the stack frame for this pthread in JS module scope
      Module['establishStackSpace']();
      Module['PThread'].receiveObjectTransfer(e.data);
      Module['PThread'].threadInitTLS();

      if (!initializedJS) {
        // Embind must initialize itself on all threads, as it generates support JS.
        // We only do this once per worker since they get reused
        Module['___embind_register_native_and_builtin_types']();

        // Execute any proxied work that came in before the thread was
        // initialized. Only do this once because it is only possible for
        // proxying notifications to arrive before thread initialization on
        // fresh workers.
        pendingNotifiedProxyingQueues.forEach(queue => {
          Module['executeNotifiedProxyingQueue'](queue);
        });
        pendingNotifiedProxyingQueues = [];
        initializedJS = true;
      }

      try {
        Module['invokeEntryPoint'](e.data.start_routine, e.data.arg);
      } catch(ex) {
        if (ex != 'unwind') {
          // ExitStatus not present in MINIMAL_RUNTIME
          if (ex instanceof Module['ExitStatus']) {
            if (Module['keepRuntimeAlive']()) {
            } else {
              Module['__emscripten_thread_exit'](ex.status);
            }
          }
          else
          {
            // The pthread "crashed".  Do not call `_emscripten_thread_exit` (which
            // would make this thread joinable.  Instead, re-throw the exception
            // and let the top level handler propagate it back to the main thread.
            throw ex;
          }
        }
      }
    } else if (e.data.cmd === 'cancel') { // Main thread is asking for a pthread_cancel() on this thread.
      if (Module['_pthread_self']()) {
        Module['__emscripten_thread_exit'](-1/*PTHREAD_CANCELED*/);
      }
    } else if (e.data.target === 'setimmediate') {
      // no-op
    } else if (e.data.cmd === 'processProxyingQueue') {
      if (initializedJS) {
        Module['executeNotifiedProxyingQueue'](e.data.queue);
      } else {
        // Defer executing this queue until the runtime is initialized.
        pendingNotifiedProxyingQueues.push(e.data.queue);
      }
    } else {
      err('worker.js received unknown command ' + e.data.cmd);
      err(e.data);
    }
  } catch(ex) {
    err('worker.js onmessage() captured an uncaught exception: ' + ex);
    if (ex && ex.stack) err(ex.stack);
    if (Module['__emscripten_thread_crashed']) {
      Module['__emscripten_thread_crashed']();
    }
    throw ex;
  }
};
Night12138 commented 2 years ago

I fixed this issue by init custom worker stack, just check code below, holp it can help you!

main worker

import createMyModule from './wasm.js';
createMyModule().then(( _m) => {
    Module = _m;
    const { InitPlayback, wasmModule, wasmMemory, _malloc } = Module; // wasmModule is not export currently, we made some modify to generated code to let wasmModule export

    const ptr = InitPlayback(SLICE_LEN, MAX_CHANNEL);
    const stack = _malloc(16384); // alloc a stack space for worker use
    postMessage({
      type: 'SEND_TO_SUB_WORKER',
      data: { ptr, wasmModule, wasmMemory, stack },
    });
}).catch(e => console.log(e));

custom worker

// ... skip
      Module['wasmModule'] = e.data.wasmModule;

      Module['wasmMemory'] = e.data.wasmMemory;

      Module['buffer'] = Module['wasmMemory'].buffer;

      Module['ENVIRONMENT_IS_PTHREAD'] = true;

      //if (typeof e.data.urlOrBlob == 'string') {
      //  importScripts(e.data.urlOrBlob);
      //} else {
      //  var objectUrl = URL.createObjectURL(e.data.urlOrBlob);
      //  importScripts(objectUrl);
      //  URL.revokeObjectURL(objectUrl);
      //}
      createMyModule(Module).then(function (instance) {
        Module = instance;

        const max = e.data.stack;
        const top = max + 16384;
        Module._emscripten_stack_set_limits(top, max); // set stack limit
        Module.stackRestore(top); // restore stack to worker

        setInterval(()=>Module._pullBuffer(128,2), 10); // now main worker can run normally
      });
    } else if (e.data.cmd === 'run') {
// ... skip
AlexMold commented 2 years ago

@Night12138 What the value of SLICE_LEN, MAX_CHANNEL, InitPlayback(SLICE_LEN, MAX_CHANNEL) ???

Do u know the right valur of threadInfoStruct ? )