vsivsi / meteor-job-collection

A persistent and reactive job queue for Meteor, supporting distributed workers that can run anywhere.
https://atmospherejs.com/vsivsi/job-collection
Other
388 stars 68 forks source link

Waiting job not readied without force although dependency has completed #221

Closed serkandurusoy closed 7 years ago

serkandurusoy commented 7 years ago

Hey @vsivsi

I have a job at waiting state, which has a single id in its depends array.

Checking that document, I see that its status is completed.

Yet, job.ready() returns false, unless, I do job.ready({true}), which indeed does run.

So my question is, based on the docs:

Any job that is 'waiting' may be readied. Jobs with unsatisfied dependencies will not be changed to 'ready' unless the force option is used.

What do you exactly mean by "unsatisfied dependencies"?

Thanks!

vsivsi commented 7 years ago

What do you exactly mean by "unsatisfied dependencies"?

The depends array should be empty. Running job.ready() only checks that array (which should always reflect the current state of the antecedent jobs). It sounds like your JC got into an inconsistent state. There are a few ways that could happen...

One way would be if the antecedent job was scheduled and completed before the dependent job was even scheduled. If memory serves, there is no explicit check for this case when a job with dependencies is saved. Such a check could be added, but my recollection is that I didn't because there would always be a potential race condition with that (i.e. the job completes between the check and the save of the dependent job.) So if this is what is happening, some defensive programming is in order. The antecedent jobs should be scheduled with a sufficient delay to ensure that they can't complete before the full chain of dependent jobs is in the DB. Or alternatively, All of the ultimate antecedent (those with no dependencies of their own) jobs should be initially saved in the 'paused' state, and then resumed only once all dependent jobs are in the DB.

So something like this: (assuming server-side Fibers to handle async calls):

// Create a sendEmail job:
var sendJob = new Job(myJobs, 'sendEmail', {
  address: 'bozo@clowns.com',
  subject: 'Critical rainbow hair shortage',
  message: 'LOL; JK, KThxBye.'
}).pause().save();

// *** Without the .pause()/.resume() there is a race condition on .depends() below if the above  
// *** job runs and completes before the one below is saved.

// Create a job to flag that the email has been sent
var sendStatusJob = new Job(myJobs, 'markSent', {
   message: sendJob.data
}).depends([sendJob]).save();

sendJob.resume();   // Now can safely run
serkandurusoy commented 7 years ago

The depends array should be empty.

Hm, does this mean that:

If that's the case, then yes, I do have quite a number of jobs which are at waiting state with one dependency in its depends array, which itself is completed. And I guess I can apply your suggestion.

Edit: furthermore, I guess this also implies that any job that has completed should also have an empty depends field (which is the case for thousands of jobs on my database except for a few)

Edit2: Judging from https://github.com/vsivsi/meteor-job-collection/blob/master/src/shared.coffee#L1085 I think that's the case, right?

vsivsi commented 7 years ago

Yup. The mechanism is that when a job is marked as 'completed' on the server, then all other jobs that have the completed job in their depends array are updated to remove it. And for each of those, if that is the final dependency (and they are otherwise ready to run, no additional delays, etc.) then they are marked as 'ready' in that same update.

The reason it doesn't work the other way (the waiting job checks for completion of its dependencies) is that would require frequent polling of the DB for all such jobs. Instead it is just done once per antecedent job. Besides being inefficient, the polling solution wouldn't work anyway, because there could be no guarantee that the antecedent job hadn't completed and already been removed from the job collection inside of one poll interval. You could detect that as well, but it would be indeterminate whether the job had even successfully completed in that case.

You could always just look for this situation and "fix" it yourself, but that would be a bit of a hack.

I should add that I can think of at least one other way this could happen, but it should be very rare. If the server went down between a job being marked 'completed' and it's dependents being updated, then those dependents would be left in this state in the DB, but that should be a truly exceptional case and not something you would see all of the time like you describe.

serkandurusoy commented 7 years ago

