siddhi-io / siddhi

Stream Processing and Complex Event Processing Engine
http://siddhi.io
Apache License 2.0
1.52k stars 529 forks source link

after consuming events from Stream 'clockStream', null. Hence, dropping event java.util.NoSuchElementException #1751

Open leobasic opened 3 years ago

leobasic commented 3 years ago

Description:

Affected Siddhi Version: siddhi v5.1.2

Steps to reproduce:

App:

define stream EntryStream (status int,eleName string,labelName string,occurTimestamp long);

define table inoutFlagTable ( status int,eleName string,pass string ); from every EntryStream[labelName=='LAB1' and eleName=='EN1' and status==0] -> EntryStream[labelName=='LAB2' and eleName=='EN1' and status==0] -> EntryStream[labelName=='LAB3' and eleName=='EN1' and status==0] within 5 sec select EntryStream.status,EntryStream.eleName,'LAB1->LAB2->LAB3' as tag insert into bindInOutStream;

from every EntryStream[labelName=='LAB1' and eleName=='EN1' and status==0] -> EntryStream[labelName=='LAB3' and eleName=='EN1' and status==0] -> EntryStream[labelName=='LAB2' and eleName=='EN1' and status==0] within 5 sec select EntryStream.status,EntryStream.eleName,'LAB1->LAB3->LAB2' as tag insert into bindInOutStream;

from every EntryStream[labelName=='LAB2' and eleName=='EN1' and status==0 ] -> EntryStream[labelName=='LAB1' and eleName=='EN1' and status==0] -> EntryStream[labelName=='LAB3' and eleName=='EN1' and status==0] within 5 sec select EntryStream.status,EntryStream.eleName,'LAB2->LAB1->LAB3' as tag insert into bindInOutStream;

from every EntryStream[labelName=='LAB2' and eleName=='EN1' and status==0 ] -> EntryStream[labelName=='LAB3' and eleName=='EN1' and status==0] -> EntryStream[labelName=='LAB1' and eleName=='EN1' and status==0] within 5 sec select EntryStream.status,EntryStream.eleName,'LAB2->LAB3->LAB1' as tag insert into bindInOutStream;

from every EntryStream[labelName=='LAB3' and eleName=='EN1' and status==0] -> EntryStream[labelName=='LAB1' and eleName=='EN1' and status==0] -> EntryStream[labelName=='LAB2' and eleName=='EN1' and status==0] within 5 sec select EntryStream.status,EntryStream.eleName,'LAB3->LAB1->LAB2' as tag insert into bindInOutStream;

from every EntryStream[labelName=='LAB3' and eleName=='EN1' and status==0] -> EntryStream[labelName=='LAB2' and eleName=='EN1' and status==0] -> EntryStream[labelName=='LAB1' and eleName=='EN1' and status==0] within 5 sec select EntryStream.status,EntryStream.eleName,'LAB3->LAB2->LAB1' as tag insert into bindInOutStream;

from every EntryStream[labelName=='LAB11' and eleName=='EN2' and status==0] -> EntryStream[labelName=='LAB22' and eleName=='EN2' and status==0] within 5 sec select EntryStream.status,EntryStream.eleName,'LAB11->LAB22' as tag insert into bindInOutStream;

from every EntryStream[labelName=='LAB22' and eleName=='EN2' and status==0] -> EntryStream[labelName=='LAB11' and eleName=='EN2' and status==0] within 5 sec select EntryStream.status,EntryStream.eleName,'LAB22->LAB11' as tag insert into bindInOutStream;

@info(name='clockStream2') from every EntryStream[eleName=='EN2' and status==0 and (labelName=='LAB22' or labelName=='LAB11')] -> not bindInOutStream[eleName=='EN2' and status==0] for 5 sec select EntryStream.status,EntryStream.eleName,'ok' as clockFlag insert into clockStream; @info(name='clockStream3') from every EntryStream[eleName=='EN3' and status==0 and (labelName=='LAB22' or labelName=='LAB11')] -> not bindInOutStream[eleName=='EN3' and status==0] for 5 sec select EntryStream.status,EntryStream.eleName,'ok' as clockFlag insert into clockStream; @info(name='clockStream4') from every EntryStream[eleName=='EN4' and status==0 and (labelName=='LAB22' or labelName=='LAB11')] -> not bindInOutStream[eleName=='EN4' and status==0] for 5 sec select EntryStream.status,EntryStream.eleName,'ok' as clockFlag insert into clockStream; from EntryStream[eleName=='EN1' and status==0] select list:contains(list:create('LAB1','LAB2','LAB3'),labelName) as permitEntry,eleName,labelName,status insert into EN1permitEntryStream; @info(name='EN1permitEntryStream')from EN1permitEntryStream[not permitEntry] select eleName,status,str:concat('unthori-alert:',eleName,'>',labelName) as flag insert into alertStream; from bindInOutStream select status,eleName,'ok' as pass insert into inoutFlagTable; @info(name='passStream') from bindInOutStream select * insert into passStream; @info(name='clockStream') from every EntryStream[eleName=='EN1' and status==0 and (labelName=='LAB3' or labelName=='LAB1' or labelName=='LAB2')] -> not bindInOutStream[eleName=='EN1' and status==0] for 5 sec select EntryStream.status,EntryStream.eleName,'ok' as clockFlag insert into clockStream; @info(name='alertStream') partition with (eleName of clockStream) begin from clockStream[ not ( (clockFlag==inoutFlagTable.pass and eleName==inoutFlagTable.eleName and status==inoutFlagTable.status) in inoutFlagTable) ] select eleName,status,'unbind-alert' as flag insert into alertStream;from clockStream[ (clockFlag==inoutFlagTable.pass and eleName==inoutFlagTable.eleName and status==inoutFlagTable.status) in inoutFlagTable ] delete inoutFlagTable on eleName==inoutFlagTable.eleName and status==inoutFlagTable.status;end;

