rom1504 / img2dataset

Easily turn large sets of image urls to an image dataset. Can download, resize and package 100M urls in 20h on one machine.
MIT License
3.6k stars 335 forks source link

Downloader is not producing full set of expected outputs #159

Closed PranshuBansalDev closed 2 years ago

PranshuBansalDev commented 2 years ago

Heya, I was trying to download the LAION400M dataset and noticed that I am not getting the full set of data for some reason.

Any tips on debugging further?

TL;DR - I was expecting ~12M files to be downloaded, only seeing successes in *_stats.json files indicating ~2M files were actually downloaded

For example - I recently tried to download this dataset in a distributed manner on EMR:

https://deploy.laion.ai/8f83b608504d46bb81708ec86e912220/dataset/part-00000-5b54c5d5-bbcf-484d-a2ce-0d6f73df1a36-c000.snappy.parquet

I applied some light NSFW filtering on it to produce a new parquet

# rest of the script is redacted, but there is some code before this to normalize the NSFW row to make filtering more convenient
sampled_df = df[df["NSFW"] == "unlikely"]
sampled_df.reset_index(inplace=True)

Verified its row count is ~12M samples:

import glob
import json
from pyarrow.parquet import ParquetDataset

files = glob.glob("*.parquet")

d = {}

for file in files:
    d[file] = 0
    dataset = ParquetDataset(file)
    for piece in dataset.pieces:
        d[file] += piece.get_metadata().num_rows

print(json.dumps(d, indent=2, sort_keys=True))
{
  "part00000.parquet": 12026281
}

Ran the download, and scanned over the output s3 bucket:

aws s3 cp\
    s3://path/to/s3/download/ . \
    --exclude "*" \
    --include "*.json" \
    --recursive

Ran this script to get the total count of images downloaded:

import json
import glob

files = glob.glob("/path/to/json/files/*.json")

count = {}
successes = {}

for file in files:
    with open(file) as f:
        j = json.load(f)
        count[file] = j["count"]
        successes[file] = j["successes"]

rate = 100 * sum(successes.values()) / sum(count.values())
print(f"Success rate: {rate}. From {sum(successes.values())} / {sum(count.values())}")

which gave me the following output:

Success rate: 56.15816066896948. From 1508566 / 2686281

