vsivsi / meteor-job-collection

A persistent and reactive job queue for Meteor, supporting distributed workers that can run anywhere.
https://atmospherejs.com/vsivsi/job-collection
Other
388 stars 68 forks source link

Error caught, fail called, but failure reported as worker timeout #225

Closed serkandurusoy closed 7 years ago

serkandurusoy commented 7 years ago

I have an inherited codebase where one piece of code structure like so:

MyQueue.processJobs(MY_JOB, {
  pollInterval: 5000,
  workTimeout: 15000,
}, async(job, callback)=> {
  try {
    const {foo,bar} = job.data;
    const sendPromise = new Promise((resolve, reject)=> {
      thirdPartyAsyncLibrary.doSomething({
        foo,
        bar
      }, res=> {
        resolve(res[0]);
      }, err=> {
        reject(err);
      });
    });
    const res = await sendPromise;
    job.done(res);
  } catch (err) {
    console.error('MY_JOB error', err);
    job.fail(err.toString());
  } finally {
    callback();
  }
})

Now, here's the interesting part,

On my server logs, I see:

MY_JOB error [Error: Meteor code must always run within a Fiber. Try wrapping callbacks that you pass to non-Meteor libraries with Meteor.bindEnvironment.]

which I can understand and will fix.

But, what puzzles me is, when I look at this job on my database, I see the following:

