grantcarthew / node-rethinkdb-job-queue

A persistent job or task queue backed by RethinkDB.
https://github.com/grantcarthew/node-rethinkdb-job-queue/wiki
MIT License
157 stars 16 forks source link

Advice on application architecture for render farm #64

Closed viridia closed 7 years ago

viridia commented 7 years ago

I realize that this sort of thing belongs on stackoverflow, but I've had a lot of bad luck there recently, I seem to have a talent for asking questions that no one is able to answer.

Anyway, I work for a company that makes web-based tools for computer animated films, and I want to use rethinkdb-job-queue as the basis of our new render farm. The essential idea is that there will be a Kubernetes cluster where individual rendering tasks will be controlled by a scheduler daemon using the queue.

As I see it, there would be two queues - one for Jobs, and one for Tasks within those Jobs. A Job might consists of a rendering step, a compositing step, and then running ffmpeg to create the final movie. Each of the steps is highly parallel - if your movie has 96 frames there's no reason not to run 96 machines, given that for a professional film project it may take several hours to render a single frame.

OK, so Queue.process() gets called when a Job is first submitted. The callback would then look at the recipe file for that Job and create a bunch of task objects. The tasks are organized in a dependency graph, so some tasks will be ready to run while others need to wait.

OK so what happens to the Job while all this is going on? We want to put the Job into some sort of quiescent state, it's not done but it's waiting for the tasks to complete. Whenever anything 'interesting' happens (like a task finishing) we want to process the job again and see if there's more work that needs to be done.

The tasks are in a similar boat, except that what they are waiting for are external processes - essentially worker tasks running in docker containers which signal when they are done running (most likely via a mutation to RethinkDB of some kind). So again, the tasks need to go to sleep and only get woken up when their task completes.

One thing that's very important is the ability for the scheduler to go down without interrupting rendering jobs. Given how long it takes to render a frame, it would be bad if taking down the scheduler for maintenance meant abandoning many hours (and dollars) worth of work.

What that means is that when the scheduler comes up again, it needs to be able to revisit any jobs or tasks that are in the queue, in order to see if they have completed while the scheduler was offline.

So the key points are (a) no persistent in-memory state, and (b) jobs and tasks are neither i/o bound or compute bound, they are mostly just waiting for stuff to happen on other machines. In such a case, I wouldn't have a task wait on a promise, but rather use a state-machine based approach, since I want the state to be durable.

I think rethinkdb-job-queue can do all this, but I'm not entirely sure what the right approach is. For example, to put jobs to sleep I can set their delay values to an infinite time in the future, and then restore them later.

For example, in my initial experiments I noticed that once Queue.process() has been called for a job, it doesn't get called again, even if I never call next(). So I think my concept of the execution model must be wrong.

grantcarthew commented 7 years ago

I'm not 100% sure what the ideal solution would be @viridia however we can work though this.

Points of interest:

grantcarthew commented 7 years ago

Just thinking on this now. You could approach this using the following method:

For ease of communication lets call the queues "Render" and "Tasks".

  1. New job gets added to the "Render" queue.
  2. The "Render" queue job is retrieved and its handler adds N jobs to the "Tasks" queue.
  3. The "Render" queue job property "Rendering" is set to true to indicate that the tasks have been added to the "Tasks" queue.
  4. The "Render" queue job dateEnable property gets set to a future date for job review. Five minutes or five hours?
  5. The "Render" queue job is saved back to the queue using next(null, job) with the above new properties.
  6. The "Tasks" queue jobs are retrieved and start processing.
  7. The "Render" queue job is retrieved due to time passing.
  8. The "Render" queue job is seen to have been started due to the "Rendering" property set to true.
  9. The "Tasks" queue jobs are checked by the "Render" queue job, still processing.
  10. The "Render" job is placed back into the queue with a new dateEnable property value.
  11. No 7 to 10 repeat until the tasks are completed.
  12. Once all "Tasks" queue jobs are complete the "Render" queue job is completed.

One option above would be to use the last "Tasks" queue job to retrieve the "Render" queue job rather than wait till the dateEnable time.

viridia commented 7 years ago

Yeah, that seems like it will work.

Another option: when a task completes, update the dateEnable time on the Render job to be very short (1 second) - this way, if a bunch of tasks complete all at once, the render job only gets processed one time.

grantcarthew commented 7 years ago

Could the high level "Render" queue be avoided using the name feature?

If you add all the render tasks to a single queue and give them the same name you could then poll the queue using Queue.findJobByName.

grantcarthew commented 7 years ago

By the way, there is a rethinkdb-job-queue channel on the RethinkDB Slack site. Happy to discuss it here though. Just letting you know.

