BlazingDB / blazingsql

BlazingSQL is a lightweight, GPU accelerated, SQL engine for Python. Built on RAPIDS cuDF.
https://blazingsql.com
Apache License 2.0
1.92k stars 181 forks source link

Unable to load and compute dask_cudf dataframe into blazing table and seeing some memory related errors. (cudaErrorMemoryAllocation out of memory) #1514

Closed chaitanyac3 closed 3 years ago

chaitanyac3 commented 3 years ago

Issue :

Trying to load a file (CSV and Parquet) using Dask CUDF and seeing some memory related errors. The dataset can easily fit into memory and the file can be read correctly using BlazingSQL's read_parquet method. However the dask_cudf.read_parquet() method fails to do the same. Seeing the same error with both file formats.

Other observation is that when a blazingSQL table is created from cudf dataframe , the table gets created but with zero records.

It will be helpful if someone can give any pointers to get over this issue.

Dataset info:

No of rows - 126 Million No of colums - 209 File Format – parquet No of Partitions - 8 File size parquet - 400 MB File size csv - 62 GB

System info :

GPU - 6 ( V100 TESLA) Memory - 16GB GPU Cores - 32 Cores

Client info: Scheduler: tcp://127.0.0.1:36617 Dashboard: http://127.0.0.1:8787/status Cluster Workers: 4 Cores: 4 Memory: 239.89 GiB

Code :

from blazingsql import BlazingContext
from dask.distributed import Client,wait
from dask_cuda import LocalCUDACluster
import dask
import dask_cudf
cluster = LocalCUDACluster()
client = Client(cluster)
bc = BlazingContext(dask_client=client)

ddf = dask_cudf.read_parquet('/home/ubuntu/126M_dataset/')
bc.create_table('table', ddf.compute())

Error Message:

