kedro-org / kedro

Kedro is a toolbox for production-ready data science. It uses software engineering best practices to help you create data engineering and data science pipelines that are reproducible, maintainable, and modular.
https://kedro.org
Apache License 2.0
9.88k stars 897 forks source link

Class `spark.SparkDataSet` not found when using 0.17.0 - fsspec (AttributeError: 'EntryPoints' object has no attribute 'get') #1895

Closed diemto closed 2 years ago

diemto commented 2 years ago

Description

I started getting Classspark.SparkDataSetnot found. error on 10/01/2022. Prior to this, the kedro pipeline ran fine.

Context

Kedro pipeline stop working because my datasets cannot be configured correctly.

Steps to Reproduce

  1. Create a kedro project with a spark.SparkDataSet called core_parquet
  2. Deploy the code to Databricks
  3. Install kedro[spark.SparkDataSet]==0.17.0 on the Databricks cluster
  4. Run the pipeline on Databricks

Expected Result

Dataset should have been configured correctly and the kedro pipeline should have ran fine.

Actual Result

Instead, I see the following error:

DataSetError: An exception occurred when parsing config for DataSet `core_parquet`:
Class `spark.SparkDataSet` not found.

Your Environment

noklam commented 2 years ago

Could you check the full error stack trace and see if it's possible that you missed some dependencies?

diemto commented 2 years ago

Here is the full stack trace:

---------------------------------------------------------------------------
StopIteration                             Traceback (most recent call last)
/databricks/python/lib/python3.7/site-packages/kedro/io/core.py in parse_dataset_definition(config, load_version, save_version)
    410         try:
--> 411             class_obj = next(obj for obj in trials if obj is not None)
    412         except StopIteration as exc:

StopIteration: 

The above exception was the direct cause of the following exception:

DataSetError                              Traceback (most recent call last)
/databricks/python/lib/python3.7/site-packages/kedro/io/core.py in from_config(cls, name, config, load_version, save_version)
    167             class_obj, config = parse_dataset_definition(
--> 168                 config, load_version, save_version
    169             )

/databricks/python/lib/python3.7/site-packages/kedro/io/core.py in parse_dataset_definition(config, load_version, save_version)
    412         except StopIteration as exc:
--> 413             raise DataSetError(f"Class `{class_obj}` not found.") from exc
    414 

DataSetError: Class `spark.SparkDataSet` not found.

The above exception was the direct cause of the following exception:

DataSetError                              Traceback (most recent call last)
<command-4397229959898573> in <module>
      8 
      9 with KedroSession.create(metadata.package_name, project_root) as session:
---> 10     session.run()

/databricks/python/lib/python3.7/site-packages/kedro/framework/session/session.py in run(self, pipeline_name, tags, runner, node_names, from_nodes, to_nodes, from_inputs, load_versions)
    401 
    402         catalog = context._get_catalog(
--> 403             save_version=save_version, load_versions=load_versions
    404         )
    405 

/databricks/python/lib/python3.7/site-packages/kedro/framework/context/context.py in _get_catalog(self, save_version, journal, load_versions)
    352             load_versions=load_versions,
    353             save_version=save_version,
--> 354             journal=journal,
    355         )
    356         if not isinstance(catalog, DataCatalog):

/databricks/python/lib/python3.7/site-packages/pluggy/hooks.py in __call__(self, *args, **kwargs)
    284                     stacklevel=2,
    285                 )
--> 286         return self._hookexec(self, self.get_hookimpls(), kwargs)
    287 
    288     def call_historic(self, result_callback=None, kwargs=None, proc=None):

/databricks/python/lib/python3.7/site-packages/pluggy/manager.py in _hookexec(self, hook, methods, kwargs)
     91         # called from all hookcaller instances.
     92         # enable_tracing will set its own wrapping function at self._inner_hookexec
---> 93         return self._inner_hookexec(hook, methods, kwargs)
     94 
     95     def register(self, plugin, name=None):

/databricks/python/lib/python3.7/site-packages/pluggy/manager.py in <lambda>(hook, methods, kwargs)
     85             methods,
     86             kwargs,
---> 87             firstresult=hook.spec.opts.get("firstresult") if hook.spec else False,
     88         )
     89 

