rapidsai / cudf

cuDF - GPU DataFrame Library
https://docs.rapids.ai/api/cudf/stable/
Apache License 2.0
8.33k stars 888 forks source link

[BUG] dask_cudf.read_json seems to be freezing when given a path with large number of files #12951

Open VibhuJawa opened 1 year ago

VibhuJawa commented 1 year ago

Describe the bug

dask_cudf.read_json seems to be freezing when given a path with large number of files. Providing the list of files directly works

Below seems to be freezing

text_ddf = dask_cudf.read_json(f'{INPUT_PATH}/data/*',engine='cudf',lines=True)

Below works

files = list(map(lambda x: os.path.join(data_path, x), os.listdir(data_path)))

text_ddf = dask_cudf.read_json(files,engine='cudf',lines=True)

Additional context CC: @ayushdg

wence- commented 1 year ago

How many files? What filesystem? It seems like the globbing that fsspec does is a bit slower than glob.glob or os.listdir, but that doesn't make that much difference for me on a local filesystem with 50000 empty files.

I think this is all happening in the graph construction (since no work is being done yet) so can you run the client-side code under a profiler and post the output (e.g. pyinstrument for a sampling-based callgraph). Or even just interrupt the "hung" code and see where it is.

VibhuJawa commented 1 year ago

So the graph creation is fine the reading is now working as well as directly using cuDF.

Using dask_cudf.read_json

with performance_report(filename="dask-report-read_json.html"):
    text_ddf = dask_cudf.read_json(f'{INPUT_PATH}/data/*',engine='cudf',lines=True)
    subset_ddf = text_ddf.partitions[:5_000]
    st = time.time()
    len_ddf = len(subset_ddf)
    et = time.time()
    print(f"Time taken in reading = {et-st} s")
    print(len_ddf)
Time taken in reading = 175.02658891677856 s
27206847

Using cudf.read_json

def read_json(paths):
    return cudf.read_json(paths,lines=True)

with performance_report(filename="dask-report-read_with_glob.html"):
    paths = [entry.path for entry in os.scandir(f'{INPUT_PATH}/data')]
    text_ddf = dd.from_map(read_json,paths)
    subset_ddf = text_ddf.partitions[:5_000]
    st = time.time()
    len_ddf = len(subset_ddf)
    et = time.time()
    print(f"Time taken in reading = {et-st} s")
    print(len_ddf)
Time taken in reading = 58.15872025489807 s
27458208

See Profiles here: https://gist.github.com/VibhuJawa/93e88e8f326546069f8abcb5f50deda6

wence- commented 1 year ago

So that really does look like fsspec is taking 60 or so seconds to open a bunch of filehandles.

Can you compare the time for:

text_ddf = dask_cudf.read_json(f'{INPUT_PATH}/data/*',engine='cudf',lines=True)

and

paths = [entry.path for entry in os.scandir(f'{INPUT_PATH}/data')]
text_ddf = dd.from_map(read_json,paths)

?

Would again be useful to know how many files you have.

VibhuJawa commented 1 year ago

Comparison:

text_ddf = dask_cudf.read_json(f'{INPUT_PATH}/data/*',engine='cudf',lines=True)
CPU times: user 7.23 s, sys: 2.67 s, total: 9.9 s
Wall time: 12.1 s
def read_json(paths):
    return cudf.read_json(paths,lines=True)

paths = [entry.path for entry in os.scandir(f'{INPUT_PATH}/data')]
text_ddf = dd.from_map(read_json,paths)
CPU times: user 393 ms, sys: 213 ms, total: 606 ms
Wall time: 593 ms

Would again be useful to know how many files you have.

print(len(paths))
60000
GregoryKimball commented 1 year ago

Seems like this could be related to https://github.com/rapidsai/cudf/issues/13246. Some suggestions from there were:

vyasr commented 4 months ago

I still see a meaningful performance difference here, but not as drastic of one. Some of this behavior could be largely filesystem-dependent, so my numbers may not be directly comparable to @VibhuJawa's above, especially if the overhead is mostly coming from a filesystem library like fsspec, but running this script

import cudf
import dask_cudf
import json
import os
from tqdm import tqdm
import time

# Write 10000 copies of the same JSON file to a subdirectory
subdir = "subdir"
os.makedirs(subdir, exist_ok=True)
for i in tqdm(range(60000)):
    with open(os.path.join(subdir, f"file_{i}.json"), "w") as f:
        json.dump({"key": "value"}, f)

start = time.time()
text_ddf = dask_cudf.read_json(os.path.join(subdir, "*"), engine='cudf', lines=True)
print("Time to read 60000 files with dask_cudf:", time.time() - start)

start = time.time()
text_df = cudf.read_json(os.path.join(subdir, "*"), lines=True)
print("Time to read 60000 files with cudf:", time.time() - start)

produces

(rapids) coder _ ~/cudf $ python test.py
100%|_____________________________________________________________________________________________________________________________________| 60000/60000 [00:05<00:00, 11487.66it/s]
Time to read 60000 files with dask_cudf: 20.821030139923096
Time to read 60000 files with cudf: 7.378933429718018

@VibhuJawa @wence- do you think there's some action we should be taking here based on these numbers?

VibhuJawa commented 4 months ago

I still see a meaningful performance difference here

@vyasr , I think we forgot a .compute here, dask is lazy be default so here example is not really computing here.

I changed to 6_000 from 60_000 (10x lower) , the delta on 6_000 files is still very significant and i think at 10x the files its only going to increase.


import cudf
import dask_cudf
import json
import os
from tqdm import tqdm
import time

# Directory to write the JSON files
subdir = "/raid/vjawa/test_dir"
os.makedirs(subdir, exist_ok=True)

# Write 6000 copies of the same JSON file to the subdirectory
for i in tqdm(range(6000)):
    with open(os.path.join(subdir, f"file_{i}.json"), "w") as f:
        json.dump({"key": "value"}, f)

# Measure time to read files with dask_cudf
start = time.time()
text_ddf = dask_cudf.read_json(os.path.join(subdir, "*"), engine='cudf', lines=True).compute()
print("Time to read 6000 files with dask_cudf:", time.time() - start)

# Measure time to read files with cudf
start = time.time()
text_df = cudf.read_json(os.path.join(subdir, "*"), lines=True)
print("Time to read 6000 files with cudf:", time.time() - start)

This produces:

Time to read 6000 files with dask_cudf: 44.730552434921265
Time to read 6000 files with cudf: 0.32662487030029297

So we are still suffering from the problem.

The workaround we have of using dd.from_map(read_json,paths) for Nemo-Curator can potentially cause problems on S3 based systems, cc: @ayushdg to add more context here.

vyasr commented 4 months ago

I thought I had computed, but that must have been on a different issue... lost track of things a bit during all the triaging I was doing last week. Thanks for the catch. I can reproduce this now by adding the compute call to make it hang.