FSX / momoko

Wraps (asynchronous) Psycopg2 for Tornado.
http://momoko.61924.nl/
Other
362 stars 73 forks source link

self.conns.empty() in Pool.close will not work as expected by big chance #141

Open masknu opened 8 years ago

masknu commented 8 years ago

When there are busy connections at the time you invoking Pool.close(), after close_alive() being called, those busy connections will be put back to self.dead of ConnectionContainer at next io_loop iteration if connections are managed by Pool.manage(). Thus, the empty() call next to close_alive() would be meaningless, and it will fire the assert by wrong reason:assert conn in self.busy, "Tried to release non-busy connection"

def close(self):
    if self.closed:
        raise PoolError('connection pool is already closed')

    self.conns.close_alive()
    #following line would be meaningless if there are busy connections out there
    self.conns.empty()
    self.closed = True
haizaar commented 8 years ago

You are right. However it looks like the issue is even deeper - if I call Pool.close() in the middle operation, the retry logic will reestablish the connection, so at the end Pool.close() does not do its jobs well.

I think the retry logic should not act if Pool.closed is true; and yes, Pool.close() should not empty connection sets .

Also I wonder whether public API should assert not self.closed before doing anything.

What do you think?

masknu commented 8 years ago

@haizaar , I agree with your concern. I tried a few changes in momoko to make it work right under any closing time, some changes like:

def connect(self):
    future = Future()
    assert self.conns.total <= self.size, "total should not greater than size"
    size = self.size - self.conns.total

    #add check here
    if 0 == size and self.conns.all_dead:

        def on_getconn(fut):
            try:
                conn = fut.result()
            except Exception as error:
                future.set_exc_info(sys.exc_info())
            else:
                future.set_result(self)
                self.putconn(conn)

        self.ioloop.add_future(self.getconn(), on_getconn)
        return future

    #change pending count here
    pending = [max(size-1, 0)]

    def on_connect(fut):
        if pending[0]:
            pending[0] -= 1
            return
        # all connection attempts are complete
        if self.conns.dead and self.raise_connect_errors:
            ecp = PartiallyConnectedError("%s connection(s) failed to connect" % len(self.conns.dead))
            future.set_exception(ecp)
        else:
            future.set_result(self)
        log.debug("All initial connection requests complete")

    #create connection use altered size
    for i in range(size):
        self.ioloop.add_future(self._new_connection(), on_connect)

    return future

After the changes we made, I prefer to use Pool.getconn at the beginning instead of Pool.conncet, It could make life simple, change connection.L316 in Pool.getconn to:

            if self.conns.total:
                self.ioloop.add_future(self._reanimate(), on_reanimate_done)
            else:
                #new db object, and haven't invoked connect(),start with one connection
                self.ioloop.add_future(self._new_connection(), on_reanimate_done)
haizaar commented 8 years ago

Frankly I'm a bit swamped and hardly will get to work on it until middle next week. If you want to submit PRs with unittests for each of the issues, I'll be happy to review and merge them. Do you want to give it a shot?