woven-planet / l5kit

L5Kit - https://woven.toyota
https://woven-planet.github.io/l5kit
858 stars 277 forks source link

Error with num_workers>0 in DistributedDataParallel mode #178

Open RocketFlash opened 4 years ago

RocketFlash commented 4 years ago

I got an error when trying to train model with DistributedDataParallel and set num_workers>0 in DataLoader. With num_workers=0 everything works fine. My code:

from l5kit.dataset import AgentDataset
from l5kit.data import ChunkedDataset, LocalDataManager
from l5kit.rasterization import build_rasterizer

dm = LocalDataManager(data_dir)
cfg = lyft_config['train_data_loader'] if istrain else lyft_config['val_data_loader']
rasterizer = build_rasterizer(lyft_config, dm)
train_zarr = ChunkedDataset(dm.require(cfg["key"])).open()
dataset = AgentDataset(lyft_config, train_zarr, rasterizer)

sampler = DistributedSampler(dataset, rank=rank, num_replicas=world_size, shuffle=True)

data_loader = DataLoader(dataset=dataset, 
                             batch_size=batch_size, 
                             sampler=sampler, 
                             pin_memory=pin_memory,
                             drop_last=drop_last,
                             num_workers=num_workers)

The error I get:


    for batch_index, data in enumerate(tqdm_train):
  File "/root/workdir/anaconda3/envs/lyft/lib/python3.8/site-packages/tqdm/std.py", line 1165, in __iter__
    for obj in iterable:
  File "/root/workdir/anaconda3/envs/lyft/lib/python3.8/site-packages/torch/utils/data/dataloader.py", line 291, in __iter__
    return _MultiProcessingDataLoaderIter(self)
  File "/root/workdir/anaconda3/envs/lyft/lib/python3.8/site-packages/torch/utils/data/dataloader.py", line 737, in __init__
    w.start()
  File "/root/workdir/anaconda3/envs/lyft/lib/python3.8/multiprocessing/process.py", line 121, in start
    self._popen = self._Popen(self)
  File "/root/workdir/anaconda3/envs/lyft/lib/python3.8/multiprocessing/context.py", line 224, in _Popen
    return _default_context.get_context().Process._Popen(process_obj)
  File "/root/workdir/anaconda3/envs/lyft/lib/python3.8/multiprocessing/context.py", line 284, in _Popen
    return Popen(process_obj)
  File "/root/workdir/anaconda3/envs/lyft/lib/python3.8/multiprocessing/popen_spawn_posix.py", line 32, in __init__
    super().__init__(process_obj)
  File "/root/workdir/anaconda3/envs/lyft/lib/python3.8/multiprocessing/popen_fork.py", line 19, in __init__
    self._launch(process_obj)
  File "/root/workdir/anaconda3/envs/lyft/lib/python3.8/multiprocessing/popen_spawn_posix.py", line 47, in _launch
    reduction.dump(process_obj, fp)
  File "/root/workdir/anaconda3/envs/lyft/lib/python3.8/multiprocessing/reduction.py", line 60, in dump
    ForkingPickler(file, protocol).dump(obj)
