triton-inference-server / fastertransformer_backend

BSD 3-Clause "New" or "Revised" License
411 stars 133 forks source link

could end_to_end_test.py with model_name 'ensemble' support decoupled mode #108

Open jimmyforrest opened 1 year ago

jimmyforrest commented 1 year ago

Description

master

Reproduced Steps

end_to_end_test.py
byshiue commented 1 year ago

Yes.

Lzhang-hub commented 1 year ago

Yes.

@byshiue Hi, I deploy a bloom model with decoupled=True, and run end_to_end_test.py with grpc. I got the error:

[StatusCode.UNIMPLEMENTED] ModelInfer RPC doesn't support models with decoupled transaction policy

But I run issue_request.py for model_namefastertransformer, it is all right.

Tritonclient version : 2.25.0 Triton Server Version 2.24.0, and Triton TRITONBACKEND API version: 1.10

Are there any other parameters that need to be configured? I will be very grateful if you can give some advice

byshiue commented 1 year ago

You need enable the decoupled = True in fastertransformer config and use async api in end_to_end_test.py like https://github.com/triton-inference-server/fastertransformer_backend/blob/main/tools/gpt/identity_test.py#L149-L174 like:

            if (flags.decoupled):
              print("set request")
              user_data = UserData()
              client.start_stream(partial(completion_callback, user_data))

              client.async_stream_infer(
                  model_name,
                  inputs,
                  request_id=str(1))

              print("get request")
              client.stop_stream()

              recv_count = 0
              while not user_data._completed_requests.empty():
                (result, error) = user_data._completed_requests.get()
                if error is not None:
                    print(error)
                    exit()
                recv_count+=1
                if (recv_count == flags.output_len):
                  results.append(result)

              if (recv_count != flags.output_len):
                  raise RuntimeError('Number of responses received doesnt match request_output_len')

You can try identity_test.py with decoupled mode first.

Lzhang-hub commented 1 year ago

Thank you very much for your reply, I try identity_test.py with decoupled mode and print the output in completion_callback,streaming output is fine

# Callback function used for async_stream_infer()
def completion_callback(user_data, result, error):
    # passing error raise and handling out

    print(result.as_numpy("output_ids"))
    user_data._completed_requests.put((result, error))

Then I use async api in end_to_end_test.py like

FLAGS = None
START_LEN = 8
OUTPUT_LEN = 24
BATCH_SIZE = 8
start_id = 220
end_id = 50256

if sys.version_info >= (3, 0):
    import queue
else:
    import Queue as queue

class UserData:
    def __init__(self):
        self._completed_requests = queue.Queue()

# Callback function used for async_stream_infer()
def completion_callback(user_data, result, error):
    # passing error raise and handling out

    print(result.as_numpy("output_ids"))
    user_data._completed_requests.put((result, error))

def prepare_tensor(name, input, protocol):
    client_util = httpclient if protocol == "http" else grpcclient
    t = client_util.InferInput(
        name, input.shape, np_to_triton_dtype(input.dtype))
    t.set_data_from_numpy(input)
    return t

def create_inference_server_client(protocol, url, concurrency, verbose):
    client_util = httpclient if protocol == "http" else grpcclient
    if protocol == "http":
        return client_util.InferenceServerClient(url,
                                                concurrency=concurrency,
                                                verbose=verbose)
    elif protocol == "grpc":
        return client_util.InferenceServerClient(url,
                                                verbose=verbose)

