ShannonAI / service-streamer

Boosting your Web Services of Deep Learning Applications.
Apache License 2.0
1.23k stars 186 forks source link

为什么并发量上不去? #32

Closed rebornwwp closed 4 years ago

rebornwwp commented 5 years ago

您好,我最近在用您的开源框架做webservice后端多卡预测的部分,我们做的是目标检测方面的模型,想要增加后端的并发量,但是我用了多卡的时候,发现并不能提高我的并发,我把代码贴出来给您,您来看看是不是哪里有些问题呢。 或者说请问一下在这一块有没有一些改进的方法呢?

希望可以帮忙,谢谢, thanks.

# manager 
import torch
from service_streamer import ManagedModel

from .model import SSDModel

class ManagerSSDModel(ManagedModel):

    def init_model(self):
        self.model = SSDModel()

    def predict(self, batch):
        return self.model.predict(batch)

    @staticmethod
    def set_gpu_id(gpu_id=None):
        if gpu_id:
            torch.cuda.set_device(gpu_id)
# 模型对象
import base64
import io
import logging

import cv2
import numpy as np
import torch
from PIL import Image

from .data import BaseTransform
from .ssd_config import cuda, labelmap, trained_model

if cuda and torch.cuda.is_available():
    torch.set_default_tensor_type('torch.cuda.FloatTensor')
else:
    torch.set_default_tensor_type('torch.FloatTensor')

logging.basicConfig(level=logging.ERROR)

class SSDModel(object):
    def __init__(self):
        from .ssd import build_ssd

        self.model_path = trained_model
        self.net = build_ssd('test', 300, len(labelmap) + 1)
        self.net.load_state_dict(torch.load(self.model_path))
        self.net.cuda()
        self.net.eval()

    def predict(self, image_bytes_batch):
        image_tensors = []
        image_sizes = []
        for image_bytes in image_bytes_batch:
            tensor, size = self.transform_image(image_bytes=image_bytes)
            image_tensors.append(tensor)
            image_sizes.append(size)

        # tensor = torch.cat(image_tensors).cuda()
        # outputs = self.net.forward(tensor)

        result = {}
        for index, (x, size) in enumerate(zip(image_tensors, image_sizes)):
            width, height = size
            scale = torch.Tensor([width, height, width, height])

            positions = []
            x = x.cuda()
            y = self.net(x)
            detections = y.data
            for i in range(detections.size(1)):
                j = 0
                while detections[0, i, j, 0] >= 0.2:
                    score = float(detections[0, i, j, 0])
                    label_name = labelmap[i - 1]
                    position = (detections[0, i, j, 1] * scale).cpu().numpy()
                    d = dict(score=score,
                             label=label_name,
                             xmin=position[0],
                             ymin=position[1],
                             xmax=position[2],
                             ymax=position[3])
                    positions.append(dict(
                        score=float(score),
                        label=label_name,
                        xmin=int(position[0]),
                        ymin=int(position[1]),
                        xmax=int(position[2]),
                        ymax=int(position[3])
                    ))
                    j += 1
            result[index] = positions

        return result

    def transform_image(self, image_bytes):
        my_transforms = BaseTransform(self.net.size, (104, 117, 123))
        image = Image.open(io.BytesIO(image_bytes))
        return torch.from_numpy(my_transforms(image)[0]).permute(2, 0, 1).unsqueeze(0), image.size
# SSD模型
# 这里主要用到了 github上面的一个ssd的开源项目。https://github.com/amdegroot/ssd.pytorch

import os

import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.autograd import Variable

from .layers import *

# SSD300 CONFIGS
voc = {
    'num_classes': 10,
    'lr_steps': (80000, 100000, 120000),
    'max_iter': 120000,
    'feature_maps': [38, 19, 10, 5, 3, 1],
    'min_dim': 300,
    'steps': [8, 16, 32, 64, 100, 300],
    'min_sizes': [30, 60, 111, 162, 213, 264],
    'max_sizes': [60, 111, 162, 213, 264, 315],
    'aspect_ratios': [[2], [2, 3], [2, 3], [2, 3], [2], [2]],
    'variance': [0.1, 0.2],
    'clip': True,
    'name': 'VOC',
}

coco = {
    'num_classes': 201,
    'lr_steps': (280000, 360000, 400000),
    'max_iter': 400000,
    'feature_maps': [38, 19, 10, 5, 3, 1],
    'min_dim': 300,
    'steps': [8, 16, 32, 64, 100, 300],
    'min_sizes': [21, 45, 99, 153, 207, 261],
    'max_sizes': [45, 99, 153, 207, 261, 315],
    'aspect_ratios': [[2], [2, 3], [2, 3], [2, 3], [2], [2]],
    'variance': [0.1, 0.2],
    'clip': True,
    'name': 'COCO',
}

