Jaymon / morp

Python message passing wrapper around Amazon SQS
MIT License
1 stars 1 forks source link

Postgres Queue #18

Closed Jaymon closed 9 months ago

Jaymon commented 1 year ago
Jaymon commented 9 months ago

https://github.com/Jaymon/morp/issues/12

Jaymon commented 9 months ago

Tabs and searches I had open while working on this:

This is the post that got me started looking into this, it included this snippet of code which was a major inspiration on getting me to actually implement this:

import psycopg2
import psycopg2.extras
import random

db_params = {
    'database': 'jobs',
    'user': 'jobsuser',
    'password': 'superSecret',
    'host': '127.0.0.1',
    'port': '5432',
}

conn = psycopg2.connect(**db_params)
cur = conn.cursor(cursor_factory=psycopg2.extras.DictCursor)

def do_some_work(job_data):
    if random.choice([True, False]):
        print('do_some_work FAILED')
        raise Exception
    else:
        print('do_some_work SUCCESS')

def process_job():

    sql = """DELETE FROM message_queue 
WHERE id = (
    SELECT id
    FROM message_queue
    WHERE status = 'new'
    ORDER BY created ASC 
    FOR UPDATE SKIP LOCKED
    LIMIT 1
)
RETURNING *;
"""
    cur.execute(sql)
    queue_item = cur.fetchone()
    print('message_queue says to process job id: ', queue_item['target_id'])
    sql = """SELECT * FROM jobs WHERE id =%s AND status='new_waiting' AND attempts <= 3 FOR UPDATE;"""
    cur.execute(sql, (queue_item['target_id'],))
    job_data = cur.fetchone()
    if job_data:
        try:
            do_some_work(job_data)
            sql = """UPDATE jobs SET status = 'complete' WHERE id =%s;"""
            cur.execute(sql, (queue_item['target_id'],))
        except Exception as e:
            sql = """UPDATE jobs SET status = 'failed', attempts = attempts + 1 WHERE id =%s;"""
            # if we want the job to run again, insert a new item to the message queue with this job id
            cur.execute(sql, (queue_item['target_id'],))
    else:
        print('no job found, did not get job id: ', queue_item['target_id'])
    conn.commit()

process_job()
cur.close()
conn.close()