if __name__ == '__main__':
    parser = argparse.ArgumentParser()
    parser.add_argument('-v',
                        '--verbose',
                        action="store_true",
                        required=False,
                        default=False,
                        help='Enable verbose output')
    parser.add_argument('-u',
                        '--url',
                        type=str,
                        required=False,
                        help='Inference server URL.')
    parser.add_argument('-beam',
                        '--beam_width',
                        type=int,
                        default=1,
                        help='beam width.')
    parser.add_argument('-topk',
                        '--topk',
                        type=int,
                        default=1,
                        required=False,
                        help='topk for sampling')
    parser.add_argument('-topp',
                        '--topp',
                        type=float,
                        default=0.0,
                        required=False,
                        help='topp for sampling')
    parser.add_argument('-d',
                        '--decoupled',
                        action="store_true",
                        required=False,
                        default=False,
                        help='Use decoupled mode')
    parser.add_argument(
        '-i',
        '--protocol',
        type=str,
        required=False,
        default='http',
        help='Protocol ("http"/"grpc") used to ' +
        'communicate with inference service. Default is "http".')
    parser.add_argument('--return_log_probs',
                        action="store_true",
                        default=False,
                        required=False,
                        help='return the cumulative log probs and output log probs or not')

    FLAGS = parser.parse_args()
    if (FLAGS.protocol != "http") and (FLAGS.protocol != "grpc"):
        print("unexpected protocol \"{}\", expects \"http\" or \"grpc\"".format(
            FLAGS.protocol))
        exit(1)

    FLAGS.protocol = "grpc" 

    if FLAGS.url is None:
        FLAGS.url = "xxx.xxx.xxx.xxx:8001"
        # FLAGS.url = "localhost:8000" if FLAGS.protocol == "http" else "localhost:8001"

    ######################
    model_name = "ensemble"
    with create_inference_server_client(FLAGS.protocol,
                                        FLAGS.url,
                                        concurrency=1,
                                        verbose=FLAGS.verbose) as client:
        results=[]
        input0 = [
                ["Blackhawks\n The 2015 Hilltoppers"],
                ["Data sources you can use to make a decision:"],
                ["\n if(angle = 0) { if(angle"],
                ["GMs typically get 78% female enrollment, but the "],
                ["Previous Chapter | Index | Next Chapter"],
                ["Michael, an American Jew, called Jews"],
                ["Blackhawks\n The 2015 Hilltoppers"],
                ["Data sources you can use to make a comparison:"]
                ]
        bad_words_list = np.array([
            ["Hawks, Hawks"],
            [""],
            [""],
            [""],
            [""],
            [""],
            [""],
            [""]], dtype=object)
        stop_words_list = np.array([
            [""],
            [""],
            [""],
            [""],
            [""],
            [""],
            [""],
            ["month, month"]], dtype=object)
        input0_data = np.array(input0).astype(object)
        output0_len = np.ones_like(input0).astype(np.uint32) * OUTPUT_LEN
        runtime_top_k = (FLAGS.topk * np.ones([input0_data.shape[0], 1])).astype(np.uint32)
        runtime_top_p = FLAGS.topp * np.ones([input0_data.shape[0], 1]).astype(np.float32)
        beam_search_diversity_rate = 0.0 * np.ones([input0_data.shape[0], 1]).astype(np.float32)
        temperature = 1.0 * np.ones([input0_data.shape[0], 1]).astype(np.float32)
        len_penalty = 1.0 * np.ones([input0_data.shape[0], 1]).astype(np.float32)
        repetition_penalty = 1.0 * np.ones([input0_data.shape[0], 1]).astype(np.float32)
        random_seed = 0 * np.ones([input0_data.shape[0], 1]).astype(np.uint64)
        is_return_log_probs = True * np.ones([input0_data.shape[0], 1]).astype(bool)
        beam_width = (FLAGS.beam_width * np.ones([input0_data.shape[0], 1])).astype(np.uint32)
        start_ids = start_id * np.ones([input0_data.shape[0], 1]).astype(np.uint32)
        end_ids = end_id * np.ones([input0_data.shape[0], 1]).astype(np.uint32)
        inputs = [
            prepare_tensor("INPUT_0", input0_data, FLAGS.protocol),
            prepare_tensor("INPUT_1", output0_len, FLAGS.protocol),
            prepare_tensor("INPUT_2", bad_words_list, FLAGS.protocol),
            prepare_tensor("INPUT_3", stop_words_list, FLAGS.protocol),
            prepare_tensor("runtime_top_k", runtime_top_k, FLAGS.protocol),
            prepare_tensor("runtime_top_p", runtime_top_p, FLAGS.protocol),
            prepare_tensor("beam_search_diversity_rate", beam_search_diversity_rate, FLAGS.protocol),
            prepare_tensor("temperature", temperature, FLAGS.protocol),
            prepare_tensor("len_penalty", len_penalty, FLAGS.protocol),
            prepare_tensor("repetition_penalty", repetition_penalty, FLAGS.protocol),
            prepare_tensor("random_seed", random_seed, FLAGS.protocol),
            prepare_tensor("is_return_log_probs", is_return_log_probs, FLAGS.protocol),
            prepare_tensor("beam_width", beam_width, FLAGS.protocol),
            prepare_tensor("start_id", start_ids, FLAGS.protocol),
            prepare_tensor("end_id", end_ids, FLAGS.protocol),
        ]
        FLAGS.decoupled=True
        if (FLAGS.decoupled):
            print("set request")
            user_data = UserData()
            client.start_stream(partial(completion_callback, user_data))
            # print(inputs[0].shape())
            client.async_stream_infer(
                model_name,
                inputs,
                request_id=str(1))

            print("get request")
            client.stop_stream()

            recv_count = 0
            while not user_data._completed_requests.empty():
                (result, error) = user_data._completed_requests.get()
                # print(result.as_numpy("output_ids"))
                if error is not None:
                    print(error)
                    exit()
                recv_count+=1
                if (recv_count == FLAGS.output_len):
                    results.append(result)

            if (recv_count != FLAGS.output_len):
                raise RuntimeError('Number of responses received doesnt match request_output_len')

        else:
            print("set request")
            result = client.infer(model_name, inputs)
            print("get request")
            results.append(result)

