tensorflow / serving

A flexible, high-performance serving system for machine learning models
https://www.tensorflow.org/serving
Apache License 2.0
6.17k stars 2.19k forks source link

Tensorflow serving using batch_enabled flag #882

Closed gr8Adakron closed 6 years ago

gr8Adakron commented 6 years ago

I have successfully installed the tensorflow-serving and I am able to get the prediction for pre-build inception model using tensorflow-serving.

Server-running Command:

tensorflow_model_server --port=9000 --model_name=inception --model_base_path=/home/afzal/serving/inception_model/

Gets the prediction using this script _called inceptionclient.py:

from __future__ import print_function
# This is a placeholder for a Google-internal import.
from grpc.beta import implementations
import tensorflow as tf
from tensorflow_serving.apis import predict_pb2
from tensorflow_serving.apis import prediction_service_pb2
tf.app.flags.DEFINE_string('server', 'localhost:9000',
                           'PredictionService host:port')
tf.app.flags.DEFINE_string('image', '', 'path to image in JPEG format')
FLAGS = tf.app.flags.FLAGS

def main(_):
  host, port = FLAGS.server.split(':')
  channel = implementations.insecure_channel(host, int(port))
  stub = prediction_service_pb2.beta_create_PredictionService_stub(channel)
  # Send request
  with open(FLAGS.image, 'rb') as f:
    # See prediction_service.proto for gRPC request/response details.
    data = f.read()
    request = predict_pb2.PredictRequest()
    request.model_spec.name = 'inception'
    request.model_spec.signature_name = 'predict_images'
    request.inputs['images'].CopyFrom(
     tf.contrib.util.make_tensor_proto(data, shape=[1]))  #..> For Original Inception uncomment this
    result = stub.Predict(request, 10.0)  # 10 secs timeout
    print(result)

if __name__ == '__main__':
  tf.app.run()

python inception_client.py --image=temp.jpg

I got the prediction fine! Successful. Awesome! Single image takes around 2.5seconds. Not bad.

Now I want to make the predictions for 1000 images, all together, using this --enable_batching flag.

The server started by adding this flag to the command, full server-starting command is:

tensorflow_model_server --enable_batching --port=9000 --model_name=inception --model_base_path=/home/afzal/serving/inception_model/

But I don't know what should be the script or how to get the batch prediction to it. I tried searching but I could not find it.

The same client script doesn't work. Any Help! I want predictions for batch of images.

chrisolston commented 6 years ago

--enable_batching turns on server-side batching, which means that the server will group together multiple inference requests it receives from clients and process them in a batch. It should be completely transparent to the client (other than performance). If by "script" you mean something that's sending requests as a client, then the same script should work.

(If you want to do client-side batching, that is also possible -- just have your client batch together multiple requests into a single RPC to the server.)

gr8Adakron commented 6 years ago

Thank you! for the reply. But if I use the same script after starting the server with --enabled_batching flag, it returns me this error.

Traceback (most recent call last):
  File "inception_client.py", line 59, in <module>
    tf.app.run()
  File "/home/afzal/.virtualenvs/tensorflow_python36/lib/python3.6/site-packages/tensorflow/python/platform/app.py", line 126, in run
    _sys.exit(main(argv))
  File "inception_client.py", line 54, in main
    result = stub.Predict(request, 10.0)  # 10 secs timeout
  File "/home/afzal/.virtualenvs/tensorflow_python36/lib/python3.6/site-packages/grpc/beta/_client_adaptations.py", line 309, in __call__
    self._request_serializer, self._response_deserializer)
  File "/home/afzal/.virtualenvs/tensorflow_python36/lib/python3.6/site-packages/grpc/beta/_client_adaptations.py", line 195, in _blocking_unary_unary
    raise _abortion_error(rpc_error_call)
grpc.framework.interfaces.face.face.AbortionError: AbortionError(code=StatusCode.FAILED_PRECONDITION, details="Batched output tensor's 0th dimension does not equal the sum of the 0th dimension sizes of the input tensors")
chrisolston commented 6 years ago

Ah, it looks like your tensorflow graph is not structured in a way to be compatible with the tf-serving batcher. The error you are getting is that the batcher is complaining that it cannot match up the outputs to the batched inputs it sent into the tf::Session::Run() call. The batcher assumes that the 0th dimension is the batch dimension, and it concatenates input tensors along that dimension (e.g. if three input tensors have batch sizes 2, 5 and 7 and the batcher decides to group them into a batch, it will concatenate them into a single tensor of batch size 2+5+7=14). Then it expects the result of running the graph on the batched tensor to have the same batch dim size (e.g. the output should have batch size 14), after which it will split the output into individual (unbatched) tensors attributed to the original unbatched inputs (e.g. 2, 5 and 7). OTOH if it runs the graph with an input with batch dim size 14 and gets back an output with batch dim size 27, it has no idea how to split the 27 up to correspond to the inputs.

