haoch / flink-siddhi

A CEP library to run Siddhi within Apache Flink™ Streaming Application (Not maintained)
Apache License 2.0
243 stars 96 forks source link

I am trying to integrate Flink-siddhi with Flink application and trapped in classnotfound issue #72

Open SMART2016 opened 3 years ago

SMART2016 commented 3 years ago

POM:

<?xml version="1.0" encoding="UTF-8"?>

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>

  <groupId>flinksidhi</groupId>
  <artifactId>flinksidhipoc</artifactId>
  <version>1.0-SNAPSHOT</version>

  <name>flinksidhipoc</name>
  <!-- FIXME change it to the project's website -->
  <url>http://www.example.com</url>

  <properties>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    <maven.compiler.source>1.7</maven.compiler.source>
    <maven.compiler.target>1.7</maven.compiler.target>
    <siddhi.version>5.1.2</siddhi.version>
    <flink.version>1.9.0</flink.version>
  </properties>

  <dependencies>
    <dependency>
      <groupId>junit</groupId>
      <artifactId>junit</artifactId>
      <version>4.11</version>
      <scope>test</scope>
    </dependency>
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-streaming-java_2.11</artifactId>
      <version>${flink.version}</version>

    </dependency>
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-connector-kafka_2.11</artifactId>
      <version>${flink.version}</version>
    </dependency>
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-clients_2.11</artifactId>
      <version>${flink.version}</version>
    </dependency>
    <dependency>
      <groupId>com.github.haoch</groupId>
      <artifactId>flink-siddhi_2.11</artifactId>
      <version>LATEST</version>
    </dependency>
    <dependency>
      <groupId>io.siddhi</groupId>
      <artifactId>siddhi-core</artifactId>
      <version>${siddhi.version}</version>
      <exclusions>
        <exclusion>  <!-- declare the exclusion here -->
          <groupId>org.apache.directory.jdbm</groupId>
          <artifactId>apacheds-jdbm1</artifactId>
        </exclusion>
      </exclusions>
    </dependency>
    <dependency>
      <groupId>io.siddhi</groupId>
      <artifactId>siddhi-query-api</artifactId>
      <version>${siddhi.version}</version>
      <exclusions>
        <exclusion>  <!-- declare the exclusion here -->
          <groupId>org.apache.directory.jdbm</groupId>
          <artifactId>apacheds-jdbm1</artifactId>
        </exclusion>
      </exclusions>
    </dependency>
    <dependency>
      <groupId>org.wso2.siddhi</groupId>
      <artifactId>siddhi-query-api</artifactId>
      <version>3.0.2
      </version>
    </dependency>
<!--    &lt;!&ndash; https://mvnrepository.com/artifact/org.wso2.siddhi/siddhi-core &ndash;&gt;-->
    <dependency>
      <groupId>org.wso2.siddhi</groupId>
      <artifactId>siddhi-core</artifactId>
      <version>3.0.2</version>
    </dependency>
  </dependencies>

  <repositories>
    <repository>
      <id>wso2-maven2-repository</id>
      <name>WSO2 Maven2 Repository</name>
      <url>http://maven.wso2.org/nexus/content/repositories/releases/</url>
    </repository>
    <repository>
      <id>clojars</id>
      <url>http://clojars.org/repo/</url>
    </repository>
  </repositories>
  <build>
    <pluginManagement><!-- lock down plugins versions to avoid using Maven defaults (may be moved to parent pom) -->
      <plugins>
        <!-- clean lifecycle, see https://maven.apache.org/ref/current/maven-core/lifecycles.html#clean_Lifecycle -->
        <plugin>
          <artifactId>maven-clean-plugin</artifactId>
          <version>3.1.0</version>
        </plugin>
        <!-- default lifecycle, jar packaging: see https://maven.apache.org/ref/current/maven-core/default-bindings.html#Plugin_bindings_for_jar_packaging -->
        <plugin>
          <artifactId>maven-resources-plugin</artifactId>
          <version>3.0.2</version>
        </plugin>
        <plugin>
          <artifactId>maven-compiler-plugin</artifactId>
          <version>3.8.0</version>
        </plugin>
        <plugin>
          <artifactId>maven-surefire-plugin</artifactId>
          <version>2.22.1</version>
        </plugin>
        <plugin>
          <artifactId>maven-jar-plugin</artifactId>
          <version>3.0.2</version>
        </plugin>
        <plugin>
          <artifactId>maven-install-plugin</artifactId>
          <version>2.5.2</version>
        </plugin>
        <plugin>
          <artifactId>maven-deploy-plugin</artifactId>
          <version>2.8.2</version>
        </plugin>
        <!-- site lifecycle, see https://maven.apache.org/ref/current/maven-core/lifecycles.html#site_Lifecycle -->
        <plugin>
          <artifactId>maven-site-plugin</artifactId>
          <version>3.7.1</version>
        </plugin>
        <plugin>
          <artifactId>maven-project-info-reports-plugin</artifactId>
          <version>3.0.0</version>
        </plugin>
      </plugins>
    </pluginManagement>
  </build>
