Barski-lab / cwl-airflow

Python package to extend Airflow functionality with CWL1.1 support
https://barski-lab.github.io/cwl-airflow
Apache License 2.0
185 stars 32 forks source link

Add queue support for Celery Executor #23

Closed michael-kotliar closed 5 years ago

michael-kotliar commented 5 years ago

When using CeleryExecutor all the tasks by default are sent to the default queue. If you update the [cwl] section of the airflow.cfg file with the following parameter:

queues = path/to/queues.yaml

where queues.yaml may look like this

advanced:
  cpus: 2
  ram: 2048
default:
  cpus: 1
  ram: 1024

CWL-Airflow will define queue based on the ResourceRequirement section of the workflow step. The step may look like this

  bam_to_bigwig:
    hints:
      ResourceRequirement:
        coresMin: 1
        ramMin: 1024
    run: ../subworkflows/bam-bedgraph-bigwig.cwl
    in:
      bam_file: samtools_sort_index_after_rmdup/bam_bai_pair
      chrom_length_file: chrom_length
      mapped_reads_number: get_stat/mapped_reads
      fragment_size: macs2_callpeak/macs2_fragments_calculated
    out: [bigwig_file]

CWL-Airflow will try to assign the task to the lowest possible queue among the sorted ones by cpus and ram. If ResourceRequirement is not specified or is not compete - use the lowest queue. JobCleanup and JobDispatcher are also assigned to the lowest queue.