combust / mleap

MLeap: Deploy ML Pipelines to Production
https://combust.github.io/mleap-docs/
Apache License 2.0
1.5k stars 310 forks source link

SQLTransformer #126

Open Mageswaran1989 opened 7 years ago

Mageswaran1989 commented 7 years ago

We have a Spark ML pipeline with SQLTransformer along with UDF.

I checked at https://github.com/combust/mleap/tree/master/mleap-spark/src/main/scala/org/apache/spark/ml/bundle/ops/feature and didn't find any relevant operation.

Could you please give direction on how to include it if feasible?

Error Log:

java.util.NoSuchElementException: key not found: org.apache.spark.ml.feature.SQLTransformer at scala.collection.MapLike$class.default(MapLike.scala:228) at scala.collection.AbstractMap.default(Map.scala:59) at scala.collection.MapLike$class.apply(MapLike.scala:141) at scala.collection.AbstractMap.apply(Map.scala:59) at ml.combust.bundle.BundleRegistry.opForObj(BundleRegistry.scala:100) at ml.combust.bundle.serializer.GraphSerializer$$anonfun$writeNode$1.apply(GraphSerializer.scala:31) at ml.combust.bundle.serializer.GraphSerializer$$anonfun$writeNode$1.apply(GraphSerializer.scala:30) at scala.util.Try$.apply(Try.scala:192) at ml.combust.bundle.serializer.GraphSerializer.writeNode(GraphSerializer.scala:30) at ml.combust.bundle.serializer.GraphSerializer$$anonfun$write$2.apply(GraphSerializer.scala:21) at ml.combust.bundle.serializer.GraphSerializer$$anonfun$write$2.apply(GraphSerializer.scala:21) at scala.collection.IndexedSeqOptimized$class.foldl(IndexedSeqOptimized.scala:57) at scala.collection.IndexedSeqOptimized$class.foldLeft(IndexedSeqOptimized.scala:66) at scala.collection.mutable.WrappedArray.foldLeft(WrappedArray.scala:35) at ml.combust.bundle.serializer.GraphSerializer.write(GraphSerializer.scala:20) at org.apache.spark.ml.bundle.ops.PipelineOp$$anon$1.store(PipelineOp.scala:21) at org.apache.spark.ml.bundle.ops.PipelineOp$$anon$1.store(PipelineOp.scala:14) at ml.combust.bundle.serializer.ModelSerializer$$anonfun$write$1.apply(ModelSerializer.scala:92) at ml.combust.bundle.serializer.ModelSerializer$$anonfun$write$1.apply(ModelSerializer.scala:88) at scala.util.Try$.apply(Try.scala:192) at ml.combust.bundle.serializer.ModelSerializer.write(ModelSerializer.scala:88) at ml.combust.bundle.serializer.NodeSerializer$$anonfun$write$1.apply(NodeSerializer.scala:88) at ml.combust.bundle.serializer.NodeSerializer$$anonfun$write$1.apply(NodeSerializer.scala:84) at scala.util.Try$.apply(Try.scala:192) at ml.combust.bundle.serializer.NodeSerializer.write(NodeSerializer.scala:84) at ml.combust.bundle.serializer.GraphSerializer$$anonfun$writeNode$1.apply(GraphSerializer.scala:34) at ml.combust.bundle.serializer.GraphSerializer$$anonfun$writeNode$1.apply(GraphSerializer.scala:30) at scala.util.Try$.apply(Try.scala:192) at ml.combust.bundle.serializer.GraphSerializer.writeNode(GraphSerializer.scala:30) at ml.combust.bundle.serializer.GraphSerializer$$anonfun$write$2.apply(GraphSerializer.scala:21) at ml.combust.bundle.serializer.GraphSerializer$$anonfun$write$2.apply(GraphSerializer.scala:21) at scala.collection.IndexedSeqOptimized$class.foldl(IndexedSeqOptimized.scala:57) at scala.collection.IndexedSeqOptimized$class.foldLeft(IndexedSeqOptimized.scala:66) at scala.collection.mutable.WrappedArray.foldLeft(WrappedArray.scala:35) at ml.combust.bundle.serializer.GraphSerializer.write(GraphSerializer.scala:20) at org.apache.spark.ml.bundle.ops.PipelineOp$$anon$1.store(PipelineOp.scala:21) at org.apache.spark.ml.bundle.ops.PipelineOp$$anon$1.store(PipelineOp.scala:14) at ml.combust.bundle.serializer.ModelSerializer$$anonfun$write$1.apply(ModelSerializer.scala:92) at ml.combust.bundle.serializer.ModelSerializer$$anonfun$write$1.apply(ModelSerializer.scala:88) at scala.util.Try$.apply(Try.scala:192) at ml.combust.bundle.serializer.ModelSerializer.write(ModelSerializer.scala:88) at ml.combust.bundle.serializer.NodeSerializer$$anonfun$write$1.apply(NodeSerializer.scala:88) at ml.combust.bundle.serializer.NodeSerializer$$anonfun$write$1.apply(NodeSerializer.scala:84) at scala.util.Try$.apply(Try.scala:192) at ml.combust.bundle.serializer.NodeSerializer.write(NodeSerializer.scala:84) at ml.combust.bundle.serializer.BundleSerializer$$anonfun$write$1.apply(BundleSerializer.scala:34) at ml.combust.bundle.serializer.BundleSerializer$$anonfun$write$1.apply(BundleSerializer.scala:29) at scala.util.Try$.apply(Try.scala:192) at ml.combust.bundle.serializer.BundleSerializer.write(BundleSerializer.scala:29) at ml.combust.bundle.BundleWriter.save(BundleWriter.scala:27) at $anonfun$2.apply(:105) at $anonfun$2.apply(:104) at resource.AbstractManagedResource$$anonfun$5.apply(AbstractManagedResource.scala:88) at scala.util.control.Exception$Catch$$anonfun$either$1.apply(Exception.scala:125) at scala.util.control.Exception$Catch$$anonfun$either$1.apply(Exception.scala:125) at scala.util.control.Exception$Catch.apply(Exception.scala:103) at scala.util.control.Exception$Catch.either(Exception.scala:125) at resource.AbstractManagedResource.acquireFor(AbstractManagedResource.scala:88) at resource.ManagedResourceOperations$class.apply(ManagedResourceOperations.scala:26) at resource.AbstractManagedResource.apply(AbstractManagedResource.scala:50) at resource.ManagedResourceOperations$class.acquireAndGet(ManagedResourceOperations.scala:25) at resource.AbstractManagedResource.acquireAndGet(AbstractManagedResource.scala:50) at resource.ManagedResourceOperations$class.foreach(ManagedResourceOperations.scala:53) at resource.AbstractManagedResource.foreach(AbstractManagedResource.scala:50) ... 62 elided

