siddhi-io / siddhi

Stream Processing and Complex Event Processing Engine
http://siddhi.io
Apache License 2.0
1.52k stars 529 forks source link

with window.externalTime(), not able to fetch events from stream #1750

Closed Connect2naga closed 2 years ago

Connect2naga commented 2 years ago

Description:

create configuration like below

@App:name('alarm-count-rule')
@App:description('Receive events via HTTP transport and view the output on the console')
@source(type = 'kafka', bootstrap.servers = "0.0.0.0:9092", topic.list = "event_2", group.id = "newprocessor_1", threading.option = "single.thread", 
    @map(type = 'json'))
define stream FMNotification (dn string, specificProblem string, notificationType string, presentationName string, moClass string, systemNotificationTime long);
-- @sink(type = 'kafka', bootstrap.servers = "0.0.0.0:9092", topic = "alarmCountRule", is.binary.message = "False", 
--  @map(type = 'json'))
-- --define stream OutputStream (dn string, specificProblem string, notificationType string, presentationName string, moClass string, systemNotificationTime long,alarmCount long);
-- define stream OutputStream (dn string, alarmCount long);
@sink(type = 'kafka', bootstrap.servers = "0.0.0.0:9092", topic = "alarmCountRule", is.binary.message = "False", 
    @map(type = 'json'))
--define stream OutputStream (dn string, specificProblem string, notificationType string, presentationName string, moClass string, systemNotificationTime long,alarmCount long);
define stream ProessedEvents(dn string, specificProblem string, notificationType string, presentationName string, moClass string, systemNotificationTime long,alarmCount long);

@info(name = 'fmNotification')
from FMNotification#window.externalTime(systemNotificationTime, 5 sec) 
--from FMNotification#window.time(3 sec)
select dn,specificProblem,notificationType,presentationName,moClass,systemNotificationTime,count() as alarmCount
group by dn
insert all events into TempStream;
@info(name = 'alarmCount')
from TempStream [alarmCount > 2]
select dn, specificProblem,notificationType,presentationName,moClass,systemNotificationTime, alarmCount
insert into ProessedEvents;

Expected: once i got 3 events with in 3 seconds, we should get all the participated events. Issue : instead the last event is printing in stream also the subsequent events satisfies the conditions

Affected Siddhi Version: 5.1.1

OS, DB, other environment details and versions:

Steps to reproduce:

Related Issues:

Connect2naga commented 2 years ago

Could you please help on this.

senthuran16 commented 2 years ago

Hi @Connect2naga , Have you tried using the externalTimeBatch window with expired events?

from InStream#window.externalTimeBatch(timestamp, 3 sec)
select a, b, c
insert expired events into OutStream;
vankin09 commented 2 years ago

Hi,

I tried using the above but this doesn't help with requirement. I get only the expired events which doesn't serve any purpose. The use case is as follows (with an example) When there are n events within a particular time window, this should be notified. Example: If and only if 3 or more events arrive within 5 seconds of each other. (one field to serve as the reference time), all 3 events should be put into an OutputStream Say, there are these events

Event 1 with fieldTime 12:00:00 Event 2 with fieldTime 12:00:03 Event 3 with fieldTime 12:00:06 Event 4 with fieldTime 12:00:09 Event 5 with fieldTime 12:00:10

In this case, only events 3,4,5 should be forwarded to OutputStream To achieve this, I tried a different approach

@App:name('alarm-count-rule-1')

@app:description('Receive events via kafka and put the correlated events in kafka sink')

@source(type = 'kafka', bootstrap.servers = "0.0.0.0:9092", topic.list = "event_2", group.id = "newprocessor_1", threading.option = "single.thread",
@Map(type = 'json'))
define stream TestNotification (id string, dn string, specificProblem string, notificationType string, presentationName string, moClass string, systemNotificationTime long);

define window TestNotificationWindow (id string, dn string, specificProblem string, notificationType string, presentationName string, moClass string, systemNotificationTime long) externalTime(systemNotificationTime, 5 sec);

@sink(type = 'kafka', bootstrap.servers = "0.0.0.0:9092", topic = "alarmCountRule", is.binary.message = "False",
@Map(type = 'json'))
define stream corr_1 (corr_r string, corr1 string, corr2 string);

@info(name = "window")
from TestNotification
insert into TestNotificationWindow;