offbye commented 6 years ago

Is it means that I can use batching simply by set enable_batching=true , without any changes to other codes?

chrisolston commented 6 years ago

--enable_batching=true will enable batching, yes.

If all you do is set that flag it will use ~sensible default values of tuning parameters, including the max batch size (which affects throughput) and the batch timeout (which affects tail latency). See https://github.com/tensorflow/serving/blob/master/tensorflow_serving/batching/README.md for ideas on how to set those parameters if you want to do more tuning. You use --batching_parameters_file to pass them to the model server binary.

ymodak commented 6 years ago

@gr8Adakron Just wanted to follow up. Were you able to resolve this issue?

gr8Adakron commented 6 years ago

This is a hybrid script, but it will answer your question. Just a little effort.

from grpc.beta import implementations
from tensorflow_serving.apis import predict_pb2
from tensorflow_serving.apis import prediction_service_pb2
from keras.preprocessing import image as keras_image
from keras.applications.xception import preprocess_input as xpreprocess_input

import cv2
import re
import os
import time
import numpy as np
import tensorflow as tf
import numpy as np
import pandas as pd
import mxnet as mx

tf.app.flags.DEFINE_string('host', 'localhost', 'Service host')
tf.app.flags.DEFINE_string('port', '8006', 'Service port')
tf.app.flags.DEFINE_string('picture', '', 'path to picture')
FLAGS = tf.app.flags.FLAGS
output_list = []

def predictResponse_into_nparray(response, output_tensor_name):
    dims = response.outputs[output_tensor_name].tensor_shape.dim
    shape = tuple(d.size for d in dims)
    return np.reshape(response.outputs[output_tensor_name].float_val, shape)

def pretty(d, indent=0):
    for key, value in d.items():
        print('\t' * indent + str(key))
        if isinstance(value, dict):
            pretty(value, indent+1)
        else:
            print('\t' * (indent+1) + str(value))

def return_prediction(file_name,offset_output,labels_file):
    with open(labels_file, 'r') as f:
        labels   = [l.rstrip() for l in f]

    #..> NP-squeeze-argmax
    sq_output    = np.squeeze(offset_output)
    argmax_index = np.argmax(np.squeeze(offset_output), axis=0)
    probindex    = np.argsort(sq_output)[::-1]

    #..> Stackoverflow
    # scores       = offset_output[0]
    # rank         = np.array(offset_output[0]).argsort()[-5:][::-1]

    prediction   = labels[argmax_index]
    probability  = format(sq_output[argmax_index],'.3f')

    #print(f"Filename {file_name}, Prediction {prediction}, probability {probability}.")
    return prediction,probability

def resize_image(absolute_image_path):  
    base_path        = absolute_image_path[:-4]
    resized_path     = f"{base_path}_resized.jpg"
    original_image   = cv2.imread(absolute_image_path)
    original_rgb     = cv2.cvtColor(original_image, cv2.COLOR_BGR2RGB)
    dim              = (200,200)
    resized          = cv2.resize(original_rgb, dim, interpolation = cv2.INTER_AREA)
    resized_color    = cv2.cvtColor(resized, cv2.COLOR_BGR2RGB)
    cv2.imwrite(resized_path,resized_color)
    return resized_path

def preprocessing_mxnet(filename):
    img = cv2.imread(filename)
    img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
    img = cv2.resize(img, (224,224,))
    img = np.swapaxes(img, 0, 2)
    img = np.swapaxes(img, 1, 2)
    img = img.astype('float')
    img[0,:,:] -= 123.68
    img[1,:,:] -= 116.779
    img[2,:,:] -= 103.939
    #img = img[np.newaxis, :]
    swap_img_1 = np.swapaxes(img, 0, 1)
    #print(swap_img_1.shape)
    swap_img_2 = np.swapaxes(swap_img_1, 1, 2)
    #print(swap_img_2.shape)
    swap_img_3 = np.swapaxes(swap_img_2, 0, 1)
    return swap_img_2

def decode_image(jpeg_file):
    with tf.device('/gpu:1'):
        decoder_graph = tf.Graph()
        with decoder_graph.as_default():
            decoded_image = tf.image.decode_png(jpeg_file, channels=3)
            decoded_image = tf.image.central_crop(decoded_image, 1)
            decoded_image = tf.image.resize_images(decoded_image, (224,224))
            normalized_image = tf.divide(decoded_image, 255)
            # reshaped_image = tf.reshape(normalized_image, [-1, 331, 331, 3])
        with tf.Session(graph = decoder_graph) as image_session:
        # image_session = tf.Session(graph = decoder_graph)
            input_0 = image_session.run(normalized_image)
    return input_0