@vsivsi than you very much for the explanation. I do agree that your method is more efficient then the opposite and there obviously are some defensive mechanisms to mitigate for when such issue surfaces.

In our case, it may actually be more likely for our servers to have been stopped mid-operation then have the dependency get completed before creating the succeeding job. Although I will most definitely look into that because the codebase is a mix of async/await, promises (all with try/catch blocks) and callbacks (as per required by job collection). I just inherited the codebase and might as well be missing something in there.

In any case, thank you for all the insight!

Kudos for this package :)

vsivsi commented 7 years ago

Thinking on this a bit more, I suppose that after a job with dependencies is saved to the DB, it could check just that one time to see if a race had occurred (if it has any depends that are now 'completed' jobs) and if so, remove them. The worst that could happen is that two processes would each attempt to remove the same entry from the depends array, which should be safe.

I'll look into that. Ideally you shouldn't need to have to think about this. Dependencies are a lightly used feature (in terms of fraction of projects that use them), so there are some rough edges in the implementation.

I'm going to reopen this as an enhancement and try to look at it for the next release.

serkandurusoy commented 7 years ago

I suppose that after a job with dependencies is saved to the DB, it could check just that one time to see if a race had occurred (if it has any depends that are now 'completed' jobs) and if so, remove them.

I was actually planning on doing this, but only within a cleanup job of its own :)

Of course it would be much leaner if this were explicit under the hood.

Just for context, this app is implementing a queue based transaction where each step calls a host of functions from multiple rest apis to some async internal methods that return promises. And they need to run in a certain order.

The dependency feature is indeed at the heart of the orchestration.

vsivsi commented 7 years ago

In our case, it may actually be more likely for our servers to have been stopped mid-operation then have the dependency get completed before creating the succeeding job.

If that is the case, then you should write some code that runs on server start to check for these cases and fix them up. The solution would be to find all 'completed' jobs, and for each one, run the same code that runs when a job is marked as completed:

https://github.com/vsivsi/meteor-job-collection/blob/master/src/shared.coffee#L1070-L1112

vsivsi commented 7 years ago

I was actually planning on doing this, but only within a cleanup job of its own :)

I would happily accept a PR that does this! :-)

serkandurusoy commented 7 years ago

If you're not in a hurry, I'd be happy to do that. I just need to improve on my ability to read coffeescript and understan how the code works in general. This has only been my first few hours :)

vsivsi commented 7 years ago

My approach would be to break the highlighted code in the link above out into its own internal method, call that from job.done() with the _id of the completing job, and then for job.save() call it with the _id of every dependency that matches a 'completed' job.

You could then also call that same function in your "startup check" with the _id values of all completed jobs in the DB.

Maxhodges commented 7 years ago

Hi @vsivsi, I'm working with @serkandurusoy and was wondering if I could compensate you with a donation to make this fix a priority?

serkandurusoy commented 7 years ago

hey @vsivsi I was considering:

a) a cron task that checks and cleans up these tasks b) or an after.update collection-hook

but either case would have been external to the package and I think the package would have benefited having this internally.

As I am completely coffeescript illiterate, @Maxhodges and I would love for you to fix this for us.

Thanks!

vsivsi commented 7 years ago

Should be able to do these two things this coming weekend. No donation needed.

So:

1) Jobs with dependencies will check for completion of those dependencies after any job.save() completes. This should actually cover two cases:

2) At startup, check for any 'waiting' jobs with dependencies that are not 'waiting', 'ready', 'running' or 'paused' (that is, can eventually be resolved). There are two cases to deal with here as well:

Have I covered all of the cases you guys can think of?

serkandurusoy commented 7 years ago

Amazing! Perhaps I should add some notes:

vsivsi commented 7 years ago

Crap, I edited your comment rather than replying, sorry! [Fixed now...]

  • When you mention removing dependencies, you are also implying that those removed ones get pushed into the resolved array, right? Otherwise we would lose the dependency tree.

Yes.

  • For waiting jobs whose dependencies have been cancelled; these sound like they should be cancelled as well, and perhaps this would walk down the tree following each dependent job in the chain to mark them as cancelled.

