salesforce / TransmogrifAI

TransmogrifAI (pronounced trăns-mŏgˈrə-fī) is an AutoML library for building modular, reusable, strongly typed machine learning workflows on Apache Spark with minimal hand-tuning
https://transmogrif.ai
BSD 3-Clause "New" or "Revised" License
2.24k stars 395 forks source link

FeatureBuilder.fromDataFrame should automatically infer advanced types #64

Open tovbinm opened 6 years ago

tovbinm commented 6 years ago

Problem Currently FeatureBuilder.fromDataFrame only infers a set of primitive TransmogrifAI types directly mapped from dataframe schema, such as Text, Real etc. But more advanced types, such as PickList, Email, Phone etc. are not inferred, because it requires some knowledge of actual data in the dataframe.

Solution Estimate value distribution for each feature and deduct a more appropriate column type. Potentially reuse logic from CSVSchemaUtils.infer, SmartTextVectorizer and RawFeatureFilter.

Alternatives N/A

snabar commented 6 years ago

This need not be automatic. Could we pass in a schema with more advanced types?

albertodema commented 5 years ago

Just to say that this feature is very important to lower the barrier of automl usage

shenzgang commented 5 years ago

@snabar Do you want to dynamically specify specific feature types? You might try to dynamically convert the type you want using the Row subscript, but there are problems with saving the model. my test sample code:

def main(args: Array[String]): Unit = {
        if (args == null && args.length != 5){
                throw new Exception("four parameters are required Usage <database> <table> <id> <label> <index>")
        }
        val Array(database,table,label,id,pickListIndex) = args//the index of Row which you want to convert to PickList feature
        println(s"args = ${args.mkString(",")}")

        val conf = new SparkConf()
        conf.set("spark.serializer","org.apache.spark.serializer.KryoSerializer")
        implicit val session = SparkSession.builder().appName(s"${this.getClass.getSimpleName}")
                .config(conf).enableHiveSupport().master("local[*]")
                .getOrCreate()
        val frame = session.sql(s"select * from ${database}.${table}").withColumn(label,col(label).cast(DoubleType))
        frame.show(false)

        val (response,feature) = FeatureBuilder.fromDataFrame[RealNN](frame,label)
        println(s"response = ${response}")

        println("============== OP Features ==============")
        val realOpFeatures = ArrayBuffer[Feature[_ <: FeatureType]]()
        for (i <- 0 until(feature.size)){
                if (feature.apply(i).name != id){
                        if (i == pickListIndex.toInt){
                                val pickListFeature = FeatureBuilder.PickList[Row](feature.apply(i).name).extract(_.get(pickListIndex.toInt).toString.toPickList).asPredictor
                                realOpFeatures.append(pickListFeature)
                        }else{
                                realOpFeatures.append(feature.apply(i))
                        }
                }
        }
        realOpFeatures.foreach(println(_))
        //Auto feature engin
        val transmogrifyFeature = realOpFeatures.transmogrify()
        //Optionally check the features with a sanity checker
        val checkedFeature = response.sanityCheck(transmogrifyFeature,removeBadFeatures = true)
        //Define the model we want to use
        val prediction = BinaryClassificationModelSelector.withTrainValidationSplit(
            modelTypesToUse = Seq(OpLogisticRegression)
        ).setInput(response, checkedFeature).getOutput()
        //model evaluator
        val evaluator = Evaluators.BinaryClassification().setLabelCol(label).setPredictionCol(prediction)
        val workflow = new OpWorkflow().setInputDataset(frame,(row: Row)=>row.get(0).toString).setResultFeatures(prediction)

        val model = workflow.train()

        println(s"Model Summary:\n ${model.summaryPretty()}")
        val (scores, metrics) = model.scoreAndEvaluate(evaluator = evaluator)
        println(s"Metrics:\n ${metrics}")
        model.save("E:\\models\\model_01",true)
        session.stop()
    }

The following exception is thrown when saving the model:

Exception in thread "main" java.lang.RuntimeException: Failed to write out stage 'FeatureGeneratorStage_00000000000d'
    at com.salesforce.op.stages.OpPipelineStageWriter.writeToJson(OpPipelineStageWriter.scala:81)
    at com.salesforce.op.OpWorkflowModelWriter$$anonfun$3.apply(OpWorkflowModelWriter.scala:131)
    at com.salesforce.op.OpWorkflowModelWriter$$anonfun$3.apply(OpWorkflowModelWriter.scala:131)
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:245)
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:245)
    at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
    at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
    at scala.collection.TraversableLike$class.map(TraversableLike.scala:245)
    at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:186)
    at com.salesforce.op.OpWorkflowModelWriter.stagesJArray(OpWorkflowModelWriter.scala:131)
    at com.salesforce.op.OpWorkflowModelWriter.stagesJArray(OpWorkflowModelWriter.scala:108)
    at com.salesforce.op.OpWorkflowModelWriter.toJson(OpWorkflowModelWriter.scala:83)
    at com.salesforce.op.OpWorkflowModelWriter.toJsonString(OpWorkflowModelWriter.scala:68)
    at com.salesforce.op.OpWorkflowModelWriter.saveImpl(OpWorkflowModelWriter.scala:58)
    at org.apache.spark.ml.util.MLWriter.save(ReadWrite.scala:103)
    at com.salesforce.op.OpWorkflowModelWriter$.save(OpWorkflowModelWriter.scala:193)
    at com.salesforce.op.OpWorkflowModel.save(OpWorkflowModel.scala:221)
    at TestApp$.main(TestApp.scala:71)
    at TestApp.main(TestApp.scala)
