uber / petastorm

Petastorm library enables single machine or distributed training and evaluation of deep learning models from datasets in Apache Parquet format. It supports ML frameworks such as Tensorflow, Pytorch, and PySpark and can be used from pure Python code.
Apache License 2.0
1.78k stars 285 forks source link

make_spark_converter RuntimeError: Vector columns are only supported in pyspark>=3.0 #769

Open Alxe1 opened 2 years ago

Alxe1 commented 2 years ago

I convert pyspark dataframe to two columns: one for feature column, it's a dense vector, and another is a label column. When I transform to tensorflow dataset using make_spark_converter, it raised an error:

/mnt/softwares/hvd_env/lib/python3.7/site-packages/petastorm/spark/spark_dataset_converter.py:28: FutureWarning: pyarrow.LocalFileSystem is deprecated as of 2.0.0, please use pyarrow.fs.LocalFileSystem instead.
  from pyarrow import LocalFileSystem
/mnt/softwares/hvd_env/lib/python3.7/site-packages/petastorm/hdfs/namenode.py:270: FutureWarning: pyarrow.hdfs.connect is deprecated as of 2.0.0, please use pyarrow.fs.HadoopFileSystem instead.
  return pyarrow.hdfs.connect(hostname, url.port or 8020, **kwargs)
Traceback (most recent call last):
  File "/mytest/tf_with_spark.py", line 381, in <module>
    train_test()
  File "/mytest/tf_with_spark.py", line 345, in train_test
    converter = make_spark_converter(train_transformed_sdf)
  File "/mnt/softwares/hvd_env/lib/python3.7/site-packages/petastorm/spark/spark_dataset_converter.py", line 696, in make_spark_converter
    df, parent_cache_dir_url, parquet_row_group_size_bytes, compression_codec, dtype)
  File "/mnt/softwares/hvd_env/lib/python3.7/site-packages/petastorm/spark/spark_dataset_converter.py", line 512, in _cache_df_or_retrieve_cache_data_url
    compression_codec, dtype)
  File "/mnt/softwares/hvd_env/lib/python3.7/site-packages/petastorm/spark/spark_dataset_converter.py", line 436, in create_cached_dataframe_meta
    dtype=dtype)
  File "/mnt/softwares/hvd_env/lib/python3.7/site-packages/petastorm/spark/spark_dataset_converter.py", line 579, in _materialize_df
    df = _convert_vector(df, dtype)
  File "/mnt/softwares/hvd_env/lib/python3.7/site-packages/petastorm/spark/spark_dataset_converter.py", line 558, in _convert_vector
    vector_to_array(df[col_name], dtype))
  File "/mnt/softwares/hvd_env/lib/python3.7/site-packages/petastorm/spark/spark_dataset_converter.py", line 40, in vector_to_array
    raise RuntimeError("Vector columns are only supported in pyspark>=3.0")
RuntimeError: Vector columns are only supported in pyspark>=3.0

Does it not support pyspark < 3.0? But in the setup.py file I see it required 'pyspark>=2.1.0'. How to salve this problem?

selitvin commented 2 years ago

Would be easier to address if you could post a runnable code snippet. Would you be able to post such snippet?

Alxe1 commented 2 years ago

Would be easier to address if you could post a runnable code snippet. Would you be able to post such snippet?

conf = SparkConf().setAppName("test")
spark = SparkSession.builder.config(conf=conf).enableHiveSupport().getOrCreate()
spark.conf.set(SparkDatasetConverter.PARENT_CACHE_DIR_URL_CONF, 'file://')

df = pd.DataFrame({'x': [0, 1, 2, 3], "y": [6, 2, 5, 7], "z": [0, 0, 1, 1]})
sdf = spark.createDataFrame(df)

vector_assembler = VectorAssembler(inputCols=["x", "y"], outputCol="features")
sdf = vector_assembler.transform(sdf)
sdf = sdf.select("features", "z")
sdf.show()

converter = make_spark_converter(sdf)
selitvin commented 2 years ago

Don't have enough spark knowledge to give an accurate answer. Perhaps @WeichenXu123 can weigh in?

OscarDPan commented 1 year ago

https://spark.apache.org/docs/3.1.3/api/python/reference/api/pyspark.ml.functions.vector_to_array.html I think in the documentation it's clear that you need to have Spark 3.0