Barski-lab / cwl-airflow

Python package to extend Airflow functionality with CWL1.1 support
https://barski-lab.github.io/cwl-airflow
Apache License 2.0
185 stars 32 forks source link

Job Cleanup not linked to the rest of the DAG #19

Closed Ocaenyth closed 5 years ago

Ocaenyth commented 5 years ago

Hello, I've recently been looking to use CWL-Airflow along with Rabix-composer-generated CWL files but I seem to be running in an issue that I do not have when importing your examples. So I have been trying to submit a workflow I have made in Rabix, that contains only one task (tool ?), which is supposed to run a "echo 'Hello World'". Now, the workflow is submitted and I can see it in Airflow's webserver, but I found something odd when I looked at the Graph View : image Unlike your examples that you've made available, the Job Cleanup has not been linked to be the last task, and instead will run after Job Dispatcher. I have been trying to understand what would cause that by looking at how you create the DAGs, but I couldn't figure it out. I also looked at what might be missing in my cwl file that yours have, but you do not provide any "small" and simple examples so it's a bit hard to find what I might be looking for and go through your examples.

echo_test.cwl (the task)

class: CommandLineTool
cwlVersion: v1.0
$namespaces:
  sbg: 'https://www.sevenbridges.com/'
id: echo_test
baseCommand:
  - echo "Hello World"
inputs: []
outputs: []
label: echo_test

new_test.cwl (the workflow)

class: Workflow
cwlVersion: v1.0
id: new_test
label: new_test
$namespaces:
  sbg: 'https://www.sevenbridges.com/'
inputs: []
outputs: []
steps:
  - id: echo_test
    in: []
    out: []
    run: ./echo_test.cwl
    label: echo_test
    'sbg:x': -233.796875
    'sbg:y': -162.5
requirements: []
michael-kotliar commented 5 years ago

Thank you for using CWL-Airflow.

The reason of JobCleanup is not being connected to any of the nodes is your empty outputs list outputs: []. When you call assign_job_cleanup function here you iterate over all of the DAG's tasks. For each task you get the list of its outputs and compare it with the workflow outputs. If any of the task's outputs is listed in the workflow outputs here, you set this task as an upstream for JobCleanup.

I would recommend you to update you echo_test.cwl and new_test.cwl as follows:

echo_test.cwl (output is redirected to echo.txt)

class: CommandLineTool
cwlVersion: v1.0
$namespaces:
  sbg: 'https://www.sevenbridges.com/'
id: echo_test
stdout: "echo.txt"
baseCommand:
  - echo
  - "Hello World"
inputs: []
outputs:
  echo_file:
    type: File
    outputBinding:
      glob: "echo.txt"
label: echo_test

new_test.cwl

class: Workflow
cwlVersion: v1.0
id: new_test
label: new_test
$namespaces:
  sbg: 'https://www.sevenbridges.com/'
inputs: []
outputs:
  echo_file:
    type: File
    outputSource: echo_test/echo_file
steps:
  - id: echo_test
    in: []
    out: [echo_file]
    run: ./echo_test.cwl
    label: echo_test
    'sbg:x': -233.796875
    'sbg:y': -162.5
requirements: []

Additionally, you will have to create an empty job file to be able to submit this job to CWL-Airflow using cwl-airflow submit new_test.cwl empty.json command.

empty.json

{}

Once you run this command you can see new job file in ~/airflow/jobs folder (by default). You don't need to do anything with this file. It may look like

{
    "workflow": "/Users/kot4or/temp/del/cwl/new_test.cwl",
    "output_folder": "/Users/kot4or/temp/del/cwl",
    "uid": "699f760b-e377-46d9-ad67-a2b1a34d13c8"
}

When you run airflow webserver and airflow scheduler you can get the results

run

Also, make sure that the version of cwltool you use is 1.0.20180622214234. The newer versions may have different syntax for the functions called from inside CWL-Airflow.