espertechinc / esper

Esper Complex Event Processing, Streaming SQL and Event Series Analysis
GNU General Public License v2.0
836 stars 260 forks source link

In DataFlow threaded execution variables passed to UDF in event filter when Filter-Optimizable setting is on evaluating to null #224

Open icholy opened 3 years ago

icholy commented 3 years ago

Variables passed to UDFs in event filters evaluate to null if the event originates from a dataflow operator.

create schema Empty ();

create dataflow InputOutput
  BeaconSource -> events<Empty> {}
  EventBusSink(events) {};

create variable String X = String.valueOf(1);

select * from Empty(AssertNotNull(X));
import com.espertech.esper.common.client.configuration.Configuration;
import com.espertech.esper.common.client.dataflow.core.EPDataFlowInstance;
import com.espertech.esper.compiler.client.CompilerArguments;
import com.espertech.esper.compiler.client.EPCompilerProvider;
import com.espertech.esper.runtime.client.EPRuntimeProvider;

import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.ArrayList;

public class Main {

    public static boolean AssertNotNull(Object v) throws Exception {
        if (v == null) {
            throw new Exception("assertion failure: got null value");
        }
        return true;
    }

    public static void main(String[] args) throws Exception {

        // read epl
        var epl = Files.readString(Path.of("query.epl"), StandardCharsets.US_ASCII);

        // setup configuration
        var configuration = new Configuration();
        configuration.getCompiler().addPlugInSingleRowFunction("AssertNotNull", Main.class.getCanonicalName(), "AssertNotNull");

        // compile it
        var compiler = EPCompilerProvider.getCompiler();
        var arguments = new CompilerArguments(configuration);
        var module = compiler.parseModule(epl);
        var compiled = compiler.compile(module, arguments);

        // deploy it
        var epRuntime = EPRuntimeProvider.getRuntime("", configuration);
        epRuntime.getDeploymentService().deploy(compiled);

        // start all dataflows
        var epDataFlow = epRuntime.getDataFlowService();
        var instances = new ArrayList<EPDataFlowInstance>();
        for (var dataflow: epDataFlow.getDataFlows()) {
            var instance = epDataFlow.instantiate(dataflow.getDeploymentId(), dataflow.getName());
            instances.add(instance);
            instance.start();
        }

        // wait for dataflows to complete
        for (var instance : instances) {
            instance.join();
        }
    }
}
bernhardttom commented 3 years ago

Use ConfigurationCompilerPlugInSingleRowFunction.FilterOptimizable.DISABLED

For reference:

            
String epl = "@name('schema') @public @buseventtype create schema Empty ();\n" +
                    "create dataflow InputOutput\n" +
                    "  BeaconSource -> events {}\n" +
                    "  EventBusSink(events) {};\n" +
                    "create variable String X = String.valueOf(1);\n" +
                    "@name('s0') select * from Empty(localAssertNotNull(X));\n";
            env.compileDeploy(epl).addListener("s0");

            EPDataFlowInstance instance = env.runtime().getDataFlowService().instantiate(env.deploymentId("schema"), "InputOutput");
            instance.start();

            try {
                Thread.sleep(100);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            env.assertListenerInvoked("s0");

            instance.cancel();

            env.undeployAll();
bernhardttom commented 3 years ago

Could however improve to detect the variable or provide an exception

icholy commented 3 years ago

If a constant is used instead of a variable, null is not passed.

icholy commented 3 years ago

@bernhardttom btw, I already have ConfigurationCompilerPlugInSingleRowFunction.ValueCache.DISABLED set.