jpmml / jpmml-evaluator-spark

PMML evaluator library for the Apache Spark cluster computing system (http://spark.apache.org/)
GNU Affero General Public License v3.0
94 stars 43 forks source link

Invalid lambda deserialization at org.shaded.jpmml.evaluator.OutputFilters.$deserializeLambda$ #26

Closed Ibrahim2008 closed 5 years ago

Ibrahim2008 commented 5 years ago

Hi There,

I have been using this library in my project. I am getting following error when i run K-means clustering algorithm(or any other clustering algorithms) on hadoop data lake.

It works fine on the standalone machine, but only fails on data lake when run in yarn cluster mode. Interesting point to mention that logistic regression, xgBoost ,decision Tree classification algorithms works fine on both standalone and yarn-cluster.

I am pasting error stack trace down below, snippet of program and pom-xml file content. Note that it throws java.lang.IllegalArgumentException: Invalid lambda deserialization at org.shaded.jpmml.evaluator.OutputFilters.$deserializeLambda$OutputFilters.java

Error:

19/01/30 13:06:58 ERROR executor.Executor: Exception in task 0.2 in stage 1.0 (TID 3)
java.io.IOException: unexpected exception type
    at java.io.ObjectStreamClass.throwMiscException(ObjectStreamClass.java:1682)
    at java.io.ObjectStreamClass.invokeReadResolve(ObjectStreamClass.java:1254)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2076)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1571)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2285)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2209)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2067)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1571)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2285)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2209)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2067)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1571)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2285)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2209)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2067)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1571)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2285)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2209)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2067)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1571)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2285)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2209)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2067)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1571)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2285)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2209)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2067)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1571)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2285)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2209)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2067)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1571)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2285)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2209)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2067)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1571)
    at java.io.ObjectInputStream.readArray(ObjectInputStream.java:1973)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1565)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2285)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2209)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2067)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1571)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2285)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2209)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2067)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1571)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2285)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2209)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2067)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1571)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2285)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2209)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2067)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1571)
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:431)
    at scala.collection.immutable.List$SerializationProxy.readObject(List.scala:479)
    at sun.reflect.GeneratedMethodAccessor4.invoke(Unknown Source)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1158)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2176)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2067)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1571)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2285)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2209)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2067)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1571)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2285)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2209)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2067)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1571)
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:431)
    at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:75)
    at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:114)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:80)
    at org.apache.spark.scheduler.Task.run(Task.scala:108)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:338)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.reflect.InvocationTargetException
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at java.lang.invoke.SerializedLambda.readResolve(SerializedLambda.java:230)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at java.io.ObjectStreamClass.invokeReadResolve(ObjectStreamClass.java:1248)
    ... 78 more
Caused by: java.lang.IllegalArgumentException: Invalid lambda deserialization
    at org.shaded.jpmml.evaluator.OutputFilters.$deserializeLambda$(OutputFilters.java:21)
    ... 88 more

My Program is:

public class AppNew {
    public static void main(String[] args) throws IOException, JAXBException, org.xml.sax.SAXException {
        // TODO Auto-generated method stu

        String fileName = args[0];
        String dataFileName = args[1];
        String writeLocation =args[2];

        SparkSession spark = SparkSession.builder().appName("jpmml").config("spark.master",args[3]).getOrCreate();

        Configuration conf = spark.sparkContext().hadoopConfiguration();
        FileSystem fs =  FileSystem.get(conf);

        EvaluatorBuilder evaluatorBuilder = new LoadingModelEvaluatorBuilder()
                .setLocatable(false)
                .setVisitors(new DefaultVisitorBattery())
                .load(fs.open(new Path(fileName)).getWrappedStream());
        Evaluator evaluator = evaluatorBuilder.build();
        evaluator.verify();

        TransformerBuilder pmmlTransformerBuilder = new TransformerBuilder(evaluator)
                .withTargetCols()
                .withOutputCols()
                .exploded(true);

        Transformer pmmlTransformer = pmmlTransformerBuilder.build();

        Random r= new Random();
        Dataset<Row> df = spark .read().option("header", "true").csv(dataFileName).toDF();
        Dataset<Row> tdf = pmmlTransformer.transform(df);

        tdf.printSchema();
        tdf.write().option("header","true").csv(String.format("%s_%s", writeLocation, r.nextLong()));
        spark.stop();
    }

My Pom file looks like this:

<dependencies>
    <dependency>
        <groupId>org.jpmml</groupId>
        <artifactId>jpmml-evaluator-spark</artifactId>
        <version>1.2.2</version>
    </dependency>
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-core_2.11</artifactId>
        <version>2.2.0</version>
    </dependency>
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-sql_2.11</artifactId>
        <version>2.2.0</version>
    </dependency>
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-mllib_2.11</artifactId>
        <version>2.2.0</version>
        <exclusions>
            <exclusion>
                <groupId>org.jpmml</groupId>
                <artifactId>pmml-model</artifactId>
            </exclusion>
        </exclusions>
    </dependency>
</dependencies>
    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>

