haoch / flink-siddhi

A CEP library to run Siddhi within Apache Flink™ Streaming Application (Not maintained)
Apache License 2.0
243 stars 96 forks source link

测试用例最后一个例子为什么会空指针异常 #64

Open CCweixiao opened 4 years ago

CCweixiao commented 4 years ago

测试用例最后一个例子为什么会空指针异常

   @Test
    public void testDynamicalStreamSimplePatternMatch2() throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStream<Event> input1 = env.addSource(new RandomEventSource(30).setName("event_stream_1"));

        DataStream<ControlEvent> controlStream = env.addSource(new SourceFunction<ControlEvent>() {
            @Override
            public void run(SourceContext<ControlEvent> sourceContext) throws InterruptedException {
                sourceContext.collect(MetadataControlEvent.builder()
                        .addExecutionPlan("1", "from input select  *  insert into output;")
                        .build());
                sourceContext.collect(OperationControlEvent.enableQuery("1"));

            }

            @Override
            public void cancel() {
            }
        });
        SiddhiCEP cep = SiddhiCEP.getSiddhiEnvironment(env);
        cep.registerStream("input",input1,"id","name");
        cep.from("input")
                .cql(controlStream).returnAsRow("output").print();

        //SiddhiCEP.define("input", input1,"id","name")

        env.execute();
    }