muayyad-alsadi / python-PooledProcessMixIn

Fast Concurrent Pool of preforked-processes and threads MixIn for python's socket server
31 stars 4 forks source link

Memory leak #7

Open teocns opened 3 years ago

teocns commented 3 years ago

After around 24 hours of running and accommodating 100.000s of requests, the script never frees memory and builds up to even 32GB.


class Handler(BaseHTTPRequestHandler):
    handle_request: Callable

    def __init__(self, *args, **kwargs):
        self.handle_request = kwargs['handle_request']
        del kwargs['handle_request']
        super(Handler,self).__init__(*args, **kwargs)

    def setup(self) -> None:
        self.timeout = HTTP_REQUEST_TIMEOUT
        BaseHTTPRequestHandler.setup(self)
        self.request.settimeout(HTTP_REQUEST_TIMEOUT)

    def do_POST(self):
        try:
            length = int(self.headers.get('content-length'))

            b64_auth = self.headers.get('authorization')

            if b64_auth != HTTP_SERVICE_AUTHENTICATION_B64:
                self.send_response(401)
                self.end_headers()
                return

            js = json.loads(self.rfile.read(length))
            #self.rfile.read(length)
            #print(js)
            response = self.handle_request(js)
            self.send_response(200)
            self.end_headers()
            self.wfile.write(json.dumps(response).encode('utf-8'))
        except Exception as ex:
            self.send_response(503, json.dumps(ex.__dict__))
            self.end_headers()
            pass

    def do_GET(self):
        self.send_response(200)
        self.end_headers()

class ThreadingSimpleServer(PooledProcessMixIn, HTTPServer):
    def __init__(self,bind_addr, handler, processes, threads) -> None:
        self._process_n=processes  # if not set will default to number of CPU cores
        self._thread_n=threads  # if not set will default to number of threads
        HTTPServer.__init__(self, bind_addr, handler)
        #self._init_pool() # this is optional, will be called automatically
muayyad-alsadi commented 3 years ago

My suggestion to you is to use wsgi standard and use uwsgi to serve it. I'll look into this to see where is the leak come from.

teocns commented 3 years ago

Thanks for the suggestion. WSGI seems an unnecessarily complex solution in my case, whereas a docker with maxrss could easily solve the issue.

But in either way, I am looking forward to identify the memory leak

muayyad-alsadi commented 3 years ago

wsgi is just a callable that accepts two arguments env dict and start_response function (to indicate response code and headers) and then return the body iterable, like this

# web.py
def application(env, start_response):
    start_response('200 OK', [('Content-Type','text/html')])
    return [b"Hello World"]

the above two lines function is a complete wsgi application

you can serve it in million ways, my favorite is cherrypy and uwsgi. for example you can run it with (different distros might have different names/version for python plugin, different package names)

uwsgi --plugin python3 -w web:application --http-socket=0.0.0.0:8080
muayyad-alsadi commented 3 years ago

I took a look at your code, I don't see any obvious leak, are you sure that self.handle_request(js) does not have a leak

    def __init__(self, *args, **kwargs):
            self.handle_request = kwargs['handle_request']
# ...
            response = self.handle_request(js)
muayyad-alsadi commented 3 years ago

BTW uwsgi has the following options, even if your handle_request(js) has a leak, it can configured to start fresh after number of requests or passing time.

    -R|--max-requests                     reload workers after the specified amount of managed requests
    --min-worker-lifetime                 number of seconds worker must run before being reloaded (default is 60)
    --max-worker-lifetime                 reload workers after the specified amount of seconds (default is disabled)
teocns commented 3 years ago

Will definitely give a try to cherrypy.

response = self.handle_request(js)

Just invokes a further request

def handle_request(**kwargs):
         return     requests.post(**kwargs)

It is substantially a scraper service. I have defined handle_request in a very broad way.