riverqueue / river

Fast and reliable background jobs in Go
https://riverqueue.com
Mozilla Public License 2.0
3.34k stars 89 forks source link

Workflow - periodic unique workflow with cancelation #556

Closed krhubert closed 3 weeks ago

krhubert commented 3 weeks ago

Hey,

I have a workflow that should be run periodically every N hour, but at the same time it can take from 30 minutes to several hours. Also I don't want to run the same workflow more than once. At the same time there should be an option to cancel the entire workflow at any given moment

Let me describe my usecase first and ask if there's some API that can achieve that.

I need to sync customer CRM data (salesforce, hubspot). Based on the company size the number of objects can be in range from few thousand up to several dozen millions. This is why the initial sync may take many hours but next syncs should be completed in under an hour. Also sometimes there's a need to resync the data because of different reasons.

As you can see from the example above I want to be able to run a single sync workflow for a company at given moment. If my customer decides to disable integration or change what should be synced, then I want to have the possibility to cancel any ongoing sync and start all over again. And at the last the sync should be an option to run the sync every N hours or even trigger it programmatically.

From my understanding currently workflows are just set of jobs which you can control individually (let's say cancel of of them), but I dont' see an option to control workflow.

What does the best approach know the API and limitations to handle such usecase?

bgentry commented 3 weeks ago

Great questions. I’ve been looking to add APIs for both cancelling and retrying all jobs in a workflow, but was waiting for some real world use cases before doing so. It sounds like in your case you have certain events that get triggered in your system which you’d like to have (a) cancel any active jobs in the sync workflow for that customer, and (b) kick off a brand new sync workflow for that same customer.

I’ve also been thinking about the best way to make it easy to load up all jobs in a workflow. But even without a new dedicated API for this, you can make do with JobListTx combined with a metadata filter to list all jobs with a given workflow_id. And then the jobs can be cancelled one at a time in a transaction with JobCancelTx.

All that requires knowing the workflow ID you’re looking for. Although you can customize how workflow IDs are generated, it’s pretty important that you don’t reuse them, so any scheme you come up with to give them a special name would need to account for this. Rather than trying to make the ID predictable, I wonder if you would instead want to store it somewhere or use a query to look it up?

On the subject of making sure to only have one of these workflows active at a given time for a customer, it depends a bit on how your jobs are structured. I was going to recommend unique jobs (by args) but that may not actually fit well here given you may have lots of jobs in the workflow. I think I’m coming back to recommending that you separately store state in your database. And IMO that’s not a pattern you should be afraid of! Although we want to make the jobs table as independently useful and feature rich as we can, it’s also subject to some specific constraints to ensure good performance, so not everything can or should be done with only that one table.

For example you could have a data_syncs table which only allows one active row per customer, and also keeps the corresponding workflow ID in a column. You can ensure the final step in your workflow marks the sync as completed so that new ones can be triggered. And if you transactionally cancel the previous workflow’s jobs along with inserting a new one, you should be able to get some pretty strong safety guarantees. You can even add a step at the beginning of the workflow that makes sure a previous sync has been fully cancelled prior to proceeding with the rest of the workflow (since cancellation of active jobs is an inherently async process).

If this comes across as being too high level I apologize. It can be tough to confidently recommend an exact solution without more internal context. But hopefully this at least helps give you some ideas on techniques that could be applied here.

It sounds like you would at least benefit from the API to cancel an entire workflow by ID in a single query, so if you can confirm that bit I should be able to whip it up pretty quickly.

bgentry commented 3 weeks ago

Oh shoot, I meant to finish up with some context on the periodic workflows concept. I’m not sure how likely this is to generalize into a specific feature (such as “recurring workflows”) but I think what might work for you is to use the existing periodic jobs concept to enqueue a job which figures out the customers whose data needs syncing, and then have it add a new data sync record + workflow for each customer that needs it.

This job could use the unique feature to make sure it’s not duplicated over a given interval, and it should be straightforward to index your customer or syncs table to allow you to incrementally enqueue sync workflows in batches. You can even use this single periodic job to fan out into individual jobs that check on if individual customers need syncs.

krhubert commented 3 weeks ago

@bgentry Thanks for such a detailed explanation.

If this comes across as being too high level I apologize. It can be tough to confidently recommend an exact solution without more internal context. But hopefully this at least helps give you some ideas on techniques that could be applied here.

All you wrote makes perfect sense to me and it sounds like a general pattern to store more state/status of workflows outside river.

It sounds like you would at least benefit from the API to cancel an entire workflow by ID in a single query, so if you can confirm that bit I should be able to whip it up pretty quickly.

That would be amazing. I can imagine that the API can be a single method added (with options) and it can be used by many other engineers.

Oh shoot, I meant to finish up with some context on the periodic workflows concept. I’m not sure how likely this is to generalize into a specific feature (such as “recurring workflows”) but I think what might work for you is to use the existing periodic jobs concept to enqueue a job which figures out the customers whose data needs syncing, and then have it add a new data sync record + workflow for each customer that needs it.

This is a strategy that I do currently. I thought about adding a final job to the workflow that marks current workflow as completed and schedules the new one based on some domain specific criteria. This approach gives me even more control when the next workflow should be run. Let say I can have several conditions:

  1. if the current workflow started more then 4 hours ago, sync again right away
  2. If the current workflow ends outside working business hours (9-17) and weekends start the next one at 4 AM next business day
  3. and so on ....

When I wrote the thing above, it makes me think if there's a way to access workflow metadata (similar to https://pkg.go.dev/github.com/riverqueue/river#Client.JobGet), to have things like startedAt etc ... Again this is not a blocker because I can store this info in a separate table like you said, but such API for workflows would be nice to have.