terrycojones / txrdq

txRDQ (Resizable Dispatch Queue) provides a Twisted class for a controllable priority queue for running jobs.
Apache License 2.0
17 stars 4 forks source link

Allow for multiple Job implementations #2

Open ldanielburr opened 10 years ago

ldanielburr commented 10 years ago

First off, hi Terry! Hope all is well.

txrdq is excellent, and I am using to good effect in a simple job dispatcher service. One thing that would be very convenient would be to have a defined interface for jobs, such that a developer could, for example, implement a PersistentJob that can be stored somewhere like Redis, Mongo, Postgres, the filesystem, etc.

Currently, the Job class is imported in rdq.py and instantiated within the ResizableDispatchQueue itself, so there is no way to cleanly pass in an alternative Job implementation for use with the rdq.

Thoughts? If you're in favor of such a change, I could sketch out a pull request.

Thanks,

Daniel

terrycojones commented 10 years ago

Hi Daniel! It's nice to hear from you.

Your suggestion makes sense. In the original version of this code I had support for reading / saving the job queue, but at that point my job info was just a Twitter user name so it was all very simple & specific. I guess I ripped that out - there's certainly no sign of it any more.

One concern I have about the current code is that it's too complicated. Someone was using it a couple of years back and seemed to have found a bug, but I couldn't figure out what was causing it. I think in the re-design that I did to add functionality I let things get too complex.

Anyway, I'd be happy to see what you're thinking.

Hmmmm.... looking at the code a bit, I'm wondering if my thought re persistence was that the user of the rdq should be the one to implement it. If you want to shut down a system, you call stop on the queue, passing cancelUnderway=True and the deferred it returns will fire with a list of Job instances (I think) that weren't executed. In each of those is a jobarg which is the thing you gave to the put method on the RDQ. So you can persist them to disk in any way you like. The priorities are in the Job instance too, so you can persist them as well, if you like. On restart you read your custom persisted stuff and loop, putting the jobs you want to resurrect back into the queue.

Sorry if I'm just telling you stuff you already know. I'm just getting myself back up to speed with my earlier thinking. I guess that thinking is why I ripped the persistence out of the class originally (if it was there, maybe it was always as I've just described).

Does that make sense?

ldanielburr commented 10 years ago

Hi Terry,

Thanks for explaining your thinking a bit, I understand better now. I agree with your design decision that job persistence should be handled outside of txrdq, but that is exactly why I'd like to have pluggable jobs: rather than having the Job class be part of txrdq, I'd like to be able to pass it in, just as I can pass in the function that actually performs the work.

Considering it further, I may be thinking about this in the wrong way. In my mind, a Job is both data and behavior, whereas in txrdq, a Job is data, and the function that you pass to put() is behavior. I was kind of loosely thinking in terms of twisted's IPlugin interface, where I'd make plugins representing different kinds of jobs, and pass those to txrdq.

I'll play around with just doing all of that outside txrdq and see how far I get. Where I think I'll encounter problems is when I want to have more state associated with a Job than the default Job implementation provides.

terrycojones commented 10 years ago

Hi again

I think the way to think about it is that the job.Job class is internal and only ever exists in memory.

You can pass whatever you like to rdq.put() and the func you give to rdq.__init__ should be able to handle one of those things as an argument. You might pass an int or a str or an instance of your own job class. If you want persistence, you need to write code to serialize/deserialize your own objects, and you pull these out of the job.Job.jobarg in the list that the deferred returned by rdq.stop fires with. You never have to worry about storing an instance of the job.Job class, they just get recreated internally when you retrieve your serialized jobs and re-submit them. The job.Job class happens to provide you with a couple of maybe-interesting other things, like the time the job hit the queue, etc. If you're interested in that data, you could persist it too (in your own job class).

If that explanation all makes sense and you think it's good enough, you could add it to the README and make an AUTHORS file with your name in it :-)

ldanielburr commented 10 years ago

Sure, I can see how that would work. I guess what I've been describing is really something of a "recipe", rather than a feature of txrdq proper. I'll put together a working example to play with, and contribute it to the docs.

terrycojones commented 10 years ago

Great, thanks!

On Mon, Jan 6, 2014 at 2:06 AM, L. Daniel Burr notifications@github.comwrote:

Sure, I can see how that would work. I guess what I've been describing is really something of a "recipe", rather than a feature of txrdq proper. I'll put together a working example to play with, and contribute it to the docs.

— Reply to this email directly or view it on GitHubhttps://github.com/terrycojones/txrdq/issues/2#issuecomment-31622950 .

ldanielburr commented 10 years ago

Haven't forgotten about this. I'm leaning towards a convention of always ensuring that the jobarg passed to queue.put() is an implementer of some IJobSpec interface, e.g.,

def dispatch(jobSpec):
    jobProcessor = IJobProcessor(jobSpec)
    d = jobProcessor.process()
    return d

queue = rdq.ResizableDispatchQueue(dispatch)
jobSpec = IJobSpec(someArbitraryJobRelatedInformation)
queue.put(jobSpec, priority=1)
...

Then you do the usual zope.interface adapter registry stuff to produce the desired objects from some set of adapters.

Note that I realized that this is probably unnecessary if you allow for multiple rdq instances, one for each kind of work you want to do, but I'm trying to come up with a nice scheme for handling different kinds of jobs via a single rdq instance, which requires that the function/method provided to rdq can distinguish between different kinds of jobs based on the arguments passed to queue.put().