tembo-io / pgmq

A lightweight message queue. Like AWS SQS and RSMQ but on Postgres.
PostgreSQL License
2.55k stars 58 forks source link

Feature to support fan-in type of workloads #304

Open MMMarcy opened 1 month ago

MMMarcy commented 1 month ago

This FR is a bit abstract since I don't yet have an informed opinion about how to implement it, both in terms of ergonomics and performance.

TL;DR

Currently, it's not possible to use pgmq as a message queue for fan-in [1] types of workloads if the "receiving"/funnel component is potentially distributed (otherwise, it would be easy to keep partial messages in the memory of that worker and re-emit them later in another queue).

Longer Explanation

The above limitation is due to pgmq lacking two primitives (based on my understanding of its inner workings). One on the producer side, one on the consumer side.

On the producer side, the missing function is something that lets you "insert or update a message if there's already a message with this field set". I believe this could be similar to the message keys mentioned in #294.

On the consumer side, the missing primitive is either "dequeue if the payload has all these fields (an extension of #288)" or, more generically, "dequeue if the provided WHERE clause evaluates to True."

Alternatives Considered

I believe it would be possible to get this working in two less-than-ideal ways:

The first one that came to mind is to use "companion tables." Basically, each fan-in step has an associated normal PostgreSQL table with a 1:1 mapping between columns and "in edges" of the DAG (3 in the image I posted). These columns are nullable, and producers signal to the consumer only the row id after writing their piece of data into the associated companion table. The consumer, upon receiving messages containing the companion table row id, does a normal lookup and continues working if and only if all the columns are present.

It might be possible to achieve this functionality if each worker went through all the messages in the queue, re-enqueued the non-relevant ones, and appended to the relevant ones before re-enqueuing them. I stopped exploring this idea due to its ugliness and potential brittleness.

Option 1 is definitely better and viable. However, if pgmq could handle this instead of it being the users' responsibility, it would be fantastic. Additionally, to the best of my knowledge, no message queue offers this functionality since they don't have an ACID-compliant runtime under the hood.

Finally, this project is really awesome. Thank you!

[1] 1_jybRhFt1MwFlIxQIvUDoPw