framed-data / overseer

Overseer is a library for building and running data pipelines in Clojure.
Eclipse Public License 1.0
97 stars 10 forks source link

Add job heartbeats + heartbeat monitor #54

Closed andrewberls closed 8 years ago

andrewberls commented 8 years ago

This is an experimental pass at adding "heartbeats", which are an attempt to prevent very long jobs from monopolizing the cluster.

The heartbeat process is a concurrently-running process like the ready job detector or the (now-removed) job completion supervisor. Every 10 seconds, the process saves a heartbeat attribute to the database (the current Unix timestamp, in milliseconds). Every worker is also configured to work as a monitor, which periodically finds jobs that failing more than a certain number of heartbeats and resets their status to unstarted. Since heartbeats are timestamps relative to each worker's relative notion of 'now', it's important to note that this means we are vulnerable to a certain amount of clock drift between nodes. For now,it is the hope that the tolerance figure will make up for this.

Note: A significant change is that started jobs are no longer considered eligible to run! Instead,the heartbeat mechanism is now the primary means by which interrupted jobs or failed workers are recovered.

Consider the following log sample:

; NODE X
15-Nov-02 06:52:34 vagrant-ubuntu-trusty-64 INFO [overseer.executor] - Found 5 handleable jobs.
15-Nov-02 06:52:34 vagrant-ubuntu-trusty-64 INFO [overseer.executor] - Reserving job 56370580-bbb8-4abb-866b-0421a74e6531 (:start)
15-Nov-02 06:52:34 vagrant-ubuntu-trusty-64 INFO [overseer.executor] - Reserved job 56370580-bbb8-4abb-866b-0421a74e6531
START JOB: start 45000 56370580-bbb8-4abb-866b-0421a74e6531
; <wait a bit>
15-Nov-02 06:52:43 vagrant-ubuntu-trusty-64 INFO [overseer.heartbeat] - Heartbeat 56370580-bbb8-4abb-866b-0421a74e6531: 1446447163
; <wait a bit, kill node>

Here we can see that node X has started job 56370580-bbb8-4abb-866b-0421a74e6531 and performed a heartbeat, before being forcibly shut down. The job is left dangling.

user=> (into {} (d/entity (d/db (d/connect uri)) [:job/id "56370580-bbb8-4abb-866b-0421a74e6531"]))
{:job/heartbeat 1446447163, :job/status :started, :job/type :start, :job/id "56370580-bbb8-4abb-866b-0421a74e6531"}

Some time later, the monitor running on node Y, working on a separate job of its own notices that X's job has failed more than the acceptable number of heartbeats and resets it:

; NODE Y
START JOB: start 30000 56370817-a095-43bd-b399-90d31a1e63ab
15-Nov-02 06:53:23 vagrant-ubuntu-trusty-64 INFO [overseer.heartbeat] - Heartbeat 56370817-a095-43bd-b399-90d31a1e63ab: 1446447203
15-Nov-02 06:53:26 vagrant-ubuntu-trusty-64 WARN [overseer.heartbeat] - Found 1 jobs with failed heartbeats
15-Nov-02 06:53:26 vagrant-ubuntu-trusty-64 WARN [overseer.heartbeat] - Resetting: 56370580-bbb8-4abb-866b-0421a74e6531
15-Nov-02 06:53:33 vagrant-ubuntu-trusty-64 INFO [overseer.heartbeat] - Heartbeat 56370817-a095-43bd-b399-90d31a1e63ab: 1446447213
user=> (into {} (d/entity (d/db (d/connect uri)) [:job/id "56370580-bbb8-4abb-866b-0421a74e6531"]))
{:job/status :unstarted, :job/type :start, :job/id "56370580-bbb8-4abb-866b-0421a74e6531"}
elliot42 commented 8 years ago

Merge conflicts?

elliot42 commented 8 years ago

:+1: thanks for sticking through this. Have you had a chance to empirically run a few jobs through this? Looks good, seems like it should have the right behaviors, looking forward to stuff like the vector clock removing some of the distributed dependencies.

elliot42 commented 8 years ago

Two random thoughts:

  1. schema will need to be transacted to prod db
  2. If the monitor transaction throws (e.g. CAS violation) then that will need to be caught otherwise it'll bounce the whole process
andrewberls commented 8 years ago

Yes I've run jobs through this and verified the heartbeat behavior, plan to do so again prior to release. You're correct RE schema - we'll need to be careful perhaps since we still have old schema in intake-mixpanel, and excellent point on CAS violation. I think a process restart is okay in that scenario, since that does seem like a basic invariant violation

andrewberls commented 8 years ago

@elliot42 Final change - made reserve-job also include an initial heartbeat value, to avoid a dangling window where a job could die before its first heartbeat, now everyone just starts with one