rom1504 / img2dataset

Easily turn large sets of image urls to an image dataset. Can download, resize and package 100M urls in 20h on one machine.
MIT License
3.71k stars 338 forks source link

Any examples on how to pass in a url_list stored on OSS (s3 like) #329

Open zwsjink opened 1 year ago

zwsjink commented 1 year ago

currently, I'm trying to use this piece of code to download images

    url_list = "oss://mybucket/part-xxxxxxx.parquet"
    df = spark.read.parquet(url_list)
    print("count: " + str(df.count()))

    download(
        processes_count=2,
        # this is not used with spark, instead one task for each core will be started (nb executor * nb core per executor)
        thread_count=256,
        retries=2,
        url_list=url_list,
        image_size=384,
        resize_only_if_bigger=False,
        resize_mode="no",
        skip_reencode=True,
        output_folder=output_dir,
        output_format="webdataset",
        input_format="parquet",
        url_col="URL",
        caption_col="TEXT",
        enable_wandb=False,
        number_sample_per_shard=10000,
        distributor="pyspark",
        save_additional_columns=["top_caption", "all_captions", "all_similarities", "WIDTH", "HEIGHT"]
    )

the print logic on the 3rd line works well (meaning reading a file from OSS is fine and my spark conf is valid), but when it comes to the download part, it throws error like this:

OSS endpoint is not set, OSSFS could not work properlywithout a endpoint, please set it manually with ossfs.set_endpoint later Traceback (most recent call last): File "/opt/spark/work-dir/download.py", line 48, in download( File "/usr/local/lib/python3.10/dist-packages/img2dataset/main.py", line 175, in download reader = Reader( File "/usr/local/lib/python3.10/dist-packages/img2dataset/reader.py", line 55, in init if fs.isdir(url_path): File "/usr/local/lib/python3.10/dist-packages/fsspec/spec.py", line 674, in isdir return self.info(path)["type"] == "directory" File "/usr/local/lib/python3.10/dist-packages/ossfs/utils.py", line 68, in wrapper result = func(ossfs, path, *args, kwargs) File "/usr/local/lib/python3.10/dist-packages/ossfs/core.py", line 559, in info result = super().info(path, kwargs) File "/usr/local/lib/python3.10/dist-packages/fsspec/spec.py", line 634, in info out = self.ls(self._parent(path), detail=True, *kwargs) File "/usr/local/lib/python3.10/dist-packages/ossfs/utils.py", line 68, in wrapper result = func(ossfs, path, args, *kwargs) File "/usr/local/lib/python3.10/dist-packages/ossfs/core.py", line 193, in ls files = self._ls_dir(path, connect_timeout=connect_timeout) File "/usr/local/lib/python3.10/dist-packages/ossfs/core.py", line 180, in _ls_dir self.dircache[norm_path] = self._get_object_info_list( File "/usr/local/lib/python3.10/dist-packages/ossfs/core.py", line 144, in _get_object_info_list for obj in self._call_oss( File "/usr/local/lib/python3.10/dist-packages/ossfs/core.py", line 82, in _call_oss service = self._get_bucket(bucket, timeout) File "/usr/local/lib/python3.10/dist-packages/ossfs/core.py", line 59, in _get_bucket raise ValueError("endpoint is required") ValueError: endpoint is required Traceback (most recent call last): File "", line 1, in File "/usr/lib/python3.10/multiprocessing/spawn.py", line 116, in spawn_main exitcode = _main(fd, parent_sentinel) File "/usr/lib/python3.10/multiprocessing/spawn.py", line 126, in _main self = reduction.pickle.load(from_parent) File "/usr/lib/python3.10/multiprocessing/synchronize.py", line 110, in setstate self._semlock = _multiprocessing.SemLock._rebuild(state) FileNotFoundError: [Errno 2] No such file or directory

I've already set such spark conf : "spark.hadoop.fs.oss.impl": "org.apache.hadoop.fs.aliyun.oss.AliyunOSSFileSystem" "spark.hadoop.fs.oss.endpoint": "xxxxxxx.aliyuncs.com" "spark.hadoop.fs.oss.accessKeyId": "xxxxxx" "spark.hadoop.fs.oss.accessKeySecret": "xxxxx"

Looks like the img2dataset reader cannot use the same configuration.

zwsjink commented 1 year ago

tried setting ENDPOINT via os.environ and the error becomes different :

23/07/15 05:03:41 INFO DAGScheduler: Job 2 finished: count at :0, took 0.282508 s count: 5177571 Starting the downloading of this file Sharding file number 1 of 1 called /mybucket/part-00000-2256f782-126f-4dc6-b9c6-e6757637749d-c000.snappy.parquet Traceback (most recent call last): File "/opt/spark/work-dir/download.py", line 49, in download( File "/usr/local/lib/python3.10/dist-packages/img2dataset/main.py", line 250, in download distributor_fn( File "/usr/local/lib/python3.10/dist-packages/img2dataset/distributor.py", line 64, in pyspark_distributor failed_shards = run(reader) File "/usr/local/lib/python3.10/dist-packages/img2dataset/distributor.py", line 57, in run for batch in batcher(gen, subjob_size): File "/usr/local/lib/python3.10/dist-packages/img2dataset/distributor.py", line 52, in batcher for first in iterator: File "/usr/local/lib/python3.10/dist-packages/img2dataset/reader.py", line 183, in iter shards, number_shards = self._save_to_arrow(input_file, start_shard_id) File "/usr/local/lib/python3.10/dist-packages/img2dataset/reader.py", line 95, in _save_to_arrow with self.fs.open(input_file, mode="rb") as file: File "/usr/local/lib/python3.10/dist-packages/fsspec/spec.py", line 1199, in open f = self._open( File "/usr/local/lib/python3.10/dist-packages/ossfs/base.py", line 257, in _open return OSSFile( File "/usr/local/lib/python3.10/dist-packages/fsspec/spec.py", line 1555, in init self.size = self.details["size"] File "/usr/local/lib/python3.10/dist-packages/fsspec/spec.py", line 1568, in details self._details = self.fs.info(self.path) File "/usr/local/lib/python3.10/dist-packages/ossfs/utils.py", line 68, in wrapper result = func(ossfs, path, args, kwargs) File "/usr/local/lib/python3.10/dist-packages/ossfs/core.py", line 559, in info result = super().info(path, kwargs) File "/usr/local/lib/python3.10/dist-packages/fsspec/spec.py", line 648, in info raise FileNotFoundError(path) FileNotFoundError: /mybucket/part-00000-2256f782-126f-4dc6-b9c6-e6757637749d-c000.snappy.parquet Traceback (most recent call last): File "", line 1, in File "/usr/lib/python3.10/multiprocessing/spawn.py", line 116, in spawn_main exitcode = _main(fd, parent_sentinel) File "/usr/lib/python3.10/multiprocessing/spawn.py", line 126, in _main self = reduction.pickle.load(from_parent) File "/usr/lib/python3.10/multiprocessing/synchronize.py", line 110, in setstate self._semlock = _multiprocessing.SemLock._rebuild(state) FileNotFoundError: [Errno 2] No such file or directory

it automatically remove 'oss:/' maybe something wrong with ossfs logic

rom1504 commented 1 year ago

did you fix this ? seems like an issue with fsspec

zwsjink commented 1 year ago

did you fix this ? seems like an issue with fsspec

nope, later on, I tried solve this issue by download to NAS first and then start from there.