oracle / graaljs

A ECMAScript 2023 compliant JavaScript implementation built on GraalVM. With polyglot language interoperability support. Running Node.js applications!
Universal Permissive License v1.0
1.76k stars 188 forks source link

Class/Context factory support for ScriptEngine #734

Open vepo opened 1 year ago

vepo commented 1 year ago

Description

I'm replacing Nashorn with graaljs in a Kafka Stream framework as a hookpoint engine. It's working fine for some of our extension points, but we have an extension point to configure some Kafka Stream steps.

This is the code we are using to test our implementation:

function showEngine() {
    print("====================================")
    if (typeof Graal != 'undefined') {
        print("[ENGINE] Using GraalVM.js")
        print(Graal.versionJS);
        print(Graal.versionGraalVM);
        print(Graal.isGraalRuntime());
    } else {
        print("[ENGINE] Using Nashorn")
    }
}

function onCustom(_stream, _properties) {
    return _stream.peek(function (_key, _value) {
        showEngine();
        print("====   PEEK BEFORE    ====")
        print("[PEEK BEFORE ] KEY   = " + _key)
        print("[PEEK BEFORE ] VALUE = " + _value)
    })
        .filter(function (_key, _value) {
            showEngine();
            print("====   FILTER          ====")
            print("[FILTER      ] KEY   = " + _key)
            print("[FILTER      ] VALUE = " + _value)
            print("[FILTER      ] FILTER= " + (_value.get('type').toString() == 'A'))
            return _value.get('type').toString() == 'A';
        })
        .peek(function (_key, _value) {
            showEngine();
            print("====    PEEK AFTER    ====")
            print("[PEEK AFTER  ] KEY   = " + _key)
            print("[PEEK AFTER  ] VALUE = " + _value)
        });
}

Our issue happens because after executing the execution Kafka Stream creates some threads and tries to call the created objects inside the javascript code. Then we have the exception bellow because more than one thread is accessing the same context simultaneously.

org.apache.kafka.streams.errors.StreamsException: Exception caught in process. taskId=0_2, processor=KSTREAM-SOURCE-0000000000, topic=streams-input, partition=2, offset=24, stacktrace=java.lang.IllegalStateException: Multi threaded access requested by thread         StreamsThread threadId: ngm_streamer_APP-69bb8cc1-3444-4b6f-ba2f-2fdb8666a8b2-StreamThread-3
TaskManager
        MetadataState:
        Tasks:
                0_2 RUNNING StreamTask(active) but is not allowed for language(s) js.
        at com.oracle.truffle.polyglot.PolyglotEngineException.illegalState(PolyglotEngineException.java:129)
        at com.oracle.truffle.polyglot.PolyglotContextImpl.throwDeniedThreadAccess(PolyglotContextImpl.java:941)
        at com.oracle.truffle.polyglot.PolyglotContextImpl.checkAllThreadAccesses(PolyglotContextImpl.java:800)
        at com.oracle.truffle.polyglot.PolyglotContextImpl.enterThreadChanged(PolyglotContextImpl.java:630)
        at com.oracle.truffle.polyglot.PolyglotEngineImpl.enterCached(PolyglotEngineImpl.java:1885)
        at com.oracle.truffle.polyglot.HostToGuestRootNode.execute(HostToGuestRootNode.java:112)
        at com.oracle.truffle.api.impl.DefaultCallTarget.callDirectOrIndirect(DefaultCallTarget.java:85)
        at com.oracle.truffle.api.impl.DefaultCallTarget.call(DefaultCallTarget.java:102)
        at com.oracle.truffle.polyglot.PolyglotFunctionProxyHandler.invoke(PolyglotFunctionProxyHandler.java:154)
        at com.sun.proxy.$Proxy60.apply(Unknown Source)
        at org.apache.kafka.streams.kstream.internals.KStreamPeek$KStreamPeekProcessor.process(KStreamPeek.java:41)
        at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:159)
        at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:290)
        at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:269)
        at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:228)
        at org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:84)
        at org.apache.kafka.streams.processor.internals.StreamTask.lambda$doProcess$1(StreamTask.java:793)
        at org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:872)
        at org.apache.kafka.streams.processor.internals.StreamTask.doProcess(StreamTask.java:793)
        at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:724)
        at org.apache.kafka.streams.processor.internals.TaskExecutor.processTask(TaskExecutor.java:100)
        at org.apache.kafka.streams.processor.internals.TaskExecutor.process(TaskExecutor.java:81)
        at org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:1182)
        at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:768)
        at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:588)
        at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:550)
