uber / petastorm

Petastorm library enables single machine or distributed training and evaluation of deep learning models from datasets in Apache Parquet format. It supports ML frameworks such as Tensorflow, Pytorch, and PySpark and can be used from pure Python code.
Apache License 2.0
1.8k stars 284 forks source link

Guidance on How to Tune BatchedDataLoader #570

Closed andrewredd closed 4 years ago

andrewredd commented 4 years ago

Hi All,

I tried reading the docs, issues, and pull requests and haven't felt like I have a good handle on how to optimize a dataloader in pytorch. I'm using Pytorch 1.5, Cloudera HDFS, Nvidia Tesla V100, have 80 cores and 700 gb of memory.

I'm trying to read a smallish table of 36M rows with ~ 2000 values per row (they are grouped into five array columns). I've made sure the parquet block size is ~256MB

This is what I have for code:

dl = BatchedDataLoader(make_batch_reader(file_name, workers_count=10, schema_fields=col_names, hdfs_driver='libhdfs', reader_pool_type = 'process'), batch_size=batch_size, shuffling_queue_capacity=shuffling_queue_capacity)

Problem: I'm trying to figure out how to keep a single GPU running at full memory capacity from the cluster. The setup above is majorly IO bound. In very basic tuning that it seems like I can get through an epoch faster using a large batch size (500K) and a shuffling_queue_capacity of 10x that. Even with the large batch size I only use up half of the GPU memory 17GB/32GB. This setup seems to process a batch every ~20secs, but then the GPU sits idle for ~30secs as it waits for another batch (I wish I knew how to benchmark all this better) I've experimented with larger and smaller shuffle queues but it seems to burn through the queue super fast and then be bound by IO out of the cluster.

I've tried a much smaller batch size (64) and petastorm cranks through the batches constantly but the GPU memory is minimally used.

Questions: 1) Are large batches the best way to get high throughout out of the cluster? 2) Do I need to add more workers? I've increased the amount to 50 and haven't seen a improvement. 3) I read that the 'process' pool type is superior because I don't use any image c++ -like processing. Is this the right approach? 4) When I added the transform_fn to the batched data loader I'd almost immediately overflowed my gpu. Should I be making my batches smaller if I take this approach?

Would love to contribute a tutorial when we figure this out if that would be helpful.

The next step would be to put this on a much larger multi GPU machine. I need to figure out these throughput questions first.

Thanks for the great work!

andrewredd commented 4 years ago

Seems similar to this issue https://github.com/uber/petastorm/issues/493

@selitvin do you have any guidance in terms of diagnosing the questions above?

selitvin commented 4 years ago

Sorry for the late response. I was away last week and was not monitoring my emails.

Thank you for the detailed description of your case and appreciate you going through existing issues and the documentation!

I'd ask the following questions:

Do we have r_io > r_nn?

Hope this helps...

andrewredd commented 4 years ago

Thank you for the guidance @selitvin! Update on r_io

I've run the following code to benchmark r_io:

from itertools import product

pools = ['thread', 'process']
workers = [i for i in range(20, 81, 20)]
for i in product(pools, workers):
    print(i)

import time

from petastorm.reader import make_batch_reader
from petastorm.pytorch import BatchedDataLoader

for pool_type, workers_count in product(pools, workers):
    with make_batch_reader(file_name, 
                               workers_count=workers_count, 
                               schema_fields=schema_cols, 
                               hdfs_driver='libhdfs',
                               reader_pool_type = pool_type) as reader:

        r_io_sum = 0
        samples = 5
        for sample_idx in range(samples):
            num_rows = 0
            post_first_batch = False
            print(f'SAMPLE: {sample_idx} -- workers_count: {workers_count} and pool_type: {pool_type}')
            batch_idx = 0
            while True:
                batch = next(reader)

                # exclude the first batch from time computation in case
                # reader lags on first batch of reader
                if not post_first_batch:
                    post_first_batch = True
                    start = time.time()

                # print batches to make sure the process isnt hanging
                if batch_idx % 10 == 0:
                    print(batch_idx)

                # sum total number of rows to calculate throughput
                num_rows += batch[0].shape[0]

                if batch_idx > 50:
                    break

                batch_idx += 1

            end = time.time()
            r_io = num_rows/(end - start)
            print(f'\t r_io: {r_io}')
            r_io_sum += r_io
        print(f'FINISHED: average r_io was {r_io_sum / samples}')

