Python based pipeline management software for clusters that makes running recursive and dynamically scheduled computations straightforward. So far works with gridEngine, lsf, parasol and on multi-core machines.
Benedict Paten, Dent Earl, Daniel Zerbino, Glenn Hickey, other UCSC people.
Install sonLib. See https://github.com/benedictpaten/sonLib
Place the directory containing the jobTree in the same directory as sonLib. The directory containing both sonLib and jobTree should be on your python path. i.e. PYTHONPATH=${PYTHONPATH}:FOO where FOO/jobTree is the path containing the base directory of jobTree.
Build the code: Type 'make all' in the base directory, this just puts some stuff that is currently all python based in the bin dir. In the future there might be some actual compilation.
Test the code: python allTests.py or 'make test'.
The following walks through running a jobTree script and using the command-line tools jobTreeStatus, jobTreeRun and jobTreeStats, which are used to analyse the status, restart and print performance statistics, respectively, about a run.
Once jobTree is installed, running a jobTree script is performed by executing the script from the command-line, e.g. (using the file sorting toy example in tests/sort/scriptTreeTest_Sort.py):
[]$ scriptTreeTest_Sort.py --fileToSort foo --jobTree bar/jobTree --batchSystem parasol --logLevel INFO --stats
Which in this case uses the parasol batch system, and INFO level logging and where foo is the file to sort and bar/jobTree is the location of a directory (which should not already exist) from which the batch will be managed. Details of the jobTree options are described below; the stats option is used to gather statistics about the jobs in a run.
The script will return a zero exit value if the jobTree system is successfully able to run to completion, else it will create an exception. If the script fails because a job failed then the log file information of the job will be reported to std error. The jobTree directory (here 'bar/jobTree') is not automatically deleted regardless of success or failure, and contains a record of the jobs run, which can be enquired about using the jobTreeStatus command. e.g.
[]$jobTreeStatus bar/jobTree --verbose
There are 0 active jobs, 0 parent jobs with children, 0 totally failed jobs and 0 empty jobs (i.e. finished but not cleaned up) curre
ntly in job tree: jobTree
There are no failed jobs to report
If a job failed, this provides a convenient way to reprint the error. The following are the important options to jobTreeStatus:
--jobTree=JOBTREE Directory containing the job tree. The jobTree location can also be specified as the argument to the script. default=./jobTree
--verbose Print loads of information, particularly all the log
files of jobs that failed. default=False
--failIfNotComplete Return exit value of 1 if job tree jobs not all
completed. default=False
If a job in the script failed or the system goes down, you may wish to retry the job after fixing the error. This can be achieved by restarting the script with the jobTreeRun command which will restart an existing jobTree.
[]$ jobTreeRun --jobTree bar/jobTree --logLevel INFO
It will always attempt to restart the jobs from the previous point of failure.
If the script was run with the --stats option then jobTreeStats can be run on the pipeline do generate information about the performance of the run, in terms of how many jobs were run, how long they executed for and how much CPU time/wait time was involved, e.g.:
[]$jobTreeStats bar/jobTree
Batch System: singleMachine
Default CPU: 1 Default Memory: 2097152K
Job Time: 0.50 Max CPUs: 9.22337e+18 Max Threads: 4
Total Clock: 0.09 Total Runtime: 7.60
Slave
Count | Time* | Clock | Wait | Memory
n | min med* ave max total | min med ave max total | min med ave max total | min med ave max total
365 | 0.01 0.02 0.02 0.06 6.82 | 0.01 0.01 0.01 0.04 4.71 | 0.00 0.00 0.01 0.03 2.11 | 9781248K 13869056K 13799121K 14639104K 5036679168K
Target
Slave Jobs | min med ave max
| 2 2 2 2
Count | Time* | Clock | Wait | Memory
n | min med* ave max total | min med ave max total | min med ave max total | min med ave max total
367 | 0.00 0.00 0.00 0.03 0.68 | 0.00 0.00 0.00 0.01 0.42 | 0.00 0.00 0.00 0.03 0.26 | 9461760K 13869056K 13787694K 14639104K 5060083712K
Cleanup
Count | Time* | Clock | Wait | Memory
n | min med* ave max total | min med ave max total | min med ave max total | min med ave max total
1 | 0.00 0.00 0.00 0.00 0.00 | 0.00 0.00 0.00 0.00 0.00 | 0.00 0.00 0.00 0.00 0.00 | 14639104K 14639104K 14639104K 14639104K 14639104K
Up
Count | Time* | Clock | Wait | Memory
n | min med* ave max total | min med ave max total | min med ave max total | min med ave max total
124 | 0.00 0.00 0.00 0.01 0.15 | 0.00 0.00 0.00 0.01 0.12 | 0.00 0.00 0.00 0.01 0.03 | 13713408K 14090240K 14044985K 14581760K 1741578240K
Setup
Count | Time* | Clock | Wait | Memory
n | min med* ave max total | min med ave max total | min med ave max total | min med ave max total
1 | 0.00 0.00 0.00 0.00 0.00 | 0.00 0.00 0.00 0.00 0.00 | 0.00 0.00 0.00 0.00 0.00 | 9551872K 9551872K 9551872K 9551872K 9551872K
Down
Count | Time* | Clock | Wait | Memory
n | min med* ave max total | min med ave max total | min med ave max total | min med ave max total
241 | 0.00 0.00 0.00 0.03 0.53 | 0.00 0.00 0.00 0.00 0.30 | 0.00 0.00 0.00 0.03 0.23 | 9461760K 13828096K 13669354K 14155776K 3294314496K
The breakdown is given per "slave", which is unit of serial execution, and per "target", which corresponds to a scriptTree target (see below). Despite its simplicity, we've found this can be very useful for tracking down performance issues, particularly when trying out a pipeline on a new system.
The important arguments to jobTreeStats are:
--outputFile=OUTPUTFILE
File in which to write results
--raw output the raw xml data.
--pretty, --human if not raw, prettify the numbers to be human readable.
--categories=CATEGORIES
comma separated list from [time, clock, wait, memory]
--sortCategory=SORTCATEGORY
how to sort Target list. may be from [alpha, time,
clock, wait, memory, count]. default=%(default)s
--sortField=SORTFIELD
how to sort Target list. may be from [min, med, ave,
max, total]. default=%(default)s
--sortReverse, --reverseSort
reverse sort order.
--cache stores a cache to speed up data display.
A jobTree script will have the following command-line options.
Options that control logging.
--logOff Turn off logging. (default is CRITICAL)
--logInfo Turn on logging at INFO level. (default is CRITICAL)
--logDebug Turn on logging at DEBUG level. (default is CRITICAL)
--logLevel=LOGLEVEL
Log at level (may be either OFF/INFO/DEBUG/CRITICAL).
(default is CRITICAL)
--logFile=LOGFILE File to log in
--rotatingLogging Turn on rotating logging, which prevents log files
getting too big.
Options to specify the location of the jobTree and turn on stats collation about the performance of jobs.
--jobTree=JOBTREE Directory in which to place job management files and
the global accessed temporary file directories(this
needs to be globally accessible by all machines
running jobs). If you pass an existing directory it
will check if it's a valid existing job tree, then try
and restart the jobs in it. The default=./jobTree
--stats Records statistics about the job-tree to be used by
jobTreeStats. default=False
Options for specifying the batch system, and arguments to the batch system/big batch system (see below).
--batchSystem=BATCHSYSTEM
The type of batch system to run the job(s) with,
currently can be
'singleMachine'/'parasol'/'acidTest'/'gridEngine'/'lsf'.
default=singleMachine
--maxThreads=MAXTHREADS
The maximum number of threads (technically processes
at this point) to use when running in single machine
mode. Increasing this will allow more jobs to run
concurrently when running on a single machine.
default=4
--parasolCommand=PARASOLCOMMAND
The command to run the parasol program default=parasol
Options to specify default cpu/memory requirements (if not
specified by the jobs themselves), and to limit the total amount of
memory/cpu requested from the batch system.
--defaultMemory=DEFAULTMEMORY
The default amount of memory to request for a job (in
bytes), by default is 2^31 = 2 gigabytes,
default=2147483648
--defaultCpu=DEFAULTCPU
The default the number of cpus to dedicate a job.
default=1
--maxCpus=MAXCPUS The maximum number of cpus to request from the batch
system at any one time. default=9223372036854775807
--maxMemory=MAXMEMORY
The maximum amount of memory to request from the batch
system at any one time. default=9223372036854775807
Options for rescuing/killing/restarting jobs, includes options for jobs that either run too long/fail or get lost (some batch systems have issues!).
--retryCount=RETRYCOUNT
Number of times to retry a failing job before giving
up and labeling job failed. default=0
--maxJobDuration=MAXJOBDURATION
Maximum runtime of a job (in seconds) before we kill
it (this is a lower bound, and the actual time before
killing the job may be longer).
default=9223372036854775807
--rescueJobsFrequency=RESCUEJOBSFREQUENCY
Period of time to wait (in seconds) between checking
for missing/overlong jobs, that is jobs which get lost
by the batch system. Expert parameter. (default is set
by the batch system)
jobTree big batch system options; jobTree can employ a secondary batch system for running large memory/cpu jobs using the following arguments.
--bigBatchSystem=BIGBATCHSYSTEM
The batch system to run for jobs with larger
memory/cpus requests, currently can be
'singleMachine'/'parasol'/'acidTest'/'gridEngine'.
default=none
--bigMemoryThreshold=BIGMEMORYTHRESHOLD
The memory threshold above which to submit to the big
queue. default=9223372036854775807
--bigCpuThreshold=BIGCPUTHRESHOLD
The cpu threshold above which to submit to the big
queue. default=9223372036854775807
--bigMaxCpus=BIGMAXCPUS
The maximum number of big batch system cpus to allow
at one time on the big queue.
default=9223372036854775807
--bigMaxMemory=BIGMAXMEMORY
The maximum amount of memory to request from the big
batch system at any one time.
default=9223372036854775807
Miscellaneous options.
--jobTime=JOBTIME The approximate time (in seconds) that you'd like a
list of child jobs to be run serially before being
parallelized. This parameter allows one to avoid over
parallelizing tiny jobs, and therefore paying
significant scheduling overhead, by running tiny jobs
in series on a single node/core of the cluster.
default=30
--maxLogFileSize=MAXLOGFILESIZE
The maximum size of a job log file to keep (in bytes),
log files larger than this will be truncated to the
last X bytes. Default is 50 kilobytes, default=50120
--command=COMMAND The command to run (which will generate subsequent
jobs). This is deprecated
The following sections are for people creating jobTree scripts and as general information. The presentation docs/jobTreeSlides.pdf is also a quite useful, albeit slightly out of date, guide to using jobTree. -
Most batch systems (such as LSF, Parasol, etc.) do not allow jobs to spawn other jobs in a simple way.
The basic pattern provided by jobTree is as follows:
ScriptTree provides a Python interface to jobTree, and is now the only way to interface with jobTree (previously you could manipulate XML files, but I've removed that functionality as I improved the underlying system).
Aside from being the interface to jobTree, scriptTree was designed to remediate some of the pain of writing wrapper scripts for cluster jobs, via the extension of a simple python wrapper class (called a 'Target' to avoid confusion with the more general use of the word 'job') which does much of the work for you. Using scriptTree, you can describe your script as a series of these classes which link together, with all the arguments and options specified in one place. The script then, using the magic of python pickles, generates all the wrappers dynamically and clean them up when done.
This inherited template pattern has the following advantages:
The best way to learn how to use script tree is to look at an example. The following is taken from (an old version of) jobTree.test.sort.scriptTreeTest_Sort.py
which provides a complete script for performing a parallel merge sort.
Below is the first 'Target' of this script inherited from the base class 'jobTree.scriptTree.Target'. Its job is to setup the merge sort.
class Setup(Target):
"""Sets up the sort.
"""
def __init__(self, inputFile, N):
Target.__init__(self, time=1, memory=1000000, cpu=1)
self.inputFile = inputFile
self.N = N
def run(self):
tempOutputFile = getTempFile(rootDir=self.getGlobalTempDir())
self.addChildTarget(Down(self.inputFile, 0, os.path.getsize(self.inputFile), self.N, tempOutputFile))
self.setFollowOnTarget(Cleanup(tempOutputFile, self.inputFile))
The constructor (init()) assigns some variables to the class. When invoking the constructor of the base class (which should be the first thing the target does), you can optionally pass time (in seconds), memory (in bytes) and cpu parameters. The time parameter is your estimate of how long the target will run - UPDATE: IT IS CURRENTLY UNUSED BY THE SCHEDULAR. The memory and cpu parameters allow you to guarantee resources for a target.
The run method is where the variables assigned by the constructor are used and where in general actual work is done. Aside from doing the specific work of the target (in this case creating a temporary file to hold some intermediate output), the run method is also where children and a follow-on job are created, using addChildTarget() and setFollowOnTarget(). A job may have arbitrary numbers of children, but one or zero follow-on jobs.
Targets are also provided with two temporary file directories called localTempDir and globalTempDir, which can be accessed with the methods getLocalTempDir() and getGlobalTempDir(), respectively. The localTempDir is the path to a temporary directory that is local to the machine on which the target is being executed and that will exist only for the length of the run method. It is useful for storing interim results that are computed during runtime. All files in this directory are guaranteed to be removed once the run method has finished - even if your target crashes.
A job can either be created as a follow-on, or it can be the very first job, or it can be created as a child of another job. Let a job not created as a follow-on be called a 'founder'. Each founder job may have a follow-on job. If it has a follow-on job, this follow-on job may in turn have a follow-on, etc. Thus each founder job defines a chain of follow-ons. Let a founder job and its maximal sequence of follow-ons be called a 'chain'. Let the last follow-on job in a chain be called the chain's 'closer'. For each chain of targets a temporary directory, globalTempDir, is created immediately prior to calling the founder target's run method, this directory and its contents then persist until the completion of closer target's run method. Thus the globalTempDir is a scratch directory in which temporary results can be stored on disk between target jobs in a chain. Furthermore, files created in this directory can be passed to the children of target jobs in the chain, allowing results to be transmitted from a target job to its children.
To avoid the need to create a Target class for every job, I've added the ability to wrap functions, hence the code for the setup function described above becomes:
def setup(target, inputFile, N):
"""Sets up the sort.
"""
tempOutputFile = getTempFile(rootDir=target.getGlobalTempDir())
target.addChildTargetFn(down, (inputFile, 0, os.path.getsize(inputFile), N, tempOutputFile))
target.setFollowOnFn(cleanup, (tempOutputFile, inputFile))
The code to turn this into a target uses the static method Target.makeTargetFnTarget:
Target.makeTargetFnTarget(setup, (fileToSort, N))
Notice that the child and follow-on targets have also been refactored as functions, hence the methods addChildTargetFn and setFollowOnFn, which take functions as opposed to Target objects.
Note, there are two types of functions you can wrap - target functions, whose first argument must be the wrapping target object (the setup function above is an excample of a target function), and plain functions that do not have a reference to the wrapping target.
ScriptTree targets are serialized (written and retrieved from disk) so that they can be executed in parallel on a cluster of different machines. Thankfully, this is mostly transparent to the user, except for the fact that targets must be 'pickled' (see python docs), which creates a few constraints upon what can and can not be passed to and stored by a target. Currently the preferred way to run a pipeline is to create an executable python script. For example, see tests/sorts/scriptTreeTest_Sort.py.
The first line to notice is:
from jobTree.scriptTree.target import Target, Stack
This imports the Target and Stack objects (the stack object is used to run the targets).
Most of the code defines a series of targets (see above). The main() method is where the script is setup and run.
The line:
parser = OptionParser()
Creates an options parser using the python module optparse.
The line:
Stack.addJobTreeOptions(parser)
Adds the jobTree options to the parser. Most importantly it adds the command line options "--jobTree [path to jobTree]", which is the location in which the jobTree will be created, and which must be supplied to the script.
The subsequent lines parse the input arguments, notably the line:
options, args = parser.parse_args()
reads in the input parameters.
The line:
i = Stack(Setup(options.fileToSort, int(options.N))).startJobTree(options)
Is where the first target is created (the Setup target shown above), where a stack object is created, which is passed the first target as its sole construction argument, and finally where the jobTree is executed from, using the stack's startJobTree(options) function. The 'options' argument will contain a dictionary of command line arguments which are used by jobTree. The return value of this function is equal to the number of failed targets. In this case we choose to throw an exception if there are any remaining.
One final important detail, the lines:
if __name__ == '__main__':
from jobTree.test.sort.scriptTreeTest_Sort import *
reload the objects in the module, such that their module names will be absolute (this is necessary for the serialization that is used). Targets in other classes that are imported do not need to be reloaded in this way.
jobTree and scriptTree are designed to be robust, so that individuals jobs (targets) can fail, and be subsequently restarted. It is assumed jobs can fail at any point. Thus until jobTree knows your children have been completed okay you can not assume that your Target has been completed. To ensure that your pipeline can be restarted after a failure ensure that every job (target):
Never cleans up / alters its own input files. Instead, parents and follow on jobs may clean up the files of children or prior jobs.
Can be re-run from just its input files any number of times. A job should only depend on its input, and it should be possible to run the job as many times as desired, essentially until news of its completion is successfully transmitted to the job tree master process.
These two properties are the key to job atomicity. Additionally, you'll find it much easier if a job:
Only creates temp files in the two provided temporary file directories. This ensures we don't soil the cluster's disks.
Logs sensibly, so that error messages can be transmitted back to the master and the pipeline can be successfully debugged.
jobTree replicates the environment in which jobTree or scriptTree is invoked and provides this environment to all the jobs/targets. This ensures uniformity of the environment variables for every job.
How robust is jobTree to failures of nodes and/or the master?
JobTree checkpoints its state on disk, so that it or the job manager can be wiped out and restarted. There is some gnarly test code to show how this works, it will keep crashing everything, at random points, but eventually everything will complete okay. As a user you needn't worry about any of this, but your child jobs must be atomic (as with all batch systems), and must follow the convention regarding input files.
How scaleable?
We have tested having 1000 concurrent jobs running on our cluster. This will depend on the underlying batch system being used.
Can you support the XYZ batch system?
See the abstract base class 'AbstractBatchSystem' in the code to see what functions need to be implemented. It's reasonably straight forward.
Is there an API for the jobTree top level commands?
Not really - at this point please use scriptTree and the few command line utilities present in the bin directory.
Why am I getting the error "ImportError: No module named etree.ElementTree"?
The version of python in your path is less than 2.5. When jobTree spawns a new job it will use the python found in your PATH. Make sure that the first python in your PATH points to a python version greater than or equal to 2.5 but less than 3.0