def preprocess_image_tensorflow(image_path):
    with tf.gfile.FastGFile(image_path, 'rb') as jpeg_file_raw:
        jpeg_file = jpeg_file_raw.read()
        input_0 = decode_image(jpeg_file)
    return input_0

def serving_model_prediction(stub,batch_of_img,model_name,input_tensor,output_tensor,signature):
    request = predict_pb2.PredictRequest()
    request.model_spec.name = model_name
    request.model_spec.signature_name = signature #tf.saved_model.signature_constants.DEFAULT_SERVING_SIGNATURE_DEF_KEY
    request.inputs[input_tensor].CopyFrom(
        tf.contrib.util.make_tensor_proto(batch_of_img, dtype='float32'))

    result = stub.Predict(request, 30)

    #print(result)
    output          = predictResponse_into_nparray(result,output_tensor)
    return output

def batch_preprocessing_mxnet(image_list,image_dir):
    preprocessed_input  = []
    all_img_names       = []
    for single_image in image_list:
            image_path  = f"{image_dir}{single_image}"
            try:
                all_img_names.append(image_path)
                preprocessed_input.append(preprocessing_mxnet(image_path))
            except:
                continue
    return preprocessed_input,all_img_names

def batch_prediction_mxnet(stub,
                     labels_file,
                     batch_input,
                     test_image_dir):
    batch_output = []
    batch_preprocessed,batch_img_names  = batch_preprocessing_mxnet(batch_input,test_image_dir)
    if(len(batch_preprocessed)==0): return []
    batch_np     = np.array(batch_preprocessed)
    batch_return = serving_model_prediction(stub=stub,
                        batch_of_img = batch_np,
                        model_name   = "mxnet_converted_pt_152",
                        input_tensor = "input",
                        output_tensor= "output",
                        signature    = tf.saved_model.signature_constants.DEFAULT_SERVING_SIGNATURE_DEF_KEY)

    for index,single_prediction in enumerate(batch_return):
        prediction_array = batch_return[index]
        image_name       = batch_img_names[index]
        batch_output.append({image_name:return_prediction(image_name,prediction_array,labels_file)})

    return batch_output

def batch_preprocessing_tensorflow(image_list,image_dir):
    preprocessed_input  = []
    all_img_names       = []
    for single_image in image_list:
            image_path  = f"{image_dir}{single_image}"
            try:
                all_img_names.append(image_path)
                preprocessed_input.append(preprocess_image_tensorflow(image_path))
            except:
                continue
    return preprocessed_input,all_img_names

def batch_prediction_tensorflow(stub,
                     labels_file,
                     batch_input,
                     test_image_dir):
    batch_output = []
    batch_preprocessed,batch_img_names  = batch_preprocessing_tensorflow(batch_input,test_image_dir)
    if(len(batch_preprocessed)==0): return []
    batch_np     = np.array(batch_preprocessed)
    batch_return = serving_model_prediction(stub=stub,
                        batch_of_img = batch_np,
                        model_name   = "product_type_tf_pnasnet",
                        input_tensor = "in",
                        output_tensor= "out",
                        signature    = tf.saved_model.signature_constants.DEFAULT_SERVING_SIGNATURE_DEF_KEY)

    for index,single_prediction in enumerate(batch_return):
        prediction_array = batch_return[index]
        image_name       = batch_img_names[index]
        batch_output.append({image_name:return_prediction(image_name,prediction_array,labels_file)})

    return batch_output

def keras_preprocessing_func(image_path):
    with tf.device('/cpu:0'):
        image   = keras_image.load_img(image_path,target_size=(224,224))
        image   = keras_image.img_to_array(image) #.astype('float32')/255
        # image   = np.expand_dims(image, axis=0)
        image   = xpreprocess_input(image)
    return image

def batch_preprocessing_keras(image_list,image_dir):
    preprocessed_input  = []
    all_img_names       = []
    for single_image in image_list:
            image_path  = f"{image_dir}{single_image}"
            try:
                all_img_names.append(image_path)
                preprocessed_input.append(keras_preprocessing_func(image_path))
            except:
                continue
    return preprocessed_input,all_img_names

