PrefectHQ / prefect

Prefect is a workflow orchestration framework for building resilient data pipelines in Python.
https://prefect.io
Apache License 2.0
15.84k stars 1.55k forks source link

Unable to use RemoteFileSystem to interact with WebHDFS #6957

Open hateyouinfinity opened 1 year ago

hateyouinfinity commented 1 year ago

First check

Bug summary

Currently it does not seem possible to use RemoteFileSystem with WebHDFS as the underlying implementation. There are 2 problems afaict. Assume you define your filesystem block as follows: myfs = RemoteFileSystem(basepath="webhdfs://home/user/project", settings={"host": "example.com"}).


Calling write_path fails due to an improperly formatted url.

myfs.write_path("filename", b"content") calls myfs.filesystem.makedirs("webhdfs://home/user/project), but the underlying implementation doesn't do any preprocessing and basically appends path to base url, producing something like https://example.com/webhdfs/v1webhdfs%3A//home/user/project?op=MKDIRS.
Calling the above url fails. Was this url generated properly it would look like this: https://example.com/webhdfs/v1/home/user/project?op=MKDIRS. That is, path param that gets passed to WebHDFS._call should begin with a slash and have no scheme.

This doesn't seem to be a problem for other RemoteFileSystem methods since all of them call (be it directly or implicitly) fs.filesystem.open, which (in the case of WebHDFS) calls fsspec.utils.infer_storage_options, stripping the scheme. However, infer_storage_options causes another problem.


First segment of fs.basepath gets stripped, leading to accessing incorrect remote paths.

fs.filesystem.open calls fs.filesystem._strip_protocol (link). Filesystem implementations commonly override _strip_protocol. WebHDFS implementation's of _strip_protocol calls fsspec.utils.infer_storage_options. As far as I can infer, infer_storage_options expects its input to either have no scheme (in which case the whole path is returned), or have netloc following the scheme (in which case netloc is stripped away along with the scheme). As a result, /user/project gets accessed instead of /home/user/project.

One can work around this by prepending an extra segment to basepath (e.g. basepath="webhdfs://fakehost/home/user/project"), but that requires knowing how a particular implementation behaves (and is ugly to boot). Of note here is that it treats s3/gcs schemes as special cases (doesn't strip the first segment), so the above method can't be used blindly. I'd like to mention that current docs have usage examples only for cloud storage providers, which are seemingly immune to this issue.

As an aside, it's not clear why an implementation that takes hostname/port as parameters expects path to contain a netloc at all.


Previous section got me thinking whether WebHDFS is unique or maybe there are other implementations that have the same problem? So I picked some implementations and wrote a script to compare what _strip_protocol outputs for the same input path.

Script ```python # pip install adlfs gcsfs ocifs paramiko pyarrow s3fs smbprotocol webdav4 #!/usr/bin/env python3 from typing import Dict, List from fsspec import get_filesystem_class DEFAULT_SCHEMES = [ "arrow_hdfs", "az", "dbfs", "file", "ftp", "github", "gs", "hdfs", "http", "https", "oci", "s3", "sftp", "smb", "webdav", "webhdfs", ] def get_resolved_path(scheme: str, path: str) -> str: return get_filesystem_class(scheme)._strip_protocol(path) def main( schemes_to_check: List[str] = DEFAULT_SCHEMES, no_scheme_path="/home/user/file" ) -> Dict[str, str]: res = {} for scheme in schemes_to_check: try: res[scheme] = get_resolved_path( scheme=scheme, path=f"{scheme}://{no_scheme_path.lstrip('/')}" ) except Exception as e: print(e) return res if __name__ == "__main__": for scheme, resolved_path in main().items(): print(f"{scheme: <20}{resolved_path}") ```

Here's what I get if I run it:

arrow_hdfs          home/user/file
az                  home/user/file
dbfs                home/user/file
file                /workspaces/prefect/home/user/file
ftp                 /user/file
github              home/user/file
gs                  home/user/file
hdfs                /user/file
http                http://home/user/file
https               https://home/user/file
oci                 home/user/file
s3                  home/user/file
sftp                /user/file
smb                 /user/file
webdav              home/user/file
webhdfs             /user/file

Going by this, a few other filesystems might have something similar going on. SFTP seems reasonably easy to test.

docker run --name sftp -p 2222:22 -d atmoz/sftp foo:pass:::upload
from prefect.filesystems import RemoteFileSystem

sftp_settings = {"host": "localhost","port": 2222, "username": "foo", "password": "pass"}

unprefixed_sftp = RemoteFileSystem(basepath="sftp://upload/", settings=sftp_settings)
prefixed_sftp = RemoteFileSystem(basepath="sftp://fakehost/upload/", settings=sftp_settings)

# We can't use write_path since SFTPFileSystem.makedirs errors out for the same reason WebHDFS.makedirs does
# So let's write a file manually and read it back

# It doesn't matter whether we use prefixed_sftp or unprefixed_sftp for this
with prefixed_sftp.filesystem.open("upload/filename", "wb") as file:
    file.write(b'Hi!')

for fs in [unprefixed_sftp, prefixed_sftp]:
    try:
        print(fs.basepath, end="\n\t")
        print(fs.read_path("filename"))
    except Exception as e:
        print(e)
# sftp://upload/
#         [Errno 2] No such file
# sftp://fakenetloc/upload/
#         b'Hi!'

print(unprefixed_sftp.filesystem.ls('.'))
# ['./upload']
print(unprefixed_sftp.filesystem.ls('./upload'))
# ['./upload/filename']
docker exec -it sftp ls /home/foo/upload
# filename

WebHDFS doesn't seem to be the only problematic implementation.

Reproduction

See above

Error

No response

Versions

Version: 2.4.2 API version: 0.8.0 Python version: 3.10.4 Git commit: 65807e84 Built: Fri, Sep 23, 2022 10:43 AM OS/Arch: win32/AMD64 Profile: default Server type: ephemeral Server: Database: sqlite SQLite version: 3.37.2

Additional context

No response

hateyouinfinity commented 1 year ago

Looking at the docstring of fsspec.implementations.smb.SMBFileSystem (link), I noticed it talks about using the class via fsspec.core.open(URI), in which case URI must contain a netloc. fsspec.core.open calls fsspec.core.open_files, which calls fsspec.core.get_fs_token_paths.

get_fs_token_paths does roughly the following (link):


Contrast example below with OP.

docker run --name sftp -p 2222:22 -d atmoz/sftp foo:pass:::upload
from fsspec.core import get_fs_token_paths
from fsspec.core import open as fsopen

fs, token, filepath = get_fs_token_paths("sftp://foo:pass@localhost:2222/upload/filename")
print(fs.host, fs.ssh_kwargs, filepath)
# localhost {'port': 2222, 'username': 'foo', 'password': 'pass'} ['/upload/filename']

with fsopen(filepath[0], "wb") as f:
    f.write(b"Bye!")

print(fs.ls('.'))
# ['./upload']

print(fs.ls('./upload'))
# ['./upload/filename']

print(fs.cat("./upload/filename"))
# b'test'

I reckon fsspec implementations fall into two groups:

  1. Implementations in the first group do their own (seemingly idempotent) preprocessing in makedirs and treat first non-scheme segment of _strip_protocol's input as part of the filepath.
  2. Implementations in the second group expect external preprocessing to be done before makedirs is called and treat first non-scheme segment of _strip_protocol's input as a netloc.

prefect.filesystems.RemoteFileSystem assumes that all implementations fall into the first group, leading to the above described problems.