Caused by: Attached Guest Language Frames (1)

        at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:750)
        at org.apache.kafka.streams.processor.internals.TaskExecutor.processTask(TaskExecutor.java:100)
        at org.apache.kafka.streams.processor.internals.TaskExecutor.process(TaskExecutor.java:81)
        at org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:1182)
        at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:768)
        at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:588)
        at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:550)
Caused by: java.lang.IllegalStateException: Multi threaded access requested by thread   StreamsThread threadId: ngm_streamer_APP-69bb8cc1-3444-4b6f-ba2f-2fdb8666a8b2-StreamThread-3
TaskManager
        MetadataState:
        Tasks:
                0_2 RUNNING StreamTask(active) but is not allowed for language(s) js.
        at com.oracle.truffle.polyglot.PolyglotEngineException.illegalState(PolyglotEngineException.java:129)
        at com.oracle.truffle.polyglot.PolyglotContextImpl.throwDeniedThreadAccess(PolyglotContextImpl.java:941)
        at com.oracle.truffle.polyglot.PolyglotContextImpl.checkAllThreadAccesses(PolyglotContextImpl.java:800)
        at com.oracle.truffle.polyglot.PolyglotContextImpl.enterThreadChanged(PolyglotContextImpl.java:630)
        at com.oracle.truffle.polyglot.PolyglotEngineImpl.enterCached(PolyglotEngineImpl.java:1885)
        at com.oracle.truffle.polyglot.HostToGuestRootNode.execute(HostToGuestRootNode.java:112)
        at com.oracle.truffle.api.impl.DefaultCallTarget.callDirectOrIndirect(DefaultCallTarget.java:85)
        at com.oracle.truffle.api.impl.DefaultCallTarget.call(DefaultCallTarget.java:102)
        at com.oracle.truffle.polyglot.PolyglotFunctionProxyHandler.invoke(PolyglotFunctionProxyHandler.java:154)
        at com.sun.proxy.$Proxy60.apply(Unknown Source)
        at org.apache.kafka.streams.kstream.internals.KStreamPeek$KStreamPeekProcessor.process(KStreamPeek.java:41)
        at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:159)
        at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:290)
        at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:269)
        at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:228)
        at org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:84)
        at org.apache.kafka.streams.processor.internals.StreamTask.lambda$doProcess$1(StreamTask.java:793)
        at org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:872)
        at org.apache.kafka.streams.processor.internals.StreamTask.doProcess(StreamTask.java:793)
        at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:724)
        ... 6 common frames omitted
Caused by: com.oracle.truffle.api.TruffleStackTrace$LazyStackTrace: null

This is the way we instantiate our script engine:

ScriptEngine engine = GraalJSScriptEngine.create(null, Context.newBuilder("js")
                                                              .allowHostAccess(HostAccess.ALL)
                                                              .allowHostClassLookup(s -> true)
                                                              .allowExperimentalOptions(true)
                                                              .out(System.out)
                                                              .err(System.err)
                                                              .option("js.nashorn-compat", "true"));

Is there a way to provide Class (or Context) factory to avoid using the same context in different threads?

Environment

GraalVM JS version: 21.3.6 Java version: OpenJDK 8

oubidar-Abderrahim commented 1 year ago

Hi, Thank you for reporting this. Please take a look at this: Multithreading Let us know if that answers your question

vepo commented 1 year ago

Thanks @oubidar-Abderrahim, but this wiki page does not answer my question. My issue is that we create some classes in our Javascript code that are called by another framework (Kafka Streams). We could not find a way to synchronize these threads or create on context per thread.

oubidar-Abderrahim commented 1 year ago

Hi @iamstolis can you take a look into this please?

iamstolis commented 1 year ago

Is there a way to provide Class (or Context) factory to avoid using the same context in different threads?

No. You should not use the same (graal-js) ScriptEngine in multiple threads without a proper synchronization. I suggest you to either use one ScriptEngine per thread or synchronize the access to one global ScriptEngine.

We could not find a way to synchronize these threads ...

If I understand correctly then your problem is that some framework is using ScriptEngine (that you provide) from multiple threads. You may try to provide your own implementation of ScriptEngine that delegates to graal-js ScriptEngine. The methods of your wrapper could either synchronize the access or delegate to per-thread graal-js ScriptEngines (that your wrapper ScriptEngine will create/manage).

vepo commented 1 year ago

@iamstolis, I'm not sure if you understand correctly.

your problem is that some framework is using ScriptEngine (that you provide) from multiple threads.

No. We have a framework using ScriptEngine that does not create the threads, it provides some classes that will be used by other threads. The threads are created by another framework, in our case Kafka Streams.

In my sample, I have the onCustom function that receives a stream and creates some classes on this stream. The function provided on filter will be used in many classes. I can configure my Kafka Stream to use just one thread, but this is not a good scenario for scaling.