OryxProject / oryx

Oryx 2: Lambda architecture on Apache Spark, Apache Kafka for real-time large scale machine learning
http://oryx.io
Apache License 2.0
1.79k stars 405 forks source link

Kafka 0.11.x support #337

Closed cimox closed 6 years ago

cimox commented 7 years ago

Currently Oryx is not supporting Kafka message format 0.11.x

Details: https://community.cloudera.com/t5/Data-Science-and-Machine/NoClassDefFoundError-in-Oryx-batch-and-speed-layers/m-p/59235#M765

Any suggestions how this can be fixed/added support @srowen, I will be happy to help with this.

srowen commented 7 years ago

Your issue does not suggest something specific to 0.11. This is how other Kafka version incompatibilities look at runtime. Do you know 0.11 isn't compatible with the 0.10.2 client? I hope not because that would mean yet another branch just to support mutually incompatible Kafka versions. I thought they'd gotten better after the 0.10.0 - 0.10.2 error.

cimox commented 7 years ago

I'm not sure whether 0.11 isn't compatible with 0.10.2 client. I can try it once master build is finished (I see it's failing in CI).

srowen commented 7 years ago

You can ignore the build failures -- that's me fixing it to work with recent travis changes. It actually works fine.

cimox commented 7 years ago

I've tested 2.5.0-SNAPSHOT. Not working with 0.11, I was trying to build it with: <kafka.version>0.11.0-kafka-3.0.0</kafka.version> and <kafka.version>0.11.0.0</kafka.version>, but build is failing...

srowen commented 7 years ago

I saw that too when I tried it, but it looked like it was just because some methods are deprecated now and the build sets -Werror to make them fail the build. You could just set it to not fail on warning.

cimox commented 7 years ago

Still getting the same exception :( My pom.xml looks like this:

...
      <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka_${scala.minor.version}</artifactId>
        <version>0.11.0.0</version>
        <scope>provided</scope>
        <exclusions>
          <!-- later Kafka versions bring this in -->
          <exclusion>
            <groupId>javax.servlet</groupId>
            <artifactId>servlet-api</artifactId>
          </exclusion>
        </exclusions>
      </dependency>
      <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-clients</artifactId>
        <version>0.11.0.0-cp1</version>
      </dependency>
...

Note: -cp1 is Confluent platform kafka-client version. Build is successful with disabled -Werror, but still getting java.lang.NoClassDefFoundError: kafka/admin/RackAwareMode exception.

cimox commented 7 years ago

Hmmm. Looking into code, this part of code is not even executed: https://github.com/OryxProject/oryx/blob/master/framework/kafka-util/src/main/java/com/cloudera/oryx/kafka/util/KafkaUtils.java#L89

Strange. It seems like there is a problem with kafka.admin.AdminUtils

srowen commented 7 years ago

You shouldn't have a class version incompatibility if you really are building and running the exact same Kafka version. I think that indicates you still have some mismatch somewhere.

However I did manage to run the tests with 0.11 and it doesn't seem to work as-is. Something else in the behavior has changed.

The 2.5 branch is branched now so master could move to 0.11. I don't have the cycles to work on getting it working with 0.11 (not sure Spark will work with it either) but if you do get it working, feel free to propose a patch.

cimox commented 7 years ago

It seems like kafka-clients and it's relevant CP versions are not compatible. There is no AdminUtils class though. Even if I get it running by workaround to don't check topic existence, it fails on Spark Streaming Kafka integration which supports only 0.10 Kafka. Hmm, I am not sure what should I do now.

cimox commented 7 years ago

I've opened discussion in Spark dev mailing list, hopefully it will get accepted.

srowen commented 7 years ago

My guess is that in fact you are getting Kafka dependencies from Spark dependencies, and yes it doesn't support 0.11 formally. I think that could be a blocker, because most workarounds I can think of would inevitably run into a problem of that form.

I haven't followed Kafka 0.11 / 1.0 (though I work on Spark) so I'm not sure, does anyone have Spark Streaming working with Kafka 0.11? if so then there's hope.

cimox commented 7 years ago

That's possible. I don't have much experience with Maven, so far I've been using Gradle mostly. But kafka-clients:0.11.0.0-cp1 gets downloaded when building kafka-utils jar.

What would you propose as a solution for this? We would love to start using Oryx asap, but this is blocker right now. Could be downgrade, or running two parallel versions of Kafka Cluster a way?

srowen commented 7 years ago

I think the first question is, does Spark 2.2 even work with Kafka 0.11? if not, then I don't know if there's a workaround here. You can run it on a separate cluster, sure, with Kafka 0.10.2. You need storage like HDFS too anyway, and it's intended to run on CDH, so a small CDH installation would provide all the pieces too.

cimox commented 7 years ago

That's what we got. We have CDH installation on 5 servers, the only one difference was that we manually installed Confluent platform as a Kafka cluster. I will try to install Kafka 0.10 via parcels and get this working until Spark supports 0.11 or 1.0

Thanks for help Sean.

cimox commented 7 years ago

I've been asking in Kafka community about 0.10 and 0.11 Spark support. They say Spark should work with 0.11 without any problems. This is response of Aymeric Bouvet:

0.11 introduced some new protocols (say a v3), which cant be understand by old clients (which only know v2 and v1). When clients send request (fetch, produce, or any other request), it send it with the protocol it know (so v2 in your case) Broker will in this case answer with v2 protocol also, so the client can understand it. So in v1, where there were no timestamp, kafka would only serve data without timestamp to an appliation which request v1, and would serve timestamp also for client which request v2. If the client is updated to v3, it will add headers for example. If a client know v3 but broker is old (0.10.0 and know only v2), if client send v3 it will be rejected. But clients can also know what version brokers support (this is known via an apiversionrequest), so client may first check what broker support, and use the best protocol in this case. This is automatically done by java clients (which are used by kafka spark)

srowen commented 7 years ago

Yeah the wire protocol may be compatible, but not necessarily the client libs at compile time. For Spark, I honestly don't know, haven't tried it yet (I'm actually working on starting to dislodge 0.8 support in Spark).

The particular problem here is that Spark Streaming is binding against Kafka libraries and it seems like it doesn't like the 0.11 libraries. It's not clear.

For here, for now, I'd definitely use the CDH libraries as a whole as it's definitely supposed to work with those. I'm releasing 2.5.0 which is intended to be compatible with all the latest CDH components (which is what's on my cluster). You can see the 2.5.x branch now and build it, even.

cimox commented 7 years ago

I'll wait for released 2.5.0 and use that. Going to downgrade Kafka cluster to 0.10

Hopefully everything will work fine.

srowen commented 7 years ago

OK I cut the release: https://github.com/OryxProject/oryx/releases/tag/oryx-2.5.0

cimox commented 7 years ago

This is really strange. I have tried 2.5.0 and it works with Kafka 0.11 - Batch layer creates model successfully, speed layer is working also and sending updates.

The problem is only with serving layer. This is full stacktrace when starting serving layer:

oryx {
    als {
        decay {
            factor=1
            zero-threshold=0
        }
        hyperparams {
            alpha=1
            epsilon=1.0E-5
            features=10
            lambda=0.001
        }
        implicit=true
        iterations=10
        logStrength=false
        no-known-items=false
        rescorer-provider-class=null
        sample-rate=1
    }
    batch {
        storage {
            data-dir="hdfs:///user/mcimerman/als-example/data/"
            key-writable-class="org.apache.hadoop.io.Text"
            max-age-data-hours=-1
            max-age-model-hours=-1
            message-writable-class="org.apache.hadoop.io.Text"
            model-dir="hdfs:///user/mcimerman/als-example/model/"
        }
        streaming {
            config {
                spark {
                    io {
                        compression {
                            codec=lzf
                        }
                    }
                    logConf=true
                    serializer="org.apache.spark.serializer.KryoSerializer"
                    speculation=true
                    ui {
                        showConsoleProgress=false
                    }
                }
            }
            deploy-mode=client
            driver-memory="1g"
            dynamic-allocation=false
            executor-cores=2
            executor-memory="4g"
            generation-interval-sec=300
            master=yarn
            num-executors=1
        }
        ui {
            port=4040
        }
        update-class="com.cloudera.oryx.app.batch.mllib.als.ALSUpdate"
    }
    default-streaming-config {
        spark {
            io {
                compression {
                    codec=lzf
                }
            }
            logConf=true
            serializer="org.apache.spark.serializer.KryoSerializer"
            speculation=true
            ui {
                showConsoleProgress=false
            }
        }
    }
    id=ALSExample
    input-schema {
        categorical-features=null
        feature-names=[]
        id-features=[]
        ignored-features=[]
        num-features=0
        numeric-features=null
        target-feature=null
    }
    input-topic {
        broker="hadoop-1.maas:9092,hadoop-2.maas:9092,hadoop-3.maas:9092,hadoop-4.maas:9092,hadoop-5.maas:9092"
        lock {
            master="hadoop-2.maas:2181,hadoop-3.maas:2181,hadoop-4.maas:2181"
        }
        message {
            key-class="java.lang.String"
            key-decoder-class="org.apache.kafka.common.serialization.StringDeserializer"
            message-class="java.lang.String"
            message-decoder-class="org.apache.kafka.common.serialization.StringDeserializer"
            topic=OryxInput
        }
    }
    kmeans {
        evaluation-strategy=SILHOUETTE
        hyperparams {
            k=10
        }
        initialization-strategy="k-means||"
        iterations=30
        runs=3
    }
    ml {
        eval {
            candidates=1
            hyperparam-search=random
            parallelism=1
            test-fraction=0.1
            threshold=null
        }
    }
    rdf {
        hyperparams {
            impurity=entropy
            max-depth=8
            max-split-candidates=100
            min-info-gain-nats=0.001
            min-node-size=16
        }
        num-trees=20
    }
    serving {
        api {
            context-path="/"
            key-alias=null
            keystore-file=null
            keystore-password=*****
            password=*****
            port=8080
            read-only=false
            secure-port=443
            user-name=null
        }
        application-resources="com.cloudera.oryx.app.serving,com.cloudera.oryx.app.serving.als"
        memory="4000m"
        min-model-load-fraction=0.8
        model-manager-class="com.cloudera.oryx.app.serving.als.model.ALSServingModelManager"
        no-init-topics=false
        yarn {
            cores="4"
            instances=1
        }
    }
    speed {
        min-model-load-fraction=0.8
        model-manager-class="com.cloudera.oryx.app.speed.als.ALSSpeedModelManager"
        streaming {
            config {
                spark {
                    io {
                        compression {
                            codec=lzf
                        }
                    }
                    logConf=true
                    serializer="org.apache.spark.serializer.KryoSerializer"
                    speculation=true
                    ui {
                        showConsoleProgress=false
                    }
                }
            }
            deploy-mode=client
            driver-memory="512m"
            dynamic-allocation=false
            executor-cores=4
            executor-memory="1g"
            generation-interval-sec=10
            master=yarn
            num-executors=2
        }
        ui {
            port=4041
        }
    }
    update-topic {
        broker="hadoop-1.maas:9092,hadoop-2.maas:9092,hadoop-3.maas:9092,hadoop-4.maas:9092,hadoop-5.maas:9092"
        lock {
            master="hadoop-2.maas:2181,hadoop-3.maas:2181,hadoop-4.maas:2181"
        }
        message {
            decoder-class="org.apache.kafka.common.serialization.StringDeserializer"
            encoder-class="org.apache.kafka.common.serialization.StringDeserializer"
            max-size=16777216
            topic=OryxUpdate
        }
    }
}

2017-09-12 11:36:14,451 INFO  ServingLayer:123 Starting Serving Layer ALSExample
Sep 12, 2017 11:36:15 AM org.apache.coyote.http11.AbstractHttp11Protocol configureUpgradeProtocol
INFO: The ["http-nio2-8080"] connector has been configured to support HTTP upgrade to [h2c]
Sep 12, 2017 11:36:15 AM org.apache.coyote.AbstractProtocol init
INFO: Initializing ProtocolHandler ["http-nio2-8080"]
Sep 12, 2017 11:36:15 AM org.apache.catalina.core.StandardService startInternal
INFO: Starting service [Tomcat]
Sep 12, 2017 11:36:15 AM org.apache.catalina.core.StandardEngine startInternal
INFO: Starting Servlet Engine: Apache Tomcat/8.5.20
2017-09-12 11:36:15,407 INFO  ModelManagerListener:103 ModelManagerListener initializing
Sep 12, 2017 11:36:15 AM org.apache.catalina.core.StandardContext listenerStart
SEVERE: Exception sending context initialized event to listener instance of class [com.cloudera.oryx.lambda.serving.ModelManagerListener]
java.lang.NoClassDefFoundError: kafka/admin/RackAwareMode
    at com.cloudera.oryx.lambda.serving.ModelManagerListener.contextInitialized(ModelManagerListener.java:108)
    at org.apache.catalina.core.StandardContext.listenerStart(StandardContext.java:4745)
    at org.apache.catalina.core.StandardContext.startInternal(StandardContext.java:5207)
    at org.apache.catalina.util.LifecycleBase.start(LifecycleBase.java:150)
    at org.apache.catalina.core.ContainerBase$StartChild.call(ContainerBase.java:1419)
    at org.apache.catalina.core.ContainerBase$StartChild.call(ContainerBase.java:1409)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.ClassNotFoundException: kafka.admin.RackAwareMode
    at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
    at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:335)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
    ... 10 more

Sep 12, 2017 11:36:15 AM org.apache.catalina.core.StandardContext startInternal
SEVERE: One or more listeners failed to start. Full details will be found in the appropriate container log file
Sep 12, 2017 11:36:15 AM org.apache.catalina.core.StandardContext startInternal
SEVERE: Context [Oryx] startup failed due to previous errors
2017-09-12 11:36:15,447 INFO  ModelManagerListener:153 ModelManagerListener destroying
2017-09-12 11:36:15,447 INFO  ModelManagerListener:173 ModelManagerListener closing
Sep 12, 2017 11:36:16 AM org.apache.coyote.AbstractProtocol start
INFO: Starting ProtocolHandler ["http-nio2-8080"]
srowen commented 7 years ago

The thing about the serving layer is that it needs the classpath set up differently, manually, via compute-classpath.sh, because it's not a Spark app. Check that the classpath makes sense for your env? it should if it's CDH.

cimox commented 7 years ago

This is result of compute-classpath.sh:

 mcimerman@hadoop-1  /opt/oryx  ./compute-classpath.sh 
/opt/cloudera/parcels/SPARK2/lib/spark2/jars/scala-library-2.11.8.jar
/opt/cloudera/parcels/SPARK2/lib/spark2/jars/scala-parser-combinators_2.11-1.0.4.jar
/opt/cloudera/parcels/CDH/jars/zookeeper-3.4.5-cdh5.12.0.jar
/opt/cloudera/parcels/CDH/jars/hadoop-auth-2.6.0-cdh5.12.0.jar
/opt/cloudera/parcels/CDH/jars/hadoop-common-2.6.0-cdh5.12.0.jar
/opt/cloudera/parcels/CDH/jars/hadoop-hdfs-2.6.0-cdh5.12.0.jar
/opt/cloudera/parcels/CDH/jars/commons-cli-1.2.jar
/opt/cloudera/parcels/CDH/jars/commons-collections-3.2.2.jar
/opt/cloudera/parcels/CDH/jars/commons-configuration-1.7.jar
/opt/cloudera/parcels/CDH/jars/protobuf-java-2.5.0.jar
/opt/cloudera/parcels/CDH/jars/snappy-java-1.0.4.1.jar
/opt/cloudera/parcels/CDH/jars/hadoop-yarn-applications-distributedshell-2.6.0-cdh5.12.0.jar

Seems like it make sense.

However, maybe this could be a problem: /opt/cloudera/parcels/CDH/jars/zookeeper-3.4.5-cdh5.12.0.jar sine we've got Confluent Platform installed as a Kafka distribution, but still using Zookeeper installed from CDH Parcels though.

srowen commented 7 years ago

Hm, for what it's worth, it all works fine for me with the runner script. I'm using 2.5.0 and CDH 5.12.0. Just tried it now.

cimox commented 7 years ago

Okay. Going to try with your build (I've been trying with mine 2.5.0 build before you've cut release).

cimox commented 7 years ago

Still getting the same error 😞. No idea why...could it be linked to global Kafka binaries?

cimox commented 7 years ago

So it seems like everything is working fine with CDH Kafka 0.10.2. Apparently I will have to keep separate CDH Kafka cluster for Oryx.

cimox commented 7 years ago

I managed to get it working with 0.10.2.1-cp2 Kafka packages resolved from Confluent Maven repository. It's working fine with Confluent platform 3.3.0.1 which uses Kafka 0.11.0.0

I've opened PR: https://github.com/OryxProject/oryx/pull/338

srowen commented 6 years ago

@cimox I updated master to use Kafka 0.11 directly. Works OK for me after some light testing so far.

https://github.com/OryxProject/oryx/commit/80e16366b85e1cfa1634a53cba6945b7531d549b