hatchet-dev / hatchet

A distributed, fault-tolerant task queue
https://hatchet.run
MIT License
3.83k stars 126 forks source link

feat: Deduplicated enqueue #666

Open colonelpanic8 opened 2 weeks ago

colonelpanic8 commented 2 weeks ago

I'm wondering if hatchet has any built in support for any sort of deduplicated enqueue, where a task/step/workflow could be enqueued in an idempotent way i.e. deduplicated based on its parameters.

I realize that there are some tricky details here, but this would be super nice.

colonelpanic8 commented 2 weeks ago

A lot of the documentatino suggests that you make your steps and workflows idempotent, which makes sense, and ours actually are, but because our workflows are also very expensive and long running, we'd really prefer to avoid running them more than once.

I realize that it is probably possible to use the rest api to query hatchet and see if a workflow already exists, but the problem with this is that it is racy in a distributed system.

We can do something where we us our own internal select for updates against the data rows that we store in our sql database when the workflow starts, but it does seem like it would be kind of nice if hatchet could simply solve this problem in a more general way for us.

colonelpanic8 commented 2 weeks ago

@abelanger5 I might be willing to contribute some code to make this happen, depending on how difficult doing something like this would be.

Would this be something you would be willing to accept? Do you have a sense of how difficult this might be? Any advice about where to look?

abelanger5 commented 2 weeks ago

There's no support for this at the moment but this feels relatively easy to accomplish. We'd be happy to accept a PR here or contribute it ourselves. Thanks for the suggestion!

There are a few ways I could see the API working here:

  1. Certain keys in the additional metadata can be used as "deduplication" fields. This would be my preferred approach, because the metadata is designed to be propagated through the entire API, so events will trigger workflows with the given metadata, and eventually steps, crons, schedules, etc will all inherit metadata values.
  2. A checksum of the input could be used, though there would probably need to be a way to exclude fields like timestamps.
  3. Some other custom value could be sent as part of the additional options.

Also happy to research some other APIs that have accomplished this well, if anyone has suggestions.

colonelpanic8 commented 2 weeks ago

My first thought is that ideally the deduplication guarantee would be enforced by postgres. Perhaps a special table could be added that would have some sort of unique constraint on a value that is generated from the additional metadata fields, and then the step/workflow creation code would need to acquire the lock by attempting an insert into this table.

It could also just be a field on whatever represents the task in the database.

colonelpanic8 commented 2 weeks ago

Another question: would this be better implemented at the workflow level or the step level?

abelanger5 commented 2 weeks ago

My first thought is that ideally the deduplication guarantee would be enforced by postgres. Perhaps a special table could be added that would have some sort of unique constraint on a value that is generated from the additional metadata fields, and then the step/workflow creation code would need to acquire the lock by attempting an insert into this table.

Yeah, I think having a separate table with updates done in serializable transaction isolation level is the way to go, since correctness is required here. This could cause a high write volume if there are many collisions but in this case there shouldn't be. If we place this in the same table as the workflow runs or step runs there might be some side effects.

Another question: would this be better implemented at the workflow level or the step level?

I'd imagine at the workflow level to start with, as it will be more difficult to implement this for steps which have partially executed in a DAG and users can always invoke a workflow as a child workflow within a step for deduplication purposes.

colonelpanic8 commented 2 weeks ago

Yeah, I think having a separate table with updates done in serializable transaction isolation level

Maybe I'm missing something, but I think that if all we are trying to do is prevent duplicates, it seems like a transaction where we insert into a table with a unique constraint along with the workflow creation is all we should need right?

In my conception you would never be updating or even reading this table it would literally serve to just torpedo the transaction in the case where you have a duplicate, so I don't understand why we would be talking about transaction isolation level.

abelanger5 commented 2 weeks ago

Maybe I'm missing something, but I think that if all we are trying to do is prevent duplicates, it seems like a transaction where we insert into a table with a unique constraint along with the workflow creation is all we should need right?

Ah, I missed in the first message that you were talking about implementing this via a unique constraint - I was considering perhaps being able to set the level of deduplication, like "workflow x can run at most 3 times with these parameters." An example use-case would be "a user upload session can only generate 100 documents."

If we had a count parameter on this table it could be updated on a stricter isolation level. But in the interest of avoiding serialization errors, perhaps a custom trigger would be better: https://dba.stackexchange.com/a/289552.

colonelpanic8 commented 2 weeks ago

That could work, but you also still need the unique constraint for the key for the case of the insert (when count was previously 0 bc there were no rows).

colonelpanic8 commented 2 weeks ago

Also, I think if you want to be able to run up to some fixed number of times, why not just unique constraint + select for update