# This function is derived from torchvision VGG make_layers()
# https://github.com/pytorch/vision/blob/master/torchvision/models/vgg.py
def vgg(cfg, i, batch_norm=False):
    layers = []
    in_channels = i
    for v in cfg:
        if v == 'M':
            layers += [nn.MaxPool2d(kernel_size=2, stride=2)]
        elif v == 'C':
            layers += [nn.MaxPool2d(kernel_size=2, stride=2, ceil_mode=True)]
        else:
            conv2d = nn.Conv2d(in_channels, v, kernel_size=3, padding=1)
            if batch_norm:
                layers += [conv2d, nn.BatchNorm2d(v), nn.ReLU(inplace=True)]
            else:
                layers += [conv2d, nn.ReLU(inplace=True)]
            in_channels = v
    pool5 = nn.MaxPool2d(kernel_size=3, stride=1, padding=1)
    conv6 = nn.Conv2d(512, 1024, kernel_size=3, padding=6, dilation=6)
    conv7 = nn.Conv2d(1024, 1024, kernel_size=1)
    layers += [pool5, conv6,
               nn.ReLU(inplace=True), conv7, nn.ReLU(inplace=True)]
    return layers

def add_extras(cfg, i):
    # Extra layers added to VGG for feature scaling
    layers = []
    in_channels = i
    flag = False
    for k, v in enumerate(cfg):
        if in_channels != 'S':
            if v == 'S':
                layers += [nn.Conv2d(in_channels, cfg[k + 1],
                                     kernel_size=(1, 3)[flag], stride=2, padding=1)]
            else:
                layers += [nn.Conv2d(in_channels, v, kernel_size=(1, 3)[flag])]
            flag = not flag
        in_channels = v
    return layers

def multibox(vgg, extra_layers, cfg, num_classes):
    loc_layers = []
    conf_layers = []
    vgg_source = [21, -2]
    for k, v in enumerate(vgg_source):
        loc_layers += [nn.Conv2d(vgg[v].out_channels,
                                 cfg[k] * 4, kernel_size=3, padding=1)]
        conf_layers += [nn.Conv2d(vgg[v].out_channels,
                                  cfg[k] * num_classes, kernel_size=3, padding=1)]
    for k, v in enumerate(extra_layers[1::2], 2):
        loc_layers += [nn.Conv2d(v.out_channels, cfg[k]
                                 * 4, kernel_size=3, padding=1)]
        conf_layers += [nn.Conv2d(v.out_channels, cfg[k]
                                  * num_classes, kernel_size=3, padding=1)]
    return vgg, extra_layers, (loc_layers, conf_layers)

