Close pool while invoking Connection.execute, the future returned by Connection.execute will never resolve.
Anyone can reproduce this issue by doing this:
conn = yield self.db.getconn(False)
yield conn.execute("BEGIN;")
cursor = yield conn.execute(" \
DECLARE serv_cur CURSOR FOR \
SELECT * FROM some_table;")
cursor_future = conn.execute("FETCH 2000 FROM serv_cur;")
#at some where of our program,
#we close the connection through Pool.close() or just conn.close() like this:
conn.close()
#then our program will stuck at following line forever
cursor = yield cursor_future
result = cursor.fetchall()
yield conn.execute("CLOSE serv_cur;")
yield conn.execute("COMMIT;")
The possible solution
In Connection.execute(),Connection.callproc() and Connection.connect()
def execute(self,
operation,
parameters=(),
cursor_factory=None):
#......other code
#before return future, save the last one
assert self.current_future is None, "can not concurrently execute on same connection"
self.current_future = future
return future
In Connection._io_callback()
def _io_callback(self, future, result, fd=None, events=None):
try:
state = self.connection.poll()
except (psycopg2.Warning, psycopg2.Error) as error:
self.ioloop.remove_handler(self.fileno)
future.set_exc_info(sys.exc_info())
#clear last future
self.current_future = None
else:
try:
if state == POLL_OK:
self.ioloop.remove_handler(self.fileno)
future.set_result(result)
#clear last future
self.current_future = None
elif state == POLL_READ:
self.ioloop.update_handler(self.fileno, IOLoop.READ)
elif state == POLL_WRITE:
self.ioloop.update_handler(self.fileno, IOLoop.WRITE)
else:
future.set_exception(psycopg2.OperationalError("poll() returned %s" % state))
#clear last future
self.current_future = None
except IOError:
# Can happen when there are quite a lof of outstanding
# requests. See https://github.com/FSX/momoko/issues/127
self.ioloop.remove_handler(self.fileno)
future.set_exception(psycopg2.OperationalError("IOError on socker"))
#clear last future
self.current_future = None
In Connection.close()
def close(self):
if not self.closed:
self.connection.close()
if self.current_future:
self.current_future.set_exception(psycopg2.OperationalError("database connection disconnected"))
self.current_future = None
Close pool while invoking Connection.execute, the future returned by Connection.execute will never resolve.
Anyone can reproduce this issue by doing this:
The possible solution
In
Connection.execute()
,Connection.callproc()
andConnection.connect()
In
Connection._io_callback()
In Connection.close()