InputHandler inputHandler = siddhiAppRuntime.getInputHandler("EntryStream"); siddhiAppRuntime.start();

    for (int i = 0; i < 100; i++) {

        new Thread(() -> {

            try {
                inputHandler.send(new Object[]{0, "EN1", "LAB2", System.currentTimeMillis()});
                inputHandler.send(new Object[]{0, "EN1", "LAB1", System.currentTimeMillis()});
                inputHandler.send(new Object[]{0, "EN1", "LAB3", System.currentTimeMillis()});
            } catch (Exception e) {
            }

        }).start();

        new Thread(() -> {

            try {
                inputHandler.send(new Object[]{0, "EN2", "LAB11", System.currentTimeMillis()});
            } catch (Exception e) {
            }

        }).start();

        new Thread(() -> {

            try {
                inputHandler.send(new Object[]{0, "EN3", "LAB111", System.currentTimeMillis()});
                inputHandler.send(new Object[]{0, "EN3", "LAB22", System.currentTimeMillis()});
            } catch (Exception e) {
            }

        }).start();

        new Thread(() -> {

            try {
                inputHandler.send(new Object[]{0, "EN4", "LAB111", System.currentTimeMillis()});
                inputHandler.send(new Object[]{0, "EN4", "LAB22", System.currentTimeMillis()});
            } catch (Exception e) {
            }

        }).start();
    }

Related Issues:

Error in '0c7c3711-34ae-4507-b7e3-ece45d8bab37' after consuming events from Stream 'clockStream', null. Hence, dropping event 'StateEvent{streamEvents=[StreamEvent{ timestamp=1632811671523, beforeWindowData=[LAB22], onAfterWindowData=null, outputData=[0, EN3], type=CURRENT, next=null}, null], timestamp=1632811676523, type=CURRENT, outputData=[0, EN3, ok], next=null}' java.util.NoSuchElementException at io.siddhi.core.event.stream.holder.SnapshotableStreamEventQueue.next(SnapshotableStreamEventQueue.java:166) at io.siddhi.core.util.collection.operator.SnapshotableEventQueueOperator.contains(SnapshotableEventQueueOperator.java:76) at io.siddhi.core.table.InMemoryTable.contains(InMemoryTable.java:147) at io.siddhi.core.table.Table.containsEvent(Table.java:368) at io.siddhi.core.executor.condition.InConditionExpressionExecutor.execute(InConditionExpressionExecutor.java:56) at io.siddhi.core.executor.condition.InConditionExpressionExecutor.execute(InConditionExpressionExecutor.java:31) at io.siddhi.core.executor.condition.NotConditionExpressionExecutor.execute(NotConditionExpressionExecutor.java:44) at io.siddhi.core.executor.condition.NotConditionExpressionExecutor.execute(NotConditionExpressionExecutor.java:29) at io.siddhi.core.query.processor.filter.FilterProcessor.process(FilterProcessor.java:50) at io.siddhi.core.query.input.ProcessStreamReceiver.processAndClear(ProcessStreamReceiver.java:183) at io.siddhi.core.query.input.ProcessStreamReceiver.process(ProcessStreamReceiver.java:90) at io.siddhi.core.query.input.ProcessStreamReceiver.receive(ProcessStreamReceiver.java:116) at io.siddhi.core.stream.StreamJunction.sendEvent(StreamJunction.java:176) at io.siddhi.core.partition.PartitionStreamReceiver.send(PartitionStreamReceiver.java:267) at io.siddhi.core.partition.PartitionStreamReceiver.receive(PartitionStreamReceiver.java:100) at io.siddhi.core.stream.StreamJunction.sendEvent(StreamJunction.java:176) at io.siddhi.core.stream.StreamJunction$Publisher.send(StreamJunction.java:401) at io.siddhi.core.query.output.callback.InsertIntoStreamCallback.send(InsertIntoStreamCallback.java:56) at io.siddhi.core.query.output.ratelimit.OutputRateLimiter.sendToCallBacks(OutputRateLimiter.java:104) at io.siddhi.core.query.output.ratelimit.PassThroughOutputRateLimiter.process(PassThroughOutputRateLimiter.java:44) at io.siddhi.core.query.selector.QuerySelector.process(QuerySelector.java:97) at io.siddhi.core.query.input.stream.state.AbsentStreamPreStateProcessor.sendEvent(AbsentStreamPreStateProcessor.java:230) at io.siddhi.core.query.input.stream.state.AbsentStreamPreStateProcessor.process(AbsentStreamPreStateProcessor.java:212) at io.siddhi.core.query.input.stream.single.EntryValveProcessor.process(EntryValveProcessor.java:49) at io.siddhi.core.util.Scheduler.sendTimerEvents(Scheduler.java:195) at io.siddhi.core.util.Scheduler.access$300(Scheduler.java:48) at io.siddhi.core.util.Scheduler$EventCaller.run(Scheduler.java:294) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745)


In the case of high concurrency, it is related to table operation. What is the reason?

senthuran16 commented 2 years ago

We noticed that an in memory Table has been used here. Have you tried using the same example with a @Store() (eg: RDBMS store)?