akira / exq

Job processing library for Elixir - compatible with Resque / Sidekiq
Other
1.51k stars 182 forks source link

Callback when all Jobs are finished #207

Closed jakubrohleder closed 7 years ago

jakubrohleder commented 8 years ago

Is there any way to run some callback function when all of the scheduled jobs had been finished?

I have really simple scenario:

for page_num <- 1..10000 do
    Exq.enqueue(Exq, "default", MyWorker, [page_num])
end

# do some cleanup when all jobs are finished
akira commented 8 years ago

@jakubrohleder Just curious, what kind of cleanup are you doing? Also, are you sure you need Exq in this situation? You can try taking a look at Task or GenStage if you need processing to be done inline. This feels more like a situation where you can use these, versus Exq which is meant for background processing.

If you were to use Exq, you can do something like calling the queue_size method in the API and see how many things are left in the queue and cleanup after it is 0. However, this assumes that there are no other jobs running in the system? Also, remember that if you have multiple worker nodes, the work can be done anywhere in the cluster. I would have to know a bit more details on what you're trying to do (inline processing versus some sort of dependency management between tasks, etc).

jakubrohleder commented 8 years ago

@akira thanks for the detailed answer!

My case is a web scrapper that is getting product data from API integration with pagination. I'm queuing all the pages into worker with Exq and each page is downloaded and processed separately. Server is not very stable, so ability to limit concurrency and to retry failed job is very important.

When it comes to cleanup I use it for removing products that has been removed from the API feed: at the beginning all products are marked as "need review" and at the end all products that hasn't been touched (were not present in the feed) should be removed.

I think it would be a decent solution for me to spawn task after all jobs are queued that periodically checks if the queue is empty or not and run the cleanup.

Thanks for pointing me to GenStage, it's an interesting concept I didn't know about!