tensorflow / ecosystem

Integration of TensorFlow with other open-source frameworks
Apache License 2.0
1.37k stars 391 forks source link

Failed to build spark-tensorflow-connector because file already exists #118

Closed ghost closed 5 years ago

ghost commented 5 years ago

Hello there,

I'm currently running spark 2.3.2 and attempting to install spark-tensorflow-connector on dataproc. I have successfully build tensorflow-hadoop.

However, when I enter the spark-tensorflow-connector directory and attempt to create a jar file using the command mvn clean install I get this error:

org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 1 times, most recent failure: Lost task 0.0 in stage 0.0 (TID 0, localhost, executor driver): java.lang.IllegalStateException: LocalPath /tmp/spark-connector-propagate7442350445858279141 already exists. SaveMode: ErrorIfExists.

EDIT: I believe the issue is related to this part of the script located at ecosystem/spark/spark-tensorflow-connector/src/test/scala/org/tensorflow/spark/datasources/tfrecords/LocalWriteSuite.scala

"Propagate" should {
    "write data locally" in {
      // Create a dataframe with 2 partitions
      val rdd = spark.sparkContext.parallelize(testRows, numSlices = 2)
      val df = spark.createDataFrame(rdd, schema)

      // Write the partitions onto the local hard drive. Since it is going to be the
      // local file system, the partitions will be written in the same directory of the
      // same machine.
      // In a distributed setting though, two different machines would each hold a single
      // partition.
      val localPath = Files.createTempDirectory("spark-connector-propagate").toAbsolutePath.toString
      // Delete the directory, the default mode is ErrorIfExists
      Files.delete(Paths.get(localPath))
      df.write.format("tfrecords")
        .option("recordType", "Example")
        .option("writeLocality", "local")
        .save(localPath)

      // Read again this directory, this time using the Hadoop file readers, it should
      // return the same data.
      // This only works in this test and does not hold in general, because the partitions
      // will be written on the workers. Everything runs locally for tests.
      val df2 = spark.read.format("tfrecords").option("recordType", "Example")
        .load(localPath).sort("id").select("id", "IntegerTypeLabel", "LongTypeLabel",
        "FloatTypeLabel", "DoubleTypeLabel", "VectorLabel", "name") // Correct column order.

      assert(df2.collect().toSeq === testRows.toSeq)
    }
  }
}

If I understood correctly, there are two partitions of the dataset and it seems that its attempting write locally with the same file name.

Has anybody ran into this issue or am I missing a step?

ghost commented 5 years ago

I was able to get it working by installing the latest version here: https://mvnrepository.com/artifact/org.tensorflow/spark-tensorflow-connector_2.11/1.13.0-rc0