Caused by: java.lang.RuntimeException: Argument 'extractFn' [TestApp$$anonfun$main$1$$anonfun$2] cannot be serialized. Make sure TestApp$$anonfun$main$1$$anonfun$2 has either no-args ctor or is an object, and does not have any external dependencies, e.g. use any out of scope variables.
    at com.salesforce.op.stages.OpPipelineStageSerializationFuns$class.serializeArgument(OpPipelineStageReaderWriter.scala:234)
    at com.salesforce.op.stages.DefaultValueReaderWriter.serializeArgument(DefaultValueReaderWriter.scala:48)
    at com.salesforce.op.stages.DefaultValueReaderWriter$$anonfun$write$1.apply(DefaultValueReaderWriter.scala:70)
    at com.salesforce.op.stages.DefaultValueReaderWriter$$anonfun$write$1.apply(DefaultValueReaderWriter.scala:69)
    at scala.util.Try$.apply(Try.scala:192)
    at com.salesforce.op.stages.DefaultValueReaderWriter.write(DefaultValueReaderWriter.scala:69)
    at com.salesforce.op.stages.FeatureGeneratorStageReaderWriter.write(FeatureGeneratorStage.scala:189)
    at com.salesforce.op.stages.FeatureGeneratorStageReaderWriter.write(FeatureGeneratorStage.scala:129)
    at com.salesforce.op.stages.OpPipelineStageWriter.writeToJson(OpPipelineStageWriter.scala:80)
    ... 18 more
Caused by: java.lang.RuntimeException: Failed to create an instance of class 'TestApp$$anonfun$main$1$$anonfun$2'. Class has to either have a no-args ctor or be an object.
    at com.salesforce.op.utils.reflection.ReflectionUtils$.newInstance(ReflectionUtils.scala:106)
    at com.salesforce.op.utils.reflection.ReflectionUtils$.newInstance(ReflectionUtils.scala:87)
    at com.salesforce.op.stages.OpPipelineStageSerializationFuns$class.serializeArgument(OpPipelineStageReaderWriter.scala:231)
    ... 26 more
Caused by: java.lang.NoSuchFieldException: MODULE$
    at java.lang.Class.getField(Class.java:1703)
    at com.salesforce.op.utils.reflection.ReflectionUtils$.newInstance(ReflectionUtils.scala:102)
    ... 28 more

@tovbinm What's wrong with me writing this, and why is this exception thrown?

tovbinm commented 5 years ago

It happens due to the way we serialize stages with our models. We do not allow serializing any lambda expressions such as .extract(_.get(pickListIndex.toInt).toString.toPickList), instead one has to define it as an explicit concrete class.

I recommend to rewrite your code as follows:

val (response,features) = FeatureBuilder.fromDataFrame[RealNN](frame, label)
println(s"response = ${response}")

val realOpFeatures = features.zipWithIndex.map { 
    case (feature, i) if i == id => feature 
    case (feature, i) if feature.isSutypeOf[isSubtypeOf] =>
        feature.asInstanceOf[FeatureLike[Text]].map(f => new ToPickListTransformer(i).setInput(f).getOutput())
}

While ToPickListTransformer is top-level concrete class defined as follows:

class ToPickListTransformer(pickListIndex: Int, uid: String = UID[ToPickListTransformer]) extends
  UnaryTransformer[Text, PickList](operationName = "toPickList", uid = uid) {
  def transformFn: Text=> PickList= (v: Text) => v.value.toPickList
}
shenzgang commented 5 years ago

@tovbinm There may be some problems here: uid: String = UID[PickListTransformer]) Error message: Cannot resolve symbol PickListTransformer and val realOpFeatures = features.zipWithIndex.map { case (feature, i) if i == id => feature case (feature, i) => feature.map(new ToPickListTransformer(i))//compile error } Type mismatch, expected: FeatureType => NotInferedB, actual: ToPickListTransformer

tovbinm commented 5 years ago

see corrected version above. thanks for pointing out the error.

shenzgang commented 5 years ago

@tovbinm,Thank you for your reply!but Maybe there are still some problems!

at ToPickListTransformer class
Error:(15, 9) type arguments [org.apache.spark.sql.Row,com.salesforce.op.features.types.PickList] do not conform to class UnaryTransformer's type parameter bounds [I <: com.salesforce.op.features.types.FeatureType,O <: com.salesforce.op.features.types.FeatureType]
        UnaryTransformer[Row, PickList](operationName = "toPickList", uid = uid) {
And
case (feature, i) => feature.map(f=> new ToPickListTransformer(i).setInput(f).getOutput())//complie error!
tovbinm commented 5 years ago

I corrected the above snippet. But it would be helpful to know what is the underlying schema in your dataset. Can you please paste the output of frame.schema.printTreeString()?

SemanticBeeng commented 4 years ago

Trying to think if there is something to reuse from frameless :thinking:

https://typelevel.org/frameless/TypedEncoder.html https://github.com/typelevel/frameless/tree/master/dataset/src/test/scala/frameless