def batch_prediction_keras(stub,
                     labels_file,
                     batch_input,
                     test_image_dir):
    batch_output = []
    batch_preprocessed,batch_img_names  = batch_preprocessing_keras(batch_input,test_image_dir)
    if(len(batch_preprocessed)==0): return []
    batch_np     = np.array(batch_preprocessed)
    batch_return = serving_model_prediction(stub=stub,
                        batch_of_img = batch_np,
                        model_name   = "product_type_keras",
                        input_tensor = "input",
                        output_tensor= "output",
                        signature    = "datax_pt")

    for index,single_prediction in enumerate(batch_return):
        prediction_array = batch_return[index]
        image_name       = batch_img_names[index]
        batch_output.append({image_name:return_prediction(image_name,prediction_array,labels_file)})

    return batch_output

def batch_father(stub,
                 test_image_dir,
                 labels_file,
                 batch_size):
    test_image_dir  = "/home/adakron/tf-serving-arch-beta/test_data/2000-images-test/"
    labels_file     = "/home/adakron/tf-serving-arch-beta/labels_files/pt-converted.txt"
    image_list      = os.listdir(test_image_dir)
    total_image     = len(image_list)
    all_input       = []
    all_img_names   = []
    all_output      = []
    no_of_iteration = int(total_image/batch_size)+2

    start_time      = time.time()

    for current_batch in range(1,no_of_iteration):
        batch_begin  = int(batch_size*(current_batch-1))
        batch_end    = int(batch_size*(current_batch))

        if(batch_end>total_image): batch_end=total_image
        if(batch_begin==batch_end): break;
        batch_input  = image_list[batch_begin:batch_end]

        print(f"> Batch start: {batch_begin}, Batch ends: {batch_end}") 
        #..> batch_prediction_mxnet | batch_prediction_keras | batch_prediction_tensorflow
        batch_output = batch_prediction_tensorflow(stub=stub,
                             labels_file    = labels_file,
                             batch_input    = batch_input,
                             test_image_dir = test_image_dir)

        all_output   = all_output+batch_output 
    return all_output

def main(_):
    host            = FLAGS.host
    port            = FLAGS.port
    channel         = implementations.insecure_channel(host, int(port))
    stub            = prediction_service_pb2.beta_create_PredictionService_stub(channel)
    start_time      = time.time()
    final_output    = []
    # Send request

    test_image_dir  = "/home/adakron/tf-serving-arch-beta/test_data/100-images-test/"
    labels_file     = "/home/adakron/tf-serving-arch-beta/labels_files/pt-converted.txt"
    batch_size      = 16

    batch_output = batch_father(stub,
                     test_image_dir=test_image_dir,
                     labels_file   =labels_file,
                     batch_size    = 16)

    print(batch_output)

    timetaken   = str(format(float((time.time() - start_time)),'.3f'))
    print(f"\n{timetaken} secs > {total_image}")

if __name__ == '__main__':
    tf.app.run()

I had Various kind of model Keras, MXnet, Tensorflow. So, I guess for you, its only tensorflow So, see line number: 255, I have made an individual function for all of them. Hope! this would help.

Sorry, for the late response. Any doubt, feel free.

sathyarr commented 5 years ago

--enable_batching turns on server-side batching, which means that the server will group together multiple inference requests it receives from clients and process them in a batch. It should be completely transparent to the client (other than performance). If by "script" you mean something that's sending requests as a client, then the same script should work.

(If you want to do client-side batching, that is also possible -- just have your client batch together multiple requests into a single RPC to the server.)

In Keras, we used to give batch_size upon prediction. How is that different from these batching settings in Tensorflow Serving?

I have a model trained in Keras as .hdf5 file. In Python, loading .hdf5 file and prediction takes 300 ms. INPUT:

[[   0.0,    1.0,    2.0,    3.0,    4.0],
 [   6.0,    7.0,    8.0,    9.0,    1876.0]]

After converting the .hdf5 file to SavedModelFormat(.pb), I tried to load the .pb in Java and did prediciton. But, it took 600 ms (twice the time) INPUT:

{
    "inputs": {
        "source_ids": [[   0.0,    1.0,    2.0,    3.0,    4.0],
                       [   6.0,    7.0,    8.0,    9.0, 1876.0]]
    }
}

Usually, TFServing should be faster!

aginpatrick commented 3 years ago

@chrisolston If I'm doing client-side batching, example my raw-data in curl request is something like:

instances = [{"b64_image":"_9j_4AA..."},{"b64_image":"_9j_4AA..."},...]
{"signature_name": "serving_default", "instances": instances}

, is it mandatory to set --enable_batching server-side or will batching be done automatically by the TF server?

Rytyka commented 8 months ago

Hi @aginpatrick and @sathyarr - wondering if you got answers to your questions, I'm curious about the same things for my use-case.

sathyarr commented 8 months ago

@Rytyka It's been years! I don't remember exactly! But ultimately tf-serving and Java were relatively faster I guess atleast without batching

Rytyka commented 8 months ago

Thanks for getting back to me :)