super(NumericalColumn, col).fillna(fill_value, method)
    501 
    502     def find_first_value(

~/miniconda3/lib/python3.7/site-packages/cudf/core/column/column.py in fillna(self, value, method, dtype)
    733         """
    734         return libcudf.replace.replace_nulls(
--> 735             input_col=self, replacement=value, method=method, dtype=dtype
    736         )
    737 

cudf/_lib/replace.pyx in cudf._lib.replace.replace_nulls()

cudf/_lib/scalar.pyx in cudf._lib.scalar.as_device_scalar()

~/miniconda3/lib/python3.7/site-packages/cudf/core/scalar.py in device_value(self)
     75         if self._device_value is None:
     76             self._device_value = DeviceScalar(
---> 77                 self._host_value, self._host_dtype
     78             )
     79         return self._device_value

cudf/_lib/scalar.pyx in cudf._lib.scalar.DeviceScalar.__init__()

cudf/_lib/scalar.pyx in cudf._lib.scalar.DeviceScalar._set_value()

cudf/_lib/scalar.pyx in cudf._lib.scalar._set_numeric_from_np_scalar()

MemoryError: std::bad_alloc: CUDA error at: /home/ubuntu/miniconda3/include/rmm/mr/device/cuda_memory_resource.hpp:69: cudaErrorMemoryAllocation out of memory

System Info :

nvidia-smi info:

+-----------------------------------------------------------------------------+ | NVIDIA-SMI 465.19.01 Driver Version: 465.19.01 CUDA Version: 11.3 | |-------------------------------+----------------------+----------------------+ | GPU Name Persistence-M| Bus-Id Disp.A | Volatile Uncorr. ECC | | Fan Temp Perf Pwr:Usage/Cap| Memory-Usage | GPU-Util Compute M. | | | | MIG M. | |===============================+======================+======================| | 0 NVIDIA Tesla V1... On | 00000000:00:1B.0 Off | 0 | | N/A 49C P0 55W / 300W | 16147MiB / 16160MiB | 0% Default | | | | N/A | +-------------------------------+----------------------+----------------------+ | 1 NVIDIA Tesla V1... On | 00000000:00:1C.0 Off | 0 | | N/A 48C P0 56W / 300W | 16106MiB / 16160MiB | 0% Default | | | | N/A | +-------------------------------+----------------------+----------------------+ | 2 NVIDIA Tesla V1... On | 00000000:00:1D.0 Off | 0 | | N/A 46C P0 61W / 300W | 16106MiB / 16160MiB | 0% Default | | | | N/A | +-------------------------------+----------------------+----------------------+ | 3 NVIDIA Tesla V1... On | 00000000:00:1E.0 Off | 0 | | N/A 48C P0 60W / 300W | 16106MiB / 16160MiB | 0% Default | | | | N/A | +-------------------------------+----------------------+----------------------+

+-----------------------------------------------------------------------------+ | Processes: | | GPU GI CI PID Type Process name GPU Memory | | ID ID Usage | |=============================================================================| | 0 N/A N/A 113949 C ...ntu/miniconda3/bin/python 823MiB | | 0 N/A N/A 114055 C ...ntu/miniconda3/bin/python 15319MiB | | 1 N/A N/A 114059 C ...ntu/miniconda3/bin/python 16101MiB | | 2 N/A N/A 114062 C ...ntu/miniconda3/bin/python 16101MiB | | 3 N/A N/A 114053 C ...ntu/miniconda3/bin/python 16101MiB | +-----------------------------------------------------------------------------+

felipeblazing commented 3 years ago

126M Row 209 Cols (4 bytes / row * col ) = 105GB of data which does not fit in a gpu.

When you use the code

ddf.compute()

You are materializing the entire dask dataframe into one gpu. The most appropriate way to have created this table in blazing is to have used the notation

bc.create_table('table', ddf)

That being said. I highly recommend you allow blazing to read the files directly for you. The ability to scale and handle larger and larger problems is best leave to the C++ side of blazing. We do not recommend using dask to read files when you can leverage blazing to do so directly.

chaitanyac3 commented 3 years ago

@Felipe

Thanks for the inputs. These are helpful. We are running some experiments to evaluate blazingSQL + Dask and very pretty impressed with the thoughput that we observed with a single GPU. However when we did increase the capacity to use additional GPU's, the throughput didnt scale as expected or observed with similar distributed systems.

To provide some insights we have a SQL query being fired in a loop N times (pure compute & no aggregrates) operating on the dataset mentioned in the subject. Each iteration of a loop outputs a filtered parquet dataset.

The execution times that we observed are as follows

1 GPU - 64s 2 GPU - 48s 4 GPU - 45s

Are these pure dask overheads or are we missing something here ?

To explore data partitioning, we tried the .partition() method in blazingSQL which didnt improve the overall execution time but the console did have the below msg which led us to explore dask_cudf + blazingSQL combo. However we got to see a different set of errors as mentioned earlier.

`"This function has been Deprecated. It is recommended to use ddf.shuffle(on=[colnames])"

` Additionally OutofMemory errors were observed on trimmed down input as well ( 100k rows and 20+ columns)

Can you share your thoughts on this ?

felipeblazing commented 3 years ago

Are you using dask to perform all of the i/o or are you passing the files to blazingsql directly? It is much more efficient to do something like

bc.create_table("my_table","/home/ubuntu/126M_dataset/")

Than it is to do something like

ddf = dask_cudf.read_parquet('/home/ubuntu/126M_dataset/')
bc.create_table('table', ddf)

This is because blazingsql is a distributed execution engine that handles all of the distribution of data and out of core scaling on the c++ layer. If you try to read it all with dask you are going to have to materialize that data before it gets handed off to the c++ engine. For this reason it is always recommended to pass the filepaths directly to the create table statement to allow blazingsql c++ scheduler to handle out of core processing and i/o.

felipeblazing commented 3 years ago

If the data is not local you can have a look at https://blazingdb.github.io/blazingsql/reference/python/storage_plugins.html to see the storage plugins we support.