class SSD(nn.Module):
    """Single Shot Multibox Architecture
    The network is composed of a base VGG network followed by the
    added multibox conv layers.  Each multibox layer branches into
        1) conv2d for class conf scores
        2) conv2d for localization predictions
        3) associated priorbox layer to produce default bounding
           boxes specific to the layer's feature map size.
    See: https://arxiv.org/pdf/1512.02325.pdf for more details.

    Args:
        phase: (string) Can be "test" or "train"
        size: input image size
        base: VGG16 layers for input, size of either 300 or 500
        extras: extra layers that feed to multibox loc and conf layers
        head: "multibox head" consists of loc and conf conv layers
    """

    def __init__(self, phase, size=300, num_classes=21):
        base = {
            '300': [64, 64, 'M', 128, 128, 'M', 256, 256, 256, 'C', 512, 512, 512, 'M',
                    512, 512, 512],
            '512': [],
        }
        extras = {
            '300': [256, 'S', 512, 128, 'S', 256, 128, 256, 128, 256],
            '512': [],
        }
        mbox = {
            # number of boxes per feature map location
            '300': [4, 6, 6, 6, 4, 4],
            '512': [],
        }
        super(SSD, self).__init__()
        base, extras, head = multibox(vgg(base[str(size)], 3),
                                      add_extras(extras[str(size)], 1024),
                                      mbox[str(size)], num_classes)
        self.phase = phase
        self.num_classes = num_classes
        self.cfg = (coco, voc)[num_classes == 10]
        self.priorbox = PriorBox(self.cfg)
        self.priors = Variable(self.priorbox.forward(), volatile=True)
        self.size = size

        # SSD network
        self.vgg = nn.ModuleList(base)
        # Layer learns to scale the l2 normalized features from conv4_3
        self.L2Norm = L2Norm(512, 20)
        self.extras = nn.ModuleList(extras)

        self.loc = nn.ModuleList(head[0])
        self.conf = nn.ModuleList(head[1])

        if phase == 'test':
            self.softmax = nn.Softmax(dim=-1)
            self.detect = Detect(num_classes, 0, 200, 0.01, 0.45)

    def forward(self, x):
        """Applies network layers and ops on input image(s) x.

        Args:
            x: input image or batch of images. Shape: [batch,3,300,300].

        Return:
            Depending on phase:
            test:
                Variable(tensor) of output class label predictions,
                confidence score, and corresponding location predictions for
                each object detected. Shape: [batch,topk,7]

            train:
                list of concat outputs from:
                    1: confidence layers, Shape: [batch*num_priors,num_classes]
                    2: localization layers, Shape: [batch,num_priors*4]
                    3: priorbox layers, Shape: [2,num_priors*4]
        """
        sources = list()
        loc = list()
        conf = list()

        # apply vgg up to conv4_3 relu
        for k in range(23):
            x = self.vgg[k](x)

        s = self.L2Norm(x)
        sources.append(s)

        # apply vgg up to fc7
        for k in range(23, len(self.vgg)):
            x = self.vgg[k](x)
        sources.append(x)

        # apply extra layers and cache source layer outputs
        for k, v in enumerate(self.extras):
            x = F.relu(v(x), inplace=True)
            if k % 2 == 1:
                sources.append(x)

        # apply multibox head to source layers
        for (x, l, c) in zip(sources, self.loc, self.conf):
            loc.append(l(x).permute(0, 2, 3, 1).contiguous())
            conf.append(c(x).permute(0, 2, 3, 1).contiguous())

        loc = torch.cat([o.view(o.size(0), -1) for o in loc], 1)
        conf = torch.cat([o.view(o.size(0), -1) for o in conf], 1)
        if self.phase == "test":
            output = self.detect(
                loc.view(loc.size(0), -1, 4),  # loc preds
                self.softmax(conf.view(conf.size(0), -1,
                                       self.num_classes)),  # conf preds
                self.priors.type(type(x.data))  # default boxes
            )
        else:
            output = (
                loc.view(loc.size(0), -1, 4),
                conf.view(conf.size(0), -1, self.num_classes),
                self.priors
            )
        return output

    def load_weights(self, base_file):
        other, ext = os.path.splitext(base_file)
        if ext == '.pkl' or '.pth':
            print('Loading weights into state dict...')
            self.load_state_dict(torch.load(base_file,
                                            map_location=lambda storage, loc: storage))
            print('Finished!')
        else:
            print('Sorry only .pth and .pkl files supported.')

def build_ssd(phase, size=300, num_classes=10):
    if phase != "test" and phase != "train":
        print("ERROR: Phase: " + phase + " not recognized")
        return
    if size != 300:
        print("ERROR: You specified size " + repr(size) + ". However, " +
              "currently only SSD300 (size=300) is supported!")
        return
    return SSD(phase, size, num_classes)
# flask server
from gevent import monkey
from gevent.pywsgi import WSGIServer
from service_streamer import Streamer
from ssd_pytorch.model_manager import ManagerSSDModel

monkey.patch_all()
from flask import Flask, jsonify, request

app = Flask(__name__)

@app.route('/predictions', methods=['POST'])
def index():
    if request.method == 'POST':
        file = request.files['file']
        image_bytes = file.read()
        outputs = app.streamer.predict([image_bytes])
        return jsonify(outputs)

if __name__ == '__main__':
    streamer = Streamer(ManagerSSDModel,
                        batch_size=64,
                        max_latency=0.1,
                        worker_num=4,
                        cuda_devices=(0, 1, 2, 3))
    app.streamer = streamer
    WSGIServer(("0.0.0.0", 5005), app).serve_forever()
    # app.run(host='0.0.0.0', port=5005, debug=True)
# 测出来的并发量
$ wrk -c 128 -d 20s --timeout=20s -s file.lua http://127.0.0.1:5005/predictions
Running 20s test @ http://127.0.0.1:5005/predictions
  2 threads and 256 connections
  Thread Stats   Avg      Stdev     Max   +/- Stdev
    Latency    17.00s   487.75ms  17.84s    59.57%
    Req/Sec    74.81     81.52   282.00     75.00%
  188 requests in 20.05s, 172.03KB read
Requests/sec:      12.38
Transfer/sec:      8.58KB

$ nvidia-smi
Every 1.0s: nvidia-smi                                                                                                                                       Wed Sep 11 00:12:41 2019

