tensorflow / ecosystem

Integration of TensorFlow with other open-source frameworks
Apache License 2.0
1.37k stars 391 forks source link

can not use with Dataframe.write.partitionBy, always output one partition. #119

Open legatoo opened 5 years ago

legatoo commented 5 years ago

I want to split my data evenly, so I add an column index to my dataframe, and I am pretty sure this column is added correctly. I printed some rows:

image

I firstly add the index using the code below:

val partitionCnt: Int = 12
val dataWithIndex = context.sc.createDataFrame(
    originData.rdd.zipWithIndex.map { case (r, i) => Row.fromSeq(r.toSeq :+ i % partitionCnt ) },
    StructType(trainRaw.schema.fields :+ StructField("index_id", LongType, false))
)

then I want to partition by index_id:

dataWithIndex.write.partitionBy(rowId)
    .format("tfrecords")
    .mode(SaveMode.Overwrite)
    .option("recordType", "Example")
    .save(hdfsPath)

but this code output only one partition everytime. I thougt is could be something wrong with dataframe, but when I output to another format, it works as expected. e.g. csv:

dataWithIndex.write.partitionBy(rowId)
    .format("com.databricks.spark.csv")
    .mode(SaveMode.Overwrite)
    .save(params("hdfsPath"))

image

ghost commented 5 years ago

Hi there,

Not sure why you need.mode(SaveMode.Overwrite) when writing the dataframe to HDFS. Have you tried it without.mode(SaveMode.Overwrite)?

legatoo commented 5 years ago

@MajesticKhan I don't think it will help, but I give it a try, the same problem too. According to the doc, mode is used to Specifies the behavior when data or table already exists, I use mode without partitionBy to produce tfrecord, it works.

ghost commented 5 years ago

The only other idea I have is to first partition the data and then persist or cache the dataframe. From there, try to write it to the stated path

nicholas-leonard commented 4 years ago

at the very least, partitionBy should fail. Currently its a silent bug.

jerome-eyespage commented 4 years ago

encounter the same problem

junshi15 commented 4 years ago

Any update on this from the developers? See the same issue at 1.15.0. Does anyone have a workaround? I suppose I could partition the data then save the partitions sequentially, but am wondering if there are better approaches.

boarder7395 commented 4 years ago

I've run into this issue as well. Using partitionBy with all suggested workaround still results in one partition. Is anyone working on this issue?

priyabratapatnaik commented 4 years ago

+1 @boarder7395

I suppose the tensorflow dastasource takes the whole dataframe as an input and don;t respect the partitionBy clause.

Is there a workaround other than writing each partition data separately in a loop.

junshi15 commented 4 years ago

We open sourced a similar package to address this issue. You can try it out here. https://github.com/linkedin/spark-tfrecord https://engineering.linkedin.com/blog/2020/spark-tfrecord