chrisboulton / php-resque

PHP port of resque (Workers and Queueing)
MIT License
3.44k stars 761 forks source link

named queue jobs #185

Open ryanhungate opened 10 years ago

ryanhungate commented 10 years ago

So, im wondering if it would hurt in any way, if while creating a job that we have the ability to pass in a named monitor, instead of a bool true / false so that it makes it easier to check statuses by a readable name that would make sense to the application. I do this mainly for checking to see if jobs exist in the pipe that are 'supposed' to be there.

In the Resque_Job class

public static function create($queue, $class, $args = null, $monitor = false)
{
    if($args !== null && !is_array($args)) {
        throw new InvalidArgumentException(
            'Supplied $args must be an array.'
        );
    }

    //// start change
    if(is_string($monitor)) {
        $id = $monitor;
    } else {
        $id = md5(uniqid('', true));
    }
    //// end change

    Resque::push($queue, array(
        'class' => $class,
        'args'  => array($args),
        'id'    => $id,
    ));

    if($monitor) {
        Resque_Job_Status::create($id);
    }

    return $id;
}
danhunsaker commented 10 years ago

Certainly seems doable... I'd be a bit concerned with uniqueness, which is hard to do with tons of jobs flying around, but with a light load (or only a few jobs using the option), it should work.

I would like to point out, though, that your application does get the ID back from the enqueue operation, and could store that internally for doing such checks. If you're worried about persisting across requests, toss it in Redis. Say, {application}:jobid:{jobname} for the key, and either the most recent ID for the value, or a list/set of the IDs that job has used within a certain time period, depending on your use case scenario. This approach prevents having two jobs running with the same ID, even by accident.

ryanhungate commented 10 years ago

Ah, i actually was kind of under the impression that if say you had a job named ( store.10.order.100.service ) that if you passed in that same id, that it would basically overwrite that same job to the redis db. If that’s not the case, then I see your point.

On May 1, 2014, at 6:27 AM, Daniel Hunsaker notifications@github.com wrote:

Certainly seems doable... I'd be a bit concerned with uniqueness, which is hard to do with tons of jobs flying around, but with a light load (or only a few jobs using the option), it should work.

I would like to point out, though, that your application does get the ID back from the enqueue operation, and could store that internally for doing such checks. If you're worried about persisting across requests, toss it in Redis. Say, {application}:jobid:{jobname} for the key, and either the most recent ID for the value, or a list/set of the IDs that job has used within a certain time period, depending on your use case scenario. This approach prevents having two jobs running with the same ID, even by accident.

— Reply to this email directly or view it on GitHub.

ryanhungate commented 10 years ago

speaking of persisting… are there any issues that have come up using Redis for the storage in the past that caused pain? I currently use a mysql storage on my queue that i wrote, and Im trying to see about lightening up the usage on our core db if at all possible, but for some reason it scares me a little bit with redis. I tend to only use it for things that I can loose, like caching stuff.

On May 1, 2014, at 6:27 AM, Daniel Hunsaker notifications@github.com wrote:

Certainly seems doable... I'd be a bit concerned with uniqueness, which is hard to do with tons of jobs flying around, but with a light load (or only a few jobs using the option), it should work.

I would like to point out, though, that your application does get the ID back from the enqueue operation, and could store that internally for doing such checks. If you're worried about persisting across requests, toss it in Redis. Say, {application}:jobid:{jobname} for the key, and either the most recent ID for the value, or a list/set of the IDs that job has used within a certain time period, depending on your use case scenario. This approach prevents having two jobs running with the same ID, even by accident.

— Reply to this email directly or view it on GitHub.

danhunsaker commented 10 years ago

The status gets overwritten, but not the job itself. That would require finding the job and removing it, which is nontrivial since the ID is stored inside the JSON-encoded value of the list entry, and by the time you found it, there'd be a good chance that the list index has changed because jobs have been started since you started looking. If you need to have the ability to overwrite jobs, you'd want to set up the Redis key I mentioned above, then either in the job's setUp(), or in a beforePerform hook, check that key's value against the current job's ID, and abort the job unless they match. That's pretty much the only really reliable way to do it.

As to Redis as storage, what exactly are you storing? Redis is great at key-value stuff, and atomic operations, and does periodically persist to disk. If, say, the power goes out, you'd lose any changes made after the last persist operation. So there is a small amount of concern that things might be lost. Ultimately, Redis isn't designed for long-term storage, so anything you can't lose shouldn't be stored there.

