deepset-ai / haystack

:mag: LLM orchestration framework to build customizable, production-ready LLM applications. Connect components (models, vector DBs, file converters) to pipelines or agents that can interact with your data. With advanced retrieval methods, it's best suited for building RAG, question answering, semantic search or conversational agent chatbots.
https://haystack.deepset.ai
Apache License 2.0
14.59k stars 1.71k forks source link

reader.train() ignores "devices" argument #4348

Closed Serbernari closed 4 months ago

Serbernari commented 1 year ago

Describe the bug I have a multiple-gpu server with one faulty video card, and I was able to work on it without using this card, however in the Haystack I cannot launch reader training on just part of my devices: it seems to ignore this argument and occupy all the GPUs available. I init the reader on the same devices that I pass into reader.train(devices=[]).

Also, to double-check, I put a print(devices) into torch.nn.parpallel line 189 and it shows all of my GPUs.

Error message ` --------------------------------------------------------------------------- RuntimeError Traceback (most recent call last) /tmp/ipykernel_22562/1748457569.py in ----> 1 reader.train(data_dir=data_dir, 2 train_filename="train.json", 3 test_filename="test.json", 4 dev_filename="dev.json", 5 use_gpu=True,

~/anaconda3/envs/venv_gpu/lib/python3.9/site-packages/haystack/nodes/reader/farm.py in train(self, data_dir, train_filename, dev_filename, test_filename, use_gpu, devices, batch_size, n_epochs, learning_rate, max_seq_len, warmup_proportion, dev_split, evaluate_every, save_dir, num_processes, use_amp, checkpoint_root_dir, checkpoint_every, checkpoints_to_keep, caching, cache_path, grad_acc_steps, early_stopping, max_query_length) 435 :return: None 436 """ --> 437 return self._training_procedure( 438 data_dir=data_dir, 439 train_filename=train_filename,

~/anaconda3/envs/venv_gpu/lib/python3.9/site-packages/haystack/nodes/reader/farm.py in _training_procedure(self, data_dir, train_filename, dev_filename, test_filename, use_gpu, devices, batch_size, n_epochs, learning_rate, max_seq_len, warmup_proportion, dev_split, evaluate_every, save_dir, num_processes, use_amp, checkpoint_root_dir, checkpoint_every, checkpoints_to_keep, teacher_model, teacher_batch_size, caching, cache_path, distillation_loss_weight, distillation_loss, temperature, tinybert, processor, grad_acc_steps, early_stopping, distributed, doc_stride, max_query_length) 355 356 # 5. Let it grow! --> 357 self.inferencer.model = trainer.train() 358 self.save(Path(save_dir)) 359

~/anaconda3/envs/venv_gpu/lib/python3.9/site-packages/haystack/modeling/training/base.py in train(self) 215 # Move batch of samples to device 216 batch = {key: batch[key].to(self.device) for key in batch} --> 217 loss = self.compute_loss(batch, step) 218 219 # Perform evaluation

~/anaconda3/envs/venv_gpu/lib/python3.9/site-packages/haystack/modeling/training/base.py in compute_loss(self, batch, step) 301 with torch.cuda.amp.autocast(enabled=self.use_amp): 302 if isinstance(module, AdaptiveModel): --> 303 logits = self.model.forward( 304 input_ids=batch["input_ids"], segment_ids=None, padding_mask=batch["padding_mask"] 305 )

~/anaconda3/envs/venv_gpu/lib/python3.9/site-packages/torch/nn/parallel/data_parallel.py in forward(self, *inputs, **kwargs) 156 "them on device: {}".format(self.src_device_obj, t.device)) 157 --> 158 inputs, kwargs = self.scatter(inputs, kwargs, self.device_ids) 159 # for forward function without any inputs, empty list and dict will be created 160 # so the module can be executed on one device which is the first one in device_ids

~/anaconda3/envs/venv_gpu/lib/python3.9/site-packages/torch/nn/parallel/data_parallel.py in scatter(self, inputs, kwargs, device_ids) 173 174 def scatter(self, inputs, kwargs, device_ids): --> 175 return scatter_kwargs(inputs, kwargs, device_ids, dim=self.dim) 176 177 def parallel_apply(self, replicas, inputs, kwargs):

~/anaconda3/envs/venv_gpu/lib/python3.9/site-packages/torch/nn/parallel/scatter_gather.py in scatter_kwargs(inputs, kwargs, target_gpus, dim) 43 r"""Scatter with support for kwargs dictionary""" 44 inputs = scatter(inputs, target_gpus, dim) if inputs else [] ---> 45 kwargs = scatter(kwargs, targetgpus, dim) if kwargs else [] 46 if len(inputs) < len(kwargs): 47 inputs.extend(() for in range(len(kwargs) - len(inputs)))

~/anaconda3/envs/venv_gpu/lib/python3.9/site-packages/torch/nn/parallel/scatter_gather.py in scatter(inputs, target_gpus, dim) 34 # None, clearing the cell 35 try: ---> 36 res = scatter_map(inputs) 37 finally: 38 scatter_map = None

~/anaconda3/envs/venv_gpu/lib/python3.9/site-packages/torch/nn/parallel/scatter_gather.py in scatter_map(obj) 25 return [list(i) for i in zip(map(scatter_map, obj))] 26 if isinstance(obj, dict) and len(obj) > 0: ---> 27 return [type(obj)(i) for i in zip(map(scatter_map, obj.items()))] 28 return [obj for targets in target_gpus] 29

~/anaconda3/envs/venv_gpu/lib/python3.9/site-packages/torch/nn/parallel/scatter_gather.py in scatter_map(obj) 21 return [type(obj)(args) for args in zip(map(scatter_map, obj))] 22 if isinstance(obj, tuple) and len(obj) > 0: ---> 23 return list(zip(map(scatter_map, obj))) 24 if isinstance(obj, list) and len(obj) > 0: 25 return [list(i) for i in zip(map(scatter_map, obj))]

~/anaconda3/envs/venv_gpu/lib/python3.9/site-packages/torch/nn/parallel/scatter_gather.py in scatter_map(obj) 17 def scatter_map(obj): 18 if isinstance(obj, torch.Tensor): ---> 19 return Scatter.apply(target_gpus, None, dim, obj) 20 if is_namedtuple(obj): 21 return [type(obj)(args) for args in zip(map(scatter_map, obj))]

~/anaconda3/envs/venv_gpu/lib/python3.9/site-packages/torch/nn/parallel/_functions.py in forward(ctx, target_gpus, chunk_sizes, dim, input) 94 # Perform CPU to GPU copies in a background stream 95 streams = [_get_stream(device) for device in target_gpus] ---> 96 outputs = comm.scatter(input, target_gpus, chunk_sizes, ctx.dim, streams) 97 # Synchronize with the copy stream 98 if streams is not None:

~/anaconda3/envs/venv_gpu/lib/python3.9/site-packages/torch/nn/parallel/comm.py in scatter(tensor, devices, chunk_sizes, dim, streams, out) 187 if out is None: 188 devices = [_get_device_index(d) for d in devices] --> 189 return tuple(torch._C._scatter(tensor, devices, chunk_sizes, dim, streams)) 190 else: 191 if devices is not None:

RuntimeError: CUDA error: uncorrectable ECC error encountered CUDA kernel errors might be asynchronously reported at some other API call,so the stacktrace below might be incorrect. For debugging consider passing CUDA_LAUNCH_BLOCKING=1. `

