malthe / pq

A PostgreSQL job queueing system
376 stars 41 forks source link

Is interacting with existing transactions a feature? #19

Open jimfulton opened 7 years ago

jimfulton commented 7 years ago

I was disappointed to see that by default, put commits.

Reading the docs, I expected pq to only manage transactions when the queue was used as a context manager.

With a little digging in the code, I found that If I passed in a cursor:

pq = PQ(conn, cursor=cursor)
queue = pq['email']
queue.put(dict(to='jim@jumfulton.info'))

The put worked in a savepoint.

Is this a feature or an accident of implementation?

Being able to use pq as part of larger transactions would be an extremely valuable feature. Triggering it by passing in a cursor seems like a bad way to express it. :)

malthe commented 7 years ago

I think the documentation here is lacking.

You're supposed to use the queue instance as a context manager and thereby obtain a cursor:

with some_queue as cursor:
    do_stuff(queue)
    do_more_stuff(cursor)

When you exit this block the transaction is committed.

Alternatively you can instantiate the Queue with an active cursor by passing the cursor keyword argument:

queue = Queue("test", cursor=cursor)

I'll try and fix up the documentation about this. Thanks for reporting.

jimfulton commented 7 years ago

So you're saying the wildly valuable feature I need is not provided?

Can you see the value of using pq with other applications?

Honestly, the main point is to integrate with existing Postgres transactions. Otherwise, why not just use celery?

jimfulton commented 7 years ago

Imagine Plone using RelStorage and adding an item to a queue as part of a larger transaction.

jimfulton commented 7 years ago

OK, so it sounds like maybe it is a feature. :)

jimfulton commented 7 years ago

So, may I suggest a simpler approach and just let put work outside a queue provided context manager? That would be consistent with the current documentation, but would be backward incompatible.

Alternatively, add a more explicit flag, like no_commit or an alternative method for adding to a queue using an existing transaction.

jimfulton commented 7 years ago

FWIW, I see pq being used in a much more lightweight way that maybe you imagine.

At least in web apps, connections are managed in pools and so application code is given them via some application mechanism. So, in that context, apps aren't going to keep queues around for any length of time, because the connections are going to be reused for other things.

Perhaps there should just be a module-level function:

def enqueue(connection, queue_name, **data):
      ...

This would:

I'd be happy to implement this on top of the existing machinery and update the README in a PR if you like.

jimfulton commented 7 years ago

Here's a snippet of code I'm using to queue a task in a Flask app that uses sqlalchemy:

conn = flask.g.session.connection().connection
cursor = conn.cursor()
pq.PQ(conn, cursor=cursor)['email'].put(dict(to=to, **kw))
cursor.close()

This illustrates the fleeting nature of connections and how heavy the current API is for this case.

This would be better:

pq.enqueue(flask.g.session.connection().connection, 'email', to=to, **kw)

:)

jimfulton commented 7 years ago

But I can live with what you suggested, especially if it gets documented.

I suspect that when you document it, you may decide you want something else. That's what documenting things tends to do for me.

stas commented 7 years ago

@jimfulton I'm not sure I follow your concerns.

Having the queue connection aside from your app connections pool is how things should work.

Consider the situation where you need to deploy a connection pooler (which is common for large-scale deployments). Your queue will require session pooling while generally your app will require cheap transaction pooling.

Hope this makes sense :)

malthe commented 7 years ago

I think I may have misunderstood something if you got the feeling that I don't think it's valuable to use PQ as part of a bigger picture.

I think it definitely is valuable to simply use PQ in a subtransaction and you can definitely do this using the cursor argument.

I think module-level functions can be a good thing to make it clear that you don't need a stateful queue object.

malthe commented 7 years ago

I believe this is how you can write it today:

conn = flask.g.session.connection().connection
queue = pq.PQ(conn)['email']
with queue:
    queue.put(dict(to=to, **kw))

But this would be better:

conn = flask.g.session.connection().connection
with pq.PQ(conn)['email'] as queue:
    queue.put(dict(to=to, **kw))

It's awkward that the context manager returns a cursor object. It really should be a QueueContext – meaning that it's a queue with a cursor that has an open savepoint.

jimfulton commented 7 years ago

On Tue, Mar 14, 2017 at 6:00 PM, Stas notifications@github.com wrote:

@jimfulton https://github.com/jimfulton I'm not sure I follow your concerns.

Having the queue connection aside from your app connections pool is how things should work.

Well, it's how things could work, if you use a two-phase commit protocol. I realized after writing my earlier note that that's probably how pq would have to be integrated with RelStorage (a backend for ZODB that can use Postgres).

