Open pbugnion opened 11 years ago
So I figured out what went wrong with the previous implementation. It turns out it I was misunderstanding some of what was going on. At the moment, the monitor thread runs the following, in a loop (within an Mdrunr instance):
job = self.queue.get() # wait until there is at least one job, then pop it off immediately.
job_status = job_status_dict[job] # check the status.
if job_status.status != JobStatus.Deleted:
# if the job has not been deleted prior to now : wait to see if there are enough cores available.
while job.nprocs > self.free_cores:
# I was sending the Delete command to this job when it was within this loop,
# hence the delete command was never checked!
# (not on purpose, obviously - it's just that, when you are within the interpreter, you
# have no real way of knowing where the monitor thread is).
time.sleep(1)
The problem therefore lies in this strange limbo bit in between when the job gets popped off the queue and when the queue has enough free cores to run the job. If the job gets sent the delete command when it is in this "limbo", nothing happens. It just runs as normal. The quick and dirty solution is to replace the while loop condition with
while job.nprocs < self.free_cores and job_status != JobStatus.DELETED:
A more long-term lesson is to try and eliminate as much of the "limbo" as possible, by leaving the job in the queue as long as possible. We could re-write the function such that it peeks at the next job and only pops it from the queue if it is ready to run:
next_job = self.queue.queue[0] # peek at the next job in the queue
job_status = job_status_dict[next_job] # check its status.
if job_status.status != JobStatus.Deleted:
# remove job from queue and return
elif next_job.nprocs < self.free_cores:
# pop from queue and run.
# else : job is valid but not enough processing power available ; return and restart the loop.
An even better, but more long-term (as in, I don't think we need to implement it right now, but it would be nice to have) is to subclass the Queue object to create our own, based on something that allows random-access of objects, and keep both jobs and job_status objects together in the queue. This would avoid this strange gymnastics that we have to do at the moment: get a job from the queue, check its status etc.
Any thoughts?
Hmm, I'm interested to know how Torque deals with job deletion. Because what it does is kill the process if it's running and then removes the entry from the queue. Surely this should be possible by transmitting SIGTERM and then, after a timeout, SIGKILL.
Yeah. I haven't really thought about deleting running jobs yet, but that sounds like the way to go.
I think the easiest way would be to change the subprocess.call
call to a subprocess.Popen
, and keep a reference to this object as an attribute to Mdjob: self.p = Popen(args,...)
. The parent queue could then call job.p.send_signal(SIGTERM)
etc.
Ah right, yeah. We should definitely be keeping process objects instead of just using subprocess.call
.
Subclassing Queue to allow modifications of job done, following commit 243b244. I now need to implement the deletion.
Need a way to delete jobs from the queue.