viridia commented 7 years ago

OK, things are progressing, but there are a number of things that I currently don't understand.

1) What determines the delay between the time I modify a Job, and the next time the process callback is called?

Specifically, there are times when I'd like a job to be immediately reprocessed. Right now it seems to process items once when they are first submitted, and then once every few minutes until they are finished. For example, if I cancel a job, the callback that I pass to onCancel() doesn't get called until several minutes later.

I've tried playing around with the master interval, which had no effect. Is there any way to tell it to immediately reprocess a job?

(My use case is that I have a real-time Job status display in the browser. One of the things my queue processor does is do pub/sub messages to the browser [I'm using deepstream.io] whenever a job has a significant change. However, I'm hoping to get the latency down so that the UI feels responsive.)

2) I noticed that when my onCancel() hook is called, the job status is 'waiting'. I would have thought it would be 'cancelled'.

grantcarthew commented 7 years ago

Not sure about a number of things here. Can you share some code please?

viridia commented 7 years ago

Well, the code for cancelling is here:

https://github.com/viridia/factory/blob/master/director/src/JobRoutes.ts#L70

And my Queue.process call is here:

https://github.com/viridia/factory/blob/master/scheduler/src/Scheduler.ts#L18

Note that these are separate processes, the program that posts to the queue is not the same as the program that does the processing.

BTW this is all bringing back memories: in 1987 I wrote a MIDI sequencer for the Amiga called Music-X, and the problems of scheduling musical events on different tracks overlaps in interesting ways with this :)

grantcarthew commented 7 years ago

First note, this if statement is not required: https://github.com/viridia/factory/blob/master/director/src/JobRoutes.ts#L60

A job will never have a status of created coming from the Queue.getJob method.

I can't see anything wrong. Have you enabled the change feed option?

viridia commented 7 years ago

Ah, OK I think that if statement dated from earlier when I was confused about the state record - getJob will in fact return the state record and it will have a status of created. I thought it was an erroneous job and kept trying to remove it.

The change feed option defaults to true, correct? I didn't override the setting. And given than the scheduler is able to pick up brand new jobs right away, I would suspect the issue lies elsewhere. However, I can add some dummy event handlers in the scheduler and see if they respond to changes from the director.

Basically, what I am seeing is that once I have processed a job and pass it to next(null, job) to indicate that I want to process it again some time in the future, it's behaving as if I had set dateEnabled to some time in the future, except that I'm not actually setting it. (My plan is to eventually set it, but I haven't gotten to that point).

grantcarthew commented 7 years ago

OK, if you can do some more digging that would help. It may be an issue you have found @viridia

viridia commented 7 years ago

Well it looks like it's revisiting the jobs every 5 minutes exactly:

2017-05-08T06:18:46.899Z - info: processing Job: a0d9c233-3181-422b-87e8-7f5e6ae561e7 1 2017-05-08T06:23:47.068Z - info: processing Job: a0d9c233-3181-422b-87e8-7f5e6ae561e7 1 2017-05-08T06:28:47.406Z - info: processing Job: a0d9c233-3181-422b-87e8-7f5e6ae561e7 1

Maybe it's something to do with my execution environment, I've got the database running inside a docker container in a kubernetes cluster running on my laptop.

viridia commented 7 years ago

OK if I manually set the dateEnable, it rapidly re-processes the items. This effectively solves the problem for me.

grantcarthew commented 7 years ago

How are you manually setting dateEnable? The Job.setDateEnable call only changes the property value on the object in memory. It does not push the changes to the database.

viridia commented 7 years ago

No, I'm calling update() afterward. Things are working much better - I just needed to always setDateEnable if I want to put the job back in the queue for reprocessing. I'm making lots of progress.

Right now my biggest annoyance is the fact that my job lifecycle doesn't match yours, which means that I have to keep a separate runState property and manually update it before calling next(). My set of lifecycle states looks something like this:

So for example, a job in the CANCELLING state is still 'running' from the perspective of rethinkdb-job-queue, because it still needs to periodically check whether its children have terminated.

What makes this complex is that whenever there's a state change, I have to make sure that both my status and the queue status are changed in sync. That's kind of why I want a way to cancel and mutate in a single operation. (There's also finish and mutate, but that's only in one place, so I don't mind manually calling update().)

BTW I'm assuming that update() followed by next(error) or next(null, job) don't interfere with each other, that is neither will stomp over the properties changed by the other one. If this is not safe, if I need to wait until the update is finished before calling next(), let me know.

grantcarthew commented 7 years ago

This is a very complex solution @viridia

I imagine you tried to come up with other options though.

The Promise returned from Job.update will resolve once the data has been committed. As long as you are calling next() after the Promise has resolved then there will be no data collisions.

viridia commented 7 years ago

Two additional issues have cropped up:

1) My strategy for waking jobs up is incorrect.

When a job's sub-tasks are running, I temporarily put the job to sleep by setting it's dateEnable to some minutes into the future. Then when a sub-task completes, I want to process the job again to check to see if any new sub-tasks have become unblocked as a result of this. (I prefer to do this in the job process loop so that you don't have lots of subtasks trying to update the job all at once).

Since this happens outside of the process loop I'm using this:

job.setDateEnable(new Date(Date.now() + 100)); job.update();

However, it seems like this is not actually waking up the job. Setting dateEnable before calling next() works, but I can only do that from inside the processing loop.

2) This isn't directly related to rethinkdb-job-queue, but I noticed that the rethinkdbdash driver has some odd behaviors when I run it on my macbook. When I run it on my ubuntu laptop, everything works. But I can't get my app to run on OS X (this is important since all of my co-engineers at work use Macs for their local development machines). If this issue can't be solved then I won't be able to use rethinkdb-job-queue, which would make me very sad.

For example, when I call r.db('dbname').table('tablename').replace(stuff) it works on one machine but silently fails on the other.

I saw that there was an existing (closed) bug on this, and I added some notes to it.

viridia commented 7 years ago

Actually, number (1) is my mistake, I was calling findJob instead of getJob() :)