</project>

Main class:

package flinksidhi;

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple4;
import org.apache.flink.streaming.siddhi.SiddhiCEP;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class SidhiApp {
    public static void main(String[] args) {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStream<Event> input = env.addSource(new RandomEventSource(5));
       // StreamCheckpointedOperator s = null;
        DataStream<Tuple4<Long, Integer, String, Double>> output = SiddhiCEP
                .define("inputStream", input, "id", "name", "price", "timestamp")
                .cql("from inputStream select timestamp, id, name, price insert into  outputStream")
                .returns("outputStream");

        DataStream<Integer> following = output.map(new MapFunction<Tuple4<Long, Integer, String, Double>, Integer>() {
            @Override
            public Integer map(Tuple4<Long, Integer, String, Double> value) throws Exception {
                return value.f1;
            }
        });

        String resultPath = "./output.txt";
        output.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE);
        try {
            env.execute();
        } catch (Exception e) {
            e.printStackTrace();
        }

    }
}

Getting below runtime error :

/Library/Java/JavaVirtualMachines/jdk1.8.0_291.jdk/Contents/Home/bin/java -javaagent:/Applications/IntelliJ IDEA.app/Contents/lib/idea_rt.jar=58868:/Applications/IntelliJ IDEA.app/Contents/bin -Dfile.encoding=UTF-8 -classpath /Library/Java/JavaVirtualMachines/jdk1.8.0_291.jdk/Contents/Home/jre/lib/charsets.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_291.jdk/Contents/Home/jre/lib/deploy.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_291.jdk/Contents/Home/jre/lib/ext/cldrdata.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_291.jdk/Contents/Home/jre/lib/ext/dnsns.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_291.jdk/Contents/Home/jre/lib/ext/jaccess.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_291.jdk/Contents/Home/jre/lib/ext/jfxrt.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_291.jdk/Contents/Home/jre/lib/ext/localedata.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_291.jdk/Contents/Home/jre/lib/ext/nashorn.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_291.jdk/Contents/Home/jre/lib/ext/sunec.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_291.jdk/Contents/Home/jre/lib/ext/sunjce_provider.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_291.jdk/Contents/Home/jre/lib/ext/sunpkcs11.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_291.jdk/Contents/Home/jre/lib/ext/zipfs.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_291.jdk/Contents/Home/jre/lib/javaws.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_291.jdk/Contents/Home/jre/lib/jce.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_291.jdk/Contents/Home/jre/lib/jfr.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_291.jdk/Contents/Home/jre/lib/jfxswt.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_291.jdk/Contents/Home/jre/lib/jsse.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_291.jdk/Contents/Home/jre/lib/management-agent.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_291.jdk/Contents/Home/jre/lib/plugin.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_291.jdk/Contents/Home/jre/lib/resources.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_291.jdk/Contents/Home/jre/lib/rt.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_291.jdk/Contents/Home/lib/ant-javafx.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_291.jdk/Contents/Home/lib/dt.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_291.jdk/Contents/Home/lib/javafx-mx.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_291.jdk/Contents/Home/lib/jconsole.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_291.jdk/Contents/Home/lib/packager.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_291.jdk/Contents/Home/lib/sa-jdi.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_291.jdk/Contents/Home/lib/tools.jar:/Users/dipanjan/work/stackidentity/flinksidhipoc/target/classes:/Users/dipanjan/.m2/repository/org/apache/flink/flink-streaming-java_2.11/1.9.0/flink-streaming-java_2.11-1.9.0.jar:/Users/dipanjan/.m2/repository/org/apache/flink/flink-core/1.9.0/flink-core-1.9.0.jar:/Users/dipanjan/.m2/repository/org/apache/flink/flink-annotations/1.9.0/flink-annotations-1.9.0.jar:/Users/dipanjan/.m2/repository/org/apache/flink/flink-metrics-core/1.9.0/flink-metrics-core-1.9.0.jar:/Users/dipanjan/.m2/repository/org/apache/flink/flink-shaded-asm-6/6.2.1-7.0/flink-shaded-asm-6-6.2.1-7.0.jar:/Users/dipanjan/.m2/repository/org/apache/commons/commons-lang3/3.3.2/commons-lang3-3.3.2.jar:/Users/dipanjan/.m2/repository/com/esotericsoftware/kryo/kryo/2.24.0/kryo-2.24.0.jar:/Users/dipanjan/.m2/repository/com/esotericsoftware/minlog/minlog/1.2/minlog-1.2.jar:/Users/dipanjan/.m2/repository/org/objenesis/objenesis/2.1/objenesis-2.1.jar:/Users/dipanjan/.m2/repository/commons-collections/commons-collections/3.2.2/commons-collections-3.2.2.jar:/Users/dipanjan/.m2/repository/org/apache/commons/commons-compress/1.18/commons-compress-1.18.jar:/Users/dipanjan/.m2/repository/org/apache/flink/flink-runtime_2.11/1.9.0/flink-runtime_2.11-1.9.0.jar:/Users/dipanjan/.m2/repository/org/apache/flink/flink-queryable-state-client-java/1.9.0/flink-queryable-state-client-java-1.9.0.jar:/Users/dipanjan/.m2/repository/org/apache/flink/flink-hadoop-fs/1.9.0/flink-hadoop-fs-1.9.0.jar:/Users/dipanjan/.m2/repository/commons-io/commons-io/2.4/commons-io-2.4.jar:/Users/dipanjan/.m2/repository/org/apache/flink/flink-shaded-netty/4.1.32.Final-7.0/flink-shaded-netty-4.1.32.Final-7.0.jar:/Users/dipanjan/.m2/repository/org/apache/flink/flink-shaded-jackson/2.9.8-7.0/flink-shaded-jackson-2.9.8-7.0.jar:/Users/dipanjan/.m2/repository/org/javassist/javassist/3.19.0-GA/javassist-3.19.0-GA.jar:/Users/dipanjan/.m2/repository/org/scala-lang/scala-library/2.11.12/scala-library-2.11.12.jar:/Users/dipanjan/.m2/repository/com/typesafe/akka/akka-actor_2.11/2.5.21/akka-actor_2.11-2.5.21.jar:/Users/dipanjan/.m2/repository/com/typesafe/config/1.3.3/config-1.3.3.jar:/Users/dipanjan/.m2/repository/org/scala-lang/modules/scala-java8-compat_2.11/0.7.0/scala-java8-compat_2.11-0.7.0.jar:/Users/dipanjan/.m2/repository/com/typesafe/akka/akka-stream_2.11/2.5.21/akka-stream_2.11-2.5.21.jar:/Users/dipanjan/.m2/repository/org/reactivestreams/reactive-streams/1.0.2/reactive-streams-1.0.2.jar:/Users/dipanjan/.m2/repository/com/typesafe/ssl-config-core_2.11/0.3.7/ssl-config-core_2.11-0.3.7.jar:/Users/dipanjan/.m2/repository/org/scala-lang/modules/scala-parser-combinators_2.11/1.1.1/scala-parser-combinators_2.11-1.1.1.jar:/Users/dipanjan/.m2/repository/com/typesafe/akka/akka-protobuf_2.11/2.5.21/akka-protobuf_2.11-2.5.21.jar:/Users/dipanjan/.m2/repository/com/typesafe/akka/akka-slf4j_2.11/2.5.21/akka-slf4j_2.11-2.5.21.jar:/Users/dipanjan/.m2/repository/org/clapper/grizzled-slf4j_2.11/1.3.2/grizzled-slf4j_2.11-1.3.2.jar:/Users/dipanjan/.m2/repository/com/github/scopt/scopt_2.11/3.5.0/scopt_2.11-3.5.0.jar:/Users/dipanjan/.m2/repository/org/xerial/snappy/snappy-java/1.1.4/snappy-java-1.1.4.jar:/Users/dipanjan/.m2/repository/com/twitter/chill_2.11/0.7.6/chill_2.11-0.7.6.jar:/Users/dipanjan/.m2/repository/com/twitter/chill-java/0.7.6/chill-java-0.7.6.jar:/Users/dipanjan/.m2/repository/org/apache/flink/flink-shaded-guava/18.0-7.0/flink-shaded-guava-18.0-7.0.jar:/Users/dipanjan/.m2/repository/org/apache/commons/commons-math3/3.5/commons-math3-3.5.jar:/Users/dipanjan/.m2/repository/org/slf4j/slf4j-api/1.7.15/slf4j-api-1.7.15.jar:/Users/dipanjan/.m2/repository/com/google/code/findbugs/jsr305/1.3.9/jsr305-1.3.9.jar:/Users/dipanjan/.m2/repository/org/apache/flink/force-shading/1.9.0/force-shading-1.9.0.jar:/Users/dipanjan/.m2/repository/org/apache/flink/flink-connector-kafka_2.11/1.9.0/flink-connector-kafka_2.11-1.9.0.jar:/Users/dipanjan/.m2/repository/org/apache/flink/flink-connector-kafka-base_2.11/1.9.0/flink-connector-kafka-base_2.11-1.9.0.jar:/Users/dipanjan/.m2/repository/org/apache/kafka/kafka-clients/2.2.0/kafka-clients-2.2.0.jar:/Users/dipanjan/.m2/repository/com/github/luben/zstd-jni/1.3.8-1/zstd-jni-1.3.8-1.jar:/Users/dipanjan/.m2/repository/org/lz4/lz4-java/1.5.0/lz4-java-1.5.0.jar:/Users/dipanjan/.m2/repository/org/apache/flink/flink-clients_2.11/1.9.0/flink-clients_2.11-1.9.0.jar:/Users/dipanjan/.m2/repository/org/apache/flink/flink-optimizer_2.11/1.9.0/flink-optimizer_2.11-1.9.0.jar:/Users/dipanjan/.m2/repository/org/apache/flink/flink-java/1.9.0/flink-java-1.9.0.jar:/Users/dipanjan/.m2/repository/commons-cli/commons-cli/1.3.1/commons-cli-1.3.1.jar:/Users/dipanjan/.m2/repository/com/github/haoch/flink-siddhi_2.11/0.2.1/flink-siddhi_2.11-0.2.1.jar:/Users/dipanjan/.m2/repository/io/siddhi/siddhi-core/5.1.2/siddhi-core-5.1.2.jar:/Users/dipanjan/.m2/repository/io/siddhi/siddhi-query-compiler/5.1.2/siddhi-query-compiler-5.1.2.jar:/Users/dipanjan/.m2/repository/org/mvel/mvel2/2.4.4.Final/mvel2-2.4.4.Final.jar:/Users/dipanjan/.m2/repository/org/antlr/antlr4-runtime/4.7.2/antlr4-runtime-4.7.2.jar:/Users/dipanjan/.m2/repository/io/siddhi/siddhi-annotations/5.1.2/siddhi-annotations-5.1.2.jar:/Users/dipanjan/.m2/repository/org/atteo/classindex/classindex/3.8/classindex-3.8.jar:/Users/dipanjan/.m2/repository/log4j/log4j/1.2.17/log4j-1.2.17.jar:/Users/dipanjan/.m2/repository/org/slf4j/slf4j-simple/1.7.26/slf4j-simple-1.7.26.jar:/Users/dipanjan/.m2/repository/org/wso2/orbit/com/lmax/disruptor/3.4.2.wso2v1/disruptor-3.4.2.wso2v1.jar:/Users/dipanjan/.m2/repository/com/google/guava/guava/23.0/guava-23.0.jar:/Users/dipanjan/.m2/repository/com/google/errorprone/error_prone_annotations/2.0.18/error_prone_annotations-2.0.18.jar:/Users/dipanjan/.m2/repository/com/google/j2objc/j2objc-annotations/1.1/j2objc-annotations-1.1.jar:/Users/dipanjan/.m2/repository/org/codehaus/mojo/animal-sniffer-annotations/1.14/animal-sniffer-annotations-1.14.jar:/Users/dipanjan/.m2/repository/org/quartz-scheduler/wso2/quartz/2.1.1.wso2v1/quartz-2.1.1.wso2v1.jar:/Users/dipanjan/.m2/repository/org/quartz-scheduler/quartz/2.1.1/quartz-2.1.1.jar:/Users/dipanjan/.m2/repository/c3p0/c3p0/0.9.1.1/c3p0-0.9.1.1.jar:/Users/dipanjan/.m2/repository/io/dropwizard/metrics/metrics-core/3.1.0/metrics-core-3.1.0.jar:/Users/dipanjan/.m2/repository/com/google/code/gson/gson/2.8.5/gson-2.8.5.jar:/Users/dipanjan/.m2/repository/com/jayway/jsonpath/json-path/2.4.0/json-path-2.4.0.jar:/Users/dipanjan/.m2/repository/net/minidev/json-smart/2.3/json-smart-2.3.jar:/Users/dipanjan/.m2/repository/net/minidev/accessors-smart/1.2/accessors-smart-1.2.jar:/Users/dipanjan/.m2/repository/org/ow2/asm/asm/5.0.4/asm-5.0.4.jar:/Users/dipanjan/.m2/repository/org/eclipse/osgi/org.eclipse.osgi.services/3.3.100.v20130513-1956/org.eclipse.osgi.services-3.3.100.v20130513-1956.jar:/Users/dipanjan/.m2/repository/org/osgi/org.osgi.core/6.0.0/org.osgi.core-6.0.0.jar:/Users/dipanjan/.m2/repository/io/siddhi/siddhi-query-api/5.1.2/siddhi-query-api-5.1.2.jar:/Users/dipanjan/.m2/repository/org/wso2/siddhi/siddhi-query-api/3.0.2/siddhi-query-api-3.0.2.jar:/Users/dipanjan/.m2/repository/org/apache/log4j/wso2/log4j/1.2.17.wso2v1/log4j-1.2.17.wso2v1.jar:/Users/dipanjan/.m2/repository/org/wso2/siddhi/siddhi-core/3.0.2/siddhi-core-3.0.2.jar:/Users/dipanjan/.m2/repository/org/wso2/siddhi/siddhi-query-compiler/3.0.2/siddhi-query-compiler-3.0.2.jar flinksidhi.SidhiApp
Exception in thread "main" java.lang.NoClassDefFoundError: org/apache/flink/streaming/api/transformations/StreamTransformation
    at org.apache.flink.streaming.siddhi.SiddhiCEP.from(SiddhiCEP.java:139)
    at org.apache.flink.streaming.siddhi.SiddhiCEP.define(SiddhiCEP.java:124)
    at flinksidhi.SidhiApp.main(SidhiApp.java:16)
Caused by: java.lang.ClassNotFoundException: org.apache.flink.streaming.api.transformations.StreamTransformation
    at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
    at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:355)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
    ... 3 more

Stuck in this loop , please help not Sure how to fix it....