Consider the situation where you need to deploy a connection pooler (which

is common for large-scale deployments). Your queue will require session pooling while generally your app will require cheap transaction pooling.

Hope this makes sense :)

Nope. IDK what you mean by "session pool" or "transaction pool".

Let's step back and review my goal: I want to add items to a queue in the same transaction I make the changes in an application.

I know of two ways to do this:

jimfulton commented 7 years ago

On Tue, Mar 14, 2017 at 6:20 PM, Malthe Borch notifications@github.com wrote:

I think I may have misunderstood something if you got the feeling that I don't think it's valuable to use PQ as part of a bigger picture.

I think it definitely is valuable to simply use PQ in a subtransaction and you can definitely do this using the cursor argument.

Yup. I just think the cursor argument is an odd way to spell it.

jimfulton commented 7 years ago

On Tue, Mar 14, 2017 at 6:27 PM, Malthe Borch notifications@github.com wrote:

I believe this is how you can write it today:

conn = flask.g.session.connection().connection queue = pq.PQ(conn)['email']with queue: queue.put(dict(to=to, **kw))

But this doesn't do what I need because it commits the transaction.

But this would be better:

conn = flask.g.session.connection().connectionwith pq.PQ(conn)['email'] as queue: queue.put(dict(to=to, **kw))

It's awkward that the context manager returns a cursor object. It really should be a QueueContext – meaning that it's a queue with a cursor that has an open savepoint.

The main thing I want is a way to add something to a queue within and without committing an existing transaction. Actually using a savepoint is a bonus. :)

pq.Queue('email', cursor=mycursor).put(...)

Is OK as long as it's supported. The main thing I care about is that the feature exists.

BTW, I'm planning to mention pq in a webinar next week:

https://blog.jetbrains.com/pycharm/2017/03/interview-with-jim-fulton-for-why-postgres-should-be-your-document-database-webinar/

As part of an explanation of why transactions are so important.

jimfulton commented 7 years ago

On Tue, Mar 14, 2017 at 6:00 PM, Stas notifications@github.com wrote:

@jimfulton https://github.com/jimfulton I'm not sure I follow your concerns.

Having the queue connection aside from your app connections pool is how things should work.

Note that I'm talking about putting things into the queue.

Workers should certainly be separate.

malthe commented 7 years ago

I think you're right that the current enter/exit behavior is just weird.

I propose this:

  1. If used as a context manager, you'll get SAVEPOINT/ROLLBACK functionality (and reuse of a cursor object).
  2. If used directly, you'll get a new cursor for each operation.

But in both cases, no transaction will happen!

Perhaps unless you ask for it explicitly:

with queue as q:
    q.transaction = True
    q.put({...})

Note that enter/exit would return a new object in any case (unlike today).

jimfulton commented 7 years ago

On Tue, Mar 14, 2017 at 6:50 PM, Malthe Borch notifications@github.com wrote:

I think you're right that the current enter/exit behavior is just weird.

I propose this:

  1. If used as a context manager, you'll get SAVEPOINT/ROLLBACK functionality.
  2. If used directly, you'll get a new cursor for each operation.

But in both cases, no transaction will happen!

Wouldn't that break existing users of pq?

malthe commented 7 years ago

I guess – perhaps call it 2.0 and make a big warning.

Sounds good with the webinar! It would be good to have a release out that fixes these usability concerns.

jimfulton commented 7 years ago

Well, I defer, but my original suggestions seems similar and would be consistent with the docs:

This seems less likely to be backward incompatible. <shrug>

malthe commented 7 years ago

I have trying to rework some of this logic but got held up by both psycopg2 and psycopg2cffi segfaulting.

jimfulton commented 7 years ago

On Sun, Mar 19, 2017 at 2:36 AM, Malthe Borch notifications@github.com wrote:

I have trying to rework some of this logic

Thanks.

but got held up by both psycopg2 and psycopg2cffi segfaulting.

Weird. :(

malthe commented 7 years ago

Yeah I got one fix merged: https://github.com/chtd/psycopg2cffi/pull/79.

I obviously have a bug in my code to the effect of exposing these library issues because fixing the above just led to other issues – even a segfault in the "standard" psycopg2 library, which is weirder still.

malthe commented 7 years ago

I made an attempt to rectify this but ran into various issues and unfortunately – out of time, for the time being.

jimfulton commented 7 years ago

I think just documenting the use of the cursor argument to use an existing transaction would be a step forward. Your telling me about that unblocked me. For example:

https://github.com/feature-flow/twotieredkanban/blob/pycharm-170320/server/zc/twotieredkanban/email.py#L21