But I got the output result is None.

byshiue commented 1 year ago

Your script miss many things and I cannot run it successfully. Please try running this script directly

#!/usr/bin/python

# Copyright (c) 2021-2022, NVIDIA CORPORATION. All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions
# are met:
#  * Redistributions of source code must retain the above copyright
#    notice, this list of conditions and the following disclaimer.
#  * Redistributions in binary form must reproduce the above copyright
#    notice, this list of conditions and the following disclaimer in the
#    documentation and/or other materials provided with the distribution.
#  * Neither the name of NVIDIA CORPORATION nor the names of its
#    contributors may be used to endorse or promote products derived
#    from this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS ``AS IS'' AND ANY
# EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
# PURPOSE ARE DISCLAIMED.  IN NO EVENT SHALL THE COPYRIGHT OWNER OR
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
# EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
# PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
# PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY
# OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

import argparse
import numpy as np
import logging
import tritonclient.grpc as grpcclient
import tritonclient.http as httpclient
from functools import partial

from tritonclient.utils import np_to_triton_dtype

FLAGS = None

GPT_START_ID = 220
GPT_END_ID = 50256

import sys
if sys.version_info >= (3, 0):
    import queue
else:
    import Queue as queue

class UserData:
    def __init__(self):
        self._completed_requests = queue.Queue()

# Callback function used for async_stream_infer()
def completion_callback(user_data, result, error):
    # passing error raise and handling out
    user_data._completed_requests.put((result, error))

def prepare_tensor(name, input, protocol):
    client_util = httpclient if protocol == "http" else grpcclient
    t = client_util.InferInput(
        name, input.shape, np_to_triton_dtype(input.dtype))
    t.set_data_from_numpy(input)
    return t

def create_inference_server_client(protocol, url, concurrency, verbose):
    client_util = httpclient if protocol == "http" else grpcclient
    if protocol == "http":
        return client_util.InferenceServerClient(url,
                                                 concurrency=concurrency,
                                                 verbose=verbose)
    elif protocol == "grpc":
        return client_util.InferenceServerClient(url, verbose=verbose)

def append_start_and_end_ids(inputs, batch_size, start_id=None, end_id=None):
    if start_id is not None:
        start_ids = start_id * np.ones([batch_size, 1]).astype(np.uint32)
        inputs.append(prepare_tensor("start_id", start_ids, FLAGS.protocol))
    if end_id is not None:
        end_ids = end_id * np.ones([batch_size, 1]).astype(np.uint32)
        inputs.append(prepare_tensor("end_id", end_ids, FLAGS.protocol))

