verbling / kickq

Kick jobs out the door. Quickly. A job queue for node
MIT License
36 stars 6 forks source link

Prevent loosing jobs by using the Reliable Queue Pattern #4

Open thanpolas opened 11 years ago

thanpolas commented 11 years ago

Kickq uses the Blocking Pop (BLPOP) command to fetch jobs that are ready to be processed. If a client fails or disconnects, the job will perish forever as it has been removed from the queue.

This behavior results in some tests failing randomly. The case is that a previous test, may have the time to re-send a BLPOP command before it gets disposed. This results in the next test to fail as the job will be popped by the previous, now disposed, worker.

An interim solution would be to find a way to send an abort command to the blocked client, but it appears that this cannot be done explicitly.

A test case has been written that illuminates this weakness and awaits for a passing solution.


TL;DR

The best long term solution is to enforce the Reliable Queue Pattern by using the BRPOPLPUSH command.

This requires the following:

That's a pickle.

Draft Solution One, Polling

The Guard will poll the Processing queue and store in an Array all the Job Ids that are being processed, along with the timestamp of when they were first seen.

A configurable timeout (~2s) will trigger the Inspector for Job Ids that have surpassed that limit. The Inspector will fetch the Job Item and examine it. If the Job Item checks out then the Processing Timeout property is read and a timer is created with a +20% margin.

If the Job's Processing Timeout +20% is surpassed and the Job Id is still in the Processing queue then it will be moved back to the To Process queue.

cons: Too much noise, by having all the workers polling the Processing queue and fetching Job Items that surpass the ~2s timeout.

Help

I am not very happy with that solution.

Please share your thoughts.

cadorn commented 11 years ago

I am new to kickq and redis so I will explain in different terms. IMO:

Thoughts?

thanpolas commented 11 years ago

@cadorn thank you for your remarks!

I understand that what you are suggesting boils down to electing a "master worker". With a master worker things are really more simple and this wouldn't be an issue.

So far i avoided creating a master worker election system to keep complexity low. There is one more task that would benefit from a master worker setup, the Scheduler. However the Scheduler's cost is almost negligent as it polls redis every ~1s with a very low cost query.

The processing queue Guard however will cost significantly more and will create redundant requests if no Master worker exists.

thanpolas commented 11 years ago

A sidenote: i am not opting out of a Master Worker election system, i am just pointing out the facts and challenges.

cadorn commented 11 years ago

@thanpolas I am not suggesting a master worker and I think I would prefer not to have one. All workers are equal. They all atomically try to move some jobs from the pending queue to the processing collection. Redis can enforce that only one worker can do this for any one job successfully using the BRPOPLPUSH I think.

The only time when a worker gets elevated to do master like work i.e. scheduler, guarding etc... is as I outlined which is temporary and as soon as the work is done the worker drops down to the level of the others. We may need a couple more flags to indicate if the work is still going on so another does not start up, but that should suffice.

Any way you slice it you need someone to do this extra work and the best processes to do it are the worker processes which should run stable, in numbers and not be overloading the VMs they run in on a normal setup. You don't want to do this on the job creation side as load of app servers can be much more varied. If you rely too heavily on DB logic to distribute this work I think you tie yourself too close to the DB which may corner you as soon as you need more flexibility which is what seems to be happening now.

If you run kickq and don't want your workers to do the extra master like work you could configure a pool of workers to do just that. But if the system is designed properly the work these workers will need to is will be minimal and thus should be no problem to spread responsibility across the normal worker pool and have one temporarily elected.

You want a system you can scale horizontally without needing to reconfigure or think about consequences. I think this is a good way to achieve that.

thanpolas commented 11 years ago

yes, i totally agree on all points and that's the exact philosophy Kickq has been built on so far.

I'll try to break down your temporary election so i can better understand the concept:

All workers at intervals check a DB property called monitorTime and if older than monitor interval insert their unique ID along with current time into currentMonitor, then update monitorTime. The worker(s) that updated these properties then query them again to determine which worker won the race to run the monitor for this interval. Then then do the appropriate monitor (I presume guard) work to cleanup out of typical operation states.

A config parameter monitorInterval will be created, representing the polling interval. Default value will be 1 second.

  1. Workers poll monitorTime which contains one JS timestamp value.
  2. If monitorTime value is older than monitorInterval go to step 3, else Step 1.
  3. Insert Worker's Unique Id along with Current Timestamp in currentMonitor and update monitorTime with the Current Timestamp using Multi so sequence of store is guaranteed.
  4. Re-query both monitorTime and currentMonitor and match the stored Current Timestamp to determine if race was won.

If i understand correctly, the race will be won by the last worker to update the db properties, right?

cadorn commented 11 years ago

