fsspec / s3fs

S3 Filesystem
http://s3fs.readthedocs.io/en/latest/
BSD 3-Clause "New" or "Revised" License
892 stars 274 forks source link

ProcessPoolExecutor hangs #889

Closed riklopfer closed 3 months ago

riklopfer commented 3 months ago

The example below hangs when using s3fs with ProcessPoolExecutor but not when using ThreadPoolExecutor. Both work with local file system.

"""
example.py
""""

import argparse
import concurrent.futures
from concurrent.futures import Executor
from os import PathLike
from typing import List, Type, Iterator, Union, Iterable

import fsspec

EXEC_MAP = {
    "thread": concurrent.futures.ThreadPoolExecutor,
    "process": concurrent.futures.ProcessPoolExecutor,
}

def find_files(path: Union[str, PathLike]) -> Iterator[str]:
    """
    Find files contained in path.
    """
    path_str = str(path)
    # Create a filesystem object using fsspec
    fs = fsspec.open(path_str).fs

    # Get the protocol (e.g., "s3", "file", etc.)
    protocol = fs.protocol if isinstance(fs.protocol, str) else fs.protocol[0]

    # Check if the path is a directory
    if fs.isdir(path_str):
        # If it's a directory, iterate over the files in the directory
        for file in fs.find(path_str):
            if not fs.isdir(file):  # Ensure it's a file, not a subdirectory
                yield f"{protocol}://{file}"
    else:
        # If it's a file, yield the file with protocol
        yield f"{protocol}://{path_str}"

def find_all_files(paths: Iterable[Union[str, PathLike]]) -> Iterator[str]:
    """Find all files contained in paths."""
    for path in paths:
        yield from find_files(path)

def get_content_length(uri):
    with fsspec.open(uri) as fp:
        return len(fp.read())

def process_many(exec_type: Type[Executor], filez: List[str]):
    with exec_type() as executor:
        futures = list()
        for file in list(find_all_files(filez)):
            print(file)
            futures.append(executor.submit(get_content_length, file))

        print("futures submitted. waiting for results")
        for ftr in concurrent.futures.as_completed(futures):
            print(ftr.result())

def main():
    parser = argparse.ArgumentParser()
    parser.add_argument("filez", help="file paths", type=str, nargs="+")
    parser.add_argument(
        "--exec-type",
        type=str,
        choices=EXEC_MAP.keys(),
        required=True,
    )
    args = parser.parse_args()

    process_many(EXEC_MAP.get(args.exec_type), args.filez)

if __name__ == "__main__":
    main()

Works

python example.py path/to/dir --exec-type process
python example.py path/to/dir --exec-type thread
python example.py s3://path/to/dir --exec-type thread

Hangs

python example.py s3://path/to/dir --exec-type process

Version info

fsspec==2024.6.1
s3fs==2024.6.1
Python 3.10.14
martindurant commented 3 months ago

async ioloops and threads are not fork-safe. You may be able to get things running by using spawn or spawnserver start methods for your processes. fsspec tries to detect that it finds itself in a forked process, but this cannot be guaranteed.

Other possible workarounds:

riklopfer commented 3 months ago

Thanks @martindurant. As long as we use ThreadPoolExecutor we are able to query the file system object in the main thread and don't need to worry about deadlocks?

martindurant commented 3 months ago

I don't expect any problems with threads, but note that s3fs is async and runs its ioloop in a dedicated thread, so you probably don't get any parallelism on the IO by doing this. You might still get parallelism on whatever you then do with what you fetched.

riklopfer commented 3 months ago

Multiprocessing documentation that I evidently missed.