cylc / cylc-flow

Cylc: a workflow engine for cycling systems.
https://cylc.github.io
GNU General Public License v3.0
329 stars 93 forks source link

Support operators or actions as alternative for the job script #3781

Open kinow opened 4 years ago

kinow commented 4 years ago

Describe exactly what you would like to see in an upcoming release

TL;DR: provide a way to have something similar to Airflow operators in Cylc, that can be combined when executing a task, instead of only providing the script option.

We had a meeting this week (@hjoliver , myself, and another co-worker) with a quick intro about Cylc, and also talked about other workflow managers.

One point that I forgot to mention in the meeting, when comparing Cylc with other workflow managers, is that in Cylc your tasks (jobs) will execute whatever you have in your script, with Bash shell.

Other workflow managers have ways to let users to re-use building blocks in their workflows. These building blocks are normally not part of the workflow manager core, and are installed as plug-ins. Although some may be part of the core (Jenkins used to ship with a lot of plug-ins, now I think it is completely separated... Airflow installs some operators by default, but you can install others separately).

One common building block is for data transfer from somewhere like S3.

Cylc

I have never used Cylc with S3. But I think it would be something like:

[scheduling]
    initial cycle point = 20130808T00
    [[graph]]
        R1 = "upload_data"

# not adding other tasks or any cycles here to simplify it

[runtime]
  [[upload_data]]
    script="/opt/s3/upload_data.py /tmp/pyramidal-1.tif"

S3 authentication information goes into the user settings ~/.aws/config. Users can ask IT/DevOps/etc to install and maintain the credentials, done with the command line or configuration management tool. Users also have to either ask IT to install the dependencies necessary for AWS, or make it part of their workflow (that lib folder I think).

Jenkins

With Jenkins, you create a workflow, defining where and when tasks (build steps) are executed. Then you combine plug-ins to execute what you need.

# groovy
stage("publish to s3") {
    step([
        $class: 'S3BucketPublisher',
        entries: [[
            sourceFile: 'mybinaryFile',
            bucket: 'GoBinaries',
            selectedRegion: 'eu-west-1',
            noUploadOnFailure: true,
            managedArtifacts: true,
            flatten: true,
            showDirectlyInBrowser: true,
            keepForever: true,
        ]],
        profileName: 'myprofile',
        dontWaitForConcurrentBuildCompletion: false, 
    ])
}

S3 authentication information goes into Jenkins credentials storage. Users can ask IT/DevOps/etc to install and maintain the credentials, done with the Jenkins UI or configuration management tool.

ps: a user can prefer to use the UI, where you can do the same as in your pipeline script

Nextflow

tmpfile = Channel.fromPath('mytestfile)
process setExample {
    publishDir 's3://mybucket/test_s3new', mode: 'copy' , overwrite: true
    input:
    file myfile from tmpfile

    output:
    file myfile into tmp

    """
    echo $myfile
    """
}

S3 authentication info goes into nextflow.config. Users can ask IT/DevOps/etc to install and maintain the credentials, done with the command line or configuration management tool.

aws {
  accessKey = '<Your AWS access key>'
  secretKey = '<Your AWS secret key>'
  region = '<AWS region identifier>'
}

Airflow

With Airflow, you would probably use a PythonOperator, as there is no operator for just uploading it. You have one to run code with data from S3, and then put the output data back to S3.

from airflow import DAG
from airflow.providers.amazon.aws.operators.s3_file_transform import S3FileTransformOperator

with DAG('S3_dag_test', default_args=default_args, schedule_interval='@once') as dag:
    upload_to_S3_task = S3FileTransformOperator(
            task_id='upload_file_to_S3',
            source_s3_key='weather-dataset-2000',
            dest_s3_key='gridded-data-weather-forecast-2000',
            transform_script='Rcmd my_r_script.r' # arg 0 is the source, arg 1 is the dest
    )

The S3 authentication information is part of a connection created in Airflow, and stored securely. Users can ask IT/DevOps/etc to install and maintain the credentials, done with the Airflow UI or configuration management tool.

Other plugins

The same is true for other tasks, such as sending a message to Slack. The eWaterCycle forecast that used Cylc + CWL has a custom script to send Slack messages:

Now if another user wants to send Slack messages, they can copy the script and use an event like they did, or an xtrigger, or a task, etc. But some users may end up with different ways to integrate Cylc <-> Slack. Having a plug-in centralizes the functionality, helping users to avoid issues other users have already fixed.

There are multiple tasks that are common for workflows, like data transfer, notifications, triggers, etc, that are normally done with libraries or services. Having these as plug-ins would be convenient for users of Cylc IMO.

Additional context

Jenkins has over 1500 plug-ins: https://plugins.jenkins.io/. Anything you find in the list of plug-ins can be used as part of your workflow (with limitations if you use Groovy pipelines, or freestyle jobs).

List of Airflow operators: https://airflow.apache.org/docs/stable/_api/airflow/operators/index.html (plus other open source maintained by users and other projects).

StackStorm has a different site, the StackStorm Exchange, that works like the Jenkins update center. Lots of packages ready to be used in workflows: https://exchange.stackstorm.org/.

This is part of what I had in mind in https://github.com/cylc/cylc-flow/issues/2959. But also to give even more flexibility to users. If a user has a simple DAG, with no cycles, and wants to use cron, I think Cylc is powerful enough to support a DAG with cron syntax.

Not necessarily using cron, as in Jenkins it supports cron syntax for example, but it is Java code running in the JVM that does the work of evaluating cron expressions and running them when they evaluate to true. (I even thought about converting cron expressions to isodatetime as a possible alternative). The tasks in Jenkins are executed after the cron schedule. Not as precise as cron, but good enough for most users.

Pull requests welcome!

hjoliver commented 4 years ago

I generally agree this would be great, with the proviso that there's not yet much call for this sort of thing in our core HPC scientific modeling user base. But post Cylc 8, and with increasing support for proper plug-in extensibility, we should do it.

oliver-sanders commented 4 years ago

If we go down the route of re-implementing the job script in Python this functionality would be a fairly simple extension. It would also open the potential for mixing languages in the same job somewhat like GitHub Actions.