haoch / flink-siddhi

A CEP library to run Siddhi within Apache Flink™ Streaming Application (Not maintained)
Apache License 2.0
243 stars 96 forks source link

KryoSerializer Exception #14

Open zhuan77241 opened 6 years ago

zhuan77241 commented 6 years ago

StreamOutputHandler.receive exception when executing multiple Query Is Output<StreamRecord> output thread safe?

java.lang.ArrayIndexOutOfBoundsException: -1
    at com.esotericsoftware.kryo.util.IntArray.pop(IntArray.java:157)
    at com.esotericsoftware.kryo.Kryo.reference(Kryo.java:822)
    at com.esotericsoftware.kryo.Kryo.copy(Kryo.java:863)
    at com.esotericsoftware.kryo.serializers.MapSerializer.copy(MapSerializer.java:157)
    at com.esotericsoftware.kryo.serializers.MapSerializer.copy(MapSerializer.java:21)
    at com.esotericsoftware.kryo.Kryo.copy(Kryo.java:862)
    at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:217)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:577)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534)
    at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:689)
    at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:667)
    at org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:41)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:579)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534)
    at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:689)
    at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:667)
    at org.apache.flink.streaming.siddhi.operator.StreamOutputHandler.receive(StreamOutputHandler.java:70)
haoch commented 5 years ago

@zhuan77241 could you pls. share your use case, as I am unable re-produce the problem.

zhuan77241 commented 5 years ago

@haoch Sorry for delayed reply. My case is to execute multiple sql on the same operate. So I so I modified the method ExecutionSiddhiStream.cql(String executionPlan) to ExecutionSiddhiStream.cqls(Map<String, String> executionMap). The exception occurs when I execute multiple sql with grop by (sql1: select .. from xx group by srcAddress,sql2:select ... from xx group by xx and many more). When I synchronized StreamOutputHandler.receive exception disappears.

Tag v0.1.3

haoch commented 5 years ago

@zhuan77241 thanks, I would have a look and update you back once any findings.

tammypi commented 5 years ago

I have encountered the same issue, and I modify code like below to solve it: image

tag:0.2.0