haoch / flink-siddhi

A CEP library to run Siddhi within Apache Flink™ Streaming Application (Not maintained)
Apache License 2.0
243 stars 96 forks source link

If input is mixed with delayed data cep cannot output correct #34

Open tammypi opened 5 years ago

tammypi commented 5 years ago

Below is the code:

package org.apache.flink.streaming.siddhi.testdelaydata;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.siddhi.SiddhiCEP;
import javax.annotation.Nullable;
import java.io.Serializable;
class MyTimestampExtractor implements AssignerWithPeriodicWatermarks<MyData>,Serializable{
    private long currentMaxTimestamp;
    private long maxOutOfOrderness = 10000L;
    private long preWaterMark = 0L;
    @Nullable
    @Override
    public Watermark getCurrentWatermark() {
        long currentWaterMark = 0L;
        currentWaterMark = currentMaxTimestamp - maxOutOfOrderness;
        if(preWaterMark > 0L && currentWaterMark == preWaterMark){
            currentMaxTimestamp = currentMaxTimestamp + 1000;
            currentMaxTimestamp = Math.min(System.currentTimeMillis(), currentMaxTimestamp);
            currentWaterMark = currentMaxTimestamp - maxOutOfOrderness;
        }
        preWaterMark = currentWaterMark;
        return new Watermark(currentWaterMark);
    }
    @Override
    public long extractTimestamp(MyData element, long previousElementTimestamp) {
        long timestamp = element.getTimestamp()*1000L;
        currentMaxTimestamp = Math.max(currentMaxTimestamp, timestamp);
        return timestamp;
    }
}
public class TestDelayData {
    public static void main(String[] args){
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        ExecutionConfig executionConfig = env.getConfig();
        executionConfig.setAutoWatermarkInterval(1000L);
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        DataStream<MyData> inputstream = env.addSource(new SourceFunction<MyData>() {
            @Override
            public void run(SourceContext<MyData> ctx) throws Exception {
                for(int j=0;j<10;j++){
                    int time = Integer.valueOf(String.valueOf(System.currentTimeMillis()/1000));
                    MyData mydata1 = new MyData(time, "1.1.1.1", "1.1.1.2", 1);
                    ctx.collect(mydata1);
                    Thread.sleep(1000l);
                    for(int i=0;i<10000;i++){
                        MyData mydata2 = new MyData(1552890430, "1.1.1.1", "1.1.1.2", 2);
                        ctx.collect(mydata2);
                    }
                    MyData mydata3 = new MyData(time+3, "1.1.1.1", "1.1.1.2", 3);
                    ctx.collect(mydata3);
                }
                while (true){
                    Thread.sleep(1000);
                }
            }
            @Override
            public void cancel() {
            }
        });
        DataStream outstream = SiddhiCEP.define("inputstream", inputstream.keyBy("sip", "dip").assignTimestampsAndWatermarks(new MyTimestampExtractor()),
                "timestamp", "sip", "dip", "logtype")
                .cql("from every s1=inputstream[logtype==1] -> s2=inputstream[logtype==3 and sip==s1.sip and dip==s1.dip] within 10 second " +
                        "select s1.timestamp as timestamp,s1.sip as sip,s1.dip as dip,s1.logtype as logtype,'mycep' as name insert into outstream")
                .returnAsRow("outstream");
        outstream.print();
        try {
            env.execute();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}
tammypi commented 5 years ago

I'm very confused and anxious. Wish your reply, thanks! (PS: In our project must process the real time data and delay data at the same time)

tammypi commented 5 years ago

issue link:https://github.com/siddhi-io/siddhi/issues/1144

haoch commented 5 years ago

@tammypi Sorry for delayed reply. I saw you created a siddhi issue as well, have you found the root cause yet?

tammypi commented 5 years ago

@haoch Yes, I have already found and the detail is in the issue link: https://github.com/siddhi-io/siddhi/issues/1144

haoch commented 5 years ago

Good, thanks! @tammypi