@info(name = "Event table")
partition with (dn of TestNotificationWindow)
begin
    from every( e1=TestNotificationWindow)
         -> e2 = TestNotificationWindow[e1.systemNotificationTime < systemNotificationTime and e1.systemNotificationTime + 5000 > systemNotificationTime and e1.id != id]< 2: >
    select e1.id as corr_r, e2[0].id as corr1, e2[1].id as corr2
    insert into corr_1
end;

With this the idea is to get the ids of all the correlated events and push it to outputstream. Then it could be used to fetch the data using http query. However, the problem with this is that this is hardcoded, if somehow the events could be looped, then that would help select e1.id as corr_r, e2[0].id as corr1, e2[1].id as corr2

senthuran16 commented 2 years ago

Hi @vankin09 ,

We can avoid hardcoding by merging the ids of e1 and all the e2s into a list, using functions of siddhi-execution-list. Please see an example below:

@App:name("SiddhiAppSimple")

@App:description("Description of the plan")

@sink(type='log', prefix='>>>Input')
define stream InputStream(id string, timestamp long);

@sink(type='log', prefix=">>>>>>Collected")
define stream MatchedCorrelatedIdsStream(ids object);

@sink(type='log', prefix='CORRELATED ID')
define stream CorrelatedIdsStream(id string);

from every( e1=InputStream)
         -> e2 = InputStream[e1.timestamp < timestamp and e1.timestamp + 5000 > timestamp and e1.timestamp != timestamp]< 2: >
    select list:addAll(list:create(e1.id), list:merge(e2.id)) as ids
    insert into MatchedCorrelatedIdsStream;

from MatchedCorrelatedIdsStream#list:tokenize(ids)
select value
insert into CorrelatedIdsStreamCorrelatedIdsStream;

from CorrelatedIdsStreamCorrelatedIdsStream#window.batch(1)
select convert(value, 'string') as id
insert into CorrelatedIdsStream;

Sample input simulated to InputStream:

event1,60000
event2,63000
event3,66000
event4,69000
event5,70000

event3, event4, and event5 will be published to CorrelatedIdsStream when simulating the above.

Output:

INFO {io.siddhi.core.stream.output.sink.LogSink} - >>>Input : Event{timestamp=1636459436385, data=[event1, 60000], isExpired=false} (Encoded) 
INFO {io.siddhi.core.stream.output.sink.LogSink} - >>>Input : Event{timestamp=1636459437385, data=[event2, 63000], isExpired=false} (Encoded) 
INFO {io.siddhi.core.stream.output.sink.LogSink} - >>>Input : Event{timestamp=1636459438385, data=[event3, 66000], isExpired=false} (Encoded) 
INFO {io.siddhi.core.stream.output.sink.LogSink} - >>>Input : Event{timestamp=1636459439385, data=[event4, 69000], isExpired=false} (Encoded) 
INFO {io.siddhi.core.stream.output.sink.LogSink} - CORRELATED ID : Event{timestamp=1636459440385, data=[event3], isExpired=false} 
INFO {io.siddhi.core.stream.output.sink.LogSink} - CORRELATED ID : Event{timestamp=1636459440385, data=[event4], isExpired=false} 
INFO {io.siddhi.core.stream.output.sink.LogSink} - CORRELATED ID : Event{timestamp=1636459440385, data=[event5], isExpired=false} 
INFO {io.siddhi.core.stream.output.sink.LogSink} - >>>>>>Collected : Event{timestamp=1636459440385, data=[[event3, event4, event5]], isExpired=false} (Encoded) 
INFO {io.siddhi.core.stream.output.sink.LogSink} - >>>Input : Event{timestamp=1636459440385, data=[event5, 70000], isExpired=false} (Encoded)

However, we can observe that the created lists don't seem to be getting cleared properly. I.e, when simulating the following events after simulating event1 - event5 above:

event6,80000
event7,83000
event8,86000
event9,89000
event10,90000

the expected outcome would be just event8, event9, event10, but what we are observing is as follows (event4 and event5 are still present in the list):