That said, MySQL doesn't perform well enough (nor atomically enough) to work well as a queue backend - it can work, sure, but not well. Especially when you start scaling to more than one worker, where situations like multiple workers processing the same job become all too common. This is one of the things Redis was selected for when GitHub built the original Ruby version of Resque - when you pop a job off a Redis list, it's off, and no other worker can get it unless it gets put back on. With MySQL there's no "select and delete in one operation" command, and nothing stopping other processes from checking the DB at the same instant, so that guarantee doesn't exist. If that's not a problem for the jobs you run, that's cool, but you still have the issue that SQL is still pretty slow, especially under high load.

Ultimately it's a tradeoff. Speed and atomicity, versus long term availability and data relationships. I tend to use both. Redis (via Resque) for the fast stuff, like worker queues; and MySQL for stuff like user login and permissions data, and even worker task logging (though only because my use case requires being able to pull up log entries by any of the pieces of information I log; otherwise logs would go to a file).

I'm a firm advocate for using the best tool for any given job, not just whichever tool you happen to already be using, and that's actually one of the things I love about Resque - the PHP version (this one) stores data in Redis in exactly the same way as the Ruby version, so I can enqeue jobs using whichever makes sense, and actually run the jobs in the language best suited to perform each. As I understand it, the Node.JS version does the same, which is even better. Ultimately, the point is to select a tool based on what it's good at, and use as many tools as needed for the job to get done right.

ryanhungate commented 10 years ago

Yeah man, good explanation. Our company processes a lot of stuff using my current queue with mysql, and to touch on the delete thing, I just grab the first item, then immediately run an update to lock it, and that seems to kind of solve that issue of duplication workers, but your right, I need something that is super fast and built to do this. I spent a lot of time writing this other queue, mainly because of my need for referencing ids in our job queue… has to do with monitoring certain things on a long term basis, and for me its easier to read a key that makes sense.

I will be porting what we have into this, it seems perfect. So for ‘failed’ jobs… if I were to hook into the onFail() event, how do you handle that type of logic, say if I wanted to wait 10 minutes before retrying a job?

Ryan

On May 1, 2014, at 7:07 AM, Daniel Hunsaker notifications@github.com wrote:

The status gets overwritten, but not the job itself. That would require finding the job and removing it, which is nontrivial since the ID is stored inside the JSON-encoded value of the list entry, and by the time you found it, there'd be a good chance that the list index has changed because jobs have been started since you started looking. If you need to have the ability to overwrite jobs, you'd want to set up the Redis key I mentioned above, then either in the job's setUp(), or in a beforePerform hook, check that key's value against the current job's ID, and abort the job unless they match. That's pretty much the only really reliable way to do it.

As to Redis as storage, what exactly are you storing? Redis is great at key-value stuff, and atomic operations, and does periodically persist to disk. If, say, the power goes out, you'd lose any changes made after the last persist operation. So there is a small amount of concern that things might be lost. Ultimately, Redis isn't designed for long-term storage, so anything you can't lose shouldn't be stored there.

That said, MySQL doesn't perform well enough (nor atomically enough) to work well as a queue backend - it can work, sure, but not well. Especially when you start scaling to more than one worker, where situations like multiple workers processing the same job become all too common. This is one of the things Redis was selected for when GitHub built the original Ruby version of Resque - when you pop a job off a Redis list, it's off, and no other worker can get it unless it gets put back on. With MySQL there's no "select and delete in one operation" command, and nothing stopping other processes from checking the DB at the same instant, so that guarantee doesn't exist. If that's not a problem for the jobs you run, that's cool, but you still have the issue that SQL is still pretty slow, especially under high load.

Ultimately it's a tradeoff. Speed and atomicity, versus long term availability and data relationships. I tend to use both. Redis (via Resque) for the fast stuff, like worker queues; and MySQL for stuff like user login and permissions data, and even worker task logging (though only because my use case requires being able to pull up log entries by any of the pieces of information I log; otherwise logs would go to a file).

I'm a firm advocate for using the best tool for any given job, not just whichever tool you happen to already be using, and that's actually one of the things I love about Resque - the PHP version (this one) stores data in Redis in exactly the same way as the Ruby version, so I can enqeue jobs using whichever makes sense, and actually run the jobs in the language best suited to perform each. As I understand it, the Node.JS version does the same, which is even better. Ultimately, the point is to select a tool based on what it's good at, and use as many tools as needed for the job to get done right.

