kedro-org / kedro

Kedro is a toolbox for production-ready data science. It uses software engineering best practices to help you create data engineering and data science pipelines that are reproducible, maintainable, and modular.
https://kedro.org
Apache License 2.0
9.46k stars 875 forks source link

[Investigate] ParallelRunner does not work with S3-linked data catalog #2162

Open ethan-isaacson-perch opened 1 year ago

ethan-isaacson-perch commented 1 year ago

Description

The ParallelRunner fails if data catalog entries point to Amazon S3.

Context

We use the ParallelRunner to run a large and highly parallelized pipeline. When our data catalog is connected to the local disk filesystem, everything works. When we attempt to switch the file locations to a functionally identical S3 bucket (using the out-of-the-box location specifications as documented here), we see errors. Further details below, but I believe this is caused by some tricky imports and a pickling failure.

Steps to Reproduce

The code is a bit too involved to wireframe directly here, but in general I believe any session that couples ParallelRunner with S3 catalog objects will throw errors.

Expected Result

The pipeline should run to completion.

Actual Result

We see errors related to the serializability of the catalog objects. Namely:

AttributeError: The following data sets cannot be used with multiprocessing: [<all of our catalog entries connected to s3>]

This error is accompanied by the following message:

In order to utilize multiprocessing you need to make sure all data sets are serialisable, i.e. data sets should not make use of lambda functions, nested functions, closures etc.
If you are using custom decorators ensure they are correctly decorated using functools.wraps().

Further up the traceback we see that the error was tripped here, in runner.parallel_runner.py:

│   221 │   │   if unserialisable:                                                                 │
│ ❱ 222 │   │   │   raise AttributeError(                                                          │
│   223 │   │   │   │   f"The following data sets cannot be used with multiprocessing: "           │
│   224 │   │   │   │   f"{sorted(unserialisable)}\nIn order to utilize multiprocessing you "      │
│   225 │   │   │   │   f"need to make sure all data sets are serialisable, i.e. data sets "    

Your Environment

Kedro version 0.18.2 Python version 3.9.1 Running on Windows 10 Pro 21H2 (also replicated on a Linux instance although I don't have the distro / version details at the moment).

Temporary fix

I have found a way to fix this problem, i.e. allow ParallelRunner to work with S3 datasets, by modifying the Kedro source code locally. I am not sure that this fix is the correct approach, but sharing in case helpful as a head start.

I found that what was happening was the S3FS-enabled catalog objects were unable to be serialized by ForkingPickler. The specific problem seems to be in the creation of glob_func, which uses s3fs.core.S3FileSystem._glob in the case of S3 files, but (I think because of the sequence of imports, somehow), the inherited function's signature does not match what the pickler expects from s3fs.core.S3FileSystem._glob. In general, my solution involves re-instantiating that glob_func at various places so that the signatures match and serialization is possible. (I think. I don't really fully understand what's going on here, and my knowledge / vocabulary of the underlying dynamics is not very good, but the following is what worked for me).

Changes to individual datasets

First, I modified the individual datasets as follows. I did this for each dataset type that I used (e.g. CSVDataSet, ParquetDataSet ... etc.

In __init__(), I added:

...
 self._fs = fsspec.filesystem(self._protocol, **_credentials, **_fs_args)

+ if self._protocol == "s3":
+     # this seems to prevent pickling errors
       # it SHOULD be the same thing -- S3FileSystem._glob is what it should be pointing to anyway -- but the pickler thinks it isn't
+    glob_func = s3fs.core.S3FileSystem._glob
+ else:
+    glob_func = self._fs.glob

 super().__init__(
...

Theoretically, I could have just defined my own runners without submitting an issue if the above were sufficient. But I found I also needed to make a small modification to core to get things to run:

Changes to io.core.py

In __init__(), in _fetch_latest_load_version(self), I changed the following line:

version_paths = sorted(self._glob_function(pattern), reverse=True)

to:

        try:
            version_paths = sorted(self._glob_function(pattern), reverse=True)
        except TypeError:
           # for some reason _glob_function gets into trouble with the pickler again, whereas
           # self._fs.glob (which should be the same function) does not
            version_paths = sorted(self._fs.glob(pattern), reverse=True)

Again, I have no conviction that the above changes were the right way to do this, but it did get multiprocessing working with S3.

jmholzer commented 1 year ago

Hey, thanks so much for this detailed report. This an important problem to investigate, I'll push for it to be added to our next sprint.

astrojuanlu commented 1 year ago

We'll try to reproduce this error, should appear with any dataset and an S3 bucket when using the parallel runner.

merelcht commented 6 months ago

Closing this issue as it hasn't had any recent activity.

BielStela commented 5 months ago

Hi! I had the same problem. I've been using ParallelRunner happily with two datasets. Then added a dataset stored in s3 and exactly the same issue described by OP happened to me.

The Pickle error is:

PicklingError: Can't pickle <function S3FileSystem._glob at 0x7fb02c74f4c0>: it's not the same object as s3fs.core.S3FileSystem._glob
astrojuanlu commented 5 months ago

Thanks @BielStela, reopening. Can you confirm you were using the latest Kedro version? Also let us know Python version and operating system

BielStela commented 5 months ago

Sure, I'm using

BielStela commented 5 months ago

for more context, a fix that worked is what OP did to the dataset. I'm using a custom dataset and adding just this to the __init__ of the class fixes the issue

if self._protocol == "s3":
    glob_func = s3fs.core.S3FileSystem._glob
else:
    glob_func = self._fs.glob

super().__init__(
    glob_function=glob_func,
    ...
atetich3211 commented 1 month ago

Hey, this is not an S3-only problem, I have the same problem if we have other systems than local e.g. abfs if we use ParallelRunner. I took a generic approach the following solution worked for me on any fsspec.

class NoFsspecProblemCSVDataset(CSVDataset):
    def __init__(self, ....) -> None:
        super().__init__(...)
    @property
    def _glob_function(self):
        return self._fs.glob

    @_glob_function.setter
    def _glob_function(self, value):
        pass

This solution prevents the problem where glob_function is non-serializable.