FSX / momoko

Wraps (asynchronous) Psycopg2 for Tornado.
http://momoko.61924.nl/
Other
363 stars 73 forks source link

Contain future resolution to a method (peewee) #116

Closed kolanos closed 9 years ago

kolanos commented 9 years ago

I'm trying to override peewee's PostgresqlDatabase psycopg2 driver with momoko. However, I am having trouble getting the future to resolve before being passed on.

class PostgresqlAsyncDatabase(peewee.PostgresqlDatabase):
    def _connect(self, database, encoding=None, **kwargs):
        dsn_params = ['dbname=%s' % database]
        if 'user' in kwargs:
            dsn_params.append('user=%s' % kwargs.pop('user'))
        if 'password' in kwargs:
            if kwargs['password']:
                dsn_params.append('password=%s' % kwargs.pop('password'))
            else:
                kwargs.pop('password')
        if 'host' in kwargs:
            dsn_params.append('host=%s' % kwargs.pop('host'))
        if 'port' in kwargs:
            dsn_params.append('port=%s' % kwargs.pop('port', 5432))
        dsn_params = ' '.join(dsn_params)
        conn = momoko.Pool(
            dsn=dsn_params,
            setsession=('SET TIME ZONE UTC',),
            **kwargs)
        return conn

    @gen.coroutine
    def execute_sql(self, sql, params=None, require_commit=True):
        params = params or ()
        peewee.logger.debug((sql, params))
        conn = self.get_conn()
        if require_commit and self.get_autocommit():
            cursors = yield conn.transaction(((sql, params), ))
            for i, cursor in enumerate(cursors):
                pass
        else:
            cursor = yield conn.execute(sql, params)
        raise gen.Return(cursor)

However when I attempt to use execute_sql() like so:

class TestHandler(RequestHandler):
    def get(self):
        cursor = self.db.execute_sql('SELECT * FROM users')
        self.write('%s' % cursor.fetchone())

I get:

AttributeError: 'Future' object has no attribute 'fetchone'

But this works:

class TestHandler(RequestHandler):
    @gen.coroutine
    def get(self):
        cursor = yield self.db.execute_sql('SELECT * FROM users')
        self.write('%s' % cursor.fetchone())

Is there any way to contain the future resolution to the execute_sql method? Or will any method that calls execute_sql() also need an @gen.coroutine decorator and a yield statement?

haizaar commented 9 years ago

The simple answer is no.

The ways it works is that you give Tornado futures and it returns you results. I.e. for something async to happen you need to give control back to IOloop. You can obtain IOloop instance in execute_sql, and add future to it, but still, you somehow need to tell to Tornado to wait and resume your get method once the future is ready. And the only way to tell it is to decorate your get with @gen.coroutine.

Hope this helps, Zaar

kolanos commented 9 years ago

Ok, I think I understand. So why I tried to use ioloop.run_sync() to hand it back to the ioloop like so:

def execute_sql(self, sql, params=None, require_commit=True):
    params = params or ()
    peewee.logger.debug((sql, params))

    with self.exception_wrapper():
        try:
            conn = self.get_conn()
            conn.ioloop.stop()
            cursor = conn.ioloop.run_sync(
                lambda: self._execute_sql(sql, params, require_commit))
        except Exception as exc:
            if self.sql_error_handler(exc, sql, params, require_commit):
                raise
    return cursor

@gen.coroutine
def _execute_sql(self, sql, params, require_commit):
    conn = self.get_conn()
    if require_commit and self.get_autocommit():
        cursors = yield conn.transaction(((sql, params), ))
        for cursor in cursors:
            pass
    else:
        cursor = yield conn.execute(sql, params)
    raise gen.Return(cursor)

I get:

Traceback (most recent call last):
  ...
  File "/usr/local/lib/python3.4/site-packages/peewee.py", line 2479, in scalar
    row = self._execute().fetchone()
  File "/usr/local/lib/python3.4/site-packages/peewee.py", line 2470, in _execute
    return self.database.execute_sql(sql, params, self.require_commit)
  File "/libs/database.py", line 38, in execute_sql
    lambda: self._execute_sql(sql, params, require_commit))
  File "/usr/local/lib/python3.4/site-packages/tornado/ioloop.py", line 443, in run_sync
    if not future_cell[0].done():
AttributeError: 'NoneType' object has no attribute 'done'

From waht I undersatnd, _execute_sql should be returning a future, but that doesn't appear to be the case according to the traceback above. What am I doing wrong?

haizaar commented 9 years ago

From a glance at your code, I can't really tell it why it does not work. Can you isolate it into a self-contained code example and post it here?

kolanos commented 9 years ago

So if I reduce the code above I'm left with this:

@gen.coroutine
def _execute_sql(self, sql, params, require_commit):
    conn = self.get_conn()
    if require_commit and self.get_autocommit():
        cursor = yield conn.transaction(((sql, params),))
    else:
        cursor = yield conn.execute(sql, params)
    raise gen.Return(cursor)

Where get_conn returns a momoko.Pool instance.

Then if I do the following:

result = db._execute_sql('SELECT * FROM users', (), False)
ioloop.add_future(result, lambda f: ioloop.stop())
ioloop.start()

