NVIDIA / TensorRT-LLM

TensorRT-LLM provides users with an easy-to-use Python API to define Large Language Models (LLMs) and build TensorRT engines that contain state-of-the-art optimizations to perform inference efficiently on NVIDIA GPUs. TensorRT-LLM also contains components to create Python and C++ runtimes that execute those TensorRT engines.
https://nvidia.github.io/TensorRT-LLM
Apache License 2.0
7.61k stars 829 forks source link

Parallelization issues #882

Open isRambler opened 6 months ago

isRambler commented 6 months ago

Hello, when I run run.py,

mpirun -n 2 --allow-run-as-root \
python3 run.py --max_output_len=1024 \
               --tokenizer_dir /root/autodl-tmp/llama-2-7b \
               --engine_dir=/root/autodl-tmp/tmp/llama/7B/trt_engines/fp16/1-gpu/   --input_file=/root/autodl-tmp/scrambled_sampled_dataset.json
python build.py --model_dir /root/autodl-tmp/llama-2-7b \
                --dtype float16 \
                --remove_input_padding \
                --use_gpt_attention_plugin float16 \
                --enable_context_fmha \
                --use_gemm_plugin float16 \
          --world_size 2 \
                --tp_size 2 \
                --output_dir /root/autodl-tmp/tmp/llama/7B/trt_engines/fp16/1-gpu/  \
          --use_rmsnorm_plugin float16 \
          --use_inflight_batching

why do I get the following error:

--------------------------------------------------------------------------
Primary job  terminated normally, but 1 process returned
a non-zero exit code. Per user-direction, the job has been aborted.
--------------------------------------------------------------------------
--------------------------------------------------------------------------
mpirun detected that one or more processes exited with non-zero status, thus causing
the job to be terminated. The first process to do so was:

  Process name: [[39394,1],1]
  Exit code:    1
isRambler commented 6 months ago

run.py

# SPDX-FileCopyrightText: Copyright (c) 2022-2023 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import argparse
import csv
import json
import time
from pathlib import Path
from typing import Union

import numpy as np
import torch
from transformers import LlamaTokenizerFast

import tensorrt_llm
from tensorrt_llm.quantization import QuantMode
from tensorrt_llm.runtime import ModelConfig, SamplingConfig
from tensorrt_llm.logger import Logger

from build import get_engine_name  # isort:skip

EOS_TOKEN = 2
PAD_TOKEN = 2

def throttle_generator(generator, stream_interval):
    for i, out in enumerate(generator):
        if not i % stream_interval:
            yield out

    if i % stream_interval:
        yield out

def read_config(config_path: Path):
    with open(config_path, 'r') as f:
        config = json.load(f)
    use_gpt_attention_plugin = config['plugin_config']['gpt_attention_plugin']
    remove_input_padding = config['plugin_config']['remove_input_padding']
    dtype = config['builder_config']['precision']
    gather_all_token_logits = config['builder_config'][
        'gather_all_token_logits']
    tp_size = config['builder_config']['tensor_parallel']
    pp_size = config['builder_config']['pipeline_parallel']
    world_size = tp_size * pp_size
    assert world_size == tensorrt_llm.mpi_world_size(), \
        f'Engine world size ({world_size}) != Runtime world size ({tensorrt_llm.mpi_world_size()})'
    num_heads = config['builder_config']['num_heads']
    hidden_size = config['builder_config']['hidden_size']
    vocab_size = config['builder_config']['vocab_size']
    num_layers = config['builder_config']['num_layers']
    num_kv_heads = config['builder_config'].get('num_kv_heads', num_heads)
    paged_kv_cache = config['plugin_config']['paged_kv_cache']
    tokens_per_block = config['plugin_config']['tokens_per_block']
    quant_mode = QuantMode(config['builder_config']['quant_mode'])
    if config['builder_config'].get('multi_query_mode', False):
        tensorrt_llm.logger.warning(
            "`multi_query_mode` config is deprecated. Please rebuild the engine."
        )
        num_kv_heads = 1
    num_kv_heads = (num_kv_heads + tp_size - 1) // tp_size
    assert (num_heads % tp_size) == 0
    num_heads = num_heads // tp_size
    hidden_size = hidden_size // tp_size
    use_custom_all_reduce = config['plugin_config'].get('use_custom_all_reduce',
                                                        False)
    max_prompt_embedding_table_size = config['builder_config'].get(
        'max_prompt_embedding_table_size', 0)

    model_config = ModelConfig(
        num_heads=num_heads,
        num_kv_heads=num_kv_heads,
        hidden_size=hidden_size,
        vocab_size=vocab_size,
        num_layers=num_layers,
        gpt_attention_plugin=use_gpt_attention_plugin,
        paged_kv_cache=paged_kv_cache,
        tokens_per_block=tokens_per_block,
        remove_input_padding=remove_input_padding,
        dtype=dtype,
        quant_mode=quant_mode,
        gather_all_token_logits=gather_all_token_logits,
        use_custom_all_reduce=use_custom_all_reduce,
        max_prompt_embedding_table_size=max_prompt_embedding_table_size)

    return model_config, tp_size, pp_size, dtype