Expected behavior Trainer should use only selected devices

To Reproduce Launch reader.train() on just part of available GPUs and check nvidia-smi

FAQ Check

System:

Serbernari commented 1 year ago

It seems that this line https://github.com/deepset-ai/haystack/blob/7d5e7c089cb3bd9a0e9b45b74c653e95543a5566/haystack/nodes/reader/farm.py#L208 might be causing this issue: it looks strange to me that if devices is None: is checked two times, on line 207 and on line 219, and in first case devices gets assigned to empty list but in second - to the self.devices

Serbernari commented 1 year ago

Monkey-patched it by

model = WrappedDataParallel(model, device_ids=[0,1]) if not isinstance(model, DataParallel) else WrappedDataParallel(model.module, device_ids=[0,1])

in this line

Serbernari commented 1 year ago

Fixed by adding parameter device_ids to the initialize_optimizer() and def optimize_model()

Now, in farm.py this call will look like this:

model, optimizer, lr_schedule = initialize_optimizer( model=self.inferencer.model, learning_rate=learning_rate, schedule_opts={"name": "LinearWarmup", "warmup_proportion": warmup_proportion}, n_batches=len(data_silo.loaders["train"]), n_epochs=n_epochs, device=devices[0], device_ids = devices, grad_acc_steps=grad_acc_steps, distributed=distributed, )

and to the definitions of optimization.py is added

def initialize_optimizer( model: AdaptiveModel, n_batches: int, n_epochs: int, device: torch.device, learning_rate: float, optimizer_opts: Optional[Dict[Any, Any]] = None, schedule_opts: Optional[Dict[Any, Any]] = None, distributed: bool = False, grad_acc_steps: int = 1, local_rank: int = -1, use_amp: bool = False, device_ids: list = [0])

and

def optimize_model( model: "AdaptiveModel", device: torch.device, local_rank: int, optimizer: Optional[torch.optim.Optimizer] = None, distributed: bool = False, use_amp: bool = False, device_ids: list = [0] ):

Following calls now look like this:

model, optimizer = optimize_model(model, device, local_rank, optimizer, distributed, device_ids=device_ids)

and

model = WrappedDataParallel(model, device_ids=device_ids) if not isinstance(model, DataParallel) else WrappedDataParallel(model.module, device_ids=device_ids) # type: ignore [assignment]

I am not closing this issue and not submitting this as a pull request because there is need to fix the same thing in calls of other readers and I did not check if it will work correctly with othre Trainers, such as DistillationTrainer

mayankjobanputra commented 1 year ago

@Serbernari I would be more than happy to help you merge your code if you could just open the PR.

Let's first fix FARM reader trainer with your PR, I will ask the team to take care of the other trainer modules. :)