TypeError: cannot pickle 'google.protobuf.pyext._message.RepeatedCompositeContainer' object```
lucabergamini commented 4 years ago

Hi @RocketFlash

DistributedDataParallel has never been tested with L5Kit, so I'm not surprise that it's not working. I'll try to take a look into it!

louis925 commented 4 years ago

I got the same error when running on Windows. I didn't use DistributedDataParallel but just simply follow the example notebook with num_workers > 0.

lucabergamini commented 4 years ago

I got the same error when running on Windows

Same version of python (3.8)?

louis925 commented 4 years ago

I got the same error when running on Windows

Same version of python (3.8)?

Oh, I am using python 3.7. Is that why?

louis925 commented 4 years ago

FYI, This is the kind of error that I got. I am not using DistributedDataParallel.

---------------------------------------------------------------------------
TypeError                                 Traceback (most recent call last)
<timed exec> in <module>
c:\users\louis\appdata\local\programs\python\python37\lib\site-packages\torch\utils\data\dataloader.py in __iter__(self)
    289             return _SingleProcessDataLoaderIter(self)
    290         else:
--> 291             return _MultiProcessingDataLoaderIter(self)
    292 
    293     @property
c:\users\louis\appdata\local\programs\python\python37\lib\site-packages\torch\utils\data\dataloader.py in __init__(self, loader)
    735             #     before it starts, and __del__ tries to join but will get:
    736             #     AssertionError: can only join a started process.
--> 737             w.start()
    738             self._index_queues.append(index_queue)
    739             self._workers.append(w)
c:\users\louis\appdata\local\programs\python\python37\lib\multiprocessing\process.py in start(self)
    110                'daemonic processes are not allowed to have children'
    111         _cleanup()
--> 112         self._popen = self._Popen(self)
    113         self._sentinel = self._popen.sentinel
    114         # Avoid a refcycle if the target function holds an indirect
c:\users\louis\appdata\local\programs\python\python37\lib\multiprocessing\context.py in _Popen(process_obj)
    221     @staticmethod
    222     def _Popen(process_obj):
--> 223         return _default_context.get_context().Process._Popen(process_obj)
    224 
    225 class DefaultContext(BaseContext):
c:\users\louis\appdata\local\programs\python\python37\lib\multiprocessing\context.py in _Popen(process_obj)
    320         def _Popen(process_obj):
    321             from .popen_spawn_win32 import Popen
--> 322             return Popen(process_obj)
    323 
    324     class SpawnContext(BaseContext):
c:\users\louis\appdata\local\programs\python\python37\lib\multiprocessing\popen_spawn_win32.py in __init__(self, process_obj)
     87             try:
     88                 reduction.dump(prep_data, to_child)
---> 89                 reduction.dump(process_obj, to_child)
     90             finally:
     91                 set_spawning_popen(None)
c:\users\louis\appdata\local\programs\python\python37\lib\multiprocessing\reduction.py in dump(obj, file, protocol)
     58 def dump(obj, file, protocol=None):
     59     '''Replacement for pickle.dump() using ForkingPickler.'''
---> 60     ForkingPickler(file, protocol).dump(obj)
     61 
     62 #
TypeError: can't pickle google.protobuf.pyext._message.RepeatedCompositeContainer objects

Do we know where we use the google.protobuf? Is it specific to l5kit, zarr, or, it was used by pytorch?

lucabergamini commented 4 years ago

Do we know where we use the google.protobuf? Is it specific to l5kit, zarr, or, it was used by pytorch?

it's specific to l5kit, we use it for the semantic map

louis925 commented 4 years ago

FYI, I have a workaround, which is wrapping around the Dataset into another class that only construct the dataset and the rasterizer when the object has been loaded to the workers. My code for doing that in a mydataset.py script:

class MyTrainDataset:
    def __init__(self, cfg, dm):
        self.cfg = cfg
        self.dm = dm
    def initialize(self, worker_id):
        print('initialize called with worker_id', worker_id)
        from l5kit.data import ChunkedDataset
        from l5kit.dataset import AgentDataset #, EgoDataset
        from l5kit.rasterization import build_rasterizer
        rasterizer = build_rasterizer(self.cfg, self.dm)
        train_cfg = self.cfg["train_data_loader"]
        train_zarr = ChunkedDataset(self.dm.require(train_cfg["key"])).open(cached=False)  # try to turn off cache
        self.dataset = AgentDataset(self.cfg, train_zarr, rasterizer)
    def __len__(self):
        # NOTE: You have to figure out the actual length beforehand since once the rasterizer and/or AgentDataset been 
        # constructed, you cannot pickle it anymore! So we can't compute the size from the real dataset. However, 
        # DataLoader require the len to determine the sampling.
        return 22496709
    def __getitem__(self, index):
        return self.dataset[index]

from torch.utils.data import get_worker_info
def my_dataset_worker_init_func(worker_id):
    worker_info = get_worker_info()
    dataset = worker_info.dataset
    dataset.initialize(worker_id)

Then you can load it in the training jupyter notebook as

from mydataset import MyTrainDataset, my_dataset_worker_init_func
train_dataset = MyTrainDataset(cfg, dm)
train_dataloader = DataLoader(
    train_dataset,
    shuffle=True, 
    batch_size=16,
    num_workers=2,
    persistent_workers=True,
    worker_init_fn=my_dataset_worker_init_func,
)
tr_it = iter(train_dataloader)

The downside is that each worker process will try to load its copy of the entire pytorch library, which somehow takes like 4GB of commit memory. During training, each worker can take upto 7GB of commit memory. So you need to turn on virtual memory for the buffer.

lucabergamini commented 4 years ago

The downside is that each worker process will try to load its copy of the entire pytorch library, which somehow takes like 4GB of commit memory. During training, each worker can take upto 7GB of commit memory. So you need to turn on virtual memory for the buffer.

I see. I guess there is no "one solution for them all" in this case :)