uber / petastorm

Petastorm library enables single machine or distributed training and evaluation of deep learning models from datasets in Apache Parquet format. It supports ML frameworks such as Tensorflow, Pytorch, and PySpark and can be used from pure Python code.
Apache License 2.0
1.8k stars 284 forks source link

Error when using make_spark_converter #725

Closed jiwidi closed 3 years ago

jiwidi commented 3 years ago

Hi! I was going over your tutorial and I coudlnt replicate it.

I tried to read a spark table and make it a torch dataset but it will complain when calling the make_spark_converter method.

My code:

from petastorm.spark import SparkDatasetConverter, make_spark_converter

# specify a cache dir first.
# the dir is used to save materialized spark dataframe files
df_train =spark.read.parquet(spark_path)
spark.conf.set(SparkDatasetConverter.PARENT_CACHE_DIR_URL_CONF, 'dbfs://pathtobucket/')

# df_train, df_test = ... # `df_train` and `df_test` are spark dataframes

# create a converter_train from `df_train`
# it will materialize `df_train` to cache dir. (the same for df_test)
converter_train = make_spark_converter(df_train)

The error:

 10 # create a converter_train from `df_train`
     11 # it will materialize `df_train` to cache dir. (the same for df_test)
---> 12 converter_train = make_spark_converter(df_train)
     13 
     14 

/databricks/python/lib/python3.7/site-packages/petastorm/spark/spark_dataset_converter.py in make_spark_converter(df, parquet_row_group_size_bytes, compression_codec, dtype)
    682     """
    683 
--> 684     parent_cache_dir_url = _get_parent_cache_dir_url()
    685 
    686     # TODO: Improve default behavior to be automatically choosing the best way.

/databricks/python/lib/python3.7/site-packages/petastorm/spark/spark_dataset_converter.py in _get_parent_cache_dir_url()
     72 
     73     conf_url = normalize_dir_url(conf_url)
---> 74     _check_parent_cache_dir_url(conf_url)
     75     _parent_cache_dir_url = conf_url
     76     logger.info(

/databricks/python/lib/python3.7/site-packages/petastorm/spark/spark_dataset_converter.py in _check_parent_cache_dir_url(dir_url)
    458     """Check dir url whether is suitable to be used as parent cache directory."""
    459     _check_url(dir_url)
--> 460     fs, dir_path = get_filesystem_and_path_or_paths(dir_url)
    461     if 'DATABRICKS_RUNTIME_VERSION' in os.environ and not _is_spark_local_mode():
    462         if isinstance(fs, LocalFileSystem):

/databricks/python/lib/python3.7/site-packages/petastorm/fs_utils.py in get_filesystem_and_path_or_paths(url_or_urls, hdfs_driver, storage_options, filesystem)
    199 
    200     fs = filesystem or FilesystemResolver(
--> 201         url_list[0], hdfs_driver=hdfs_driver, storage_options=storage_options).filesystem()
    202     path_list = [get_dataset_path(parsed_url) for parsed_url in parsed_url_list]
    203 

/databricks/python/lib/python3.7/site-packages/petastorm/fs_utils.py in __init__(self, dataset_url, hadoop_configuration, connector, hdfs_driver, user, storage_options)
    140             options = cls._get_kwargs_from_urls(self._dataset_url)
    141             update_storage_options(options, storage_options)
--> 142             self._filesystem = cls(**options)
    143             self._filesystem_factory = lambda: cls(**options)  # pylint: disable=unnecessary-lambda
    144 

/databricks/python/lib/python3.7/site-packages/fsspec/spec.py in __call__(cls, *args, **kwargs)
     74             return cls._cache[token]
     75         else:
---> 76             obj = super().__call__(*args, **kwargs)
     77             # Setting _fs_token here causes some static linters to complain.
     78             obj._fs_token_ = token

TypeError: __init__() missing 2 required positional arguments: 'instance' and 'token'

Im using spark 3.0.1 if that helps.