pytorch / serve

Serve, optimize and scale PyTorch models in production
https://pytorch.org/serve/
Apache License 2.0
4.14k stars 835 forks source link

relative import error with `custom handler` #689

Closed pushpendrapratap closed 3 years ago

pushpendrapratap commented 3 years ago

Context

Your Environment

Expected Behavior

Current Behavior

Possible Solution

Steps to Reproduce

  1. traced model is in directory models/
  2. custom model handler and extra files are in src/
  3. torch-model-archiver --model-name r2plus1d --version 1.0 --serialized-file ./models/r2plus1d_8_kinetics_100_epochs.pt --extra-files ./src/utils.py --handler ./src/model_handler.py --export-path ./models/ -f
  4. torchserve --start --ncs --model-store ./models/ --models r2plus1d=r2plus1d.mar

relevant code snippets and files

  1. model_handler.py
    
    import os
    import json
    import logging
    from time import time
    from typing import Dict, List, Tuple

import torch import numpy as np from PIL import Image import torch.nn as nn from torchvision import transforms as torchTF from torchvision.transforms import Compose from decord import VideoReader, cpu from smart_open import open as sm_open from ts.torch_handler.base_handler import BaseHandler

from .utils import set_seed

set_seed(0) logger = logging.getLogger(name)

DEFAULT_MEAN = (0.43216, 0.394666, 0.37645) DEFAULT_STD = (0.22803, 0.22145, 0.216989)

class ModelHandler: def init(self): self.model = None self.context = None self.manifest = None self.initialized = False self.device, self.map_location = None, "cpu" self.batch_size, self.sample_length, self.width, self.height = 8, 50, 132, 132 self.transform = Compose( [ torchTF.Lambda(lambda fms: [torchTF.Resize(128)(fm) for fm in fms]), torchTF.Lambda(lambda fms: [torchTF.CenterCrop(112)(fm) for fm in fms]),

[T, H, W, C] -> [T, C, H, W]

            torchTF.Lambda(
                lambda fms: torch.stack([torchTF.ToTensor()(fm) for fm in fms])
            ),
            # [T, C, H, W] -> [T, C, H, W]
            torchTF.Lambda(
                lambda fms: torch.stack(
                    [torchTF.Normalize(DEFAULT_MEAN, DEFAULT_STD)(fm) for fm in fms]
                )
            ),
            # [T, C, H, W] -> [C, T, H, W]
            torchTF.Lambda(lambda ts: ts.permute(1, 0, 2, 3)),
        ]
    )

def initialize(self, context):
    """
    Initialize model. This will be called during model loading time
    :param context: Initial context contains model server system properties.
    :return:
    """
    self.context = context
    self.device = torch.device("cpu")
    self.manifest = self.context.manifest
    self._batch_size = self.context.system_properties["batch_size"]
    properties = self.context.system_properties
    model_dir = properties.get("model_dir")
    serialized_file = self.manifest["model"]["serializedFile"]
    model_pt_path = os.path.join(model_dir, serialized_file)
    logger.info("properties: {}".format(properties))
    logger.info("self.manifest: {}".format(self.manifest))
    logger.info("self._batch_size: {}".format(self._batch_size))
    if not os.path.isfile(model_pt_path):
        raise RuntimeError("Missing the model.pt file")
    self.model = torch.jit.load(model_pt_path, map_location=self.map_location)
    self.model.to(self.device)
    self.model.eval()
    logger.info("Model file: {}, loaded successfully".format(model_pt_path))
    self.initialized = True

def _predict(self, frames, transform) -> np.ndarray:
    """Runs prediction on frames applying transforms before predictions."""
    clip = frames
    # Transform frames and append batch dim
    sample = torch.unsqueeze(transform(clip), 0)
    marshalled_data = sample.to(self.device)
    with torch.no_grad():
        output = self.model(marshalled_data)
    scores = nn.functional.softmax(output, dim=1).data.cpu().numpy()[0]
    return np.array(scores)