Wed Sep 11 00:12:41 2019
+-----------------------------------------------------------------------------+
| NVIDIA-SMI 430.26       Driver Version: 430.26       CUDA Version: 10.2     |
|-------------------------------+----------------------+----------------------+
| GPU  Name        Persistence-M| Bus-Id        Disp.A | Volatile Uncorr. ECC |
| Fan  Temp  Perf  Pwr:Usage/Cap|         Memory-Usage | GPU-Util  Compute M. |
|===============================+======================+======================|
|   0  GeForce GTX 108...  Off  | 00000000:02:00.0 Off |                  N/A |
| 23%   29C    P8     8W / 250W |   2802MiB / 11178MiB |      0%      Default |
+-------------------------------+----------------------+----------------------+
|   1  GeForce GTX 108...  Off  | 00000000:03:00.0 Off |                  N/A |
| 23%   28C    P8     9W / 250W |    989MiB / 11178MiB |      0%      Default |
+-------------------------------+----------------------+----------------------+
|   2  GeForce GTX 108...  Off  | 00000000:82:00.0 Off |                  N/A |
| 34%   57C    P2    65W / 250W |   2112MiB / 11178MiB |     20%      Default |
+-------------------------------+----------------------+----------------------+
|   3  GeForce GTX 108...  Off  | 00000000:83:00.0 Off |                  N/A |
| 23%   31C    P8     9W / 250W |    989MiB / 11178MiB |      0%      Default |
+-------------------------------+----------------------+----------------------+

+-----------------------------------------------------------------------------+
| Processes:                                                       GPU Memory |
|  GPU       PID   Type   Process name                             Usage      |
|=============================================================================|
|    0     29661      C    path/bin/python   953MiB |
|    0     29662      C   pathbin/python  613MiB |
|    0     29663      C   bin/python  613MiB |
|    0     29664      C   bin/python  613MiB |
|    1     29662      C   bin/python   979MiB |
|    2     29663      C   /bin/python  979MiB |
|    3     29664      C   path/bin/python 979MiB |
+-----------------------------------------------------------------------------+
Meteorix commented 4 years ago

你好,首先你需要定位性能瓶颈在哪:

  1. 在大量并发的情况下,分别查看cpu和gpu的利用率
  2. 如果cpu是100%,那么说明瓶颈在cpu预处理transform_image之类的函数,那么你可以使用gunicorn多个web service,readme有写用法
  3. 如果gpu是100%,那么说明瓶颈在gpu,不过我猜并不是
  4. 如果两个都没有到100%,说明瓶颈在传输部分,那个另说

可以先试试再留言~

rebornwwp commented 4 years ago

看了下,确实是cpu的瓶颈,我先试试把程序改改。

谢谢了

zhongbin1 commented 4 years ago

@rebornwwp 你好,请问解决了吗?我也遇到同样的问题。

rebornwwp commented 4 years ago

初步确认是cpu的瓶颈,数据预处理的时候太消耗cpu了,需要对预处理代码做一些操作。我叫别人做的,自己没时间,不知道做的咋样了

zhongbin1 commented 4 years ago

好的,谢啦。我再改改看。

pharrellyhy commented 4 years ago

Hi,

我生产环境的架构是nginx + gunicorn + flask + tf-serving,部署在阿里云ECS上,现在想增加并发量。cpu的使用率很低,gpu能达到60-70%,压测用的是locust。我发现加一块gpu之后rps并不会线性增长。单个gpu rps在120左右,latency在300ms左右,加一个gpu rps只能增加到150左右,感觉可能传输是瓶颈,用service-streamer能起作用吗?Thanks.

Meteorix commented 4 years ago

@pharrellyhy 可以,用多进程模式,一个进程一块gpu,参考example

pharrellyhy commented 4 years ago

@Meteorix Thanks. I‘ll give it a try. BTW, service streamer的稳定性如何,我现在大概一天有100多万的请求量,会不会突然挂掉呢?

Meteorix commented 4 years ago

@Meteorix Thanks. I‘ll give it a try. BTW, service streamer的稳定性如何,我现在大概一天有100多万的请求量,会不会突然挂掉呢?

主要看qps吧,压力测试on your own了,可以另开一个issue讨论

pharrellyhy commented 4 years ago

@Meteorix OK, 我先试试. btw,因为我现在已经用了gunicorn + flask了,用service-streamer的话是不是就不用gunicorn了?

Meteorix commented 4 years ago

还是可以用gunicorn来实现多进程

pharrellyhy commented 4 years ago

@Meteorix Got it, thanks!