Yes, that is what happens by default now if you cancel a job that is an antecedent to other dependent jobs, and so on... But that process can be interrupted by a server/db crash or connectivity loss, and this would fix that up (using the same code, presumably)

  • For waiting jobs whose dependencies have failed, I think this is something that should be handled by the application where options would be to... [snip].

What happens by default is when an antecedent fails (fatally, with no more retries left), all dependent jobs also fail. If the failure conditions allow the antecedent to be successfully re-retried (using job.restart()) then the app has control over whether dependent jobs are also restarted. See documentation for the options on job.restart().

The changes needed here are simply to fix-up any inconsistent state that may have resulted from a server failure while a wave of 'cancelled' or 'failed' changes were propagating through a dependency tree. Of course all of this BS is necessary because MongoDB doesn't have transactions...

serkandurusoy commented 7 years ago

If job.restart() allows me to retry a chain of jobs starting from a failed one within the chain and down, then it does fullfill my request to have control.

As for "this BS is necessary because MongoDB doesn't have transactions"; it is exactly what I've been trying explain to our team. I'm glad you mentioned this without my influence :)

In any case, I think the "BS" would solve (or recover from) most of the potential transactional problems.

As for ones that might still fall through the cracks (until they are recovered during the next startup) it might help if you either exposed that method so that we can call them externally, or better yet perhaps provide an option to make it a recurring cleanup task instead of a fire-once-on-startup thing.

Does this make sense?

vsivsi commented 7 years ago

Yes, can expose it as some kind of server side "dependency check". But to be safe, I think it needs to be run only on a "shutdown" system, otherwise there could be all kinds of new intractable race conditions introduced (if jobs are running and resolving dependencies live while the check/fixup is being performed...). So:

jc.shutdownJobServer()
jc.dependencyCheck()   // If run without shutdown, maybe it will log problems but not fix them?
jc.startupJobServer()
serkandurusoy commented 7 years ago

Hm yes, I do agree that it is a good idea to freeze the running state, cleanup, and let it run back again.

Although, jc.shutdownJobServer() should be smart enough to know when to gracefully stop, is that doable with the existing methods?

EDIT: I also like the idea about running without shutdown as a dry run, though, I'd prefer a return value instead of console.log so that I can maybe decide what to do about it!

vsivsi commented 7 years ago

Here's what happens when you run jc.shutdownJobServer(). You can specify a "timeout" value (defaults to 60 seconds). After that has expired, any still running jobs are automatically failed.

Between the call to jc.shutdownJobServer() and the expiration of the timeout, two other things are happening:

1) Any requests from workers for new jobs will not return any jobs to work on.

2) Any calls to job.progress() will return null to indicate that the worker should stop working on that job (because the server is going down).

So if you can bound the time it takes to complete all "in flight" running jobs (or for them to all report progress), then you can perform a completely orderly shutdown. If you have jobs set to "retry" after failure, then the jobs "failed" as a result of the shutdown should automatically be retried when the server is started back up.

Agreed, that the dependencyCheck() should return an object with issues found (and this object should drive the fixup logic itself)

serkandurusoy commented 7 years ago

Hm, so say we have a job where

apart from trying our best to make that job's runner an idempotent function (which may sometimes be quite hard to do)

would you agree that we might then end up with a problem?

I do agree that this could still have been a case with a server shutdown, but at least for that case, we might have had a clue that the job had been left at a "running" state.

I guess what I'm trying to say is, automatically failing after a timeout might be an assumption that might prove to be wrong in some (rare) cases, no?

Agreed, that the dependencyCheck() should return an object with issues found (and this object should drive the fixup logic itself)

What about one with its own instance methods? This way, we could potentially cache the instance and directly call a fixErrors method on it later as part of some other logic, instead of rerunning it again after the shutdown (which we still can, of course)

vsivsi commented 7 years ago

would you agree that we might then end up with a problem?

Perhaps yes. I think it all hinges on the "api call which takes some time to return". How long are we talking here? Minutes? Hours?