Mageswaran1989 commented 7 years ago

will following http://mleap-docs.combust.ml/mleap-runtime/custom-transformer.html help? i.e creating a custom transformer instead of UDF?

seme0021 commented 7 years ago

@Mageswaran1989 MLeap doesn't support the SQLTransformer for a couple of reasons:

What logic do you have in your SQLTransformer? People have asked this question before and we recommend:

Mageswaran1989 commented 7 years ago

@seme0021 custom transformer documentation along with TokenizerOp helped me to solve my simple filtering operation on words based on length.

When I try loading it on LeapFrame:

  1. I was not able to read a normal json file with FrameReader(). My guess is that FrameReader() needs the schema information encoded in the json file of interrest, correct if I am wrong.
  2. Above one was by passed with spray json parsing and loading it on LeapFrame() with custom MLeap Schema and LocalDataset()
  3. When loaded Mleap Pipeline works imitating the Spark pipeline, however features vector (nullable = true) in Spark is converted to TensorType(DoubleType(false),false)) in MLeap
hollinwilkins commented 7 years ago

@Mageswaran1989

Hey there,

I'll respond to your questions here:

  1. You are right, the JSON-encoded LeapFrame needs to have the schema information serialized as well. Please see an example in the documentation here: http://mleap-docs.combust.ml/core-concepts/data-frames/ under the section "Example Leap Frame"

  2. Glad to hear you got it working :)

  3. MLeap does not use Spark linalg.DenseVector and SparseVector for storing Tensors. We have our own Tensor format that we will use so we can support arbitrary n-dimensional data. This is mostly so we can easily integrate with deep learning libraries like Tensorflow and Torch. There is a very easy way to convert an MLeap tensor to a Dense or Sparse Spark vector if you wish though. Please see the code here: https://github.com/combust/mleap/blob/master/mleap-runtime/src/main/scala/ml/combust/mleap/runtime/converter/VectorConverters.scala

Importing this class will give you implicit conversions between MLeap Tensors and Spark Vectors.

import ml.combust.mleap.runtime.converter.VectorConverters._
import ml.combust.mleap.tensor.Tensor
import org.apache.spark.ml.linalg.Vector

val mleapTensor = Tensor.denseVector(Array[Double](23.4, 55.6, 34.4))
val sparkVector: Vector = mleapTensor

As far as debugging the LeapFrame goes, we have an issue open for making a nice .show function as you suggest, follow it here: https://github.com/combust/mleap/issues/121

Cheers, Hollin

Mageswaran1989 commented 7 years ago

@hollinwilkins Thank you!

Seems like I have to bug you for some more time ;)

How to store/bundle Spark org.apache.spark.ml.linalg.Matrix with ml.combust.bundle.dsl.Value?

hollinwilkins commented 7 years ago

@Mageswaran1989 You can use the Value.tensor method to store an n-dimensional array such as a matrix.

hollinwilkins commented 7 years ago

Here is an example from our code to store multinomial Logistic Regressions: https://github.com/combust/mleap/blob/master/mleap-spark/src/main/scala/org/apache/spark/ml/bundle/ops/classification/LogisticRegressionOpV21.scala#L29

