pytorch / serve

Serve, optimize and scale PyTorch models in production
https://pytorch.org/serve/
Apache License 2.0
4.14k stars 835 forks source link

Workflow API doesn't work with Kserve V2 envelope #1529

Open wjiuhe opened 2 years ago

wjiuhe commented 2 years ago

Context

Your Environment

I use the image pytorch/torchserve:0.5.3-cpu as the environment.

I test with two workflows:

mnist_workflow_handlers.py

import json
import base64

def print_data(data, context):
    print("+" * 50)
    print("node: print_data")
    print("data:", data)
    print("context:", context)
    if data is None:
        return data
    output = []
    for row in data:
        d = row.get("data") or row.get("body")
        # if isinstance(d, (bytes, bytearray)):
        #     d = d.decode('utf-8')
        output.append(d)
    return output

def test_data(data, context):
    print("+" * 50)
    print("node: test_data")
    print("data:", data)
    print("context:", context)
    if data is None:
        return data
    output = []
    for row in data:
        d = row.get("data") or row.get("body")
        # if isinstance(d, (bytes, bytearray)):
        #     d = d.decode('utf-8')
        output.append(d)
    return output

def combine_data(data, context):
    if data is None:
        return data
    print("+" * 50)
    print("node: combine_data")
    print("data:", data)
    print("context:", context)
    output = []
    for row in data:
        data_dict = {}
        for k in row:
            d = row.get(k)
            # if isinstance(d, (bytes, bytearray)):
            #     d = d.decode('utf-8')
            data_dict[k] = d
        output.append(data_dict)
    print([d.get("print_data") for d in output])
    return [d.get("print_data") for d in output]

mnist_workflow.yaml

models:
    min-workers: 1
    max-workers: 1
    batch-size: 4
    max-batch-delay: 100
    retry-attempts: 5
    timeout-ms: 300000

    mnist:
      url: mnist.mar

dag:
  print_data: [test_data, combine_data]
  test_data: [combine_data]
  combine_data: [mnist]

Expected Behavior

The workflow api should work out of box with kserve v2 envelope.

Current Behavior

The workflow api doesn't work with kserve v2 envelope.

Reasons

1.The envelope of the workflow function is not set correctly.

https://github.com/pytorch/serve/blob/6a191f822f2e759c2aad07b8f9eacbedcb039827/frontend/server/src/main/java/org/pytorch/serve/wlm/ModelManager.java#L122

While the models' envelope are set to the default envelope specified in config.properties, the workflow funcitons' envelope is null.

2. The request envelope kservev2.py cannot internal workflow requests and nested requests properly.

https://github.com/pytorch/serve/blob/master/ts/torch_handler/request_envelope/kservev2.py

If a node has multiple parent nodes, for example node A with result ["a"] and node B with result ["b"], the output of these node will be combined by the frontend into:

[
    {
        "A": {"outputs": ["data": ["a"]]},
        "B": {"outputs": ["data": ["b"]]},
    }
]

However, because there is no key data or body (only A and B)in this json, the envelope class cannot process the data.

Even if the data is found, the kserve v2 envelope can only handle inputs key of inputs other than outputs.

Also in order to pass it to the model properly as it does with null envelope, the request data needs to be converted into the form:

[
    "data": {
        "A": ["a"],
        "B": ["b"]
    }
]

Possible Solution

It is possible to set all the workflow model/functions' envelope to null, whatever the service_envelope specifed in the config.properties. However, this creates an inconsistence between the model api standard and the workflow api standard. And all the models in the workflow use null envelope while other models use another envelope. Therefore, it is not a desired solution.

In order to solve the issue and maintain the consistency between APIs of workflow and standalone models:

  1. Set the envelope of workflow functions to the default envelope as regular models (those have .mar files)
  2. Identify internal workflow requests between nodes, and let kserve v2 envelope be able to parse outputs data. For example, pass a special header from frontend, telling the backend this is an internal workflow request.
  3. Identify internal workflow requests type, whether it is a flat or nested data. For example, pass a special header from frontend, telling backend the request is flat or nested.
  4. Kserve v2 envelope handler will handle the data according to the header.

I've prepared a pull request and will submit it asap for you review

I divide the changes into separate commits targeting different problems with description in the commit message. Once all of the commits fulfill the requirements, I will add more unittests, rebase and squash the commits.

If possible, please help to give suggestions whether I'm on the right track of this problem.

Steps to Reproduce

  1. Pull the image pytorch/torchserve:0.5.3-cpu
  2. Don't set the envelope in config.properties. Run the container with the workflow mnist I provided above, or the example workflow dog_breed_classification. The prediction will succeed.
  3. Set the service_envelope=kservev2, run the container with the workflow in 2.. Now the prediction will fail

Failure Logs [if any]

I add some print in ts/model_service_worker.py:

class TorchModelServiceWorker(object):

    @staticmethod
    def load_model(load_model_request):
        try:
            model_dir = load_model_request["modelPath"].decode("utf-8")
            model_name = load_model_request["modelName"].decode("utf-8")
            handler = load_model_request["handler"].decode("utf-8") if load_model_request["handler"] else None
            envelope = load_model_request["envelope"].decode("utf-8") if "envelope" in load_model_request else None
            envelope = envelope if envelope is not None and len(envelope) > 0 else None
            print("=" * 50)
            print("model name:", model_name)
            print("envelope: ", envelope)

Below are the logs when loading the dog_breed_classification:

+ 2022-03-22T06:39:04,375 [INFO ] W-9024-dog_breed_wf_new__pre_processing_1.0-stdout MODEL_LOG - ==================================================
+ 2022-03-22T06:39:04,375 [INFO ] W-9024-dog_breed_wf_new__pre_processing_1.0-stdout MODEL_LOG - model_name: dog_breed_wf_new__pre_processing
+ 2022-03-22T06:39:04,375 [INFO ] W-9024-dog_breed_wf_new__pre_processing_1.0-stdout MODEL_LOG - envelope: None
2022-03-22T06:39:04,375 [INFO ] W-9024-dog_breed_wf_new__pre_processing_1.0-stdout MODEL_LOG - model_name: dog_breed_wf_new__pre_processing, batchSize: 1
2022-03-22T06:39:04,376 [INFO ] W-9024-dog_breed_wf_new__pre_processing_1.0 org.pytorch.serve.wlm.WorkerThread - Backend response time: 1
2022-03-22T06:39:04,376 [DEBUG] W-9024-dog_breed_wf_new__pre_processing_1.0 org.pytorch.serve.wlm.WorkerThread - W-9024-dog_breed_wf_new__pre_processing_1.0 State change WORKER_STARTED -> WORKER_MODEL_LOADED
2022-03-22T06:39:04,376 [INFO ] W-9024-dog_breed_wf_new__pre_processing_1.0 TS_METRICS - W-9024-dog_breed_wf_new__pre_processing_1.0.ms:959|#Level:Host|#hostname:8b026932f222,timestamp:1647931144
2022-03-22T06:39:04,376 [INFO ] W-9024-dog_breed_wf_new__pre_processing_1.0 TS_METRICS - WorkerThreadTime.ms:1|#Level:Host|#hostname:8b026932f222,timestamp:null
2022-03-22T06:39:04,875 [DEBUG] pool-6-thread-2 org.pytorch.serve.wlm.ModelVersionedRefs - Adding new version 1.0 for model dog_breed_wf_new__dog_breed_classification
2022-03-22T06:39:04,875 [DEBUG] pool-6-thread-2 org.pytorch.serve.wlm.ModelVersionedRefs - Setting default version to 1.0 for model dog_breed_wf_new__dog_breed_classification
2022-03-22T06:39:04,875 [INFO ] pool-6-thread-2 org.pytorch.serve.wlm.ModelManager - Model dog_breed_wf_new__dog_breed_classification loaded.
2022-03-22T06:39:04,875 [DEBUG] pool-6-thread-2 org.pytorch.serve.wlm.ModelManager - updateModel: dog_breed_wf_new__dog_breed_classification, count: 1
2022-03-22T06:39:04,876 [DEBUG] W-9026-dog_breed_wf_new__dog_breed_classification_1.0 org.pytorch.serve.wlm.WorkerLifeCycle - Worker cmdline: [/home/venv/bin/python, /home/venv/lib/python3.8/site-packages/ts/model_service_worker.py, --sock-type, unix, --sock-name, /home/model-server/tmp/.ts.sock.9026]
2022-03-22T06:39:05,040 [INFO ] W-9025-dog_breed_wf_new__cat_dog_classification_1.0-stdout MODEL_LOG - Listening on port: /home/model-server/tmp/.ts.sock.9025
2022-03-22T06:39:05,041 [INFO ] W-9025-dog_breed_wf_new__cat_dog_classification_1.0-stdout MODEL_LOG - [PID]14321
2022-03-22T06:39:05,041 [INFO ] W-9025-dog_breed_wf_new__cat_dog_classification_1.0-stdout MODEL_LOG - Torch worker started.
2022-03-22T06:39:05,041 [INFO ] W-9025-dog_breed_wf_new__cat_dog_classification_1.0-stdout MODEL_LOG - Python runtime: 3.8.0
2022-03-22T06:39:05,041 [DEBUG] W-9025-dog_breed_wf_new__cat_dog_classification_1.0 org.pytorch.serve.wlm.WorkerThread - W-9025-dog_breed_wf_new__cat_dog_classification_1.0 State change null -> WORKER_STARTED
2022-03-22T06:39:05,041 [INFO ] W-9025-dog_breed_wf_new__cat_dog_classification_1.0 org.pytorch.serve.wlm.WorkerThread - Connecting to: /home/model-server/tmp/.ts.sock.9025
2022-03-22T06:39:05,042 [INFO ] W-9025-dog_breed_wf_new__cat_dog_classification_1.0-stdout MODEL_LOG - Connection accepted: /home/model-server/tmp/.ts.sock.9025.
2022-03-22T06:39:05,042 [INFO ] W-9025-dog_breed_wf_new__cat_dog_classification_1.0 org.pytorch.serve.wlm.WorkerThread - Flushing req. to backend at: 1647931145042
+ 2022-03-22T06:39:05,042 [INFO ] W-9025-dog_breed_wf_new__cat_dog_classification_1.0-stdout MODEL_LOG - ==================================================
+ 2022-03-22T06:39:05,043 [INFO ] W-9025-dog_breed_wf_new__cat_dog_classification_1.0-stdout MODEL_LOG - model_name: dog_breed_wf_new__cat_dog_classification
+ 2022-03-22T06:39:05,043 [INFO ] W-9025-dog_breed_wf_new__cat_dog_classification_1.0-stdout MODEL_LOG - envelope: kservev2
2022-03-22T06:39:05,043 [INFO ] W-9025-dog_breed_wf_new__cat_dog_classification_1.0-stdout MODEL_LOG - model_name: dog_breed_wf_new__cat_dog_classification, batchSize: 4
2022-03-22T06:39:05,781 [INFO ] W-9026-dog_breed_wf_new__dog_breed_classification_1.0-stdout MODEL_LOG - Listening on port: /home/model-server/tmp/.ts.sock.9026
2022-03-22T06:39:05,782 [INFO ] W-9026-dog_breed_wf_new__dog_breed_classification_1.0-stdout MODEL_LOG - [PID]14334
2022-03-22T06:39:05,782 [INFO ] W-9026-dog_breed_wf_new__dog_breed_classification_1.0-stdout MODEL_LOG - Torch worker started.
2022-03-22T06:39:05,782 [INFO ] W-9026-dog_breed_wf_new__dog_breed_classification_1.0-stdout MODEL_LOG - Python runtime: 3.8.0
2022-03-22T06:39:05,782 [DEBUG] W-9026-dog_breed_wf_new__dog_breed_classification_1.0 org.pytorch.serve.wlm.WorkerThread - W-9026-dog_breed_wf_new__dog_breed_classification_1.0 State change null -> WORKER_STARTED
2022-03-22T06:39:05,782 [INFO ] W-9026-dog_breed_wf_new__dog_breed_classification_1.0 org.pytorch.serve.wlm.WorkerThread - Connecting to: /home/model-server/tmp/.ts.sock.9026
2022-03-22T06:39:05,783 [INFO ] W-9026-dog_breed_wf_new__dog_breed_classification_1.0-stdout MODEL_LOG - Connection accepted: /home/model-server/tmp/.ts.sock.9026.
2022-03-22T06:39:05,783 [INFO ] W-9026-dog_breed_wf_new__dog_breed_classification_1.0 org.pytorch.serve.wlm.WorkerThread - Flushing req. to backend at: 1647931145783
+ 2022-03-22T06:39:05,783 [INFO ] W-9026-dog_breed_wf_new__dog_breed_classification_1.0-stdout MODEL_LOG - ==================================================
+ 2022-03-22T06:39:05,783 [INFO ] W-9026-dog_breed_wf_new__dog_breed_classification_1.0-stdout MODEL_LOG - model_name: dog_breed_wf_new__dog_breed_classification
+ 2022-03-22T06:39:05,783 [INFO ] W-9026-dog_breed_wf_new__dog_breed_classification_1.0-stdout MODEL_LOG - envelope: kservev2
2022-03-22T06:39:05,783 [INFO ] W-9026-dog_breed_wf_new__dog_breed_classification_1.0-stdout MODEL_LOG - model_name: dog_breed_wf_new__dog_breed_classification, batchSize: 4
2022-03-22T06:39:06,366 [INFO ] W-9025-dog_breed_wf_new__cat_dog_classification_1.0-stdout MODEL_LOG - Missing the index_to_name.json file. Inference output will not include class name.
2022-03-22T06:39:06,366 [INFO ] W-9025-dog_breed_wf_new__cat_dog_classification_1.0 org.pytorch.serve.wlm.WorkerThread - Backend response time: 1324
2022-03-22T06:39:06,366 [DEBUG] W-9025-dog_breed_wf_new__cat_dog_classification_1.0 org.pytorch.serve.wlm.WorkerThread - W-9025-dog_breed_wf_new__cat_dog_classification_1.0 State change WORKER_STARTED -> WORKER_MODEL_LOADED
2022-03-22T06:39:06,366 [INFO ] W-9025-dog_breed_wf_new__cat_dog_classification_1.0 TS_METRICS - W-9025-dog_breed_wf_new__cat_dog_classification_1.0.ms:2231|#Level:Host|#hostname:8b026932f222,tim
sinking-point commented 11 months ago

Any plans to fix this?

Rakesh-Raushan commented 1 month ago

Any updates on support of torchserve workflow with Kserve?