If it's minutes, then just be very conservative with the timeout. If it's much longer, then... I mean, what happens if you make that API call (assuming HTTP) and the TCP socket drops while you are waiting for the response? Not really that different of a situation, right? HTTP is stateless, you can't exactly just reconnect and take up where you left off. You now have no idea what happened without writing logic to handle that specific case and query the server to try to reconstruct the outcome of the call's side-effects before deciding how to proceed.

So yes, in extreme cases your job might fail, for any number of reasons (impending shutdown being just one...) and non-idempotent operations may be left in indeterminate states. You will need mechanisms for retrieving the required state to decide how to recover. I don't think job-collection makes this requirement any worse other than providing some more rare opportunities for it to happen. All network operations can fail, any non-idempotent networked request needs to have a defined mechanism to recover in the face of errors that occur between the request and response.

I think it's important to point out that the worker doesn't need to die just because the server is going down. It can hang around and wait for the API call to complete, and then stash that state someplace where it can be used by the next attempt to decide what to do. But really, this type of thing can't replace having a sane error recovery mechanism defined for all non-idempotent operations.

I guess what I'm trying to say is, automatically failing after a timeout might be an assumption that might prove to be wrong in some (rare) cases, no?

Perhaps. But job-collection is built around the assumption that there is no such thing as a "running" job when the server itself is not running. So there's no real alternative.

What about one with its own instance methods?

I need to think about this. I don't want to over complicate things for just one user's requirements. I'm a big believer in keeping things simple.

serkandurusoy commented 7 years ago

Those are very good points, I must confess that I'm plainly trying to stretch my luck here as far as I can in order to minimize the amount of refactoring surface on a series of large job-collection based codebases I am inheriting ;)

This is also the case for my instance methods suggestion. Although I'd consider it still simple enough and naturally in line with Meteor's usual paradigms of returning similar enhanced objects from collection-aware methods.

In any case, items 1 and 2 from your earlier comment coupled with jc.dependencyCheck() are already enormous improvements in terms of sanity-ensuring tooling and I would like to thank you for taking the time to discuss this with me.

Also contextualy relevant to this topic, but out the package's codebase, I'm considering a very basic checksum mechanism where I'll keep a separate dependency tracking collection to be asynchronously populated after job collection updates, only to use as a cross reference slash lookup table for triaging job regressions. I'd certainly like to hear your comment on this. I could perhaps even forge this into an addon package... What do you think?

vsivsi commented 7 years ago

I'm considering a very basic checksum mechanism where I'll keep a separate dependency tracking collection to be asynchronously populated after job collection updates, only to use as a cross reference slash lookup table for triaging job regressions. I'd certainly like to hear your comment on this.

Sounds like another opportunity for out-of-sync states and race conditions. If you just want to use such a mechanism to help with fault detection and debugging, then that's probably fine, but the job-collection had better be the "source of truth" about your jobs and dependencies. If it isn't, then it sounds like your implementation is seriously broken and/or job-collection isn't the right tool for your application. I'm usually pretty skeptical of band-aides like this.

serkandurusoy commented 7 years ago

If you just want to use such a mechanism to help with fault detection and debugging

Yes, that's exactly what I have in mind.

BTW:

We're having this exact problem very consistently. Even today, almost 10 different jobs have gotten stuck in waiting state with a job id in their dependency while that job had already been completed successfully.

We're trying to understand if there's anything we can do on our implementation to prevent this.

FYI, we are suspecting mongodb atlas because the problem started happening 3-4 weeks ago where we did a migration from mlab to atlas.

vsivsi commented 7 years ago

You should be able to look at the timestamps on the logged events in each job document to reconstruct the order in which things are happening. It also seems like you should really turn on logging in the server (or write custom event hooks) to be able to get a fuller picture of what is happening. This sounds like you have a timing bug in your job scheduling, or an unhandled error on some DB operation. The data point about this only cropping up after the migration is a major clue. It could be errors accessing the database, or simply different latency and timings that are revealing races in your implementation. I'm not ruling out a bug in job-collection, but I'd need to see a clear minimal reproduction in order to pursue that.