def parse_input(input_text: str, input_file: str, tokenizer, end_id: int, batch_request_begin: int, batch_request_end: int,
                remove_input_padding: bool, input_tokens_limit: Union[int,
                                                                      None]):
    input_tokens = []
    num_request = 0
    if input_file is None:
        input_tokens.append(
            tokenizer.encode(input_text, add_special_tokens=False))
    else:
        if input_file.endswith('.csv'):
            with open(input_file, 'r') as csv_file:
                csv_reader = csv.reader(csv_file, delimiter=',')
                for line in csv_reader:
                    input_tokens.append(np.array(line, dtype='int32'))
        elif input_file.endswith('.npy'):
            inputs = np.load(input_file)
            for row in inputs:
                row = row[row != end_id]
                input_tokens.append(row)
        elif input_file.endswith('.txt'):
            with open(input_file, 'r', encoding='utf-8',
                      errors='replace') as txt_file:
                input_text = txt_file.read()
                input_tokens.append(
                    tokenizer.encode(input_text, add_special_tokens=False))
        elif input_file.endswith('.json'):
            with open(input_file) as f:
                requests = json.load(f)
            requests = requests[batch_request_begin: batch_request_end]
            for input_text, _, _ in requests:
                input_tokens.append(tokenizer.encode(input_text, add_special_tokens=False))
        else:
            print('Input file format not supported.')
            raise SystemExit

    # Cap max input tokens
    if input_tokens_limit is not None:
        print(
            f"Maximum input number of tokens found as {max([len(x) for x in input_tokens])};"
            f" will be capped to {input_tokens_limit}")
        input_tokens = [x[-input_tokens_limit:] for x in input_tokens]

    input_ids = None
    input_lengths = torch.tensor([len(x) for x in input_tokens],
                                 dtype=torch.int32,
                                 device='cuda')
    if remove_input_padding:
        input_ids = np.concatenate(input_tokens)
        input_ids = torch.tensor(input_ids, dtype=torch.int32,
                                 device='cuda').unsqueeze(0)
    else:
        input_ids = torch.nested.to_padded_tensor(
            torch.nested.nested_tensor(input_tokens, dtype=torch.int32),
            end_id).cuda()
    return input_ids, input_lengths

