Closed tavallaie closed 8 hours ago
I don't know why my tests will fails, logger don't show anything particular. @v0idpwn @ChuckHend can you help me to solve it?
I can take a look only tomorrow.
(2) Is this completely thread-safe? can I use it with async python and have 2 transactions open at the same time? For example: start T1, read 10 messages, start some async work, start T2, read 10 messages, start some async work, finish T2, finish T1?
psycopg
connection pool is thread safe but if you you want to support async operations we need use asyncpg
instead of psycopg_pool
through pip
and some modification to code.
(1) Could you please add a test/example where the transaction is used by queries outside of pgmq? I don't exactly follow how this would be done. Assume that, for example, I wanted to: open a transaction, read 10 messages, insert 10 entries in a misc table, delete the 10 messages I've read, commit the transaction.
I think I fixed the bug, if it is still necessary I will add it.
@tavallaie , can you put forth an example (or better yet, a test) using this implementation with an existing transaction? This is roughly how I would imagine someone might want to use it:
with psycopg.connect() as conn:
query_results = conn.execute("select users from notifications")
queue.tx = conn: # or something like this
for user in query_results:
queue.send('myq', user)
conn.execute("delete from notifications where ....")
# a select, multiple pgmq.send(), and a delete all executed in single transaction
Maybe this is possible with what is currently in this PR? I am not sure though, so I think a test like this would be great. I can help with that if needed.
Maybe this is possible with what is currently in this PR? I am not sure though, so I think a test like this would be great. I can help with that if needed.
I define a decorator to do everything within single transaction but I have to write test for that decorator, I am working on it.
@tavallaie, are you still working on this? If not, maybe we can find someone to finish it or add transaction support in with https://github.com/tembo-io/pgmq/issues/290
let me give it another shot, but I will appreciate any help I can get.
let me give it another shot. but I will appreciate any help I can get.
I also disagree with #290, in this way we have problem using it in Django, celery or other ORM like peewee and etc. it is better too have different library for that.
@ChuckHend I think it is ready for review
I also disagree with #290, in this way we have problem using it in Django, celery or other ORM like peewee and etc. it is better too have different library for that.
adding some ORM natively means adding unnecessary dependency for other projects that not using those dependency at all. so for example If I want to make a worker without need orm or using django that already have it owns database backend we have extra dependency. so I suggest making another library or extra package for that like async.
@tavallaie can you add an example that demonstrates something like the SQL below, but using the python implementation in this PR?
begin;
select pgmq.create('somequeue');
insert into some_table (name) values ('brian');
select pgmq.send('somequeue', '{"hello": "world"}');
commit;
I'll add it now
I added an example directory and perform send and read all together
@tavallaie, I think this is just about good to go, thank you for the hard work. I've approved, but please take another look at the formatting in the readme/doc (it seems like every like got indented by 1 space.
As a side note, I think many will find the decorator useful but using the psycopg connection objects directly is fairly simple too.
from tembo_pgmq_python import PGMQueue
from psycopg_pool import ConnectionPool
url = "postgresql://postgres:postgres@localhost:5432/postgres"
pool = ConnectionPool(url, open=True)
queue = PGMQueue()
with pool.connection() as conn:
with conn.transaction():
queue.create_queue("cat2", conn=conn)
queue.send("cat2", {"hello": "world"}, conn=conn)
rows = conn.execute("select queue_name from pgmq.metrics_all()").fetchall()
print(rows)
conn.execute("create table test (id serial primary key, name text)")
conn.execute("insert into test (name) values ('test')")
rows = conn.execute("select * from test").fetchall()
print(rows)
raise Exception("This is an exception")
due to #257, I try to add transaction to python library.