serkandurusoy commented 7 years ago

We have 10s of jobs, each with 5 second poll intervals working on multiple app instances where job steps often involve external api calls followed by mongodb writes.

I did try enabling logging but it simply flooded the output.

But in any case, does this kind of volume sound like a stretch or do you think it should be well within the range of what we can expect to be handled?

What still puzzles me is the fact that, there's no actual failure. It is just that there's job completion, but the completion is not propagated to the dependent job. If it had been an error, would it not surface in the form of a failure? When I examine the log entries in the job collection, I can see that the "Dependency resolved" entries are missing for those cases.

vsivsi commented 7 years ago

Here's a suggestion, write a server-side event handler for jobDone operations.

You can see an example near the bottom of this section: https://github.com/vsivsi/meteor-job-collection#jcevents---server-only

In the event handler, you can write a query to find any jobs that just completed with remaining dependent jobs using code like:

jc.events.on('jobDone', function (msg) {
    if (!msg.error) {
      var depends = jc.find({  depends: { $all: [ msg.params[0] ] }}).fetch()
      if (depends.length > 0) { 
        // This job didn't properly resolve its dependencies! 
      }
    } else {
       // Log the error information
    }
  });

This should help you zero in on precisely when this is happening and give you a "hook" to produce better diagnostics at the precice moment of failure.

vsivsi commented 7 years ago

And just to note, the code above will only work if the dependent job actually exists when the antecedent finishes.

To detect the opposite case, you could write a similar event handler for jobSave to check that any dependent jobs actually exist and are in one of the states in jc.jobStatusCancellable (that is, capable of eventually calling job.done() to resolve the dependency).

serkandurusoy commented 7 years ago

Hey Vaughn, thank you for the pointer. In our case, dependent jobs already exist so I'll try the jobDone event.

In the meantime, I've spotted https://github.com/vsivsi/meteor-job-collection/issues/211 which might also be helpful in our case, do you plan on releasing that?

And as for the enhancements we've discussed above, do you still think you'd be able to release them soon?

Thank you very much.

EDIT: Do you think https://github.com/vsivsi/meteor-job-collection/issues/188 would be relevant to our case? Because all our apps are connecting to a replicaset.

vsivsi commented 7 years ago

Hi, I'll try to make progress on all of this over the weekend. I won't have released anything on Monday, but I will try hard to have a branch for you to test off of. I will merge the branch from #211 into that.

As for the question in #188, uh yes. If you are running MongoDB with replica sets, you need to think deeply about how that is all configured, and the advice I give in the answers there remains the same today. Achieving consistency with MongoDB in the face of hardware/network failures is non-trivial, and as it is a general issue not specific to job-collection, it's really out of scope to even try to help with that.

Packages like job-collection are written assuming you know what you are doing on the MongoDB deployment end of things, and if you don't, there is nothing they can do to save you...

vsivsi commented 7 years ago

Okay, I have some changes on the dependency-checks branch. I've added functionality to job.save() to check for completed, failed, cancelled or missing antecedent jobs. Completed antecedents are moved from depends to resolved, the other three cases lead to the saved job being immediately cancelled. That is consistent with what happens when an antecedent job fails fatally, all dependent jobs (the whole tree) are cancelled. When this happens, job.save() returns null.

I've exposed the single job dependency checker as an internal method on the JobCollection object called jc._checkDeps(jobDoc, [dryRun]).

If run with dryRun=false then it will check the current dependencies and resolve/cancel exactly as described above for a freshly saved job. It returns false if the job was cancelled, true otherwise. If run with dryRun=true (the default), then it makes no changes to the jobCollection, and instead returns an object like:

{
   jobId: '8jsHbvi5TNokf6KrJ',
   resolved: [ '6ym3c7uB9sQGBReHF' ],
   failed: [ 'xqrP5BDg2y8Q7XEfP' ],
   cancelled: [ 'GodEuWahbqGJM8Boe' ],
   removed: [ 'L6mgpwhrNsY8Eqqbu' ]
}