if __name__ == '__main__':
    parser = argparse.ArgumentParser()
    parser.add_argument('-v',
                        '--verbose',
                        action="store_true",
                        required=False,
                        default=False,
                        help='Enable verbose output')
    parser.add_argument('-u',
                        '--url',
                        type=str,
                        required=False,
                        help='Inference server URL.')
    parser.add_argument('-beam',
                        '--beam_width',
                        type=int,
                        default=1,
                        help='beam width.')
    parser.add_argument('-topk',
                        '--topk',
                        type=int,
                        default=1,
                        required=False,
                        help='topk for sampling')
    parser.add_argument('-topp',
                        '--topp',
                        type=float,
                        default=0.0,
                        required=False,
                        help='topp for sampling')
    parser.add_argument('-i',
                        '--protocol',
                        type=str,
                        required=False,
                        default='grpc',
                        help='Protocol ("http"/"grpc") used to '
                             'communicate with inference service. Default is "http".')
    parser.add_argument('--return_log_probs',
                        action="store_true",
                        default=False,
                        required=False,
                        help='return the cumulative log probs and output log probs or not')
    parser.add_argument('--output-len',
                        type=int,
                        default=24,
                        required=False,
                        help='the number of tokens we hope model generating')
    parser.add_argument('--model-variant',
                        type=str,
                        default='gpt',
                        choices=['gpt', 'bloom'],
                        help='The type of GPT model variants.')

    FLAGS = parser.parse_args()

    LOGGER = logging.getLogger(f"{__file__} {__name__}")
    log_format = "%(asctime)s %(name)s:%(lineno)d [%(levelname)s] %(message)s"
    logging.basicConfig(level=logging.DEBUG if FLAGS.verbose else logging.INFO,
                        format=log_format)

    if (FLAGS.protocol != "http") and (FLAGS.protocol != "grpc"):
        LOGGER.error(f'unexpected protocol "{FLAGS.protocol}", expects "http" or "grpc"')
        exit(1)

    if FLAGS.url is None:
        FLAGS.url = "localhost:8000" if FLAGS.protocol == "http" else "localhost:8001"

    ######################
    model_name = "ensemble"
    with create_inference_server_client(FLAGS.protocol,
                                        FLAGS.url,
                                        concurrency=1,
                                        verbose=FLAGS.verbose) as client:
        input0 = [
            ["Blackhawks\n The 2015 Hilltoppers"],
            ["Data sources you can use to make a decision:"],
            ["\n if(angle = 0) { if(angle"],
            ["GMs typically get 78% female enrollment, but the "],
            ["Previous Chapter | Index | Next Chapter"],
            ["Michael, an American Jew, called Jews"],
            ["Blackhawks\n The 2015 Hilltoppers"],
            ["Data sources you can use to make a comparison:"]
        ]
        bad_words_list = np.array([
            ["Hawks, Hawks"],
            [""],
            [""],
            [""],
            [""],
            [""],
            [""],
            [""]], dtype=object)
        stop_words_list = np.array([
            [""],
            [""],
            [""],
            [""],
            [""],
            [""],
            [""],
            ["month, month"]], dtype=object)
        input0_data = np.array(input0).astype(object)
        output0_len = np.ones_like(input0).astype(np.uint32) * FLAGS.output_len
        runtime_top_k = (FLAGS.topk * np.ones([input0_data.shape[0], 1])).astype(np.uint32)
        runtime_top_p = FLAGS.topp * np.ones([input0_data.shape[0], 1]).astype(np.float32)
        beam_search_diversity_rate = 0.0 * np.ones([input0_data.shape[0], 1]).astype(np.float32)
        temperature = 1.0 * np.ones([input0_data.shape[0], 1]).astype(np.float32)
        len_penalty = 1.0 * np.ones([input0_data.shape[0], 1]).astype(np.float32)
        repetition_penalty = 1.0 * np.ones([input0_data.shape[0], 1]).astype(np.float32)
        random_seed = 0 * np.ones([input0_data.shape[0], 1]).astype(np.uint64)
        is_return_log_probs = True * np.ones([input0_data.shape[0], 1]).astype(bool)
        beam_width = (FLAGS.beam_width * np.ones([input0_data.shape[0], 1])).astype(np.uint32)
        inputs = [
            prepare_tensor("INPUT_0", input0_data, FLAGS.protocol),
            prepare_tensor("INPUT_1", output0_len, FLAGS.protocol),
            prepare_tensor("INPUT_2", bad_words_list, FLAGS.protocol),
            prepare_tensor("INPUT_3", stop_words_list, FLAGS.protocol),
            prepare_tensor("runtime_top_k", runtime_top_k, FLAGS.protocol),
            prepare_tensor("runtime_top_p", runtime_top_p, FLAGS.protocol),
            prepare_tensor("beam_search_diversity_rate", beam_search_diversity_rate, FLAGS.protocol),
            prepare_tensor("temperature", temperature, FLAGS.protocol),
            prepare_tensor("len_penalty", len_penalty, FLAGS.protocol),
            prepare_tensor("repetition_penalty", repetition_penalty, FLAGS.protocol),
            prepare_tensor("random_seed", random_seed, FLAGS.protocol),
            prepare_tensor("is_return_log_probs", is_return_log_probs, FLAGS.protocol),
            prepare_tensor("beam_width", beam_width, FLAGS.protocol),
        ]

        if FLAGS.model_variant == 'gpt':
            append_start_and_end_ids(inputs, input0_data.shape[0], GPT_START_ID, GPT_END_ID)

        # factual-nucleus arguments
        # top_p_decay = 0.9 * np.ones([input0_data.shape[0], 1]).astype(np.float32)
        # top_p_min = 0.5 * np.ones([input0_data.shape[0], 1]).astype(np.float32)
        # top_p_reset_ids = 13 * np.ones([input0_data.shape[0], 1]).astype(np.uint32)
        # inputs.append(prepare_tensor("top_p_decay", top_p_decay, FLAGS.protocol))
        # inputs.append(prepare_tensor("top_p_min", top_p_min, FLAGS.protocol))
        # inputs.append(prepare_tensor("top_p_reset_ids", top_p_reset_ids, FLAGS.protocol))

        try:
            print("set request")
            user_data = UserData()
            client.start_stream(partial(completion_callback, user_data))

            client.async_stream_infer(
                model_name,
                inputs,
                request_id=str(1))

            print("get request")
            client.stop_stream()

            recv_count = 0
            while not user_data._completed_requests.empty():
                (result, error) = user_data._completed_requests.get()
                print(f"result {recv_count}: {result.as_numpy('OUTPUT_0')}")
                print()
                # print(f"result: {result.as_numpy('sequence_length')}")
                # print(f"result: {result.as_numpy('response_input_lengths')}")
                if error is not None:
                    print(error)
                    exit()
                recv_count+=1
                if (recv_count == FLAGS.output_len):
                    print(f"result: {result.as_numpy('OUTPUT_0')}")
                    # print(f"result: {result.as_numpy('sequence_length')}")
                    # print(f"result: {result.as_numpy('response_input_lengths')}")

            if (recv_count != FLAGS.output_len):
                raise RuntimeError('Number of responses received doesnt match request_output_len')

            # result = client.infer(model_name, inputs)
            # output0 = result.as_numpy("OUTPUT_0")
            # LOGGER.info("============After ensemble============")
            # LOGGER.info(f"output: \n {output0}")
            # LOGGER.info(f"sequence_length: \n{result.as_numpy('sequence_length')}")
            # if FLAGS.return_log_probs:
            #     LOGGER.info(f"cum_log_probs: \n{result.as_numpy('cum_log_probs')}")
            #     LOGGER.info(f"output_log_probs: \n{result.as_numpy('output_log_probs')}")
        except Exception as e:
            LOGGER.info(e)
Lzhang-hub commented 1 year ago

Thank you very much, I run it successfully! I made a stupid mistake ,I forgot to change the name of the output variable I use print(f"result: {result.as_numpy('output_ids')}") in ensemble model , it should be print(f"result: {result.as_numpy('OUTPUT_0')}")