apple / ccs-calendarserver

The Calendar and Contacts Server.
https://www.calendarserver.org
Apache License 2.0
485 stars 136 forks source link

SQL-based Work Queue enhancements #393

Open macosforgebot opened 10 years ago

macosforgebot commented 10 years ago

@cyrusdaboo originally submitted this as ticket:839


txext.enterprise.queue implements a database-based distributed work queue. The basic behavior is this:

There are several issues with the current implementation that we need to address:

  1. Dequeue locking: when an item of work is dequeued (see txext.enterprise.queue.ultimatelyPerform) first its group id is locked using a NamedLock, then its record (row) is deleted from the work table. Both of those actions will block if some other process is already processing a work item in the same group, or if there is no group, if the work item itself is being processed. The problem with that is that it will block the “orphaned” work item polling loop - and it is possible for all hosts to end up blocked beyond one long-lived item at the “top” of a work table.
  2. There needs to be a way to enqueue a work item without scheduling it via callLater in the current process. For work items known to require significant work, it would be better to have them scheduled on a node/process with low load - there is no guarantee the process creating the work item will have low load at the time the callLater triggers.
  3. Item (2) also suggests we need some way to indicate priority and estimated load for different types of work item. e.g. we know push notification work is a light load and needs to occur close to real-time (within 3 secs of being scheduled). Whereas other work, scheduling, group cacher etc, involve heavier loads and typically execute a minute or hours after being enqueued.
  4. The default “orphan” notBefore limit seems high - waiting for 10 minutes for a high priority item is not good, particularly if we fix item (2) where we would be deliberately “orphaning” work.
  5. Scaleability - some types of work may involve process of very large data sets - e.g. scans over calendar homes, all attachments etc. Dumping 100K work items into the queue system is not ideal - particularly in the absence of any kind of prioritization mechanism. A better approach would be to create a limited set (pool) of work items that run in parallel and process the overall (large) work set. However, that needs to be done in a way to minimize lock contention (i.e., address item (1) in some fashion).
macosforgebot commented 10 years ago

@wsanchez originally submitted this as comment:1:⁠ticket:839

macosforgebot commented 10 years ago

@cyrusdaboo originally submitted this as comment:2:⁠ticket:839


Experimented with the following.

Use “select … for update” and find a way to skip rows already locked and return the first unlocked row.

Postgres:

create or replace function next_job() returns integer as $$
declare
  result integer;
begin
  select ID into result from JOB where pg_try_advisory_xact_lock(ID) limit 1 for update;
  return result;
end
$$ LANGUAGE plpgsql;

To get the next locked ID value:

select * from next_job();

Oracle:

create or replace function next_job return integer is
  cursor c1 is select ID from JOB for update skip locked;
  result integer;
begin
  open c1;
  fetch c1 into result;
  select ID from JOB where ID = result for update;
  return result;
end;
/

To get the next locked ID value:

var r number;
call next_job() into :r;
select :r from dual;

With each of the above, multiple sessions each get there next available ID without being blocked on existing sessions. For our purposes we need to have a single JOB table that lists all outstanding jobs (work tables still exist to describe each type of work):

create table JOB (
  ID     integer primary key default nextval('JOB_SEQ') not null, --implicit index
  CLS    varchar(255) not null,
  PRIORITY    integer default 0,
  WEIGHT      integer default 0,
  NOT_BEFORE  timestamp default null,
  NOT_AFTER   timestamp default null
);

When a job is enqueued an entry in the JOB table and class/type specific work queue table is made (the later references the ID column in the JOB table - or not in the case where there is a bunch of work to be done by a smaller pool of “workers”). Each job includes priority/weight and timing details (notBefore holds the job until the specified time, notAfter will cause the job priority to bump up if after that time).

To dequeue the next job, the next_job() stored procedure is used. That will need to define the logic for sorting the JOB rows so as to pick the next job taking priority/weight/notBefore/notAfter into account.