If the job has no dependencies, or if they are all still awaiting resolution by valid antecedent jobs, then the dry run will return false.

So with this, you can write a global dependency checker/fixer like:

jc.find({ status: 'waiting' }, { fields: { _id: 1, depends: 1 } })
  .forEach((doc) => jc._checkDeps(doc, false))

As we discussed, this should only be done on a shutdown JobCollection.

If you want to check a collection for problems, an easy way would be:

// Problems detected if true
probs = jc.find({ status: 'waiting' }, { fields: { _id: 1, depends: 1 } })
  .map(jc._checkDeps)
  .some((val) => val)

If you actually want information about all of the jobs with problem dependencies:

jc.find({ status: 'waiting' }, { fields: { _id: 1, depends: 1 } })
  .map(jc._checkDeps)
  .filter((doc) => doc)

Etcetera.

Please try out this branch and let me know how it goes.

serkandurusoy commented 7 years ago

Progress update: it seems I may need to change a few things about how the jovs were originally structured, so I'm doing that in preparation before implementing this. I will comment once I get that done and this one implemented for my problem queue. Thank you.

serkandurusoy commented 7 years ago

I see that you reset "recoverable" jobs to status waiting here

Do you think this is a safe assumption? I guess it is, but just wanted to double check.

vsivsi commented 7 years ago

That code doesn't reset them to waiting, it requires that they are still waiting at the time of the update. If the job is paused or cancelled in the meantime, then we don't want to touch it. And it shouldn't be running, completed, failed etc. if it still has unresolved dependencies.

serkandurusoy commented 7 years ago

Hey @vsivsi I've implemented the branch and it is now being tested.

I've not yet implemented the manual check, though due to:

As we discussed, this should only be done on a shutdown JobCollection.

So how do you suggest I should implement the whole shutdown => do maintenance => startup workflow?

Do you think I can use a dedicated cron-style job to carry out the maintenance or do you think this would be a problem since the job itself will have to shut down the job server?

If that's a problem, would you recommend something like synced-cron?

Even if that's not a problem, would you still recommend using some other job manager for jc maintenance just because it would be a better/stricter separation?

vsivsi commented 7 years ago

I haven't tried it, but you should be able to shutdown and restart the job collection using a job.

The only wrinkle is that you need to execute job.done() before you run jc.shutdownJobServer() within the worker function (otherwise that job will be marked as failed by the server when it shuts down). But nothing stops you from doing that, and then not executing the callback until after you have shutdown, cleaned, and restarted the server. It just means that you'll need to devise a separate way of logging any status/errors once the jc server is down. You can of course queue up anything you want to log in the worker context and log it to the completed job once the server comes back up.

serkandurusoy commented 7 years ago

Hm, yes, this sounds a little tricky in the sense that it sounds more susceptible to failure, but thanks for the walkthrough. It would have taken me time to figure out the correct order of operations myself!

I'll now close this because I think this issue has served its purpose. Thank you.

serkandurusoy commented 7 years ago

Ah sorry, closed by mistake. But I did implement the unchained version and I do confirm that this works as expected.

If you don't mind I'd like to keep this issue open for a couple of days while I test this code in production and confirm that I no longer have stale jobs with dependency issues.

vsivsi commented 7 years ago

One more thing. In any case, you will need to wait until the timeout value in the shutdown has passed (default 1 minute). The shutdown call will return nearly immediately, but jobs will still be in motion until after that timeout expires, so don't do any cleaning until then.

Fine with keeping it open...

serkandurusoy commented 7 years ago

Thank you for that pointer.

I guess when I'm done with this, I owe you a paragraph summarizing this so that you can add it to the readme :)

serkandurusoy commented 7 years ago

@vsivsi I can confirm that this is working fine in production. Do you plan on releasing this on atmosphere soon?

vsivsi commented 7 years ago

Busy week, but I should be able to do it sometime in the next few days.

serkandurusoy commented 7 years ago

That would be awesome, thank you!

vsivsi commented 7 years ago

All of this was just released in version 1.5.0