— Reply to this email directly or view it on GitHub.

danhunsaker commented 10 years ago

There's a "companion" project (called a plugin upstream, even though it isn't, really) called Resque Scheduler. The Ruby version has Cron-like regularly repeating task support, but the way they implemented it doesn't port to PHP easily (nor is it really the right approach, IMO). That's not what you'd be using, though. The PHP version does have the "run this job at this time" support for run-once-type scheduling, which is what you'll need. It supports both a runAt() and a runIn() approach, so you can schedule jobs for a specific date/time, or a certain time from now, respectively. Your example would use runIn().

As to MySQL and the immediate update, the fact that another worker can still get in and run a select between the time of the first worker's select and its subsequent update means it will still have a chance of running jobs twice. Any form of multithreading makes race conditions a real problem. I'm sure it works decently, but this is definitely better for queueing. :-)

ryanhungate commented 10 years ago

Cool, so basically to net it all out… when a job is run, its simply not in the queue anymore because it’s removed right then from the queue array… and so you process the job but if it fails it will allow the hook to grab it, do some logic, and re-queue it if it fits your guidelines… right? Does it make sense to do this in the worker itself, or do you handle this in the onFail() event hook instead? Im all about following a pattern on this stuff, and its just a little different than mine worked, where I had built in support for retries, and even delays on the retries based on the queue names etc...

Ryan

On May 1, 2014, at 7:29 AM, Daniel Hunsaker notifications@github.com wrote:

There's a "companion" project (called a plugin upstream, even though it isn't, really) called Resque Scheduler. The Ruby version has Cron-like regularly repeating task support, but the way they implemented it doesn't port to PHP easily (nor is it really the right approach, IMO). That's not what you'd be using, though. The PHP version does have the "run this job at this time" support for run-once-type scheduling, which is what you'll need. It supports both a runAt() and a runIn() approach, so you can schedule jobs for a specific date/time, or a certain time from now, respectively. Your example would use runIn().

As to MySQL and the immediate update, the fact that another worker can still get in and run a select between the time of the first worker's select and its subsequent update means it will still have a chance of running jobs twice. Any form of multithreading makes race conditions a real problem. I'm sure it works decently, but this is definitely better for queueing. :-)

— Reply to this email directly or view it on GitHub.

danhunsaker commented 10 years ago

That's essentially what happens, yeah. Kinda like using $queue[] = $job to enqueue, and $job = array_shift($queue) to start work, if you were working in purely PHP. Which wouldn't work at all, of course, but the idea is the same.

The correct way would be onFail, since that's Resque-wide, and doesn't require changing Resque itself. That's actually why the hooks exist - so you can extend the functionality without having to change the library itself. If you meant to say "in the job" instead of "in the worker", onFail is still correct, because if the job fails, none of the job's code would run anymore anyway.

ryanhungate commented 10 years ago

Hey, one more thing… can expand just a tad on using the named key approach, and updating the job vs duplicating by using the setUp()? I feel like im a little off on that one - and I feel like if I tell the app to queue up a job with a particular ID, if it exists, it should be overwritten. Thanks.

EDIT: I see that there is also a recreate() method in the job class, so my id may be a bad idea unless it could somehow use the named key, because it passes in a true / false vs the original key.

danhunsaker commented 10 years ago

If we patched the original enqueue() to support manually setting the ID, we'd patch the recreate() as well. Though I think an additional argument would be better than changing the monitor flag - some users would want to set the ID without the extra Redis key for the status being created.

That said, here's how I would set this up in my projects if I wanted to deduplicate jobs. In my application code, where I enqueue the job, I'd save the job ID that enqueue() returns into a Redis key with the job name in it. So, for example, I'm creating a job I want to call "highlander" (because "There can be only one!"). In this case, I'd be saving the job ID to myapp:jobid:highlander.

On the worker side, I'd set up the actual deduplication. setUp() would be the best place if only certain classes should be named; beforePerform would be best if you want to extend the feature to every job. In that function, whichever approach you take, it would check the Redis key created earlier (myapp:jobid:highlander) against the current ID to see if they match. If they do, great, the function returns (or moves on to whatever other setup you need to do). If not, though, it aborts the job right there, before it actually starts. That ensures that only the newest job enqueued under that name can run, without the additional overhead of having to search the entire queue and hope you find it before the index changes so you can remove it before it gets popped off the queue.