/databricks/python/lib/python3.7/site-packages/pluggy/callers.py in _multicall(hook_impls, caller_kwargs, firstresult)
    206                 pass
    207 
--> 208         return outcome.get_result()

/databricks/python/lib/python3.7/site-packages/pluggy/callers.py in get_result(self)
     78             ex = self._excinfo
     79             if _py3:
---> 80                 raise ex[1].with_traceback(ex[2])
     81             _reraise(*ex)  # noqa
     82 

/databricks/python/lib/python3.7/site-packages/pluggy/callers.py in _multicall(hook_impls, caller_kwargs, firstresult)
    185                         _raise_wrapfail(gen, "did not yield")
    186                 else:
--> 187                     res = hook_impl.function(*args)
    188                     if res is not None:
    189                         results.append(res)

/dbfs/pnr_feed/pipeline/src/pnr_feed/hooks.py in register_catalog(self, catalog, credentials, load_versions, save_version, journal)
     92     ) -> DataCatalog:
     93         return DataCatalog.from_config(
---> 94                                 catalog, credentials, load_versions, save_version, journal
     95         )

/databricks/python/lib/python3.7/site-packages/kedro/io/data_catalog.py in from_config(cls, catalog, credentials, load_versions, save_version, journal)
    327             ds_config = _resolve_credentials(ds_config, credentials)
    328             data_sets[ds_name] = AbstractDataSet.from_config(
--> 329                 ds_name, ds_config, load_versions.get(ds_name), save_version
    330             )
    331 

/databricks/python/lib/python3.7/site-packages/kedro/io/core.py in from_config(cls, name, config, load_version, save_version)
    172                 "An exception occurred when parsing config "
    173                 "for DataSet `{}`:\n{}".format(name, str(exc))
--> 174             ) from exc
    175 
    176         try:

DataSetError: An exception occurred when parsing config for DataSet `core_parquet`:
Class `spark.SparkDataSet` not found.

From the stack trace, it doesn't look like I'm missing any dependencies

noklam commented 2 years ago

This is a bit strange, can you try this and see if the import success or not?

from kedro.extras.datasets.spark import SparkDataSet
diemto commented 2 years ago

This is a bit strange, can you try this and see if the import success or not?

from kedro.extras.datasets.spark import SparkDataSet

I'm getting the following error when running the above command

AttributeError: 'EntryPoints' object has no attribute 'get'

Here is the stack trace:

---------------------------------------------------------------------------
AttributeError                            Traceback (most recent call last)
<command-1944072326525861> in <module>
----> 1 from kedro.extras.datasets.spark import SparkDataSet

/databricks/python/lib/python3.7/site-packages/kedro/extras/datasets/spark/__init__.py in <module>
     34 
     35 with suppress(ImportError):
---> 36     from .spark_dataset import SparkDataSet  # NOQA
     37 with suppress(ImportError):
     38     from .spark_hive_dataset import SparkHiveDataSet  # NOQA

/databricks/python/lib/python3.7/site-packages/kedro/extras/datasets/spark/spark_dataset.py in <module>
     41 from pyspark.sql import DataFrame, SparkSession  # pylint: disable=import-error
     42 from pyspark.sql.utils import AnalysisException  # pylint: disable=import-error
---> 43 from s3fs import S3FileSystem
     44 
     45 from kedro.io.core import AbstractVersionedDataSet, Version

/databricks/python/lib/python3.7/site-packages/s3fs/__init__.py in <module>
----> 1 from .core import S3FileSystem, S3File
      2 from .mapping import S3Map
      3 
      4 from ._version import get_versions
      5 

/databricks/python/lib/python3.7/site-packages/s3fs/core.py in <module>
      6 from typing import Tuple, Optional
      7 
----> 8 from fsspec import AbstractFileSystem
      9 from fsspec.spec import AbstractBufferedFile
     10 

/databricks/python/lib/python3.7/site-packages/fsspec/__init__.py in <module>
     41 if entry_points is not None:
     42     entry_points = entry_points()
---> 43     for spec in entry_points.get("fsspec.specs", []):
     44         err_msg = f"Unable to load filesystem from {spec}"
     45         register_implementation(spec.name, spec.module, errtxt=err_msg)

AttributeError: 'EntryPoints' object has no attribute 'get'
noklam commented 2 years ago

