haoch / flink-siddhi

A CEP library to run Siddhi within Apache Flink™ Streaming Application (Not maintained)
Apache License 2.0
243 stars 96 forks source link

custom function cannot use in retturn() #21

Open tammypi opened 5 years ago

tammypi commented 5 years ago

If I register a custom function like this: cep.registerExtension("str:groupConcat", GroupConcatFunctionExtension.class);

Below code will return an exception like 'groupConcat() is neither a function or aggregate': DataStream outstream2 = cep.from("inputstream2").cql("from inputstream2#window.timeBatch(1 sec) " + "select str:groupConcat(dip) as related_alerts " + "group by sip " + "insert into outstream2") .returns("outstream2");

But if I change returns() to returnAsMap(), then it will works very well: DataStream outstream2 = cep.from("inputstream2").cql("from inputstream2#window.timeBatch(1 sec) " + "select str:groupConcat(dip) as related_alerts " + "group by sip " + "insert into outstream2") .returnAsMap("outstream2");

tammypi commented 5 years ago

1

tammypi commented 5 years ago

The reason of custom function cannot use in returns():

1.returns() has one line: TypeInformation typeInformation = SiddhiTypeFactory.getTupleTypeInformation(siddhiContext.getAllEnrichedExecutionPlan(), outStreamId);

2.SiddhiTypeFactory.getTupleTypeInformation() calls the getStreamDefinition() of SiddiTypeFactory

3. public static AbstractDefinition getStreamDefinition(String executionPlan, String streamId) { SiddhiManager siddhiManager = null; SiddhiAppRuntime runtime = null; try { siddhiManager = new SiddhiManager(); siddhiManager.setExtension("str:groupConcat", GroupConcatFunctionExtension.class); runtime = siddhiManager.createSiddhiAppRuntime(executionPlan); Map<String, StreamDefinition> definitionMap = runtime.getStreamDefinitionMap(); if (definitionMap.containsKey(streamId)) { return definitionMap.get(streamId); } else { throw new IllegalArgumentException("Unknown stream id" + streamId); } } finally { if (runtime != null) { runtime.shutdown(); } if (siddhiManager != null) { siddhiManager.shutdown(); } } } initialize the siddhiManager, but do not set the custom function which registered in SiddhiCEP into the siddhiManager.