Broken pipe on big response tensors #2849

hariom-qure opened 9 months ago

hariom-qure commented 9 months ago

🐛 Describe the bug

We have a model which essentially does image segmentation of sorts.
The output tensor is of this size: [batch, 920, 920], fp32.

I keep getting broken pipe errors in this:

From my debugging, it essentially fails after I return this tensor from my postprocess method in base handler.
Is there a limit to response size for torchserve?
Thanks for the help!

Error logs

the main container logs:

hariomapp-torchserve-1  | java.lang.InterruptedException: null
hariomapp-torchserve-1  |   at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos( ~[?:?]
hariomapp-torchserve-1  |   at java.util.concurrent.LinkedBlockingDeque.pollFirst( ~[?:?]
hariomapp-torchserve-1  |   at java.util.concurrent.LinkedBlockingDeque.poll( ~[?:?]
hariomapp-torchserve-1  |   at org.pytorch.serve.wlm.Model.pollBatch( ~[model-server.jar:?]
hariomapp-torchserve-1  |   at org.pytorch.serve.wlm.BatchAggregator.getRequest( ~[model-server.jar:?]
hariomapp-torchserve-1  |   at [model-server.jar:?]
hariomapp-torchserve-1  |   at java.util.concurrent.ThreadPoolExecutor.runWorker( [?:?]
hariomapp-torchserve-1  |   at java.util.concurrent.ThreadPoolExecutor$ [?:?]
hariomapp-torchserve-1  |   at [?:?]

Model logs

2023-12-12T07:11:26,936 [INFO ] W-9000-msk_fracture_4.0.0-stdout MODEL_LOG - Backend worker process died.
2023-12-12T07:11:26,936 [INFO ] W-9000-msk_fracture_4.0.0-stdout MODEL_LOG - Traceback (most recent call last):
2023-12-12T07:11:26,936 [INFO ] W-9000-msk_fracture_4.0.0-stdout MODEL_LOG -   File "/home/venv/lib/python3.9/site-packages/ts/", line 258, in <module>
2023-12-12T07:11:26,936 [INFO ] W-9000-msk_fracture_4.0.0-stdout MODEL_LOG -     worker.run_server()
2023-12-12T07:11:26,936 [INFO ] W-9000-msk_fracture_4.0.0-stdout MODEL_LOG -   File "/home/venv/lib/python3.9/site-packages/ts/", line 226, in run_server
2023-12-12T07:11:26,936 [INFO ] W-9000-msk_fracture_4.0.0-stdout MODEL_LOG -     self.handle_connection(cl_socket)
2023-12-12T07:11:26,936 [INFO ] W-9000-msk_fracture_4.0.0-stdout MODEL_LOG -   File "/home/venv/lib/python3.9/site-packages/ts/", line 183, in handle_connection
2023-12-12T07:11:26,936 [INFO ] W-9000-msk_fracture_4.0.0-stdout MODEL_LOG -     cl_socket.sendall(resp)
2023-12-12T07:11:26,936 [INFO ] W-9000-msk_fracture_4.0.0-stdout MODEL_LOG - BrokenPipeError: [Errno 32] Broken pipe
2023-12-12T07:11:28,676 [INFO ] W-9000-msk_fracture_4.0.0-stdout MODEL_LOG - s_name_part0=/home/model-server/tmp/.ts.sock, s_name_part1=9000, p

Installation instructions

Using docker, simply ran the stock image in dockerhub

compose file:

version: '3'
    image: pytorch/torchserve:latest-gpu
      - 9080:8080
      - 9081:8081
      - 9082:8082
      - 7070:7070
      - 7071:7071
      - ./modelstore:/home/model-server/model-store
      - TS_METRICS_MODE=prometheus
    command: torchserve --model-store /home/model-server/model-store

Model Packaing

I simply take a tensor as input and return raw tensor generated by model in output.
Essentially I get a tuple[dict[str, Tensor], dict[str, Tensor]] from the model, all tensor values would have the same size and have the batch size as first dimension.


from ts.torch_handler.base_handler import BaseHandler
import pickle
import base64
import logging
import torch

logger = logging.getLogger(__name__)

class ModelHandler(BaseHandler):
    def preprocess(self, data):
        all_tensors = [pickle.loads(d["body"]) for d in data]
        result =, 0)
        return result

    def _single_result(self, data, i):
        we get this:
            "90_rot": tensor[1.000, 2.999, etc.],
            ...other keys, same structure

        We take the index'th element out in value, so its tensor[1.00] but its size is torch.Size([])
        t[i].tolist() gives a number, the actual number we want to send back
        But remote expects a [number] format, so we send that
        return {
            k: [v[i].tolist()] for k, v in data.items()

    def _get_len_batch(self, data):
        """The final dict has a str[dict, tensor[length]]. The length is the batch size

        It is guaranteed that for each key, the length of the tensor is the same

        key = next(iter(data))
        return len(data[key])

    def _single_tuple(self, data, i):
        return {
            "part_wise": self._single_result(data[0], i),
            "other_fracture":  base64.b64encode(pickle.dumps(data[1])).decode('utf8')

    def postprocess(self, data):
        length = self._get_len_batch(data[0])
        print("size of fracture tensor", data[1]["other_fracture"].size())
        result = [self._single_tuple(data, i) for i in range(length)]
        return result

default of docker image pytorch/torchserve:latest-gpu, pulled approximately a week ago



pip list output (cant find the script in docker container):


torch                   2.1.0+cu121
torch-model-archiver    0.9.0
torch-workflow-archiver 0.2.11
torchaudio              2.1.0+cu121
torchdata               0.7.0
torchserve              0.9.0
torchtext               0.16.0+cpu
torchvision             0.16.0+cu121

Repro instructions

  torch-model-archiver --model-name "${name}" --version 4.0.0 --serialized-file "" --handler ""

Possible Solution

I feel its the response size, might be wrong. I tried serializing it to bytes in postprocess, but its not able to finish anything more than a batch of size 1. Used this for serializing (no specific reason for choosing this, i found it on google randomly):

return base64.b64encode(pickle.dumps(data)).decode('utf8')
lxning commented 9 months ago

There is a parameter max_response_size. Could you please increase the value to see if it works for you?

agunapal commented 9 months ago

@hariom-qure As @lxning suggested increasing max_response_size can help in many cases. Sometimes, when I have had huge data to send back, in addition to increasing max_response_size, I have used something like this in postprocess to solve this

def postprocess(self, data):
        # Convert the mask array to a string object
        class NumpyArrayEncoder(json.JSONEncoder):
            def default(self, obj):
                if isinstance(obj, np.ndarray):
                    # return obj.tolist()
                    return "__nparray__" + json.dumps(obj.tolist())
                return json.JSONEncoder.default(self, obj)

        json_data = json.dumps({"data": data}, cls=NumpyArrayEncoder)

        # Compress the string
        compressed_data = zlib.compress(json_data.encode("utf-8"))

        # Serialize the compressed data using Pickle
        serialized_data = pickle.dumps(compressed_data)

        # Encode the serialized data as Base64
        base64_encoded_data = base64.b64encode(serialized_data).decode("utf-8")
IonBoleac commented 9 months ago

@hariom-qure I suggest to try this code if you want.

def postprocess(self, data):
        length = self._get_len_batch(data[0])
        print("size of fracture tensor", data[1]["other_fracture"].size())
        result = [self._single_tuple(data, i) for i in range(length)]
        output = []
        output.append({"result": result})
        return result

I had a simile problem and i resolve with this.