{ 
    "_id" : "iY4d59NXHdRwZrnXp", 
    "runId" : null, 
    "type" : "MY_JOB", 
    "data" : {
        "foo" : "foo", 
        "bar" : "bar", 
    }, 
    "status" : "failed", 
    "updated" : ISODate("2017-03-27T09:03:36.823+0000"), 
    "created" : ISODate("2017-03-27T08:55:06.634+0000"), 
    "priority" : NumberInt(0), 
    "retries" : NumberInt(0), 
    "repeatRetries" : NumberInt(6), 
    "retryWait" : NumberInt(10000), 
    "retried" : NumberInt(6), 
    "retryBackoff" : "exponential", 
    "retryUntil" : ISODate("275760-09-13T00:00:00.000+0000"), 
    "repeats" : NumberInt(0), 
    "repeatWait" : NumberInt(300000), 
    "repeated" : NumberInt(0), 
    "repeatUntil" : ISODate("275760-09-13T00:00:00.000+0000"), 
    "after" : ISODate("2017-03-27T09:08:56.823+0000"), 
    "progress" : {
        "completed" : NumberInt(0), 
        "total" : NumberInt(1), 
        "percent" : NumberInt(0)
    }, 
    "depends" : [

    ], 
    "resolved" : [

    ], 
    "log" : [
        {
            "time" : ISODate("2017-03-27T08:55:06.634+0000"), 
            "runId" : null, 
            "level" : "info", 
            "message" : "Constructed"
        }, 
        {
            "time" : ISODate("2017-03-27T08:55:06.634+0000"), 
            "runId" : null, 
            "message" : "Job Submitted", 
            "level" : "info"
        }, 
        {
            "time" : ISODate("2017-03-27T08:55:06.639+0000"), 
            "runId" : null, 
            "message" : "Promoted to ready", 
            "level" : "info"
        }, 
        {
            "time" : ISODate("2017-03-27T08:55:06.717+0000"), 
            "runId" : "rHEF6KsxJf43TiPyf", 
            "message" : "Job Running", 
            "level" : "info"
        }, 
        {
            "time" : ISODate("2017-03-27T08:55:21.747+0000"), 
            "runId" : "rHEF6KsxJf43TiPyf", 
            "message" : "Job Failed with Error: Failed for exceeding worker set workTimeout.", 
            "level" : "warning"
        }, 
        {
            "time" : ISODate("2017-03-27T08:55:36.738+0000"), 
            "runId" : null, 
            "message" : "Promoted to ready", 
            "level" : "info"
        }, 
        {
            "time" : ISODate("2017-03-27T08:55:36.815+0000"), 
            "runId" : "5vkdRajSCauNCNtbE", 
            "message" : "Job Running", 
            "level" : "info"
        }, 
        {
            "time" : ISODate("2017-03-27T08:56:06.748+0000"), 
            "runId" : "5vkdRajSCauNCNtbE", 
            "message" : "Job Failed with Error: Failed for exceeding worker set workTimeout.", 
            "level" : "warning"
        }, 
        {
            "time" : ISODate("2017-03-27T08:56:36.742+0000"), 
            "runId" : null, 
            "message" : "Promoted to ready", 
            "level" : "info"
        }, 
        {
            "time" : ISODate("2017-03-27T08:56:36.813+0000"), 
            "runId" : "DsLhnxg7yE4fjPTgQ", 
            "message" : "Job Running", 
            "level" : "info"
        }, 
        {
            "time" : ISODate("2017-03-27T08:57:06.747+0000"), 
            "runId" : "DsLhnxg7yE4fjPTgQ", 
            "message" : "Job Failed with Error: Failed for exceeding worker set workTimeout.", 
            "level" : "warning"
        }, 
        {
            "time" : ISODate("2017-03-27T08:57:51.750+0000"), 
            "runId" : null, 
            "message" : "Promoted to ready", 
            "level" : "info"
        }, 
        {
            "time" : ISODate("2017-03-27T08:57:51.768+0000"), 
            "runId" : "voHZ7RzstWZaCuoHc", 
            "message" : "Job Running", 
            "level" : "info"
        }, 
        {
            "time" : ISODate("2017-03-27T08:58:21.817+0000"), 
            "runId" : "voHZ7RzstWZaCuoHc", 
            "message" : "Job Failed with Error: Failed for exceeding worker set workTimeout.", 
            "level" : "warning"
        }, 
        {
            "time" : ISODate("2017-03-27T08:59:51.760+0000"), 
            "runId" : null, 
            "message" : "Promoted to ready", 
            "level" : "info"
        }, 
        {
            "time" : ISODate("2017-03-27T08:59:51.827+0000"), 
            "runId" : "svcCKF6p2FgN4JWvp", 
            "message" : "Job Running", 
            "level" : "info"
        }, 
        {
            "time" : ISODate("2017-03-27T09:00:21.769+0000"), 
            "runId" : "svcCKF6p2FgN4JWvp", 
            "message" : "Job Failed with Error: Failed for exceeding worker set workTimeout.", 
            "level" : "warning"
        }, 
        {
            "time" : ISODate("2017-03-27T09:03:06.817+0000"), 
            "runId" : null, 
            "message" : "Promoted to ready", 
            "level" : "info"
        }, 
        {
            "time" : ISODate("2017-03-27T09:03:06.842+0000"), 
            "runId" : "7dJqpiRHYjrD2skpj", 
            "message" : "Job Running", 
            "level" : "info"
        }, 
        {
            "time" : ISODate("2017-03-27T09:03:36.827+0000"), 
            "runId" : "7dJqpiRHYjrD2skpj", 
            "message" : "Job Failed with Fatal Error: Failed for exceeding worker set workTimeout.", 
            "level" : "danger"
        }
    ], 
    "workTimeout" : NumberInt(15000), 
    "expiresAfter" : ISODate("2017-03-27T09:03:21.839+0000"), 
    "failures" : [
        {
            "value" : "Failed for exceeding worker set workTimeout", 
            "runId" : "rHEF6KsxJf43TiPyf"
        }, 
        {
            "value" : "Failed for exceeding worker set workTimeout", 
            "runId" : "5vkdRajSCauNCNtbE"
        }, 
        {
            "value" : "Failed for exceeding worker set workTimeout", 
            "runId" : "DsLhnxg7yE4fjPTgQ"
        }, 
        {
            "value" : "Failed for exceeding worker set workTimeout", 
            "runId" : "voHZ7RzstWZaCuoHc"
        }, 
        {
            "value" : "Failed for exceeding worker set workTimeout", 
            "runId" : "svcCKF6p2FgN4JWvp"
        }, 
        {
            "value" : "Failed for exceeding worker set workTimeout", 
            "runId" : "7dJqpiRHYjrD2skpj"
        }
    ]
}

Here it says that the failure is due to timeout. But the code looks like it does a proper job catching the error, reporting it as a failure and using the finally catcall block to ensure callback is called.

What am I missing here? (obviously I can increase timeout but it feels like a betting-game) Or is there a best practice you can suggest for situations like these?

For success situations, I can try and make sure the worker functions are idempotent, but for failure, I don't know how to override/extend the timeout information with the actual information?

For better context, this one specific is one where we use mandrill to send off an email to a single recipient.

vsivsi commented 7 years ago