kidtronnix commented 7 years ago

@hollinwilkins I am trying to create some custom transformers too. I looked at the docs and from the looks of thing I need to make those changes to extend the library and then repackage so I upload it on something like Databricks.

hollinwilkins commented 7 years ago

@smaxwellstewart As per custom transformers, you will be creating them in a separate library and packaging them in some way that Databricks can have access to them. And assembly jar would accomplish this quite nicely.

Sorry our docs are a bit incomplete here, but going through this process with you will help us to create them so that people in the future will have it easier. We would be happy to set up a time to chat and walk through the process as well if you like. Email at hollin at combust.ml and mikhail at combust.ml

Best

singh-dhirendra commented 7 years ago

@hollinwilkins I understand the issues while SQLTransformer when doing group by operations, however, SQLTransformer comes with lots of handy SQL functions like (dateDiff(), log(), if(),)which helps in transforming the input data, and most of these functions operate on a single row so won't necessarily have issues which a SQL transformer which operates on multiple rows, are there any recommendations how can we achieve the same result as the single row based SQL methods?

hollinwilkins commented 7 years ago

@singh-dhirendra For binary/unary math operations you can use mleap-extension, which provides UnaryMath and BinaryMath transformers. Eventually we may support SQLTransformer, but in order to truly support it would be a bit of effort. We would need to switch from row-based transformation to frame-based transformation, which has a few other benefits as well. Then we would need to decide on a SQL engine that we can use and wire everything up, which will also be quite a task. If we do move in that direction, I think splitting off an optional module for the SQLTransformer which includes the SQL engine would be desirable.

Short answer to your question:

  1. Use custom transformers for the operations you need and contribute them back to MLeap and mleap-spark-extension
  2. Long-term, we may be switching from row-based transformation to frame-based, in which case SQLTransformer becomes more of a possibility
hollinwilkins commented 7 years ago

Also, if you are interested in helping with any of these pieces:

  1. Moving to frame-based transformation
  2. Finding a suitable SQL engine and building the SQLTransformer

We would definitely appreciate it!

singh-dhirendra commented 7 years ago

Thanks @hollinwilkins for your suggestion, I am happy to contribute to frame-based transformations or SQLTransformer let me know if there is doc how that would potentially work

PowerToThePeople111 commented 6 years ago

I ran into the same issue, but mainly because I want to reduce the size of the dataframe which is returned as a response by the mleap-serving service when asking it for a prediction. The idea was to use the SQLTransformer to only select certain columns of the final frame after the prediction was made and some indexes are changed to labels again.

But this projection could also be done in a different way by simply filtering the final json objects for the requested properties, which would be probably way easier to implement and potentially of great interest. I think this makes especially much sense when the dataframe containing all the features and base data is really big.

Is there any mechanism to configure the service, so that only certain columns of the frame are returned?

And if not, would you like to have one added? ;-)

Edit: For all in need of some reduced overhead: the place to look at is ml.combust.mleap.serving.MleapResource.

If you just wanna reduce the size of your resulting dataframe, you can do something like:

service.transform(frame) 
             .get 
             .select("ColumnYouNeed1", "ColumnYouNeed2", "ColumnYouNeed3", ...)
             .get

This still keeps all the overhead of the mleap dataframes with the column descriptions.

hollinwilkins commented 6 years ago

@PowerToThePeople111 Hey, we are about to merge this branch in which will make the existing mleap-serving obsolete: https://github.com/combust/mleap/tree/feature/mleap-grpc

With each transform request, we introduce a TransformOptions object, which includes the ability to filter down to only the fields you care about in the result. I am writing up documentation for this right now.

femibyte commented 5 years ago

Any progress on this issue in making mleap support SQLTransformer ?

toquoccuong commented 5 years ago

I also have the problem with SQLTransformer with groupBy operation. So It is nice if Mleap can support frame-based transformation soon.

femibyte commented 5 years ago

@Mageswaran1989 MLeap doesn't support the SQLTransformer for a couple of reasons:

  • mleap-runtime doesn't contain a sql engine to support arbitrary sql operations
  • MLeap only supports row-based transformations, whereas sql can have groupby, join, windowing, and many other operations

What logic do you have in your SQLTransformer? People have asked this question before and we recommend:

  • For non-row operations, move the SQL out of the ML Pipeline that you plan to serialize
  • For row-based operations, use the available ML transformers or write a custom transformer <- this is where the custom transformer documentation will help.

How does moving the SQL out of the ML Pipeline help if you're using the SQLTransformer to train a model ?

@seme0021

drei34 commented 1 year ago

@jsleight Do you know if mleap 0.22 supports sql transformer? If you have a pipeline and a bunch of rows, what is the recommended way to filter rows using a pipeline stage?

jsleight commented 1 year ago

SQLTransformer isn't supported, and tbh is hard to support. We'd essentially need to replicate Spark's sql engine without Spark.

I would advise to filter the rows before calling the pipeline.