def _filter_labels(
    self,
    id_score_dict: dict,
    labels: List[str],
    threshold: float = 0.0,
    target_labels: List[str] = None,
    filter_labels: List[str] = None,
) -> Dict[str, int]:
    """Given the predictions, filter out the noise based on threshold,
    target labels and filter labels.

    Arg:
        id_score_dict: dictionary of predictions
        labels: all labels
        threshold: the min threshold to keep prediction
        target_labels: exclude any labels not in target labels
        filter_labels: exclude any labels in filter labels

    Returns
        A dictionary of labels and scores
    """

    # Show only interested actions (target_labels) with a confidence score >= threshold
    result = {}
    for i, s in id_score_dict.items():
        label = labels[i]
        if (
            (s < threshold)
            or (target_labels is not None and label not in target_labels)
            or (filter_labels is not None and label in filter_labels)
        ):
            continue
        if label in result:
            result[label] += s.item()
        else:
            result[label] = s.item()
    return result

def predict_frames(
    self,
    window: List,
    averaging_size: int,
    score_threshold: float,
    labels: List[str],
    target_labels: List[str],
    transforms: Compose,
) -> Dict[str, float]:
    """ Predicts frames """

    t = time()
    scores = self._predict(window, transforms)
    dur = time() - t
    if len(labels) >= averaging_size:
        num_labels = averaging_size
    else:
        num_labels = len(labels) - 1
    top5_id_score_dict = {
        i: scores[i] for i in (-scores).argpartition(num_labels - 1)[:num_labels]
    }
    top5_label_score_dict = self._filter_labels(
        top5_id_score_dict,
        labels,
        threshold=score_threshold,
        target_labels=target_labels,
    )
    topn = sorted(top5_label_score_dict.items(), key=lambda kv: -kv[1])
    logger.info("{} fps, topn: {}".format(len(window) // dur, topn))
    return topn

def predict_video(
    self,
    video_fpath: str,
    labels: List[str] = None,
    averaging_size: int = 5,
    score_threshold: float = 0.025,
    target_labels: List[str] = None,
    transforms: Compose = None,
) -> Dict[str, List[Tuple[str, float]]]:
    """Load video and show frames and inference results while displaying the results"""

    result, nclip = {}, 0
    try:
        with sm_open(video_fpath, "rb") as f:
            # video_reader = VideoReader(f, ctx=cpu(0), width=self.width, height=self.height)
            video_reader = VideoReader(f, ctx=cpu(0))
            logger.info("Total frames = {}".format(len(video_reader)))
    except Exception as err:
        logger.error("error_type: {}, error: {}".format(type(err), err))
    window = []
    if not labels:
        labels = [
            "other 2-pointer success",
            "other 2-pointer failure",
            "layup success",
            "free-throw success",
            "3-pointer success",
            "3-pointer failure",
            "free-throw failure",
            "layup failure",
        ]
    else:
        raise ("No labels found, add labels argument.")
    if not transforms:
        transforms = self.transform
    while True:
        try:
            frame = video_reader.next().asnumpy()
            if len(frame.shape) != 3:
                break
            pil_frame = torchTF.ToPILImage()(frame)
            window.append(pil_frame)
            if (
                len(window) == len(video_reader)
                and len(window) < self.sample_length
            ):
                while len(window) < self.sample_length:
                    window.append(window[-1].copy())
            if len(window) == self.sample_length:
                idxs = np.round(
                    np.linspace(
                        nclip * self.sample_length,
                        (nclip + 1) * self.sample_length - 1,
                        self.batch_size,
                    )
                ).astype(int)
                sampled_window = [
                    window[i - (nclip * self.sample_length)] for i in idxs
                ]
                ans = self.predict_frames(
                    sampled_window,
                    averaging_size,
                    score_threshold,
                    labels,
                    target_labels,
                    transforms,
                )
                result["clip_" + str(nclip)] = ans
                nclip += 1
                window = []
        except StopIteration:
            break
        except Exception as err:
            logger.error("error_type: {}, error: {}".format(type(err), err))
            break
    return result

def preprocess(self, data) -> str:
    # Take the input data/batch and pre-process it, make it inference ready
    req_body = data[0].get("data")
    if req_body is None:
        req_body = data[0].get("body")
    video_uri = req_body.get("video_url")
    logger.info("video_uri: {}".format(video_uri))
    return video_uri

def inference(self, model_input: str) -> Dict[str, List[Tuple[str, float]]]:
    # Do some inference call to engine here and return output
    result = self.predict_video(model_input)
    return result

def postprocess(self, inference_output) -> List:
    # Take output from network and post-process to desired format
    processed_output = json.dumps(inference_output)
    return [processed_output] * self._batch_size

def handle(self, data, context):
    model_input = self.preprocess(data)
    model_out = self.inference(model_input)
    response = self.postprocess(model_out)
    return response

_service = ModelHandler()

def handle(data, context): try: if not _service.initialized: _service.initialize(context) if data is None: return None return _service.handle(data, context) except Exception as err: logger.error("error_type: {}, error: {}".format(type(err), err)) raise err


2. utils.py

import os import random

import torch import numpy as np

def set_seed(seed: int): random.seed(seed) np.random.seed(seed) torch.manual_seed(seed) os.environ["PYTHONHASHSEED"] = str(seed) if torch.cuda.is_available(): torch.cuda.manual_seed(seed) torch.cuda.manual_seed_all(seed) torch.backends.cudnn.deterministic = True torch.backends.cudnn.benchmark = False



**NOTE:** I've used `utils.py` just to give an example of `relative import error` with `--extra-files` in case of custom handlers. In the actual codebase, there are lot more dependency files on which `model_handler` depends.

3. [r2plus1d_8_kinetics_100_epochs.pt](https://drive.google.com/file/d/1VBB6vGCtq-5XB8lKc07O5qGfm_xaG0ph/view?usp=sharing) - size is around 243 MB
HamidShojanazeri commented 3 years ago

@pushpendrapratap would you please try to set the util file path in the initialize() method as utils = os.path.join(model_dir, "utils.py"), then import utils where its needed. Please let me know if that helps.

harshbafna commented 3 years ago

@pushpendrapratap:

The model-archive is extracted in a temporary directory (model_dir) and added to the PYTHONPATH but it is not a package hence the relative import statement fails.

Use the following import statement

from utils import set_seed

instead of

from .utils import set_seed
pushpendrapratap commented 3 years ago

@HamidShojanazeri thanks for the suggestion but in my case, it will not help. Here is a reason: Let's say I've a script utils.py which in turn import other python scripts (e.g., video_transforms.py), all imports are relative and not absolute. Now, in this scenario, your suggested approach will not work.

pushpendrapratap commented 3 years ago

@harshbafna absolute import works. My bad, I think I didn't clarify it well that actually I was looking for any better way to achieve the same. The reason was that, in order to just deploy my model using torchserve, I've to create lots of duplicate scripts (--extra-files) and change relative import to absolute import in all of them.

Can you suggest any resources for torchserve best practices? Thanks

harshbafna commented 3 years ago

@pushpendrapratap, There can be a couple of approaches to supply multiple python dependency files while creating the mar file :

If it is a python project with setup.py, you can create a binary, supply it with --extra-files and add a requirements.txt file with an entry for your project binary.

For more details on model-specific requirements.txt refer documentation

pushpendrapratap commented 3 years ago

@harshbafna Thanks for your response. Yes, I think for the time being I've to go with the above approach. But I really wish if the same codebase could have been used to serve the inference request (like if I'm going to use Flask or Starlette, all I've to do is add an app.py file and I'll be good to go).

pushpendrapratap commented 3 years ago

Closing this issue as absolute import fixes the above issue.

csaroff commented 2 years ago

In case you came here looking for a way to define your handler in nbdev(which uses relative imports), this is what worked for me.

...
from fastai.vision.all import * # Just including this here for the Path import

## Standalone boilerplate before relative imports
## Allows the nbdev relative imports to work with torchserve
if not __package__ and '__file__' in locals():
    DIR = Path(__file__).resolve().parent
    sys.path.insert(0, str(DIR.parent))
    __package__ = DIR.name

from my_nbdev_package.my_module import *
...