Hi, so it looks like your worker is taking longer than 15 seconds to complete. This is not an error that the worker sees or catches. It is a server-side "kill" of the job for taking too long. When you use the option workTimeout: 15000, you are telling the server, "hey, if you don't hear from this job at least every 15 seconds (job.log, job.progress, or job.done), then assume it is dead and fail it so that it can be retried on a new worker. Without this, many users have found that crashed/disconnected workers could lead to "stuck" jobs that appeared to be running, but with a long-dead worker. This way the server can "reap" those dead jobs and reassign them to living/restarted workers.

So there are two ways around this, either increase the timeout to be much longer than any actual job should ever take. Such that you can be certain that worker must be dead. Or, you can call job.progress or job.log every so often from a delayed worker the let the server know that the job is still alive. Each time you do that, the timeout countdown is reset.

Hope that helps.

serkandurusoy commented 7 years ago

Hm, but there are a few problems here

1) The original error Meteor code must always run within a Fiber. Try wrapping callbacks that you pass to non-Meteor libraries with Meteor.bindEnvironment. is unlikely to cause or be caused by a timeout. It is like any other error 2) The error is caught but simply ignored, there's no way I can attach this error back to the job 3) My job.fail and callback are getting ignored, I would have expected them to be accepted, even after a timeout, in a way that at least helps me debug the problem, perhaps in the form of additional log entries on the job collection.

The current implementation hides away the fact that timeout does not in fact kill anything, it just simply gets into ignore mode, right?

It is okay if you feel that this is a good compromise, but I would nonetheless like to hear your opinions on my options to increase the visibility of these situations.

Thank you.

vsivsi commented 7 years ago

Okay, so two things:

1) Do you know about Meteor.bindEnvironment()? If you wrap your async callback functions with it, it will cause them to play nice with fibers. This is the root cause of your exception and has nothing to do with JobCollection. Google should lead you to some good explainers. TL;DR for the rest: this is not a bug in JobCollection. You should read-up on Fibers, their use by the Meteor server-side, how they interact with non-Meteor async functions, and how to mitigate that with bindEnvironment and other tools.

2) This error is special because Fibers are special. The entire server-side of Meteor depends on Fibers. When you make an async call and the callback is invoked outside of the calling fiber, all hell breaks loose. Catching the exception doesn't fix this situation. Fibers exists outside of the JS engine/node.js. Their use in the Meteor server implementation was/is controversial, but that ship has sailed. So the failure of JobCollection to properly "fail" your job is because JobCollection literally can't properly operate on the server-side when not running in a fiber. The reason is that Meteor system API calls themselves won't work, and JobCollection is built on top of Meteor.

None of this is specific to JobCollection. Try making your async call outside of a JobCollection worker on the server, and then try to make a Meteor system call that does an async operation (one with an optional callback) from inside the async callback. It won't work. Wrapping the failing Meteor call in try/catch logic will catch the error, but it won't restore the fiber, and so any other async Meteor calls will still fail inside the catch. There is no way for the catch to recover, because the fiber context for the async caller has been lost.

One of the job-collection sample apps uses an external async library properly within a worker on the Meteor server. The code is coffeescript, but you can find the relevant code here: https://github.com/vsivsi/meteor-file-job-sample-app/blob/master/sample.coffee#L538 https://github.com/vsivsi/meteor-file-job-sample-app/blob/master/sample.coffee#L560 Etc.

Note that every single async callback function provided to a non-Meteor library needs to be wrapped by bindEnvironment!

serkandurusoy commented 7 years ago

Thank you Vaughn for the explanation, but I must insist you're perhaps looking at this the wrong way.

You don't need to be defensive here, I'm not raising a bug report on jobcollection.

This is merely a suggestion of a collaborative effort to find a "best practice" for a condition that might be happening for a lot of people, but they are not able to realize it due to how timeouts are handled.

I have no problem with fibers (or any meteor tooling that needs to play nice with it) and in fact this was a very easy problem to fix, a recently updated npm dependency's peerdependency was merely overriding meteor's Promise global (which await waits for) and I solved it by explicitly relying on meteor's Promise.await which wraps fibers transparently.

As for the catch not handling the errors, that's where I think you are misunderstanding me.

The catch successfully handles the error as it would have caught any other error. (fibers is simply irrelevant here and it is just a coincidence that I reported you this one).

My problem is, I have no means to tell jobcollection "after a timeout" that an error (or even success for that matter) has occured.

I am not saying that jobcollection has a bug.

I am saying that it would be nice if jobcollection exposed some method (apart from a manual edit on the doc itself) that allowed me to update the job document's log/failures array with the new information which happens to arrive after the timeout, at which point, jobcollection ignores the new information.

So let me summarize the problem:

I hope this makes it more clear.