INFO {io.siddhi.core.stream.output.sink.LogSink} - >>>Input : Event{timestamp=1636460007318, data=[event6, 80000], isExpired=false} (Encoded) 
INFO {io.siddhi.core.stream.output.sink.LogSink} - >>>Input : Event{timestamp=1636460016239, data=[event7, 83000], isExpired=false} (Encoded) 
INFO {io.siddhi.core.stream.output.sink.LogSink} - >>>Input : Event{timestamp=1636460024238, data=[event8, 86000], isExpired=false} (Encoded) 
INFO {io.siddhi.core.stream.output.sink.LogSink} - >>>Input : Event{timestamp=1636460033711, data=[event9, 89000], isExpired=false} (Encoded) 
INFO {io.siddhi.core.stream.output.sink.LogSink} - CORRELATED ID : Event{timestamp=1636460040519, data=[event8], isExpired=false} 
INFO {io.siddhi.core.stream.output.sink.LogSink} - CORRELATED ID : Event{timestamp=1636460040519, data=[event4], isExpired=false} 
INFO {io.siddhi.core.stream.output.sink.LogSink} - CORRELATED ID : Event{timestamp=1636460040519, data=[event5], isExpired=false} 
INFO {io.siddhi.core.stream.output.sink.LogSink} - CORRELATED ID : Event{timestamp=1636460040519, data=[event9], isExpired=false} 
INFO {io.siddhi.core.stream.output.sink.LogSink} - CORRELATED ID : Event{timestamp=1636460040519, data=[event10], isExpired=false} 
INFO {io.siddhi.core.stream.output.sink.LogSink} - >>>>>>Collected : Event{timestamp=1636460040519, data=[[event8, event4, event5, event9, event10]], isExpired=false} (Encoded) 
INFO {io.siddhi.core.stream.output.sink.LogSink} - >>>Input : Event{timestamp=1636460040519, data=[event10, 90000], isExpired=false} (Encoded)

We will have to look into this further, as to how to address this behaviour.

vankin09 commented 2 years ago

We will also try to check from our end on top of the solution you have provided.

Apart from this, we also had a query related to the out of order. The scenario is: I have a window configured with externalTime for 5 seconds with event that match is 3 and I have the following sequence of events(external time of the event as mentioned below)

Window: 5 seconds, number of events: 3 event1, 08000 event2, 09000 event3, 16000 event4, 10000 event5, 25000 event6, 17000 event7, 18000

The expected outcome of this is:

There are 2 different sets found as listed below

Can you let me know if the window used will result in expected outcome ?

senthuran16 commented 2 years ago

Hi @vankin09 ,

Yes, the above requirement can be achieved using Siddhi Pattern. Please see the following Siddhi application that demonstrates this behaviour.

@App:name("PatternMatching")

@sink(type='log', prefix='>>>Input')
define stream InputStream(id string, timestamp long);

@sink(type='log', prefix='MATCHED')
define stream LogStream(e1Id string, e2Id string, e3Id string);

from every( e1=InputStream)
         -> e2 = InputStream[e1.timestamp < timestamp and e1.timestamp + 5000 > timestamp and e1.timestamp != timestamp]< 2: >
    select e1.id as e1Id, e2[0].id as e2Id, e2[1].id as e3Id
    insert into LogStream;

Input:

event1,8000
event2,9000
event3,16000
event4,10000
event5,25000
event6,17000
event7,18000

Output:

INFO {io.siddhi.core.stream.output.sink.LogSink} - >>>Input : Event{timestamp=1636896589549, data=[event1, 8000], isExpired=false} (Encoded) 
INFO {io.siddhi.core.stream.output.sink.LogSink} - >>>Input : Event{timestamp=1636896590549, data=[event2, 9000], isExpired=false} (Encoded) 
INFO {io.siddhi.core.stream.output.sink.LogSink} - >>>Input : Event{timestamp=1636896591549, data=[event3, 16000], isExpired=false} (Encoded) 
INFO {io.siddhi.core.stream.output.sink.LogSink} - MATCHED : Event{timestamp=1636896592549, data=[event1, event2, event4], isExpired=false} 
INFO {io.siddhi.core.stream.output.sink.LogSink} - >>>Input : Event{timestamp=1636896592549, data=[event4, 10000], isExpired=false} (Encoded) 
INFO {io.siddhi.core.stream.output.sink.LogSink} - >>>Input : Event{timestamp=1636896593549, data=[event5, 25000], isExpired=false} (Encoded) 
INFO {io.siddhi.core.stream.output.sink.LogSink} - >>>Input : Event{timestamp=1636896594549, data=[event6, 17000], isExpired=false} (Encoded) 
INFO {io.siddhi.core.stream.output.sink.LogSink} - MATCHED : Event{timestamp=1636896595549, data=[event3, event6, event7], isExpired=false} 
INFO {io.siddhi.core.stream.output.sink.LogSink} - >>>Input : Event{timestamp=1636896595549, data=[event7, 18000], isExpired=false} (Encoded)
AnuGayan commented 2 years ago

Closing the issue since the answer is provided above