                <executions>
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                        <configuration>
                            <filters>
                                <filter>
                                    <artifact>*:*</artifact>
                                    <excludes>
                                        <exclude>META-INF/*.SF</exclude>
                                        <exclude>META-INF/*.DSA</exclude>
                                        <exclude>META-INF/*.RSA</exclude>
                                    </excludes>
                                </filter>
                            </filters>
                            <transformers>
                                <transformer
                                        implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
                                    <resource>META-INF/spring.handlers</resource>
                                </transformer>
                                <transformer
                                        implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                                    <mainClass>com.fxc.rpc.impl.member.MemberProvider</mainClass>
                                </transformer>
                                <transformer
                                        implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
                                    <resource>META-INF/spring.schemas</resource>
                                </transformer>
                            </transformers>
                            <relocations>
                                <relocation>
                                    <pattern>org.dmg.pmml</pattern>
                                    <shadedPattern>org.shaded.dmg.pmml</shadedPattern>
                                </relocation>
                                <relocation>
                                    <pattern>org.jpmml</pattern>
                                    <shadedPattern>org.shaded.jpmml</shadedPattern>
                                </relocation>
                                <relocation>
                                    <pattern>com.google.guava</pattern>
                                    <shadedPattern>com.shaded.google.guava</shadedPattern>
                                </relocation>
                                <relocation>
                                    <pattern>com.google.common</pattern>
                                    <shadedPattern>com.shaded.google.common</shadedPattern>
                                </relocation>
                            </relocations>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <configuration>
                    <compilerArgument>-XDignore.symbol.file</compilerArgument>
                </configuration>
            </plugin>
        </plugins>
    </build>

Appreciate your help in resolving this issue.

Thanks in advance.

Regards, Ibrahim.

vruusmann commented 5 years ago

Note that it throws java.lang.IllegalArgumentException: Invalid lambda deserialization

This is the code location: https://github.com/jpmml/jpmml-evaluator/blob/master/pmml-evaluator/src/main/java/org/jpmml/evaluator/OutputFilters.java#L21-L38

Maybe this whole OutputFilters interface should be marked as java.io.Serializable too?

Anyway, it's extremely strange, that this deserialization error happens with some model types (eg ClusteringModel) but not with others (GeneralRegressionModel, MiningModel elements). Now that doesn't make sense, and suggests that perhaps there's some deeper configuration issue with your runtime system - maybe you're using different Java versions in different machines, and they do lambda serialization differently?

Ibrahim2008 commented 5 years ago

@vruusmann appreciate your swift response. We will investigate and will post findings.

Ibrahim2008 commented 5 years ago

@vruusmann Thanks for fixing this issue, appreciate your help. Is it possible to release these fixes soon? We are very keen to test and deploy these packages to our production environment. Thanks in advance.

vruusmann commented 5 years ago

@Ibrahim2008 The current version uses these lambda-based output filters by default; if they break your application, then simply replace them with custom manually-created ones:

EvaluatorBuilder evaluatorBuilder = new ModelEvaluatorBuilder(pmml)
    setOutputFilter(new OutputFilter(){
        @Override
        public boolean test(org.dmg.pmml.OutputField outputField){
            // Do whatever is appropriate
        }
    });
Evaluator evaluator = evaluatorBuilder.build();