triton-inference-server / tensorrtllm_backend

The Triton TensorRT-LLM Backend
Apache License 2.0
588 stars 81 forks source link

sreaming mode doesn't work #367

Open dongteng opened 3 months ago

dongteng commented 3 months ago

System Info

V100*2 nvcr.io/nvidia/tritonserver:24.01-trtllm-python-py3 tensorrt-llm 0.7.0

Who can help?

No response

Information

Tasks

Reproduction

I want deploy qwen14B in triton like https://github.com/NVIDIA/TensorRT-LLM/tree/a8018c14e6a9868b507a0517550b2cc6e41bd86e/examples/qwen 1.build engine python3 build.py --hf_model_dir /root/model_repo \ --dtype float16 \ --remove_input_padding \ --use_gpt_attention_plugin float16 \ --enable_context_fmha \ --use_gemm_plugin float16 \ --output_dir /root/Qwen/14B/trt_engines/fp16/2-gpu \ --world_size 2 \ --tp_size 2

  1. tritonserver cd /root/Qwen/14B/trt_engines/fp16/2-gpu cp -r ./* /tensorrtllm_backend/triton_model_repo/tensorrt_llm/1/

cd /root/ cp -r model_repo /tensorrtllm_backend/triton_model_repo/tensorrt_llm/ rm /tensorrtllm_backend/triton_model_repo/tensorrt_llm/model_repo/*.safetensors

cd /tensorrtllm_backend python3 scripts/launch_triton_server.py --world_size=2 --model_repo=/tensorrtllm_backend/triton_model_repo

When I curl -X POST 10.110.31.16:8001/v2/models/tensorrt_llm_bls/generate_stream \ -d '{"text_input": "<|im_start|>system\n you are a writer .<|im_end|>\n<|im_start|>user\nwho are you ?<|im_end|>\n<|im_start|>assistant\n", "max_tokens": 54, "bad_words": "\n", "stop_words": "", "end_id": [151643], "pad_id": [151643],"stream": true }'

It gave me a whole output not stream like image

the model.py for tensorrt_llm_bls is `mport json import traceback

import numpy as np import triton_python_backend_utils as pb_utils

class TritonPythonModel:

def initialize(self, args):

    # Parse model configs
    model_config = json.loads(args['model_config'])

    params = model_config['parameters']

    accumulate_tokens_str = ''
    if 'accumulate_tokens' in params:
        accumulate_tokens_str = params['accumulate_tokens']['string_value']

    self.accumulate_tokens = accumulate_tokens_str.lower() in [
        'true', 'yes', '1', 't'
    ]

    self.decoupled = pb_utils.using_decoupled_model_transaction_policy(
        model_config)

    print(f"=============decouple:{self.decoupled}==========")
    self.logger = pb_utils.Logger

    self.bls_input_tensor_names = [
        "text_input", "max_tokens", "bad_words", "stop_words", "end_id",
        "pad_id", "top_k", "top_p", "temperature", "length_penalty",
        "repetition_penalty", "min_length", "presence_penalty",
        "random_seed", "return_log_probs", "beam_width", "stream",
        "prompt_embedding_table", "prompt_vocab_size",
        "embedding_bias_words", "embedding_bias_weights"
    ]

    self.preproc_input_to_bls_input_map = {
        "QUERY": "text_input",
        "REQUEST_OUTPUT_LEN": "max_tokens",
        "BAD_WORDS_DICT": "bad_words",
        "STOP_WORDS_DICT": "stop_words",
        "EMBEDDING_BIAS_WORDS": "embedding_bias_words",
        "EMBEDDING_BIAS_WEIGHTS": "embedding_bias_weights"
    }

    self.preproc_output_to_trtllm_input_map = {
        "INPUT_ID": "input_ids",
        "REQUEST_INPUT_LEN": "input_lengths",
        "REQUEST_OUTPUT_LEN": "request_output_len",
        "BAD_WORDS_IDS": "bad_words_list",
        "STOP_WORDS_IDS": "stop_words_list",
        "EMBEDDING_BIAS": "embedding_bias",
    }

    self.trtllm_input_to_bls_input_map = {
        "end_id": "end_id",
        "pad_id": "pad_id",
        "beam_width": "beam_width",
        "runtime_top_k": "top_k",
        "runtime_top_p": "top_p",
        "len_penalty": "length_penalty",
        "repetition_penalty": "repetition_penalty",
        "min_length": "min_length",
        "presence_penalty": "presence_penalty",
        "random_seed": "random_seed",
        "return_log_probs": "return_log_probs",
        "streaming": "stream",
        "prompt_embedding_table": "prompt_embedding_table",
        "prompt_vocab_size": "prompt_vocab_size",
    }

    self.trtllm_output_to_postproc_input_map = {
        "output_ids": "TOKENS_BATCH",
        "sequence_length": "SEQUENCE_LENGTH",
        "cum_log_probs": "CUM_LOG_PROBS",
        "output_log_probs": "OUTPUT_LOG_PROBS",
    }

    self.postproc_output_to_bls_output_map = {
        "OUTPUT": "text_output",
        "OUT_CUM_LOG_PROBS": "cum_log_probs",
        "OUT_OUTPUT_LOG_PROBS": "output_log_probs",
    }

def _get_bls_input_tensors_map(self, request):

    bls_input_tensors_map = {}
    for input_tensor_name in self.bls_input_tensor_names:
        tensor = pb_utils.get_input_tensor_by_name(request,
                                                   input_tensor_name)
        if tensor != None:
            bls_input_tensors_map[input_tensor_name] = tensor

    return bls_input_tensors_map

def _get_preproc_input_tensors(self, bls_input_tensors_map):

    preproc_input_tensors = []

    for preproc_name, bls_name in self.preproc_input_to_bls_input_map.items(
    ):

        if bls_name in bls_input_tensors_map:
            tensor = bls_input_tensors_map[bls_name]
            # Change the name to what the preprocessor expects
            preproc_input_tensors.append(
                pb_utils.Tensor(preproc_name, tensor.as_numpy()))

    return preproc_input_tensors

def _get_trtllm_input_tensors(self, bls_input_tensors_map,
                              preproc_output_tensors):

    trtllm_input_tensors = []

    # Set input tensors from preprocessor outputs
    for preproc_output_tensor in preproc_output_tensors:

        trtllm_tensor_name = self.preproc_output_to_trtllm_input_map[
            preproc_output_tensor.name()]
        trtllm_input_tensors.append(
            pb_utils.Tensor(trtllm_tensor_name,
                            preproc_output_tensor.as_numpy()))

    # Set input tensors from bls inputs
    for trtllm_name, bls_name in self.trtllm_input_to_bls_input_map.items(
    ):

        if bls_name in bls_input_tensors_map:
            tensor = bls_input_tensors_map[bls_name]
            # Change the name to what the preprocessor expects
            trtllm_input_tensors.append(
                pb_utils.Tensor(trtllm_name, tensor.as_numpy()))

    return trtllm_input_tensors

def _get_postproc_input_tensors(self, tokens, trtllm_output_tensors):

    postproc_input_tensors = []

    for trtllm_output_tensor in trtllm_output_tensors:

        # If in decoupled mode, option to append new tokens to existing tokens before calling postprocessor
        # This might be needed for some tokenizers
        # Note that in that case, the client must overwrite previously received output text
        if (self.accumulate_tokens and self.decoupled
                and trtllm_output_tensor.name() == "output_ids"):

            new_tokens = trtllm_output_tensor.as_numpy()
            if new_tokens.ndim != 3:
                raise pb_utils.TritonModelException(
                    "Expected output_ids tensor to have 3 dims.")
            if new_tokens.shape[0] != 1:
                raise pb_utils.TritonModelException(
                    "Expected output_ids tensor to have batch size of 1")
            if new_tokens.shape[1] != 1:
                raise pb_utils.TritonModelException(
                    "Accumulation of tokens is only implemented for beam width = 1"
                )

            tokens = new_tokens if (tokens is None) else np.concatenate(
                (tokens, new_tokens), axis=2)

            # output ids
            postproc_output_ids_name = self.trtllm_output_to_postproc_input_map[
                "output_ids"]
            postproc_input_tensors.append(
                pb_utils.Tensor(postproc_output_ids_name, tokens))

            # sequence length
            np_seq_len_tensor = np.array([[tokens.shape[2]]],
                                         dtype=np.int32)
            postproc_seq_len_name = self.trtllm_output_to_postproc_input_map[
                "sequence_length"]
            postproc_input_tensors.append(
                pb_utils.Tensor(postproc_seq_len_name, np_seq_len_tensor))

    # Set input tensors from trtllm outputs
    for trtllm_output_tensor in trtllm_output_tensors:

        # output_ids and sequence_length were handled earlier
        if (self.accumulate_tokens and self.decoupled
                and (trtllm_output_tensor.name() == "output_ids"
                     or trtllm_output_tensor.name() == "sequence_length")):
            continue

        postproc_tensor_name = self.trtllm_output_to_postproc_input_map[
            trtllm_output_tensor.name()]

        postproc_input_tensors.append(
            pb_utils.Tensor(postproc_tensor_name,
                            trtllm_output_tensor.as_numpy()))

    return tokens, postproc_input_tensors

def _get_bls_output_tensors(self, postproc_output_tensors):

    bls_output_tensors = []

    # Set input tensors from trtllm outputs
    for postproc_output_tensor in postproc_output_tensors:

        bls_tensor_name = self.postproc_output_to_bls_output_map[
            postproc_output_tensor.name()]
        bls_output_tensors.append(
            pb_utils.Tensor(bls_tensor_name,
                            postproc_output_tensor.as_numpy()))

    return bls_output_tensors

def execute(self, requests):

    responses = []
    bls_response_sender = None

    for request in requests:

        #Get the response sender for the BLS
        if self.decoupled:
            bls_response_sender = request.get_response_sender()

        try:
            # Get the bls input tensors
            bls_input_tensors_map = self._get_bls_input_tensors_map(
                request)

            #Check the batch dimension
            for name, tensor in bls_input_tensors_map.items():
                batch_dim = tensor.as_numpy().shape[0]
                print("Debug name {}, shape: {}", name, tensor.as_numpy().shape)
                if batch_dim != 1:

                    err_str = "Inflight batching backend expects requests with batch size of 1."
                    self.logger.log_error(err_str)
                    raise pb_utils.TritonModelException(err_str)

            # Create the preprocessor input tensors
            preproc_input_tensors = self._get_preproc_input_tensors(
                bls_input_tensors_map)
            print(f"bls_input_tensors_map==={bls_input_tensors_map}====")
            print(f"preproc_input_tensors:{preproc_input_tensors}=======")
            preproc_request = pb_utils.InferenceRequest(
                model_name="preprocessing",
                inputs=preproc_input_tensors,
                requested_output_names=list(
                    self.preproc_output_to_trtllm_input_map.keys()))

            #Execute preprocessor
            preproc_response = preproc_request.exec()

            if preproc_response.has_error():
                raise pb_utils.TritonModelException(
                    preproc_response.error().message())

            # Create the trtllm input tensors
            trtllm_input_tensors = self._get_trtllm_input_tensors(
                bls_input_tensors_map, preproc_response.output_tensors())

            trtllm_request = pb_utils.InferenceRequest(
                model_name="tensorrt_llm",
                inputs=trtllm_input_tensors,
                requested_output_names=list(
                    self.trtllm_output_to_postproc_input_map.keys()))

            #Execute trtllm
            trtllm_responses = trtllm_request.exec(
                decoupled=self.decoupled)
            print(f"excute trtllm -get:{trtllm_responses}")
            if not self.decoupled:
                trtllm_responses = [trtllm_responses]
            print(f"=============2=decoupled{self.decoupled}=========")
            tokens = None

            #Loop over the trtllm responses
            for trtllm_response in trtllm_responses:
                print(f"excute trtllm -every get:{trtllm_response}")

                if trtllm_response.has_error():
                    raise pb_utils.TritonModelException(
                        trtllm_response.error().message())

                trtllm_output_tensors = trtllm_response.output_tensors()

                tokens, postproc_input_tensors = self._get_postproc_input_tensors(
                    tokens, trtllm_output_tensors)
                print(f"tokens:==={tokens}")
                postproc_request = pb_utils.InferenceRequest(
                    model_name="postprocessing",
                    inputs=postproc_input_tensors,
                    requested_output_names=list(
                        self.postproc_output_to_bls_output_map.keys()))

                #Execute postprocessor
                postproc_response = postproc_request.exec()

                if postproc_response.has_error():
                    raise pb_utils.TritonModelException(
                        postproc_response.error().message())

                # Create the BLS response
                bls_output_tensors = self._get_bls_output_tensors(
                    postproc_response.output_tensors())

                bls_response = pb_utils.InferenceResponse(
                    output_tensors=bls_output_tensors)

                if self.decoupled:
                    print(f"==============3==decoupled{self.decoupled}==========")
                    bls_response_sender.send(bls_response)
                else:
                    responses.append(bls_response)

            # All responses have been sent, set final flag
            if self.decoupled:
                print(f"==============4==decoupled:{self.decoupled}==========")
                print(f"==============4==:{pb_utils.TRITONSERVER_RESPONSE_COMPLETE_FINAL}==========")
                bls_response_sender.send(
                    flags=pb_utils.TRITONSERVER_RESPONSE_COMPLETE_FINAL)

        except Exception:

            self.logger.log_error(traceback.format_exc())
            # If encountering an error, send a response with err msg
            error_response = pb_utils.InferenceResponse(
                output_tensors=[],
                error=pb_utils.TritonError(traceback.format_exc()))

            if self.decoupled:
                print(f"==============5==decoupled:{self.decoupled}==========")
                bls_response_sender.send(error_response)
                bls_response_sender.send(
                    flags=pb_utils.TRITONSERVER_RESPONSE_COMPLETE_FINAL)
            else:
                responses.append(error_response)

    if self.decoupled:
        return None
    else:
        assert len(responses) == len(requests)
        return responses

def finalize(self):
    """`finalize` is called only once when the model is being unloaded.
    Implementing `finalize` function is optional. This function allows
    the model to perform any necessary clean ups before exit.
    """
    print('Cleaning up...')

tensorrt-llm-bls config.pbtxt: name: "tensorrt_llm_bls" backend: "python" max_batch_size: 4

model_transaction_policy { decoupled: true }

input [ { name: "text_input" data_type: TYPE_STRING dims: [ -1 ] }, { name: "max_tokens" data_type: TYPE_INT32 dims: [ -1 ] }, { name: "bad_words" data_type: TYPE_STRING dims: [ -1 ] optional: true }, { name: "stop_words" data_type: TYPE_STRING dims: [ -1 ] optional: true }, { name: "end_id" data_type: TYPE_INT32 dims: [ 1 ] optional: true }, { name: "pad_id" data_type: TYPE_INT32 dims: [ 1 ] optional: true }, { name: "top_k" data_type: TYPE_INT32 dims: [ 1 ] optional: true }, { name: "top_p" data_type: TYPE_FP32 dims: [ 1 ] optional: true }, { name: "temperature" data_type: TYPE_FP32 dims: [ 1 ] optional: true }, { name: "length_penalty" data_type: TYPE_FP32 dims: [ 1 ] optional: true }, { name: "repetition_penalty" data_type: TYPE_FP32 dims: [ 1 ] optional: true }, { name: "min_length" data_type: TYPE_INT32 dims: [ 1 ] optional: true }, { name: "presence_penalty" data_type: TYPE_FP32 dims: [ 1 ] optional: true }, { name: "random_seed" data_type: TYPE_UINT64 dims: [ 1 ] optional: true }, { name: "return_log_probs" data_type: TYPE_BOOL dims: [ 1 ] optional: true }, { name: "beam_width" data_type: TYPE_INT32 dims: [ 1 ] optional: true }, { name: "stream" data_type: TYPE_BOOL dims: [ 1 ] optional: true }, { name: "prompt_embedding_table" data_type: TYPE_FP16 dims: [ -1, -1 ] optional: true }, { name: "prompt_vocab_size" data_type: TYPE_INT32 dims: [ 1 ] optional: true }, { name: "embedding_bias_words" data_type: TYPE_STRING dims: [ -1 ] optional: true }, { name: "embedding_bias_weights" data_type: TYPE_FP32 dims: [ -1 ] optional: true } ] output [ { name: "text_output" data_type: TYPE_STRING dims: [ -1 ] }, { name: "cum_log_probs" data_type: TYPE_FP32 dims: [ -1 ] }, { name: "output_log_probs" data_type: TYPE_FP32 dims: [ -1, -1 ] } ]

parameters: { key: "accumulate_tokens" value: { string_value: "true" } }

instance_group [ { count: 2 kind : KIND_CPU } ] `

4.

Expected behavior

expect streaming output but not . curl -X POST 10.110.31.16:8001/v2/models/tensorrt_llm_bls/generate_stream \ -d '{"text_input": "<|im_start|>system\n you are a writer .<|im_end|>\n<|im_start|>user\nwho are you<|im_end|>\n<|im_start|>assistant\n", "max_tokens": 54, "bad_words": "\n", "stop_words": "", "end_id": [151643], "pad_id": [151643],"stream": true }'

actual behavior

image

additional notes

when i print sth, image image I think the trellm_response is a whole output could not Iteratorable

schetlur-nv commented 3 months ago

Hey @dongteng - can you try using https://gitlab-master.nvidia.com/ftp/tekit_backend/-/blob/main/inflight_batcher_llm/client/end_to_end_grpc_client.py with the same bls backend and see if it works? It will help isolate the problem.

nuxlear commented 1 month ago

@dongteng - can you try using --paged_kv_cache enable for in-flight batching and setting batching_strategy:inflight_fused_batching in tensorrt_llm model in triton server setting (config.pbtxt)?

https://github.com/triton-inference-server/tensorrtllm_backend/issues/348#issuecomment-2114744044 As I mentioned on link above, you should turn on in-flight batching strategy for using TRT-LLM + triton server with streaming mode.