Thank you for your time.

vsivsi commented 7 years ago

I see. There is no restriction on calling job.log() (even after a job has failed for any reason).

You can log to jobs before they are saved, while they are waiting, when they are running, after they are completed, cancelled or failed. Anytime really.

So if all you want to do is log the fact that the operation completed after the timeout, you can log that using job.log(). What you can't do is retroactively 'complete' the job simply by calling job.done() after it has already failed.

Well, you can on the server by updating the job document directly. And I suppose you could write an event handler for jobLog calls to automatically do so when the log message meets some criteria, and then have that trigger an update to revert from 'failed' to 'completed', but it gets complicated if you are using auto-retry and the job is already waiting (or even running!) to retry the work.

But really this approach would all be a bit of a hack. The best way to forestall all of this is simply for the worker to either report progress (even if it hasn't changed, "still 50% done!") or to log the delay (e.g. "async API call taking abnormally long to complete"); and be sure to do one of those things often enough to keep the server-side timeout from killing the job.

It's no big deal to write a worker watchdog timer that checks in with the server occasionally while waiting for a long running async process to complete, and if I finally understand your issue, that seems like the most straightforward answer: don't let the server kill a delayed worker in the first place:

// In your worker function
function watchdog () {
   job.log("Still waiting for process to finish!")
}
var wd = Meteor.setInterval(watchdog, 7500)

// ... In the callback of the async process ...
Meteor.clearInterval(wd)
job.done()
serkandurusoy commented 7 years ago

Hey Vaughn, this is the aha! reply I was waiting, awesome discussion, thank you!

First; I agree that a timed out job should definitely fail and not hacked back to life.

But for sake of discussion,

The best way to forestall all of this is simply for the worker to either report progress (even if it hasn't changed, "still 50% done!") or to log the delay (e.g. "async API call taking abnormally long to complete"); and be sure to do one of those things often enough to keep the server-side timeout from killing the job.

Would you rather:

But I guess this method (with the snippet you've shared) makes timeouts moot and now I wonder, does it not have the same effect of simply setting workTimeout: undefined?

Also, in my original example, to be more precise, my problem was the fact that I did not want to override the timeout, but simply wanted to create an additional after-timeout-failure-log for tracability, so for that:

So if all you want to do is log the fact that the operation completed after the timeout, you can log that using job.log().

This does not override the timeout and simply appends a log entry to the failed job, right?

EDIT: Perhaps it might be a good idea to edit the readme's relevant sections from

workTimeout -- When requesting work, tells the server to automatically fail the requested job(s) if more than workTimeout milliseconds elapses between updates (job.progress(), job.log()) from the worker, before processing on the job is completed. This is optional, and allows the server to automatically fail running jobs that may never finish because a worker went down or lost connectivity. Default: undefined

perhaps to

workTimeout -- When requesting work, tells the server to automatically fail the requested job(s) if more than workTimeout milliseconds elapses between updates (job.progress(), job.log()) from the worker, before processing on the job is completed. This is optional, and allows the server to automatically fail running jobs that may never finish because a worker went down or lost connectivity. If job.progress() or job.log() are called before the timeout, they reset the timeout, furthermore, job.log() can be called even after the timeout to create additional log entry, but it won't recover the job from a timeout based failure. Default: undefined

vsivsi commented 7 years ago

Would you rather:

  • Report progress
  • Log something (delay, whatever)

Personally I would only report progress when there is actually progress, doing otherwise is essentially invisible, with no trace left in the job document. Logging a delay leaves a record of the delay. Seems more useful. However, job.progress() does have one useful attribute: it will fail if the job has been cancelled or failed on the server. This is a signal for the worker to stop working on this job. So depending on the application, it may be best to do both.

does it not have the same effect of simply setting workTimeout: undefined?

Remember, the purpose of workTimeout is to allow the server to recover a job from a dead/disconnected worker. If your workers are running remotely and some of them lose network connectivity, or power, etc. you want the workTimeout to kill any jobs they were running so that other still good workers can retry those "running" jobs that will otherwise never complete. A dead worker can't "check in" with the server. That's the difference.

This does not override the timeout and simply appends a log entry to the failed job, right?

If the job has already failed on the server, calling job.log() will simply append a log entry (with the failed runId). The worker will discover that the job has failed when it tries to call job.done(). This assumes you haven't been using job.progress()

Thanks for the suggested text, I'll add that in the next update.

serkandurusoy commented 7 years ago

Thank you, I've implemented job.log(error).