Which yielded the following output:

SAMPLING -- workers_count: 20 and pool_type: thread
     SAMPLE 0/9 -- r_io: 47289.3997842017
     SAMPLE 1/9 -- r_io: 48842.93403938571
     SAMPLE 2/9 -- r_io: 48546.25535884053
     SAMPLE 3/9 -- r_io: 47612.4815459365
     SAMPLE 4/9 -- r_io: 40063.17078407439
     SAMPLE 5/9 -- r_io: 49382.76953992936
     SAMPLE 6/9 -- r_io: 46786.332368129006
     SAMPLE 7/9 -- r_io: 48050.99179485772
     SAMPLE 8/9 -- r_io: 48326.967940064846
     SAMPLE 9/9 -- r_io: 42193.66452616447
FINISHED: average r_io was 46709.49676815842

SAMPLING -- workers_count: 50 and pool_type: thread  
     SAMPLE 0/9 -- r_io: 14967.043967968786
     SAMPLE 1/9 -- r_io: 15508.153836147667
     SAMPLE 2/9 -- r_io: 14742.278548151298
     SAMPLE 3/9 -- r_io: 16603.15669550825
     SAMPLE 4/9 -- r_io: 16091.641570850566
     SAMPLE 5/9 -- r_io: 15430.678891548883
     SAMPLE 6/9 -- r_io: 16523.03313438565
     SAMPLE 7/9 -- r_io: 21826.005453270616
     SAMPLE 8/9 -- r_io: 22088.700325769678
     SAMPLE 9/9 -- r_io: 16995.26939655195
FINISHED: average r_io was 17077.596182015335

SAMPLING -- workers_count: 80 and pool_type: thread  
     SAMPLE 0/9 -- r_io: 6308.844691935815
     SAMPLE 1/9 -- r_io: 5982.248499435853
     SAMPLE 2/9 -- r_io: 5627.0357041572715
     SAMPLE 3/9 -- r_io: 5725.003200946084
     SAMPLE 4/9 -- r_io: 6344.455604555025
     SAMPLE 5/9 -- r_io: 6337.626240406156
     SAMPLE 6/9 -- r_io: 5954.369076002141
     SAMPLE 7/9 -- r_io: 5789.844728023085
     SAMPLE 8/9 -- r_io: 6183.318914167719
     SAMPLE 9/9 -- r_io: 6120.812918811555
FINISHED: average r_io was 6037.355957844071

SAMPLING -- workers_count: 20 and pool_type: process 
     SAMPLE 0/9 -- r_io: 25138.236878261065
     SAMPLE 1/9 -- r_io: 23611.909485802513
     SAMPLE 2/9 -- r_io: 25261.459363242815
     SAMPLE 3/9 -- r_io: 25907.096433257884
     SAMPLE 4/9 -- r_io: 23173.003867129046
     SAMPLE 5/9 -- r_io: 26955.650560117658
     SAMPLE 6/9 -- r_io: 28025.459349069155
     SAMPLE 7/9 -- r_io: 24698.44730601666
     SAMPLE 8/9 -- r_io: 22864.565694072713
     SAMPLE 9/9 -- r_io: 24143.33538095894
FINISHED: average r_io was 24977.916431792844