def ptuning_setup(prompt_table, dtype, hidden_size, tasks, input_ids,
                  input_lengths, remove_input_padding):
    if prompt_table is not None:
        prompt_table = torch.from_numpy(np.load(prompt_table))
        task_vocab_size = torch.tensor([prompt_table.shape[1]],
                                       dtype=torch.int32,
                                       device="cuda")
        prompt_table = prompt_table.view(
            (prompt_table.shape[0] * prompt_table.shape[1],
             prompt_table.shape[2]))
        prompt_table = prompt_table.cuda().to(
            dtype=tensorrt_llm._utils.str_dtype_to_torch(dtype))
    else:
        prompt_table = torch.empty([1, hidden_size]).cuda()
        task_vocab_size = torch.zeros([1]).cuda()

    num_sequences = input_lengths.size(
        0) if remove_input_padding else input_ids.size(0)

    if tasks is not None:
        tasks = torch.tensor([int(t) for t in tasks.split(',')],
                             dtype=torch.int32,
                             device="cuda")
        assert tasks.shape[
            0] == num_sequences, "Number of supplied tasks must match input batch size"
    else:
        tasks = torch.zeros([num_sequences]).cuda()

    return [prompt_table, tasks, task_vocab_size]

def print_output(output_ids, input_lengths, tokenizer,
                 output_csv, output_npy, sequence_lengths):
    output_lengths = []
    input_lengths = torch.tensor(input_lengths,device='cuda')
    if output_csv is None and output_npy is None:
        for b in range(len(input_lengths)):
            output_begin = input_lengths[b]
            output_length = sequence_lengths[b][0] - input_lengths[b]
            output_end = input_lengths[b] + output_length
            outputs = output_ids[b][0][output_begin:output_end].tolist()
            output_text = tokenizer.decode(outputs)
            print(f"output_len:{len(outputs)}  \n")
            output_lengths.append(len(outputs))
    prompt_num_tokens = sum(prompt_len for prompt_len in input_lengths)
    total_num_tokens = sum(output_len for output_len in output_lengths)
    print(f"Prompt_num_tokens:{prompt_num_tokens.item():.2f} tokens \n"
          f"Total_num_tokens:{total_num_tokens:.2f} tokens \n")
    return total_num_tokens

def parse_arguments():
    parser = argparse.ArgumentParser()
    parser.add_argument('--max_output_len', type=int, required=True)
    parser.add_argument('--max_kv_cache_len',
                        type=int,
                        default=None,
                        help='The max kv cache length. \
              If the final sequence length exceeds the kv cache length, we will enable cyclic kv cache. \
              If it is set to None, we will use the max sequence length.')
    parser.add_argument('--log_level', type=str, default='error')
    parser.add_argument('--engine_dir', type=str, default='llama_outputs')
    parser.add_argument('--tokenizer_dir',
                        type=str,
                        default=".",
                        help="Directory containing the tokenizer.model.")
    parser.add_argument('--input_text',
                        type=str,
                        default='Born in north-east France, Soyer trained as a')
    parser.add_argument(
        '--input_tokens',
        dest='input_file',
        type=str,
        help=
        'CSV or Numpy file containing tokenized input. Alternative to text input.',
        default=None)
    parser.add_argument(
        '--input_file',
        type=str,
        help=
        'CSV or Numpy file',
        default=None)
    parser.add_argument(
        '--input_tokens_limit',
        type=int,
        help='Truncate input tokens if number exceeds the set limit value',
        default=None)
    parser.add_argument('--output_csv',
                        type=str,
                        help='CSV file where the tokenized output is stored.',
                        default=None)
    parser.add_argument('--output_npy',
                        type=str,
                        help='Numpy file where the tokenized output is stored.',
                        default=None)
    parser.add_argument('--num_beams',
                        type=int,
                        help="Use beam search if num_beams >1",
                        default=1)
    parser.add_argument('--streaming', default=False, action='store_true')
    parser.add_argument('--streaming_interval',
                        type=int,
                        help="How often to return tokens when streaming.",
                        default=5)
    parser.add_argument(
        '--prompt_table',
        type=Path,
        help="Path to .npy file, exported by nemo_prompt_convert.py")
    parser.add_argument(
        '--tasks',
        help="Comma-separated list of tasks for prompt tuning: ex 0,3,1,0")
    return parser.parse_args()

