binarymatt / pyres

a resque clone in python
http://github.com/binarydud/pyres
MIT License
955 stars 130 forks source link

A worker doesn't wait before processing the next job #77

Closed arthur78 closed 13 years ago

arthur78 commented 13 years ago

Hi! (redis: 2.2.12, redis-py: 2.4.9, pyres: 1.1)

I hope I miss something, but enqueued jobs are processed immediately one after the other, regardless of the specified (or default) interval.

if I do this:

r = pyres.ResQ()
for i in range(5):
    r.enqueue(LogCurrTime)
r.close()

worker.Work.run(queues)

then all datetime strings in the log file differ only in micrseconds part: 2011-08-21 21:16:38.683243 2011-08-21 21:16:38.691961 2011-08-21 21:16:38.700720 2011-08-21 21:16:38.709433 2011-08-21 21:16:38.718149

Here's output from the _pyreswork script: 2011-08-21 21:16:38 INFO Found job on Log 2011-08-21 21:16:38 INFO Forked 57681 at 2011-08-21 21:16:38.678921 2011-08-21 21:16:38 INFO Processing Log since 2011-08-21 21:16:38.679161 2011-08-21 21:16:38 INFO done working 2011-08-21 21:16:38 INFO Found job on Log 2011-08-21 21:16:38 INFO Forked 57682 at 2011-08-21 21:16:38.688113 2011-08-21 21:16:38 INFO Processing Log since 2011-08-21 21:16:38.688141 2011-08-21 21:16:38 INFO done working 2011-08-21 21:16:38 INFO Found job on Log 2011-08-21 21:16:38 INFO Forked 57683 at 2011-08-21 21:16:38.696898 2011-08-21 21:16:38 INFO Processing Log since 2011-08-21 21:16:38.696929 2011-08-21 21:16:38 INFO done working 2011-08-21 21:16:38 INFO Found job on Log 2011-08-21 21:16:38 INFO Forked 57684 at 2011-08-21 21:16:38.705608 2011-08-21 21:16:38 INFO Processing Log since 2011-08-21 21:16:38.705620 2011-08-21 21:16:38 INFO done working 2011-08-21 21:16:38 INFO Found job on Log 2011-08-21 21:16:38 INFO Forked 57685 at 2011-08-21 21:16:38.714284 2011-08-21 21:16:38 INFO Processing Log since 2011-08-21 21:16:38.714304 2011-08-21 21:16:38 INFO done working

I expected, that logged times have to be different by 5 seconds (default interval). Or do I miss something?

binarymatt commented 13 years ago

the interval is only used while waiting for a message on the series of queues that the working is listening to. If there are messages on the queue, the worker will continue to process each job as they come available and will not wait between jobs.

arthur78 commented 13 years ago

Thank you.


    def work(self, interval=5):
        """Invoked by ``run`` method. ``work`` listens on a list of queues and sleeps
        for ``interval`` time.

        ``interval`` -- Number of seconds the worker will wait until processing the next job. Default is "5".

        Whenever a worker finds a job on the queue it first calls ``reserve`` on
        that job to make sure another worker won't run it, then *forks* itself to
        work on that job.

        Finally, the ``process`` method actually processes the job by eventually calling the Job instance's ``perform`` method.

        """
        ....

The sentence: interval -- Number of seconds the worker will wait until processing the next job. Default is "5". was ambiguous for me.

arthur78 commented 13 years ago

Tell me please, is such behaviour is correct?


class DateTimeLog1(object):
    queue = 'Log1'

    @staticmethod
    def perform():
        with open('log1.log', 'a') as f:
            f.write('%s\n' % datetime.now())

class DateTimeLog2(object):
    queue = 'Log2'

    @staticmethod
    def perform():
        with open('log2.log', 'a') as f:
            f.write('%s\n' % datetime.now())

r = pyres.ResQ()
for i in range(3):
    r.enqueue(DateTimeLog1)
    r.enqueue(DateTimeLog2)
r.close()
$ /usr/local/bin/pyres_worker Log1,Log2

2011-08-21 22:27:47 INFO Found job on Log2 2011-08-21 22:27:47 INFO Forked 58147 at 2011-08-21 22:27:47.078192 2011-08-21 22:27:47 INFO Processing Log2 since 2011-08-21 22:27:47.079527 2011-08-21 22:27:47 INFO done working 2011-08-21 22:27:47 INFO Found job on Log1 2011-08-21 22:27:47 INFO Forked 58148 at 2011-08-21 22:27:47.088282 2011-08-21 22:27:47 INFO Processing Log1 since 2011-08-21 22:27:47.088327 2011-08-21 22:27:47 INFO done working 2011-08-21 22:27:47 INFO Found job on Log1 2011-08-21 22:27:47 INFO Forked 58149 at 2011-08-21 22:27:47.096969 2011-08-21 22:27:47 INFO Processing Log1 since 2011-08-21 22:27:47.096972 2011-08-21 22:27:47 INFO done working 2011-08-21 22:27:47 INFO Found job on Log1 2011-08-21 22:27:47 INFO Forked 58150 at 2011-08-21 22:27:47.105521 2011-08-21 22:27:47 INFO Processing Log1 since 2011-08-21 22:27:47.105550 2011-08-21 22:27:47 INFO done working 2011-08-21 22:27:53 INFO Found job on Log2 2011-08-21 22:27:53 INFO Forked 58151 at 2011-08-21 22:27:53.007391 2011-08-21 22:27:53 INFO Processing Log2 since 2011-08-21 22:27:53.007420 2011-08-21 22:27:53 INFO done working 2011-08-21 22:27:59 INFO Found job on Log2 2011-08-21 22:27:59 INFO Forked 58152 at 2011-08-21 22:27:59.066883 2011-08-21 22:27:59 INFO Processing Log2 since 2011-08-21 22:27:59.066899 2011-08-21 22:27:59 INFO done working

$ less log1.log

2011-08-21 22:27:47.092194 2011-08-21 22:27:47.100847 2011-08-21 22:27:47.109452

$ less log2.log

2011-08-21 22:27:47.083451 2011-08-21 22:27:53.011258 2011-08-21 22:27:59.070731

Jobs from the Log2 queue have a visible interval. And no pause among jobs from the Log1 queue. Is everything correct?

binarymatt commented 13 years ago

it looks like this behavior was introduced when the reserve method on the Job class was changed to block on reserve for the interval. So what's happening is that Log1 q is cleared out, but the worker still checks Log1 each time and waits for interval before checking Log2.