BTW, in case I forget, thank you for writing and maintaining a very useful package and answering my questions above and beyond the call of duty :)

grantcarthew commented 7 years ago

You're welcome.

I don't have any Apple devices so can't test the driver. Mitchel (@neumino), who is the owner of the dash driver, has been very helpful to me in the past, however I believe his focus has moved a little. If you can raise a new issue on the driver repository I'm sure someone will help.

viridia commented 7 years ago

I discovered today that rethinkdbdash mysteriously fails (many operations silently fail) when running inside the xhyve virtual machine, but runs OK under virtualbox. That's good enough for my purposes I think.

grantcarthew commented 7 years ago

Ah good find. So it is not related to Apple?

viridia commented 7 years ago

Only in that xhyve is a Mac-specific VM. But virtualbox runs on Mac too, and I didn't see the same problems.

grantcarthew commented 7 years ago

So what's the status @viridia ?

You still using this queue or did you have to come up with a custom solution?

viridia commented 7 years ago

I decided to write my own:

https://github.com/viridia/factory/tree/master/queue

It's intended to be part of a larger system, not a stand-alone package. It lacks many of the features of rethinkdb-job-queue, but has a few important features I need. It doesn't have nearly as many tests.

I didn't copy your code, so I am a little concerned that I may not be doing things in the best way. But it seems to work so far (famous last words!) I did, however, learn a lot from the overall design architecture of rethinkdb-job-queue, it taught me a lot about what is possible.

One trick that I used (which I learned from my time working at Google) was to abstract the system clock (setTimeout and friends) behind an interface, so that in unit tests you could replace it with a fake clock that allowed you to advance time as fast as needed.

Most of the methods for manipulating a Job are done via a JobControl object, which represents, in essence, a transaction. Any changes made to a job (setting state, time, properties, etc) are accumulated and then committed to the database in a single operation. There are two ways to get such an object: the Queue.process callback gets one automatically, or you can get one explicitly from the Queue (there's a factory function).

The Queue itself only has a few methods for modifying jobs, mainly ones which you would want to do in bulk like clearing the queue.

Another difference: since my logs are intended for end users (professional 3D artists running rendering jobs), they are much simplified.

There are no event callbacks or onCancel methods. Since there are multiple instances of the queue consumer process, any database-triggered event would be handled by multiple workers, which I don't want. Instead, everything goes through process(), which makes sure only one worker gets it.

Also, I decided not to use rethinkdbdash because (a) I wanted to share the same db connection with other parts of the system and (b) there's no mocking library for rethinkdbdash, but there is one for rethinkdb. (Right now the unit tests run against a live test database, but I'd like the option to use mocks at some point).

Also, because it's all written in TypeScript, I can avoid writing a lot of tests that merely check that I'm passing the correct types. :)

grantcarthew commented 7 years ago

Good stuff. Glad you got it working. I like the idea of abstracting the clock. Thanks again for the TypeScript additions here.

viridia commented 7 years ago

Thanks for all your help!

Oh, and here's a plug for one of my other packages that you may find interesting: https://www.npmjs.com/package/certainty