def generate(
    max_output_len: int,
    log_level: str = 'info',
    engine_dir: str = 'llama_outputs',
    input_text: str = 'Born in north-east France, Soyer trained as a',
    input_file: str = None,
    output_csv: str = None,
    output_npy: str = None,
    tokenizer_dir: str = None,
    max_kv_cache_len: int = None,
    num_beams: int = 1,
    streaming: bool = False,
    streaming_interval: int = 5,
    prompt_table: Path = None,
    tasks: str = None,
    input_tokens_limit: Union[None, int] = None,
):
    logger = Logger()
    logger.set_level(log_level)
    logger.set_log_file('./inference.log')
    logger.info(f"beam : {num_beams}")
    logger.info(f"engine_dir : {engine_dir}")

    engine_dir = Path(engine_dir)
    config_path = engine_dir / 'config.json'
    model_config, tp_size, pp_size, dtype = read_config(config_path)
    logger.info(f"dtype : {dtype}")
    world_size = tp_size * pp_size

    runtime_rank = tensorrt_llm.mpi_rank()
    runtime_mapping = tensorrt_llm.Mapping(world_size,
                                           runtime_rank,
                                           tp_size=tp_size,
                                           pp_size=pp_size)
    torch.cuda.set_device(runtime_rank % runtime_mapping.gpus_per_node)

    tokenizer = LlamaTokenizerFast.from_pretrained(tokenizer_dir, legacy=False)

    sampling_config = SamplingConfig(end_id=EOS_TOKEN,
                                     pad_id=PAD_TOKEN,
                                     num_beams=num_beams)

    engine_name = get_engine_name('llama', dtype, tp_size, pp_size,
                                  runtime_rank)
    serialize_path = engine_dir / engine_name
    with open(serialize_path, 'rb') as f:
        engine_buffer = f.read()
    decoder = tensorrt_llm.runtime.GenerationSession(model_config,
                                                     engine_buffer,
                                                     runtime_mapping,
                                                     debug_mode=False,
                                                     debug_tensors_to_save=None)
    if runtime_rank == 0:
        print(f"Running the {dtype} engine ...")

    output_ids = []
    sequence_lengths = []
    input_lengths_sum = []
    start = time.perf_counter()
    num_request = 100
    for i in range(0, num_request, 8):
        if i+8 >= num_request:
            batch_request_begin = i
            batch_request_end = num_request
            batch = batch_request_end - batch_request_begin
        else:
            batch_request_begin = i
            batch_request_end = i+8
            batch = 8
        input_ids, input_lengths = parse_input(input_text,
                                               input_file,
                                               tokenizer,
                                               EOS_TOKEN,
                                               batch_request_begin,
                                               batch_request_end,
                                               model_config.remove_input_padding,
                                               input_tokens_limit=input_tokens_limit)
        max_input_length = torch.max(input_lengths).item()
        decoder.setup(batch,
                      max_input_length,
                      max_output_len,
                      num_beams,
                      max_kv_cache_length=max_kv_cache_len)
        ptuning_args = [] if model_config.max_prompt_embedding_table_size == 0 else ptuning_setup(
            prompt_table, dtype, model_config.hidden_size, tasks, input_ids,
            input_lengths, model_config.remove_input_padding)
        batch_outputs = decoder.decode(input_ids,
                                       input_lengths,
                                       sampling_config,
                                       *ptuning_args,
                                       streaming=streaming,
                                       output_sequence_lengths=True,
                                       return_dict=True)
        torch.cuda.synchronize()
        output_ids.append(batch_outputs['output_ids'])
        sequence_lengths.append(batch_outputs['sequence_lengths'])
        input_lengths_sum.append(input_lengths)

    output_ids = [item for sublist in output_ids for item in sublist]
    sequence_lengths = [item for sublist in sequence_lengths for item in sublist]
    outputs = {'output_ids': output_ids, 'sequence_lengths': sequence_lengths}
    input_lengths_sum = [item for sublist in input_lengths_sum for item in sublist]
    if streaming:
        for outputs_dict in throttle_generator(outputs, streaming_interval):
            if runtime_rank == 0:
                output_ids = outputs_dict['output_ids']
                sequence_lengths = outputs_dict['sequence_lengths']
                total_num_tokens = print_output(output_ids, input_lengths_sum, tokenizer, output_csv,
                                                                   output_npy, sequence_lengths)
    else:
        if runtime_rank == 0:
            output_ids = outputs['output_ids']
            sequence_lengths = outputs['sequence_lengths']
            total_num_tokens = print_output(output_ids, input_lengths_sum, tokenizer, output_csv,
                                                               output_npy, sequence_lengths)

        if model_config.gather_all_token_logits:
            if runtime_mapping.is_last_pp_rank():
                print(
                    f"context_logits.shape: {outputs['context_logits'].shape}")
                print(
                    f"generation_logits.shape: {len(outputs['generation_logits']), outputs['generation_logits'][0].shape}"
                )
                print(outputs['context_logits'])
                print(outputs['generation_logits'])
    end = time.perf_counter()
    elapsed_time = end - start
    print(f"Throughput: {num_request/elapsed_time:.2f} requests/s \n"
          f"Tokens/s: {total_num_tokens /elapsed_time:.2f} tokens/s \n")