The high error rate here is not of major concern, I was running at low worker node count for experimentation so we have a lot of dns issues (I'll use a knot resolver later)

unknown url type: '21nicrmo2'                                                      1.0
<urlopen error [errno 22] invalid argument>                                        1.0
encoding with 'idna' codec failed (unicodeerror: label empty or too long)          1.0
http/1.1 401.2 unauthorized\r\n                                                    4.0
<urlopen error no host given>                                                      5.0
<urlopen error unknown url type: "https>                                          11.0
incomplete read                                                                   14.0
<urlopen error [errno 101] network is unreachable>                                38.0
<urlopen error [errno 104] connection reset by peer>                              75.0
[errno 104] connection reset by peer                                              92.0
opencv                                                                           354.0
<urlopen error [errno 113] no route to host>                                     448.0
remote end closed connection without response                                    472.0
<urlopen error [errno 111] connection refused>                                  1144.0
encoding issue                                                                  2341.0
timed out                                                                       2850.0
<urlopen error timed out>                                                       4394.0
the read operation timed out                                                    4617.0
image decoding error                                                            5563.0
ssl                                                                             6174.0
http error                                                                     62670.0
<urlopen error [errno -2] name or service not known>                         1086446.0
success                                                                      1508566.0

I also noticed there were only 270 json files produced, but given that each shard should contain 10,000 images, I expected ~1,200 json files to be produced. Not sure where this discrepancy is coming from

> ls
00000_stats.json  00051_stats.json  01017_stats.json  01066_stats.json  01112_stats.json  01157_stats.json
00001_stats.json  00052_stats.json  01018_stats.json  01067_stats.json  01113_stats.json  01159_stats.json
...
> ls -l | wc -l 
270
PranshuBansalDev commented 2 years ago

image

I'm even seeing ~1200 jobs being created, but somehow only 270 outputs

rom1504 commented 2 years ago

Do you have enough ram in workers ?

PranshuBansalDev commented 2 years ago

I believe they are all succeeding

image

This is the config I'm using


        --master yarn\
        --deploy-mode client\
        --conf "spark.executor.cores=5"\
        --conf "spark.executor.memory=9g"\
        --conf "spark.executor.memoryOverhead=1g"\
        --conf "spark.driver.memory=9g"\
        --conf "spark.driver.cores=5"\
        --conf "spark.default.parallelism=50"\
        --conf "spark.network.timeout=1000000s"\

This is my worker node

image

rom1504 commented 2 years ago

Ok, do you have any way to check the memory usage during the job? It's likely that it is the problem

Can you also check executor logs to see if you get this log https://github.com/rom1504/img2dataset/blob/main/img2dataset/downloader.py#L93 ?

PranshuBansalDev commented 2 years ago

I see that from cloudwatch that it looks fine:

image

Oh interesting, I looked through some of the executors that were running shockingly fast (<1s) and found a bunch of these errors

botocore.exceptions.NoCredentialsError: Unable to locate credentials
shard 381 failed with error Unable to locate credentials

Wonder if thats related, I'll dig into it

Any reason this is just a print statement and not a thrown exception that the tool can do retries on?

rom1504 commented 2 years ago

Yeah the problem is spark doesn't have a feature of "try then give up at some point" so if there was an exception here instead of a print then your whole job would likely have failed after the tasks had retried a few times

I think something that could be improved here is doing a loop in that piece of code to retry a few times instead of just failing

rom1504 commented 2 years ago

Your credentials error is likely the problem

There is 2 ways to solve it One is to find the root cause and solve that cred problem Another is implement the retry I'm mentioning above, assuming this is a temporary problem, the second try should work

PranshuBansalDev commented 2 years ago

Is there any concept of surfacing these logs without having to go into the individual executors? Or somehow tracking these failures in stats.json?

PranshuBansalDev commented 2 years ago

Your credentials error is likely the problem

There is 2 ways to solve it One is to find the root cause and solve that cred problem Another is implement the retry I'm mentioning above, assuming this is a temporary problem, the second try should work

Yeah I noticed the error eventually goes away, I'm wondering if its some kind of spark job spin up time and I should just add a wait 5m to the calling script or something

rom1504 commented 2 years ago

There are several options to surface them but I'm not sure if I can think of something clean, feel free to try things With spark it's kind of usual to look at executor logs

PranshuBansalDev commented 2 years ago

Edit - I spoke too soon, I reduced subjob_size way down to 5, and lets see if that works

PranshuBansalDev commented 2 years ago

Could this https://github.com/rom1504/img2dataset/issues/137 be related?

rom1504 commented 2 years ago

The retry option is at the sample level, that will not help in your case What you need is a retry at the shard level and need new code in the line i was suggesting above.

On Wed, Mar 30, 2022, 03:05 Pranshu Bansal @.***> wrote:

Could this #137 https://github.com/rom1504/img2dataset/issues/137 be related?

— Reply to this email directly, view it on GitHub https://github.com/rom1504/img2dataset/issues/159#issuecomment-1082513158, or unsubscribe https://github.com/notifications/unsubscribe-auth/AAR437QZKW6BAI2IY4UOX53VCOSGVANCNFSM5R4WWB7Q . You are receiving this because you commented.Message ID: @.***>

rom1504 commented 2 years ago

Reducing subjob_size to 5 will drastically decrease the performance of the tool, at this point it will be slower than running on one node.

rom1504 commented 2 years ago

https://github.com/rom1504/img2dataset/blob/main/img2dataset/downloader.py#L88 by retry at the shard level i mean doing a for loop there

PranshuBansalDev commented 2 years ago

Yeah thats what I did, I ran retries at shard level (i.e. i went and edited the code so that it 1. retries on shard failure, 2. eventually throws after 3 retries. This did not resolve the error). The error I'm getting is not retry-able. Do you know the minimum parallelization you'd recommend before it starts to be more efficient to run on single node? Reducing parallelism seems to be the only fix

PranshuBansalDev commented 2 years ago

Yeah I just ran a test over a single parquet, running at low parallelism fixed it (but yes now the tool isnt performant)

I'll spend today working on finding if there are better params I can use

SSL errors also (expectedly) went way down

>>> df.sum()
success                                                                      11279122.0
http error                                                                     451597.0
ssl                                                                             48412.0
image decoding error                                                            57479.0
<urlopen error [errno -2] name or service not known>                            82297.0
remote end closed connection without response                                    3438.0
the read operation timed out                                                    27808.0
timed out                                                                       16421.0
<urlopen error [errno 111] connection refused>                                   8824.0
<urlopen error timed out>                                                       32609.0
opencv                                                                           2876.0
<urlopen error [errno 113] no route to host>                                     3014.0
<urlopen error [errno 104] connection reset by peer>                              352.0
encoding issue                                                                  10642.0
[errno 104] connection reset by peer                                              614.0
<urlopen error [errno 101] network is unreachable>                                542.0
incomplete read                                                                    53.0
<urlopen error [errno 22] invalid argument>                                         5.0
http/1.1 401.2 unauthorized\r\n                                                    98.0
<urlopen error unknown url type: "https>                                           35.0
<urlopen error no host given>                                                      33.0
encoding with 'idna' codec failed (unicodeerror: label empty or too long)           3.0
got more than 100 headers                                                           3.0
unknown url type: '21nicrmo2'                                                       4.0
rom1504 commented 2 years ago

The error I'm getting is not retry-able

What do you mean exactly? Does it fail again if you try (after some wait time) ?

Do you know the minimum parallelization you'd recommend before it starts to be more efficient to run on single node? Reducing parallelism seems to be the only fix

I do not advise reducing the parallism for this permission problem. The whole point of this tool is to be as parallel as possible

My advice at this point is either to try to implement retrying properly (can you share your implementation?) Either to fix the root permission issue

In my case i chose not to use AWS EMR and instead to use directly ec2 instances with a spark standalone cluster on them because AWS EMR was not working well. You can see how in the distributed guide in the doc.

Btw running this in distributed only makes sense if your dataset is larger than a few billions of samples

For laion400m, a single node is enough to run in 3 days

PranshuBansalDev commented 2 years ago

I tried something super basic like this:

    def __call__(
        self, row,
    ):
        for retry in len(self.retries+1):
            try:
                self.download_shard(row)
                print(f"shard {row[0]} completed successfully!")
                return
            except Exception as err:  # pylint: disable=broad-except
                traceback.print_exc()
                print(f"shard {row[0]} failed with error {err}. Retry {retry}/{self.retries}")
                time.sleep(10)
        return (False, 0, 0, 0, 0, 0, None)

Is there a better way to attempt retries?

For single node - do you have recommended configs for the LAION400M download? (ec2 instance type, multiprocessing params, etc)

rom1504 commented 2 years ago

Your loop seems ok. So it prints failure 10 times in a row and never succeeds ?

If that's the case I'm afraid the only thing to do is really to fix the S3 auth. Maybe your AWS configs are not quite right?

For a single node you can use these commands https://github.com/rom1504/img2dataset/blob/main/dataset_examples/laion400m.md In term of hardware 16 cores is enough. If you have more then it will be faster. But for example c6i in AWS ec2 It's really important to setup knot resolver though

PranshuBansalDev commented 2 years ago

I don't think its a aws config thing because eventually the download does start working, and it also works at very low parallelism.

I'll also give the knot resolver a go. Thank you so so so so so much for all the support you've been providing to me!

PranshuBansalDev commented 2 years ago

Ok I just decided to opt for a slower download, as its currently blocking some other work, not sure what the issue is. If you want to keep this open for resolution I'll update the thread when i do get around to digging into the root cause

PranshuBansalDev commented 2 years ago

Huge thanks for all the ideas and support though!

rom1504 commented 2 years ago

Let's keep it open for now. I'll think what can be done to make this better and clearer when I have time.

PranshuBansalDev commented 2 years ago

So the set of things I'm going to investigate in parallel while the real download goes on in the background is:

  1. set up a knot resolver (this isnt super trivial as my corpo security may ban me from using the ones you have access to)
  2. try see if premounting s3 onto my file system (using s3fs or similar) will help resolve the startup thrashing (my hunch thats what the issue is)

While I definitely believe using ec2 directly may help resolve the issues, I want to try my best to see if we can get EMR working so that we can leverage auto-scaling/remove the parallel-ssh steps for future use (doing that for 100 machines if theres a 100B dataset doesnt sound fun)

PranshuBansalDev commented 2 years ago

Ok i figured it out eventually - I don't have a smoking gun with data, but when I did the below suggested fix it resolved the issue:

Basically my spark defaults had spark.dynamicAllocation.enabled as True, when I disabled dynamicAllocation the issue went away

Observations: I noticed that the executors would keep dieing/come back up. After a bit of digging I saw that the executors that were getting shut down were handling ~40-50% of the tasks in less than 50ms

Digging: I realized that it takes a bit of time between executors spinning up and the tasks actually getting allocated to them, I noticed that in the spark logs there was a bunch of "killing executor due to timeout"

Solution: So when I went and looked up this message I did some reading about dynamicAllocation. So I had 2 thoughts -

  1. Disable it, see if it works
  2. Go back and increase idleTimeout to something like 10-15minutes

I tried 1, it worked, then I ran out of time to go back and try 2

Hopefully this helps, but your tool isn't the issue here

rom1504 commented 2 years ago

Interesting, thanks for the info! I'm still interested to increase the reliability here. I will be handling it in https://github.com/rom1504/img2dataset/issues/140. Trying various ways to implement shard retrying and making it so the likeliness of the success rate being low decrease / trying to making so things are more likely to work by default

Btw just to give some ideas of speeds, I talked with someone that used a bunch of freely provided TPU vms and was able to download laion2B-multi in 7h at around 100k sample/s (using img2dataset)

rom1504 commented 2 years ago

Did you manage to download what you wanted in a reasonable time ?

PranshuBansalDev commented 2 years ago

Yup, we used 10 c4.8x large and got it done in 30hours for LAION400M, success rate of ~93% with no dns/knot resolvers

I used these settings (probably overly conservative, but I used the guidance avail here)

--master yarn\
                --deploy-mode client\
                --conf "spark.dynamicAllocation.enabled=false"\
                --conf "spark.executor.cores=5"\
                --conf "spark.executor.memory=9g"\
                --conf "spark.executor.memoryOverhead=1g"\
                --conf "spark.driver.memory=9g"\
                --conf "spark.driver.cores=5"\
                --conf "spark.executor.instances=59"\
                --conf "spark.default.parallelism=1300"
PranshuBansalDev commented 2 years ago

I also had this weird issue where the SparkJob would just show "successfully completed" after the first 5-6 parquets, so I had to wrap it in a loop and that helped a bunch too

aws s3 ls s3://path/to/parquets/ > files.txt
input="files.txt"
while IFS= read -r line
  spark-submit ... img2dataset ... -i $line
done < "$input" 
PranshuBansalDev commented 2 years ago

Also - since I resolved this issue on my side/posted the steps to fix going forward for others I won't monitor this anymore, please do ping me at my email address if you have any further questions!

I'll let you close this out once you're ready to

rom1504 commented 2 years ago

you may now rerun the job to get missing shards, see https://github.com/rom1504/img2dataset#incremental-mode

however I will also implement a shard retrying feature in a future PR

rom1504 commented 2 years ago

shard retrying implemented as well