SAMPLING -- workers_count: 50 and pool_type: process 
     SAMPLE 0/9 -- r_io: 26813.751892380726
     SAMPLE 1/9 -- r_io: 25272.543999493835
     SAMPLE 2/9 -- r_io: 26577.61466215444
     SAMPLE 3/9 -- r_io: 27982.130119654983
     SAMPLE 4/9 -- r_io: 24728.14891720711
     SAMPLE 5/9 -- r_io: 25970.6300840258
     SAMPLE 6/9 -- r_io: 28529.835520128672
     SAMPLE 7/9 -- r_io: 25037.357654011445
     SAMPLE 8/9 -- r_io: 26641.47463352383
     SAMPLE 9/9 -- r_io: 27916.105393833364
FINISHED: average r_io was 26546.959287641417

SAMPLING -- workers_count: 80 and pool_type: process
     SAMPLE 0/9 -- r_io: 20892.490186938205
     SAMPLE 1/9 -- r_io: 24667.224219708984
     SAMPLE 2/9 -- r_io: 24918.51854776131
     SAMPLE 3/9 -- r_io: 23008.608614715413
     SAMPLE 4/9 -- r_io: 23604.119538046947
     SAMPLE 5/9 -- r_io: 25003.16456680183
     SAMPLE 6/9 -- r_io: 28074.600471135745
     SAMPLE 7/9 -- r_io: 26454.70882168297
     SAMPLE 8/9 -- r_io: 25425.623173821612
     SAMPLE 9/9 -- r_io: 25714.620004098764
FINISHED: average r_io was 24776.367814471178

The code takes five samples of each run because 50 batches returned a different throughput (depending I imagine on the availability of the cluster). Interestingly the grid search over performance indicated that the thread approach with 20 workers outperfromed a process approach with more workers or more workers on the thread approach. Does this indicate a bug in the code? I'm showing consistent results across runs that reach nearly double the throughput that I currently have with 80 workers and process pooling.

Going to move on to r_nn next.

selitvin commented 4 years ago

As for the benchmarking code, I'd suggest having several "warmup" cycles - there are some queues that are being filled up in the beginning, and you might want to measure the steady-state of the system - after all queues are full.

You might want to setup num_epochs=20 (or whatever number make sense) and not recreated reader in-between.

For this benchmark it make sense that thread-pool is faster as the inter-process-communication with a process pool is implemented in pyzmq. Although fast, it can not beat in-process thread-pool. It's great that thread-pool is faster because it means we don't do much of Python GIL intensive operations.

Things might start looking worse (hopefully not by much) when we will go through BatchedDataLoader. That's another interesting data point.

andrewredd commented 4 years ago

Is there a way to alter the process pool or the number of workers without recreating the reader? I think the reader creation slows down in later iterations because the work pool isn’t getting cleared because I’m in Jupyter.

On Tue, Jul 21, 2020 at 1:24 PM Yevgeni Litvin notifications@github.com wrote:

As for the benchmarking code, I'd suggest having several "warmup" cycles - there are some queues that are being filled up in the beginning, and you might want to measure the steady-state of the system - after all queues are full.

You might want to setup num_epochs=20 (or whatever number make sense) and not recreated reader in-between.

For this benchmark it make sense that thread-pool is faster as the inter-process-communication with a process pool is implemented in pyzmq. Although fast, it can not beat in-process thread-pool. It's great that thread-pool is faster because it means we don't do much of Python GIL intensive operations.

Things might start looking worse (hopefully not by much) when we will go through BatchedDataLoader. That's another interesting data point.

— You are receiving this because you authored the thread. Reply to this email directly, view it on GitHub https://github.com/uber/petastorm/issues/570#issuecomment-661996728, or unsubscribe https://github.com/notifications/unsubscribe-auth/ALQO4BTOCNGUMZUVJMD4HFLR4XFMNANCNFSM4O25IIEQ .

selitvin commented 4 years ago

Oops, I misread your code. A reader recreation is critical as you change the pool-type/workers count. I think your code is correct and the benchmark properly estimate the r_io.