If I register a custom function like this:
cep.registerExtension("str:groupConcat", GroupConcatFunctionExtension.class);
Below code will return an exception like 'groupConcat() is neither a function or aggregate':
DataStream outstream2 = cep.from("inputstream2").cql("from inputstream2#window.timeBatch(1 sec) " + "select str:groupConcat(dip) as related_alerts " + "group by sip " + "insert into outstream2") .returns("outstream2");
But if I change returns() to returnAsMap(), then it will works very well:
DataStream outstream2 = cep.from("inputstream2").cql("from inputstream2#window.timeBatch(1 sec) " + "select str:groupConcat(dip) as related_alerts " + "group by sip " + "insert into outstream2") .returnAsMap("outstream2");
If I register a custom function like this: cep.registerExtension("str:groupConcat", GroupConcatFunctionExtension.class);
Below code will return an exception like 'groupConcat() is neither a function or aggregate': DataStream outstream2 = cep.from("inputstream2").cql("from inputstream2#window.timeBatch(1 sec) " + "select str:groupConcat(dip) as related_alerts " + "group by sip " + "insert into outstream2") .returns("outstream2");
But if I change returns() to returnAsMap(), then it will works very well: DataStream outstream2 = cep.from("inputstream2").cql("from inputstream2#window.timeBatch(1 sec) " + "select str:groupConcat(dip) as related_alerts " + "group by sip " + "insert into outstream2") .returnAsMap("outstream2");