Hopefully that makes more sense. :-)

ryanhungate commented 10 years ago

Hey Dan,

Sorry, got busy yesterday. So a super quick example is this:

/// pushing the job somewhere in code
 $token = Resque::enqueue($queue, 'TestJob', $data, true);
 \Resque::redis()->set("myapp:job_ref:job_name", $token);

/// the job
class TestJob
{
    public function setUp()
    {
        $job_token = $this->payload['id'];

        if (\Resque::redis()->get("myapp:job_ref:job_name") !== $job_token) {
            throw new \Exception('Only the latest job token would match this redis key.');
        }
    }

    public function perform()
    {
        // .. Run job
    }

    public function tearDown()
    {
        // ... Remove environment for this job
    }
}

And now im looking over the delayed jobs, and it does not seem to even give an id in these cases... so does this mean that while using the resque-scheduler version, its not going to work? EDIT: well it seems as if the only thing that matters is when you set a job for later, that I would have to just do this: $token = md5(uniqid('', true)); \Resque::redis()->set("myapp:job_ref:job_name", $token);

danhunsaker commented 10 years ago

That's the gist of it, yeah. You could also do something like this for added flexibility:

function afterEnqueueHandler($class, $data, $queue, $token)
{
    if (isset($data['job_name']))
    {
        \Resque::redis()->set("myapp:job_ref:{$data['job_name']}", $token);
    }
}
Resque_Event::listen('afterEnqueue', 'afterEnqueueHandler');

function beforePerformHandler($job)
{
    $data = $job->getArguments();
    $job_token = $job->payload['id'];
    if (isset($data['job_name']) && \Resque::redis()->get("myapp:job_ref:{$data['job_name']}") != $job_token)
    {
        throw new \Resque_Job_DontPerform('Only the latest job token would match this redis key.');
    }
}
Resque_Event::listen('beforePerform', 'beforePerformHandler');

Then you just set job_name as one of the arguments to a job that you only want to run the most recent of, and the hooks handle the rest.

Delayed execution is a bit different, but not impossible. The trick, here, is to use a value added to the job instead of the job ID. In fact, I would go so far as to say this approach would probably be best because it updates the Redis key immediately before the job is queued, so you avoid the race condition of a job being pulled and started before you get the ID set in Redis successfully. It would look very similar to the above example:

function afterEnqueueHandler($class, $data, $queue, $token)
{
    if (isset($data['job_name']))
    {
        \Resque::redis()->set("myapp:job_ref:{$data['job_name']}", $data['job_token']);
    }
}
Resque_Event::listen('afterEnqueue', 'afterEnqueueHandler');

function afterScheduleHandler($timestamp, $queue, $class, $data)
{
    if (isset($data['delayed_job_name']))
    {
        \Resque::redis()->set("myapp:delayed_job_ref:{$data['delayed_job_name']}", $data['delayed_job_token']);
    }
}
Resque_Event::listen('afterSchedule', 'afterScheduleHandler');

function beforePerformHandler($job)
{
    $data = $job->getArguments();
    if ((isset($data['job_name']) && \Resque::redis()->get("myapp:job_ref:{$data['job_name']}") != $data['job_token'])
        || (isset($data['delayed_job_name']) && \Resque::redis()->get("myapp:delayed_job_ref:{$data['delayed_job_name']}") != $data['delayed_job_token']))
    {
        throw new \Resque_Job_DontPerform('Only the latest job token would match this redis key.');
    }
}
Resque_Event::listen('beforePerform', 'beforePerformHandler');

This approach is much more reliable, and handles both job types. For normal jobs, just set the job_name to whatever makes sense, and the job_token to something unique (md5(uniqid('', true)) probably works just fine), and the hooks do the rest. For scheduled jobs, you'd just use delayed_job_name and delayed_job_token instead. The reason for the difference is that Reqsue-Scheduler calls the normal enqueue() method, which fires the normal afterEnqueue event, and that would always overwrite your most-recently-scheduled with a most-recently-started.

ryanhungate commented 10 years ago

Man that's solid... thanks. since I am wrapping this in a Laravel package, i dont even have to do anything crazy every time I queue something up, i can just add properties automatically. Ill play around with that asap. This kind of excites me a bit to see it so clean. It just makes sense. ( im such a dork I guess ). :)