Right. This election is not expensive and guarantees that one exact worker will pick up the job. The worker who picked up the monitor job will also need to update currentMonitor with a progress timestamp every monitorInterval / 2.1 to indicate that monitor job is still running. I suppose other workers will also need to query currentMonitor to check if one is still running based on progress timestamp. If one is then nobody should start new monitor job. If timestamp is older than currentMonitor progress * 2 it is assumed that monitor died without clearing currentMonitor. In this case election happens as usual.

I would set monitorInterval to something like 15 seconds. I think it is completely acceptable to have a schedule and untypical timeout (jobs that died due to worker dying - all other states should be handled immediately by worker) resolution of 15 seconds (based on monitorInterval). I presume scheduled jobs will be queued in a scheduled queue and moved into high priority processing queue by monitor (high priority queue gets picked before normal processing queue). This means scheduled jobs may still need to sit a bit after triggered but not as long as if they were in the normal processing queue where they may wait for minutes or much longer.

I see scheduling in the context of kickq used for scheduling jobs that run every say 15 minutes, hour or days. Not for stuff you need executed at very specific times or short intervals. You should use other tools to do that as it will not scale to thousands of jobs per second.

thanpolas commented 11 years ago

Ok, here's the revised flow:

A config parameter monitorInterval will be created, representing the polling interval. Default value will be 15 seconds.

  1. Workers poll monitorTime which contains one JS timestamp value.
  2. If monitorTime value is older than monitorInterval go to step 3, else Step 1.
  3. Insert Worker's Unique Id along with Current Timestamp in currentMonitor and update monitorTime with the Current Timestamp using Multi so sequence of store is guaranteed.
  4. Re-query both monitorTime and currentMonitor and match the stored Current Timestamp to determine if race was won.
  5. The winner of the race will update the monitorTime property in intervals equal to ~60% the value of monitorInterval to prevent other workers from triggering Step 3.

Any ideas how we can uniquely identify a worker? Hostname? IP? mac address? random unique num?

Re scheduling, i think we are on the same page. There was an initial requirement for kickq to act as a cron for node and support repetitive jobs. That requirement has been descoped. The only reasons the scheduler is required for are:

cadorn commented 11 years ago

Any ideas how we can uniquely identify a worker? Hostname? IP? mac address? random unique num?

<hostname> : <random hash> should work.

What about the different queues/collections and their interactions? Are we on the same page?

The status of the job is implied by the queue/collection it is in and jobs are moved around, never deleted and re-inserted. Can we move a job from a queue to a collection? Or maybe address items in a queue by ID rather than index?

thanpolas commented 11 years ago

This is already implemented since v0.0.12, check out the struct:

kickq:state:[state]

Type: Set member: [ job id ]

An index of job items by state. Each state contains a set of job ids.

Also, there is no priority concept in Kickq and that is by design. Priorities can be structured based on Job Names and how many workers are assigned per Job Name...

thanpolas commented 11 years ago

There are 2 things to point out with the kickq:state:[state] collections though...

  1. They are not attached / relayed to the BLPOP operation and cannot be as the types are different (The Queue is a LIST, the states are a SET).
  2. There are no sanity checks being performed on what is going to in the states collections. Yet.
cadorn commented 11 years ago

Ok. As long as a job can never be lost or stalled no matter what happens to any part of the system at any time.

cadorn commented 11 years ago

So how does this monitor discussion relate to the topic of this issue?

Will the monitor schedule the jobs? i.e. If we run the monitor once every 15 seconds does that mean jobs will go from queued to processing only once every 15 seconds?

I was under the assumption that the monitor is used to cleanup only and jobs will be immediately picked up by workers as multiple workers will poll the pending queue all the time.

thanpolas commented 11 years ago

No plans to mix the Scheduler with the monitor, they will remain independent.

Scheduler, no matter the interval, will look ahead in a configurable time, which is ~interval + 15%. I plan on changing the schedulerLookAhead type from number representing milliseconds to a number representing a ratio. Scheduler's pop method should also be scrutinized as it too has its weak points.

cadorn commented 11 years ago

Scheduler's pop method should also be scrutinized as it too has its weak points.

How do multiple workers grab jobs without interfering?

thanpolas commented 11 years ago

How do multiple workers grab jobs without interfering?

All scheduled jobs are stored in a Sorted Set, workers perform an atomic zpop, the problem here is like BLPOP, the worker will hold the jobid and if it fails to insert it into the queue, the job will never get executed.

cadorn commented 11 years ago

We can do the same kind of mediation here as for the monitor.

  1. Worker looks for next job to process and gets its ID.
  2. Worker inserts job into assignment collection with its worker ID as value using setnx.
  3. Worker queries assignment collection with same key as in 2. and ensures its worker ID won. If it did, go to 4., if it did not go back to 1.
  4. Move job from pending queue to processing collection by identifying job by ID.

This should ensure that a job can never get dropped.

We should use setnx for setting currentMonitor as well and monitor should delete currentMonitor when done.

thanpolas commented 11 years ago

there should also be a 5. to delete the assignment collection key?

cadorn commented 11 years ago

Yes. Delete key when done.