I think this is the problem, the import was failing due to some dependencies. Check which fsspec version you are using? The entrypoints api was changed in importlib, you may need to check if fsspec use importlib or importlib-metadata and try upgrade these library.

deepyaman commented 2 years ago

See https://stackoverflow.com/a/73932581/1093967. Can you share pip freeze and what your version of importlib-metadata is? Maybe you can pin it <5.0 for now.

Newer versions of Kedro pin importlib-metadata<5.0 (and explicitly require it), but back in 0.17.0 it's not part of Kedro's requirements and therefore could be unbounded.

deepyaman commented 2 years ago

Also, this has been resolved in fsspec>=2021.06.01; see https://github.com/fsspec/filesystem_spec/commit/384b8721a7d9ed71ea9ef9c1d60d610649e16084. However, since you're using Kedro 0.17.0, you don't have the option to upgrade to a newer version of fsspec. :( 0.17.0 isn't even aware that fsspec moved to CalVer.

diemto commented 2 years ago

See https://stackoverflow.com/a/73932581/1093967. Can you share pip freeze and what your version of importlib-metadata is? Maybe you can pin it <5.0 for now.

Newer versions of Kedro pin importlib-metadata<5.0 (and explicitly require it), but back in 0.17.0 it's not part of Kedro's requirements and therefore could be unbounded.

Here is the output of pip freeze:

anyconfig==0.9.11
arrow==1.2.3
asn1crypto==0.24.0
backcall==0.1.0
binaryornot==0.4.4
boto==2.49.0
boto3==1.9.162
botocore==1.12.163
cachetools==4.2.4
certifi==2019.3.9
cffi==1.12.2
chardet==3.0.4
charset-normalizer==2.1.1
click==7.1.2
cookiecutter==1.7.3
cryptography==2.6.1
cycler==0.10.0
Cython==0.29.6
decorator==4.4.0
docopt==0.6.2
docutils==0.14
fsspec==0.8.7
gitdb==4.0.9
GitPython==3.1.27
hdfs==2.7.0
idna==2.8
importlib-metadata==5.0.0
ipython==7.4.0
ipython-genutils==0.2.0
jedi==0.13.3
Jinja2==3.1.2
jinja2-time==0.2.0
jmespath==0.10.0
jupyter-client==6.2.0
jupyter-core==4.11.1
kedro==0.17.0
kiwisolver==1.1.0
MarkupSafe==2.1.1
matplotlib==3.0.3
nest-asyncio==1.5.6
numpy==1.16.2
pandas==0.24.2
parso==0.3.4
patsy==0.5.1
pexpect==4.6.0
pickleshare==0.7.5
pip-tools==5.5.0
pluggy==0.13.1
poyo==0.5.0
prompt-toolkit==2.0.9
psycopg2==2.7.6.1
ptyprocess==0.6.0
py4j==0.10.9.5
pyarrow==0.13.0
pycparser==2.19
pycurl==7.43.0
Pygments==2.3.1
PyGObject==3.20.0
pyOpenSSL==19.0.0
pyparsing==2.4.2
PySocks==1.6.8
pyspark==3.3.0
python-apt==1.1.0b1+ubuntu0.16.4.8
python-dateutil==2.8.0
python-json-logger==0.1.11
python-slugify==6.1.2
pytz==2018.9
PyYAML==5.4.1
pyzmq==24.0.1
requests==2.28.1
s3fs==0.4.2
s3transfer==0.2.1
scikit-learn==0.20.3
scipy==1.2.1
seaborn==0.9.0
six==1.12.0
smmap==5.0.0
ssh-import-id==5.5
statsmodels==0.9.0
text-unidecode==1.3
toml==0.10.2
toposort==1.7
tornado==6.2
traitlets==4.3.2
typing_extensions==4.3.0
unattended-upgrades==0.1
urllib3==1.24.1
virtualenv==16.4.1
wcwidth==0.1.7
zipp==3.8.1

Looks like the version of importlib-metadata that I'm on is ==5.0.0

diemto commented 2 years ago

Pinning the importlib-metadata version to <5.0.0 resolved the error! Thank you @noklam and @deepyaman for all your help!

noklam commented 2 years ago

@diemto Awesome! Closing the issue now, thanks for getting back with your fix.