tensorflow / ecosystem

Integration of TensorFlow with other open-source frameworks
Apache License 2.0
1.37k stars 392 forks source link

Cannot convert field to unsupported data type StructType([StructField("user_flush_num", ArrayType(IntegerType(), True)),StructField("field2", ArrayType(IntegerType(), True))]) #186

Closed jia66wei closed 3 years ago

jia66wei commented 3 years ago

ss = SparkSession.builder.appName("123").getOrCreate() struct = StructType([StructField("user_flush_num", ArrayType(IntegerType(), True)),StructField("field2", ArrayType(IntegerType(), True))]) struct1 = StructType([StructField('f1',StringType(),True), StructField('list',ArrayType(struct))]) data1 = sc.parallelize(["a","b","c"]).map(lambda x: [x,[[[1,2,3],[1,2,3]],[(1,2,3),(1,2,3)]]]) df = ss.createDataFrame(data1,struct1) df.createOrReplaceTempView("table") res = ss.sql("select * from table") print(res.take(10))

out is ok: [Row(f1='a', list=[Row(user_flush_num=[1, 2, 3], field2=[1, 2, 3]), Row(user_flush_num=[1, 2, 3], field2=[1, 2, 3])]), Row(f1='b', list=[Row(user_flush_num=[1, 2, 3], field2=[1, 2, 3]), Row(user_flush_num=[1, 2, 3], field2=[1, 2, 3])]), Row(f1='c', list=[Row(user_flush_num=[1, 2, 3], field2=[1, 2, 3]), Row(user_flush_num=[1, 2, 3], field2=[1, 2, 3])])]

but, when convert to tfrecord is wrong:

df.write.format("tfrecords").option("recordType", "Example").option( "codec", "org.apache.hadoop.io.compress.GzipCodec" ).save(output_root + "/train.tfrecord")

wrong info: Caused by: java.lang.RuntimeException: Cannot convert field to unsupported data type StructType(StructField(item_newsid,IntegerType,true)) at org.tensorflow.spark.datasources.tfrecords.serde.DefaultTfRecordRowEncoder$.org$tensorflow$spark$datasources$tfrecords$serde$DefaultTfRecordRowEncoder$$encodeFeature(DefaultTfRecordRowEncoder.scala:144) at org.tensorflow.spark.datasources.tfrecords.serde.DefaultTfRecordRowEncoder$$anonfun$encodeExample$1.apply(DefaultTfRecordRowEncoder.scala:64) at org.tensorflow.spark.datasources.tfrecords.serde.DefaultTfRecordRowEncoder$$anonfun$encodeExample$1.apply(DefaultTfRecordRowEncoder.scala:61) at scala.collection.immutable.List.foreach(List.scala:381) at org.tensorflow.spark.datasources.tfrecords.serde.DefaultTfRecordRowEncoder$.encodeExample(DefaultTfRecordRowEncoder.scala:61) at org.tensorflow.spark.datasources.tfrecords.DefaultSource$$anonfun$3.apply(DefaultSource.scala:61) at org.tensorflow.spark.datasources.tfrecords.DefaultSource$$anonfun$3.apply(DefaultSource.scala:58) at scala.collection.Iterator$$anon$11.next(Iterator.scala:409) at org.apache.spark.internal.io.SparkHadoopMapReduceWriter$$anonfun$4.apply(SparkHadoopMapReduceWriter.scala:151) at org.apache.spark.internal.io.SparkHadoopMapReduceWriter$$anonfun$4.apply(SparkHadoopMapReduceWriter.scala:148) at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1374) at org.apache.spark.internal.io.SparkHadoopMapReduceWriter$.org$apache$spark$internal$io$SparkHadoopMapReduceWriter$$executeTask(SparkHadoopMapReduceWriter.scala:163) ... 8 more

jia66wei commented 3 years ago

need help, thank you!

jia66wei commented 3 years ago

https://github.com/tensorflow/tensorflow/blob/master/tensorflow/core/example/example.proto
can help, tfrecord format is limit, example can support: which is oneof packed BytesList,// FloatList, or Int64List). >