haoch / flink-siddhi

A CEP library to run Siddhi within Apache Flink™ Streaming Application (Not maintained)
Apache License 2.0
243 stars 96 forks source link

[Question]Why need to do siddhi snapshot when processing each element ? #13

Open JasonChoi27 opened 6 years ago

JasonChoi27 commented 6 years ago

hi, haoch

With AbstractSiddhiOperator, we already have snapshotState using flink checkpoint, why do we still need to checkpointSiddhiRuntimeState or checkpointRecordQueueState when processing every element ?

code as

@Override public void processElement(StreamRecord element) throws Exception { if (isControlStream(element.getValue())) { this.onEventReceived(getControlEvent(element.getValue())); return; } String streamId = getStreamId(element.getValue()); StreamSchema schema = siddhiPlan.getInputStreamSchema(streamId);

    if (isProcessingTime) {
        processEvent(streamId, schema, element.getValue(), System.currentTimeMillis());
        this.checkpointSiddhiRuntimeState();
    } else {
        PriorityQueue<StreamRecord<IN>> priorityQueue = getPriorityQueue();
        // event time processing
        // we have to buffer the elements until we receive the proper watermark
        if (getExecutionConfig().isObjectReuseEnabled()) {
            // copy the StreamRecord so that it cannot be changed
            priorityQueue.offer(new StreamRecord<>(schema.getTypeSerializer().copy(element.getValue()), element.getTimestamp()));
        } else {
            priorityQueue.offer(element);
        }
        this.checkpointRecordQueueState();
    }
}
madhurijain97 commented 2 years ago

We observed that checkpoint still works if you checkpointSiddhiRuntimeState for every control event.

public void processElement(StreamRecord<IN> element) throws Exception {
        if (isControlStream(element.getValue())) {
            this.onEventReceived(getControlEvent(element.getValue()));    //added 
            this.checkpointSiddhiRuntimeState();
            return;
        }
        String streamId = getStreamId(element.getValue());
        StreamSchema<IN> schema = siddhiPlan.getInputStreamSchema(streamId);

          if (isProcessingTime) {
              processEvent(streamId, element.getValue(), System.currentTimeMillis()); 
          } else {
              PriorityQueue<StreamRecord<IN>> priorityQueue = getPriorityQueue();
              // event time processing
              // we have to buffer the elements until we receive the proper watermark
              if (getExecutionConfig().isObjectReuseEnabled()) {
                  // copy the StreamRecord so that it cannot be changed
                  priorityQueue.offer(new StreamRecord<>(schema.getTypeSerializer().copy(element.getValue()), element.getTimestamp()));
              } else {
                  priorityQueue.offer(element);
              }
              this.checkpointRecordQueueState();
          }
    }

This would help reduce the latency I believe. Since our usecase requires processingTime, we only changed when checkpointSiddhiRuntimeState gets called.