haoch / flink-siddhi

A CEP library to run Siddhi within Apache Flink™ Streaming Application (Not maintained)
Apache License 2.0
243 stars 96 forks source link

custom function in an external jar #31

Open tammypi opened 5 years ago

tammypi commented 5 years ago

In my case user can define custom function in their own project and package a jar file, then upload it to a common local path(~/plugins). Flink application(we call it engine project) should load all jars and register the custom functions of the plugin jars.

Following is the code: `SiddhiCEP cep = SiddhiCEP.getSiddhiEnvironment(env);

String pluginSql = "select class_path,function_name,file_path from plugins";

List<Map<String,Object>> pluginRtn = PgUtil.getInstance(host,port,database,username,password).query(pluginSql);

for(int i=0;i<pluginRtn.size();i++){

URL url = new URL("file:"+pluginRtn.get(i).get("file_path"));

URLClassLoader classLoader = new URLClassLoader(new URL[] { url }, Thread.currentThread()
                    .getContextClassLoader());

Class myClass = classLoader.loadClass(String.valueOf(pluginRtn.get(i).get("class_path")));

cep.registerExtension(String.valueOf(pluginRtn.get(i).get("function_name")), (Class<FunctionExecutor>)myClass);

}`

My flink application run on yarn so my command like below: ./flink run -m yarn-cluster -ynm xxtest -ys 4 -yn 10 -ytm 5120 -p 1 -yt ~/plugins -c com.xxxx.MyMainClass ~/xxxxx.jar

The parameter "-yt ~/plugins" define that should ship all files in ~/plugins.

But I encounter a problem that If I upload a plugin(slen()) and use it, the application will throw an exception: Error on '25d79ef0-b5e5-40ad-9014-0a70ae1851eb' @ Line: 1. Position: 5613, near 'slen(sip)'. 'slen' is neither a function extension nor an aggregated attribute extension

If I copy all function code from plugin project to the engine project then it will work very well.

So could you please tell me if there exist any way register and use custom function of external jar in flink-siddhi ?

Wish your reply, thanks!

tammypi commented 5 years ago

If I change URLClassLoader from local to system, following is the code: ` SiddhiCEP cep = SiddhiCEP.getSiddhiEnvironment(env);

String pluginSql = "select class_path,function_name,file_path from plugins";

List<Map<String,Object>> pluginRtn = PgUtil.getInstance(host,port,database,username,password).query(pluginSql);

for(int i=0;i<pluginRtn.size();i++){

URL url = new URL("file:"+pluginRtn.get(i).get("file_path")); URLClassLoader urlClassLoader = (URLClassLoader)ClassLoader.getSystemClassLoader(); Method addURL = URLClassLoader.class.getDeclaredMethod("addURL", URL.class); addURL.setAccessible(true); addURL.invoke(urlClassLoader, url); Class myClass = urlClassLoader.loadClass(String.valueOf(pluginRtn.get(i).get("class_path")));

cep.registerExtension(String.valueOf(pluginRtn.get(i).get("function_name")), (Class)myClass);

}`

then it works well on local environment, but will throw following exception on yarn:

`java.lang.IncompatibleClassChangeError

    at org.wso2.siddhi.query.compiler.internal.SiddhiQLBaseVisitorImpl.populateQueryContext(SiddhiQLBaseVisitorImpl.java:3050)

    at org.wso2.siddhi.query.compiler.internal.SiddhiQLBaseVisitorImpl.visitFilter(SiddhiQLBaseVisitorImpl.java:1491)

    at org.wso2.siddhi.query.compiler.internal.SiddhiQLBaseVisitorImpl.visitFilter(SiddhiQLBaseVisitorImpl.java:101)

    at org.wso2.siddhi.query.compiler.SiddhiQLParser$FilterContext.accept(SiddhiQLParser.java:5211)

    at org.antlr.v4.runtime.tree.AbstractParseTreeVisitor.visitChildren(AbstractParseTreeVisitor.java:70)

    at org.wso2.siddhi.query.compiler.SiddhiQLBaseVisitor.visitBasic_source_stream_handler(SiddhiQLBaseVisitor.java:984)`
tammypi commented 5 years ago

Ok, I have found a way walk around this issue.

1.config flink-conf.yaml: classloader.resolve-order: parent-first (flink default configure is child-first, maybe some packages in flink-siddhi/siddhi conflict with flink)

2.code:

`SiddhiCEP cep = SiddhiCEP.getSiddhiEnvironment(env);

String pluginSql = "select class_path,function_name,file_path from plugins";

List<Map<String,Object>> pluginRtn = PgUtil.getInstance(host,port,database,username,password).query(pluginSql);

for(int i=0;i<pluginRtn.size();i++){

URL url = new URL("file:"+pluginRtn.get(i).get("file_path"));

URLClassLoader classLoader = new URLClassLoader(new URL[] { url }, Thread.currentThread() .getContextClassLoader());

Class myClass = classLoader.loadClass(String.valueOf(pluginRtn.get(i).get("class_path")));

cep.registerExtension(String.valueOf(pluginRtn.get(i).get("function_name")), (Class)myClass);

}`

3.start flink application with -yt:

./flink run -m yarn-cluster -ynm xxtest -ys 4 -yn 10 -ytm 5120 -p 1 -yt ~/plugins -c com.xxxx.MyMainClass ~/xxxxx.jar

It will run perfect on yarn.