Then result.result() is a psycopg2 cursor. But what I don't understand is how to get this to run within the context of a Tornado server. The Toernado server ioloop is already running. so if I run the code above it gives me a RuntimeError saying that the ioloop is already running.

But I can't seem to get a result of a future without a start/stop of the ioloop. What am I missing?

haizaar commented 9 years ago

Try this:

@gen.coroutine
def _execute_sql(self, sql, params, require_commit):
    conn = self.get_conn()
    if require_commit and self.get_autocommit():
        yield conn.transaction(((sql, params),))
    else:
        yield conn.execute(sql, params)

Then write your get handler as follows:

@gen.coroutine
def get(self):
    cursor = yield db._execute_sql('SELECT * FROM users', (), False)

But better post a complete working sample - I'm shooting in the dark here.

kolanos commented 9 years ago

@haizaar, thanks for your help so far. Much appreciated.

The code I'm trying to override is here: https://github.com/coleifer/peewee/blob/master/peewee.py

Specifically the '_connect' and 'execute_sql' method in the PostgresqlDatabase class. If you review this code, you'll see that 'execute_sql' is used extensively. It is not intended to be used directly via a request handler. So what I'm trying to do is keep how much of Peewee I need to override to a minimum, hopefully by resolving the coroutine before it gets passed beyond the '_execute' methods.

If you have any suggestions, I'd very much appreciate them. If there's a way to hand off a future to the ioloop without having to run it in a handler that would probably be a good start. Right now, I've only been able to do this by starting and stopping the ioloop. I assume I don't need to actually stop the ioloop to resolve a future, but I have not been able to confirm this in the documentation.

haizaar commented 9 years ago

Well, you can't bribe the generators :)

Tornado (and Python 3.4 Tulip) chose explicit yield control model; in contrast to gevent that does socket monkey-patching and async behind the scenes. I.e. every time you want to give up (=yield) control to let your program to do something else other than handling the current request, you need to specify it explicitly.

Thus, using run_sync will convert your program to one-request-at-a-time thing. Tornado will do nothing else while waiting for your synchronous get to complete.

So back to Torando - your get (or post, etc) methods needs to return future. Tornado will wait until future resolved and finalize the response (while the future is being resolved your code is supposed to use self.write to return something to the client).

In the nutshell, if I remove all the exception and corner-cases handling from gen.coroutine, here what it does:

def coroutine(func):
    def wrapper(*args, **kwargs):
        future = Future()
        gen = func(*args, **kwargs)
        Runner(future, gen)
        return future
    return wrapper

class Runner:
    def __init__(self, retult_future, gen):
        self.ioloop = IOLoop.instance()
        self.gen = gen
        self.result_future = result_future
        future = next(gen)
        self.ioloop.add_future(future, self.run)

    def run(self, future):
        while True:
            value = future.result()
            try:
                future = self.gen.send(value)
            except (StopIteration, Return) as e:
                self.result_future.set_result(e.value)
            self.ioloop.add_future(future, self.run)

You may notice that this code is conceptually to yield from in python 3.3+. Just it was written long time before it.

Bottom line - you'll have instrument all of the peewee code with yields. So armed with the above knowledge, peewee should be instrumented as the following (conceptual) example:

class TestHandler(RequestHandler):
    @gen.coroutine
    def post(self, name):
        user = User(name=name)
        yield user.save()

class User(Model):
    name = ...

class Model:
    @gen.coroutine
    def save(self):
        sql = self.compile_sql()
        cursor = yield self.db.execute_sql(sql)
        # update self using cursor data

class DB:
    @gen.coroutine
    def connect(self):
        self.pool = momoko.Pool()
        yield self.pool.connect()

    @gen.coroutine
    def execute_sql(self, sql):
        cursor = yield self.pool.execute(sql)
        raise Return(cursor)

If you are using Python 3.3+, you can save some lines using yield from:

class TestHandler(RequestHandler):
    @gen.coroutine
    def post(self, name):
        user = User(name=name)
        yield from user.save()

class User(Model):
    name = ...

class Model:
    def save(self):
        sql = self.compile_sql()
        cursor = yield from self.db.execute_sql(sql)
        # update self using cursor data

class DB:
    def connect(self):
        self.pool = momoko.Pool()
        yield self.pool.connect()

    def execute_sql(self, sql):
        cursor = yield from self.pool.execute(sql)
        return cursor

This is more generic. I.e. not Tornado specific and should work with Python3.4 asyncio as well once Tornado and asyncio agree on a compatible Future object. However the drawback is that a user needs to read a function code to know whether it's async or sync. That's why async def and await were introduced in Python 3.5. So in Python 3.5 the Model class from above code would look like:

class Model:
    async def save(self):
        sql = self.compile_sql()
        cursor = await self.db.execute_sql(sql)
        # update self using cursor data

Thus a user knows right away that save is async method and you need to await it.

Hope this helps and good luck. Please keep me posted on your progress. I'm curious whether you'll merge async support in peewee without much refactoring.

haizaar commented 9 years ago

Feel free to reopen if you have further questions.

kolanos commented 9 years ago

I've started working on a peewee adapter for momoko here: https://github.com/txtadvice/peewee-momoko