Closed dcollinsf5 closed 9 years ago
Just a note for anyone else following in my footsteps later.
Because error packets can be emitted many times but a Promise can only be rejected once, the fusion of the two is not a good match. If you are going to use promises in your worker task, you'll need something like this as a final catch() on your promise chain to make sure that the Gearman server is notified that there was an unrecoverable error in your code.
.catch(function(err) {
log(name, task, "ERROR: " + err.stack);
// .error() merely emits an error type packet to the server
task.error(err);
// .end() actually tells the server that the task is no longer running on the worker
task.end(err);
});
If you just throw from your promise-returning task function, you will see ghost tasks piling up on the Gearman server for every job that you threw an error from a promise-handling function on. IMO, using Abraxas with a promise-returning function should result in Abraxas calling task.end for you in the promise rejection handler, as a rejected Promise can never reject again, or resolve as a different value, thus rejected promises returned from a task are a de-facto task.end().
oh hi, paying attention here again =D
Ok, so there are three error-ish type packets:
WORK_WARNING – This is non-fatal and can be issued multiple times WORK_EXCEPTION – This is fatal and come with a payload as the message WORK_FAIL – This is fatal and does NOT come with a payload as the message
A client may emit any number of warning events, but it should only reject on WORK_EXCEPTION or WORK_FAIL. And ONLY one of those per job is actually valid. So there shouldn't be any doubling up– if there is, there's a bug elsewhere.
I agree with your analysis– if your worker is returning a promise (or throwing in its handler) this should be handled cleanly, by basically adding the catch that you have documented there here:
task.error
sending WORK_FAIL
or WORK_EXCEPTION
is what actually terminates the task on the gearman server. task.end
should be unnecessary
Thanks for working this– the fix is in abraxas@2.1.1 on the registry now.
Hello! :) I think that this didn't resolve the issue, as I just tested with 2.1.1 with the same results when my task throws an error. https://github.com/iarna/abraxas/commit/c9e8cdacfcb14feb95096b34aab9c0e74fe4df43
I'm moving the comment on c9e8cda over here, so that it's easier to discuss:
Hello, While your code is calling task.error on line 136-138 now, this doesn't resolve the original problem - your code was actually already calling task.error in the case I described in #15, as a result of the logic run on lines 242-257. The problem is that the task.error call on line 252 (and now, the task.error call on line 137) does not result in the server thinking the job is finished; the job remains listed in queued & running indefinitely.
Per this post #15 (comment), the only way I can get both (a) a JobException error on the server AND (b) have the server think the task is no longer running is to call both task.error() (so the JobException is sent) and then call task.end() (so the server no longer thinks the task is running).
Ah yes, you're right, I missed https://github.com/iarna/abraxas/blob/master/worker.js#L249 which is actually what allows error
to be called.
So let's work through this.
task.error
is defined in https://github.com/iarna/abraxas/blob/master/task-worker.js#L55-L59
It calls work.socket.workException
which sends either a WORK_WARN & WORK_FAIL packets OR it sends a WORK_EXCEPTION packet to the server.
It then calls this.client.endWork(this.jobid)
By contrast, end
queues up the chunk it was given and then calls WritableStream.end
, which just closes the task stream. THAT in turn is going to trigger:
https://github.com/iarna/abraxas/blob/master/worker.js#L216-L221
if (socket.connected) {
socket.workComplete(jobid,task.lastChunk);
}
self.endWork(jobid);
So they both call endWork
, which is just bookkeeping and requesting more jobs from the job queue:
Worker.endWork = function (jobid) {
delete this._activeJobs[jobid];
-- this._activeJobsCount;
this.askForWork();
}
So the only actual difference is workComplete
which sends the WORK_COMPLETE packet.
This is how the various WORK_COMPLETE
, WORK_FAIL
and WORK_EXCEPTION
are defined according to the spec (http://gearman.org/protocol/):
WORK_COMPLETE
This is to notify the server (and any listening clients) that
the job completed successfully.
Arguments:
- NULL byte terminated job handle.
- Opaque data that is returned to the client as a response.
WORK_FAIL
This is to notify the server (and any listening clients) that
the job failed.
Arguments:
- Job handle.
WORK_EXCEPTION
This is to notify the server (and any listening clients) that
the job failed with the given exception.
Arguments:
- NULL byte terminated job handle.
So I think the current behavior is correct in so far as the spec is concerned, and this sounds like a bug in the gearmand server that you're using. I'll check back by here with notes from the source to a few different implementations in a bit.
In the meantime: Which gearmand server are you using?
Ok, so checking other implementations, first the original Perl implementation…
The exception code is here:
https://metacpan.org/source/DORMANDO/Gearman-Server-1.11/lib/Gearman/Server/Client.pm#L284
Versus the fail code here:
https://metacpan.org/source/DORMANDO/Gearman-Server-1.11/lib/Gearman/Server/Client.pm#L266
So it seems that it DOES NOT treat exceptions as fatal and DOES still require the WORK_FAIL packet. So it seems the protocol spec is just wrong.
So I double checked with the c++ version (1.1.12) and good thing I did. It seems to follow the protocol document– this isn't surprising as they're the one's who published it. You can see below that WORK_COMPLETE and WORK_EXCEPTION both cleanup in the same ways.
case GEARMAN_COMMAND_WORK_COMPLETE:
{
gearman_server_job_st *server_job= gearman_server_job_get(Server,
(char *)(packet->arg[0]), (size_t)strlen(packet->arg[0]),
server_con);
if (server_job == NULL)
{
return _server_error_packet(GEARMAN_DEFAULT_LOG_PARAM, server_con, GEARMAN_JOB_NOT_FOUND, gearman_literal_param("Job given in work result not found"));
}
/* Queue the complete packet for all clients. */
ret= _server_queue_work_data(server_job, packet,
GEARMAN_COMMAND_WORK_COMPLETE);
if (gearmand_failed(ret))
{
return gearmand_gerror("_server_queue_work_data", ret);
}
/* Remove from persistent queue if one exists. */
if (server_job->job_queued)
{
ret= gearman_queue_done(Server,
server_job->unique,
server_job->unique_length,
server_job->function->function_name,
server_job->function->function_name_size);
if (gearmand_failed(ret))
{
return gearmand_gerror("Remove from persistent queue", ret);
}
}
/* Job is done, remove it. */
gearman_server_job_free(server_job);
}
break;
case GEARMAN_COMMAND_WORK_EXCEPTION:
{
gearman_server_job_st *server_job= gearman_server_job_get(Server,
(char *)(packet->arg[0]), (size_t)strlen(packet->arg[0]),
server_con);
gearmand_log_debug(GEARMAN_DEFAULT_LOG_PARAM,
"Exception being sent from: %.*s(%lu)",
server_job->function->function_name_size, server_job->function->function_name, server_job->function->function_name_size);
if (server_job == NULL)
{
return _server_error_packet(GEARMAN_DEFAULT_LOG_PARAM, server_con, GEARMAN_JOB_NOT_FOUND,
gearman_literal_param("An exception was received for a job that does not exist"));
}
/* Queue the exception packet for all clients. */
ret= _server_queue_work_data(server_job, packet, GEARMAN_COMMAND_WORK_EXCEPTION);
if (gearmand_failed(ret))
{
return gearmand_gerror("_server_queue_work_data", ret);
}
/* Remove from persistent queue if one exists. */
if (server_job->job_queued)
{
ret= gearman_queue_done(Server,
server_job->unique,
server_job->unique_length,
server_job->function->function_name,
server_job->function->function_name_size);
if (gearmand_failed(ret))
{
return gearmand_gerror("Remove from persistent queue", ret);
}
}
/* Job is done, remove it. */
gearman_server_job_free(server_job);
}
break;
Ok, so for completeness, I checked the java server, and it gets better!!
They implement this like Perl does, without marking the job as failed. They also note:
// Note: The protocol states this packet notifies the server that the specified job
// has failed. However, the C server does not fail the Job. It is effectively a
// WORK_WARNING packet that is only sent to clients that have specified they want
// exceptions forwarded to them. This server will do the same as long as the C
// server does so.
Annoyingly the C version is in bzr and I don't feel like setting that up to find out when they finally fixed their code to match their docs. Regardless, being able to work with older implementations is definitely something I want to support.
Wow, that is interesting. FYI, we are running gearmand 1.12 from https://launchpad.net/gearmand/ - we installed it from the Ubuntu package.
Welp, I'm going to assume there's some subtly to the 1.1.12 source that I missed and that it really doesn't handle them as fails. Either way, I'm gonna make workException
send a WORK_FAIL
packet to keep everyone happy.
Ok, I've pushed 2.1.3
with this fix.
That seems to have done the trick :) Thanks for your time and attention; we like Abraxas :)
Glad to hear it, on both counts! =)
I am trying to understand some unexpected behavior, and am not sure if it is normal behavior that I simply don't understand, or if I am encountering a bug.
Here's the scenario: A Worker begins running a new task; the task is a promise-returning function. The Worker throws an error during processing; it is thrown all the way up and becomes a Rejection. I see it emitted across the link to the Abraxas Client as a JobException type error. I have ensured that the client is receiving the JobException error in the submitJob() callback.
The strange thing is that even after the error has been received by the Client, the Gearman server still lists the task as 'Running' (and 'Queued', strangely?) indefinitely, and no other worker attempts to re-try the task (obviously).
If I actually kill the worker's node process manually from shell, another worker immediately attempts to retry the task. Although my current code uses a (bluebird) Promise returning function to run the task on the Worker, I have also tried calling task.error() manually in my promise chain's callback, rather than throwing an error in the promise chain, but with the same results. The only thing that properly cleans up the running process is calling task.end() in my worker-side catch(), but this is not the behavior I want, as Gearman then things the task finished successfully and no other worker picks it up.
So I guess my questions are:
This seems like such a common use case that I can't help but think I am missing something basic about how to manage tasks.