yahoo / TensorFlowOnSpark

TensorFlowOnSpark brings TensorFlow programs to Apache Spark clusters.
Apache License 2.0
3.87k stars 938 forks source link

do we support scala & java code write tensorflow model with tenorflow-core-api ? #588

Open mullerhai opened 2 years ago

mullerhai commented 2 years ago

Environment:

Describe the bug: A clear and concise description of what the bug is.

Logs: If applicable, add logs to help explain your problem. Note: errors may not be fully described in the driver/console logs. Make sure to check the executor logs for possible root causes.

Spark Submit Command Line: If applicable, add your spark-submit command line.

leewyang commented 2 years ago

Unfortunately, Java API support in TF has been spotty with deprecation warnings and no API stability guarantees. We initially tried to support Java when the API was updated regularly with each TF release, but even then, it was mostly geared towards inference and not training. That said, contributions are always welcome!

mullerhai commented 2 years ago

Unfortunately, Java API support in TF has been spotty with deprecation warnings and no API stability guarantees. We initially tried to support Java when the API was updated regularly with each TF release, but even then, it was mostly geared towards inference and not training. That said, contributions are always welcome!

for me when I read our source code ,I was inspired by these scala code in TFModel, we do the model implement spark Model Interface api, tensor convert df and df convert to tensor ,and invoke tensorflow session, and get the distribute partition block ,mapPartition do model train,collect all partitions result for one model

  override def transform(dataset: Dataset[_]): DataFrame = {
    val spark = dataset.sparkSession

    val inputColumns = this.getInputMapping.keys.toSeq
    val inputTensorNames = this.getInputMapping.values
    val outputTensorNames = this.getOutputMapping.keys.toSeq

    val inputDF = dataset.select(inputColumns.head, inputColumns.tail: _*)
    val inputSchema = inputDF.schema
    val outputSchema = transformSchema(inputSchema)

    val outputRDD = inputDF.rdd.mapPartitions { iter: Iterator[Row] =>
      if (TFModel.model == null || TFModel.modelDir != this.getModel) {
        // load model into a per-executor singleton reference, if needed.
        TFModel.modelDir = this.getModel
        TFModel.model = SavedModelBundle.load(this.getModel, this.getTag)
        TFModel.graph = TFModel.model.graph
        TFModel.sess = TFModel.model.session
      }

      iter.grouped(this.getBatchSize).flatMap { batch =>
        // get input batch of Rows and convert to list of input Tensors
        val inputTensors = batch2tensors(batch, inputSchema)

        var runner = TFModel.sess.runner()

        // feed input tensors
        for ((name, tensor) <- inputTensors) {
          runner = runner.feed(this.getInputMapping(name), tensor)
        }
        // fetch output tensors
        for (name <- outputTensorNames) {
          runner = runner.fetch(name)
        }

        // run the graph
        val outputTensors = runner.run()

        assert(outputTensors.map(_.shape).map(s => if (s.isEmpty) 0L else s.apply(0)).distinct.size == 1,
          "Cardinality of output tensors must match")

        // convert the list of output Tensors to a batch of output Rows
        tensors2batch(outputTensors)
      }
    }

    spark.createDataFrame(outputRDD, outputSchema)
  }
mokundong commented 2 years ago

got an error "Could not find SavedModel .pb" when submit on yarn cluster at code "TFModel.model = SavedModelBundle.load(this.getModel, this.getTag)"