if __name__ == '__main__':
    args = parse_arguments()
    generate(**vars(args))
byshiue commented 6 months ago

Please share the full log, the branch/commit/tag you use, and how do you build the docker image.

isRambler commented 6 months ago

Please share the full log, the branch/commit/tag you use, and how do you build the docker image.

log as follows:

Running the float16 engine ...
/root/miniconda3/lib/python3.10/site-packages/tensorrt_llm/runtime/generation.py:709: UserWarning: The PyTorch API of nested tensors is in prototype stage and will change in the near future. (Triggered internally at ../aten/src/ATen/NestedTensorImpl.cpp:178.)
  torch.nested.nested_tensor(split_ids_list,
/root/miniconda3/lib/python3.10/site-packages/tensorrt_llm/runtime/generation.py:709: UserWarning: The PyTorch API of nested tensors is in prototype stage and will change in the near future. (Triggered internally at ../aten/src/ATen/NestedTensorImpl.cpp:178.)
  torch.nested.nested_tensor(split_ids_list,
Traceback (most recent call last):
  File "/root/TensorRT-LLM/examples/llama/run.py", line 402, in <module>
    generate(**vars(args))
  File "/root/TensorRT-LLM/examples/llama/run.py", line 397, in generate
    f"Tokens/s: {total_num_tokens /elapsed_time:.2f} tokens/s \n")
UnboundLocalError: local variable 'total_num_tokens' referenced before assignment
--------------------------------------------------------------------------
Primary job  terminated normally, but 1 process returned
a non-zero exit code. Per user-direction, the job has been aborted.
--------------------------------------------------------------------------
--------------------------------------------------------------------------
mpirun detected that one or more processes exited with non-zero status, thus causing
the job to be terminated. The first process to do so was:

  Process name: [[40265,1],1]
  Exit code:    1
--------------------------------------------------------------------------

docker build

Container OS    Ubuntu 22.04
CUDA    NVIDIA CUDA 12.2.2
cuDNN   8.9.5
TensorRT    9.2.0
Python  3.10.12
Pytorch 2.1.0
TensorRT-LLM    0.6.1
GPU
A40(48GB) * 2
CPU
30 vCPU AMD EPYC 7543 32-Core Processor
byshiue commented 6 months ago

Do you really use v0.6.1? After checking, the examples/llama/run.py of v0.6.1 only has 375 lines codes, while your error happens on line 397.

Also, you could also try the v0.7.1 examples/run.py.