ShannonAI / service-streamer

Boosting your Web Services of Deep Learning Applications.
Apache License 2.0
1.23k stars 187 forks source link

queue.Empty and ValueError: semaphore or lock released too many times #41

Closed yongzhuo closed 4 years ago

yongzhuo commented 4 years ago

A.1 使用:gunicorn+flask+service-streamer(flask下启动了3个模型,一个端口(例如8804),3个app) batch_size=64 A.2 说明:

  1. 其中配置gunicorn+gevent启动报错(报错如C.报错信息)
  2. 其中配置gunicorn+tornado启动报错(报错如C.报错信息)
  3. uwsgi启动不报错,但是batch_size只能达到4-8的样子
  4. 开动3个端口,每个端口对应一个模型,启动,gunicorn+gevent,不报错,batch_size可以达到64

想问一下,可能是什么原因导致的

C.报错信息: Exception in thread thread_collect_result: Traceback (most recent call last): File "/home/ap/nlp/Anaconda3/lib/python3.6/multiprocessing/queues.py", line 105, in get raise Empty queue.Empty

During handling of the above exception, another exception occurred:

Traceback (most recent call last): File "/home/ap/nlp/Anaconda3/lib/python3.6/threading.py", line 916, in _bootstrap_inner self.run() File "/home/ap/nlp/Anaconda3/lib/python3.6/site-packages/gevent/threading.py", line 177, in run super(Thread, self).run() File "/home/ap/nlp/Anaconda3/lib/python3.6/threading.py", line 864, in run self._target(*self._args, **self._kwargs) File "/home/ap/nlp/myzhuo/nlp-platform/tools/service_streamer/service_streamer.py", line 103, in _loop_collect_result message = self._recv_response(timeout=1) File "/home/ap/nlp/myzhuo/nlp-platform/tools/service_streamer/service_streamer.py", line 268, in _recv_response message = self._output_queue.get(timeout=1) File "/home/ap/nlp/Anaconda3/lib/python3.6/multiprocessing/queues.py", line 111, in get self._rlock.release() ValueError: semaphore or lock released too many times

Meteorix commented 4 years ago

你好,gunicorn配置和streamer调用代码可以贴一下么

yongzhuo commented 4 years ago
  1. gunicorn配置:
    loglevel = 'info'
    bind = "0.0.0.0:8804"
    backlog = 2048
    worker_connections = 4096
    pidfile = join(base_logPath, "gunicorn.pid")
    workers = 1
    max_requests = 0 
    worker_class = 'gevent' 
    x_forwarded_for_header = 'X-FORWARDED-FOR'

2.streamer调用代码


from tools.service_streamer.service_streamer import Streamer
from flask import Flask, request, jsonify

def model_streamer(path_abs, batch_size=batch_size, max_latency=0.1, worker_num=worker_num, cuda_devices=cuda_devices):
    path_hyper_url = os.path.join(model_root_path, path_abs)
    model_pars = {"path_hyper_url": path_hyper_url}
    model = TextCnnWorkFlowBatch(model_pars)
    streamer = Streamer(predict_function_or_model=model,
                        batch_size=batch_size,
                        max_latency=max_latency,
                        worker_num=worker_num,
                        cuda_devices=cuda_devices)
    streamer._wait_for_worker_ready()
    return streamer

def model_predict(streamer_real):
    params = request.form if request.form else request.json
    sentences = params.get('texts', '')
    res00 = []
    res00 = streamer_real.predict(sentences)
    res = {'result_code': 0,
           'result_message': '调用成功',
           'prob_dist': res00
           }
    return res

streamer_11 = model_streamer(path_abs_11)
streamer_14 = model_streamer(path_abs_14)
streamer_90 = model_streamer(path_abs_90)

app = Flask(__name__)

@app.route('/text_classification/textcnn_11/predict', methods=["POST"])
def textcnn_11_predict():
    res = model_predict(streamer_11)
    return jsonify(content=res,
                   content_type='application/json;charset = utf-8',
                   status='200',
                   reason='success',
                   charset='utf-8')

@app.route('/text_classification/textcnn_14/predict', methods=["POST"])
def textcnn_14_predict():
    res = model_predict(streamer_14)
    return jsonify(content=res,
                   content_type='application/json;charset = utf-8',
                   status='200',
                   reason='success',
                   charset='utf-8')

@app.route('/text_classification/textcnn_90/predict', methods=["POST"])
def textcnn_90_predict():
    res = model_predict(streamer_90)
    return jsonify(content=res,
                   content_type='application/json;charset = utf-8',
                   status='200',
                   reason='success',
                   charset='utf-8')

if __name__ == '__main__':
    app.run()

3.模型加载


try:
    gpu_options = tf.GPUOptions(per_process_gpu_memory_fraction=self.gpu_memory_fraction)  # gpu_memory_fraction
    config = tf.ConfigProto(gpu_options=gpu_options)
    graph_new = tf.Graph()
    self.sess = tf.Session(graph=graph_new, config=config)
    with self.sess.as_default():
        with graph_new.as_default():
            self.model = self.Graph(self.params)
            self.saver = tf.train.Saver()
            path_checkpoint_latest = tf.train.latest_checkpoint(self.model_path)
            self.saver.restore(self.sess, path_checkpoint_latest)
except Exception as e:
    logger.info('loading graph error! error is {}'.format(str(e)))
yongzhuo commented 4 years ago

应该是该模式下多个进程共用一个Queue,所以只支持一个model