nextflow-io / nextflow

A DSL for data-driven computational pipelines
http://nextflow.io
Apache License 2.0
2.69k stars 621 forks source link

wr as new Nextflow backend #1088

Closed sb10 closed 4 years ago

sb10 commented 5 years ago

New feature

I develop wr which is a workflow runner like Nextflow, but can also just be used as a backend scheduler. It can schedule to LSF and OpenStack right now.

The benefit to Nextflow users of going via wr instead of using Nextflow’s existing LSF or Kubernetes support is:

  1. wr makes more efficient use of LSF: it can pick an appropriate queue, use job arrays, and “reuse” job slots. In a simple test I did, Nextflow using wr in LSF mode was 2 times faster than Nextflow using its own LSF scheduler.
  2. wr’s OpenStack support is incredibly easy to use and set up (basically a single command to run), and provides auto scaling up and down. Kubernetes, by comparison, is really quite complex to get working on OpenStack, doesn’t auto scale, and wastes resources with multiple nodes needed even while no workflows are being operated on. I was able to get Nextflow to work with wr in OpenStack mode (but the shared disk requirement for Nextflow’s state remains a concern).

Usage scenario

Users with access to LSF or OpenStack clusters who want to run their Nextflow workflows efficiently and easily.

Suggest implementation

Since I don’t know Java well enough to understand how to implement this “correctly”, I wrote a simple bsub emulator in wr, which is what my tests so far have been based on. I submit the Nextflow command as a job to wr, turning on the bsub emulation, and configure Nextflow to use its existing LSF scheduler. While running under the emulation, Nextflow’s bsub calls actually call wr.

Of course the proper way to do this would be have Nextflow call wr directly (either the wr command line, or it’s REST API). The possibly tricky thing with regard to having it work in OpenStack mode is having it tell wr about OpenStack-specific things like what image to use, what hardware flavour to use, pass details on how to mount S3 etc. (the bsub emulation handles all of this).

Here's what I did for my LSF test...

echo_1000_sleep.nf:

#!/usr/bin/env nextflow

num = Channel.from(1..1000)

process echo_sleep {
  input:
  val x from num

    output:
    stdout result

  "echo $x && sleep 1"
}

result.subscribe { println it }

workflow.onComplete {
    println "Pipeline completed at: $workflow.complete"
    println "Execution status: ${ workflow.success ? 'OK' : 'failed' }"
}

nextflow.config:

process {
  executor='lsf'
  queue='normal'
    memory='100MB'
}

install wr:

wget https://github.com/VertebrateResequencing/wr/releases/download/v0.17.0/wr-linux-x86-64.zip
unzip wr-linux-x86-64.zip
mv wr /to/somewhere/in/my/PATH/wr

run:

wr manager start -s lsf
echo "nextflow run ./echo_1000_sleep.nf" | wr add --bsub -r 0 -i nextflow --cwd_matters --memory 1GB

Here's what I did to get it to work in OpenStack...

nextflow_install.sh:

sudo apt-get update
sudo apt-get install openjdk-8-jre-headless -y
wget -qO- https://get.nextflow.io | bash
sudo mv nextflow /usr/bin/nextflow

put input files in S3:

s3cmd put nextflow.config s3://sb10/nextflow/nextflow.config
s3cmd put echo_1000_sleep.nf s3://sb10/nextflow/echo_1000_sleep.nf

~/.openstack_rc:

[your rc file containing OpenStack environment variables downloaded from Horizon]

run:

source ~/.openstack_rc
wr cloud deploy --os 'Ubuntu Xenial' --username ubuntu
echo "cp echo_1000_sleep.nf /shared/echo_1000_sleep.nf && cp nextflow.config /shared/nextflow.config && cd /shared && nextflow run echo_1000_sleep.nf" | wr add --bsub -r 0 -o 2 -i nextflow --memory 1GB --mounts 'ur:sb10/nextflow' --cloud_script nextflow_install.sh --cloud_shared

The NFS share at /shared created by the --cloud_shared option is slow and limited in size; a better solution would be to set up your own high performance shared filesystem in OpenStack (eg. GlusterFS), then add to nextflow_install.sh to mount this share. Or even better, is there a way to have Nextflow not store state on disk? If it could just query wr for job completion status, that would be better.

pditommaso commented 5 years ago

This is a nice initiative, I'll be very happy to review and merge a pull request implementing the support for this back-end engine.

sb10 commented 5 years ago

@pditommaso Is there a way to get around needing a shared disk for Nextflow state? If I was going to try and write this myself, despite not knowing Java, what's my best starting point? Base it on Nextflow's LSF scheduler? Or one of the cloud ones? Has anyone written a guide to writing Nextflow schedulers?

pditommaso commented 5 years ago

Is there a way to get around needing a shared disk for Nextflow state

Not easily, the bottom line is that temporary task outputs need to be shared to downstream tasks, therefore some kind of shared storage. For example with AWS Batch NF is using S3 as shared storage, I guess for OpenStack there's a similar object storage solution (and actually there's).

what's my best starting point?

It depends, if wr provides a command line tool somehow similar to qsub, bsub, etc, your can just create a executor extending AbstractGridExecutor along the same lines of the one for Slurm for example.

If you are planning instead to use a low-level API, you can have a look can extend the Executor base case.

Has anyone written a guide to write Nextflow schedulers

Nope, but I can advise you if needed.

sb10 commented 5 years ago

temporary task outputs need to be shared to downstream tasks, therefore some kind of shared storage. For example with AWS Batch NF is using S3 as shared storage

That works for wr's purposes; wr provides easy mounting of S3, which could be used for job inputs/outputs.

The problem I have is with Nextflow running on the "master" node needing to check the output written to S3 on a "worker" node before it, eg. submits the next job.

wr's S3 mount is not POSIX enough for what Nexflow on the "master" needs to do for this state checking. (eg. it caches S3 "directory" contents, so is not aware if a different process writes to that "directory")

Can job input/outputs be stored in S3, but job state be queried from the scheduler?

pditommaso commented 5 years ago

Can job input/outputs be stored in S3, but job state be queried from the scheduler?

Yes, you will need to write your own version of TaskHandler

sb10 commented 5 years ago

I have an initial basic but working implementation here: https://github.com/sb10/nextflow/blob/wr/modules/nextflow/src/main/groovy/nextflow/executor/WrExecutor.groovy

I have # *** comments for areas where I'm stuck. Can you give me some pointers? In particular:

  1. Don't know how to pick up options from nextflow.config.
  2. Not sure as to the best approach for doing the important things BashWrapperBuilder does without creating a file.
  3. How do you get a "pure" task name?
  4. How do you get Groovy to trust a ca cert?
  5. Not sure how passing all possible task options should be done. Just pass around the task object, and will that have all desirable information like memory requirement hints etc?
  6. Under what circumstances does kill() get called?

I've only had a quick glance at an example LSF test, and didn't really get it. How should I go about writing a complete test for this? Are end-to-end tests done, or only mocks?

I'm currently using the TaskPollingMonitor, but the ideal way of implementing this would be for Nextflow to listen on a websocket and for wr to tell it the moment that tasks change state. What would be the best approach to doing that? Implement my own TaskMonitor? Or is it even possible?

Currently, my implementation is only working running locally. I still haven't thought about how to handle input and output files being in S3. With existing Executors, how does a user start a workflow running when their input files are not available locally, but in an S3 bucket? And how do they specify that the output files go to S3 as well?

pditommaso commented 5 years ago

It looks a good start.

I have # *** comments for areas where I'm stuck.

I would suggest submitting a draft PR so I can reply to your comments.

Don't know how to pick up options from nextflow.config.

The Executor object has a session attribute that holds the config map. Something like session.config.navigate('your.attribute') should work.

Not sure as to the best approach for doing the important things BashWrapperBuilder does without creating a file.

What do you mean?

How do you get a "pure" task name?

What do you mean for pure?

How do you get Groovy to trust a ca cert?

More than Groovy, you should look at URLConnection Java API. Have a look at K8sClient

Not sure how passing all possible task options should be done. Just pass around the task object, and will that have all desirable information like memory requirement hints etc?

Task specific settings are accessible on TaskRun#config map. Have a look for example here.

Under what circumstances does kill() get called?

Mostly when the execution is aborted and the cleanup method is invoked.

How should I go about writing a complete test for this? Are end-to-end tests done, or only mocks?

Mocks would be a good start. Have a look a K8sClientTest

I'm currently using the TaskPollingMonitor, but the ideal way of implementing this would be for Nextflow to listen on a websocket and for wr to tell it the moment that tasks change state.

I would keep this as a future enhancement.

Currently, my implementation is only working running locally. I still haven't thought about how to handle input and output files being in S3. With existing Executors, how does a user start a workflow running when their input files are not available locally, but in an S3 bucket?

This should be managed automatically by BashWrapperBuilder

sb10 commented 5 years ago

I have # *** comments for areas where I'm stuck.

I would suggest submitting a draft PR so I can reply to your comments.

PR created here: https://github.com/nextflow-io/nextflow/pull/1114

Don't know how to pick up options from nextflow.config.

The Executor object has a session attribute that holds the config map. Something like session.config.navigate('your.attribute') should work.

OK, I'll give it a try. But what is session.getConfigAttribute() and how are you supposed to use it?

Not sure as to the best approach for doing the important things BashWrapperBuilder does without creating a file.

What do you mean?

If possible I don't want Nextflow to create any files of its own (only tasks commands should create workflow output files). If that's not possible, then at least I need the wrapper script it creates to not have any absolute paths in it, because those paths won't exist on other OpenStack instances that wr might create to run tasks.

How do you get a "pure" task name?

What do you mean for pure?

For a scatter task, task.name looks like name (1), while I just want name.

Under what circumstances does kill() get called?

Mostly when the execution is aborted and the cleanup method is invoked.

Specifically, what state could a task be in when kill() is called? Any state?

Currently my code only considers "started" and "complete" statuses, which actually comprise multiple underlying states, and miss some possibilities. For example, in wr a job can be "lost" if it was running on an instance that got deleted externally. The user confirms the situation and then tells wr the job is really dead, and can then choose to restart it.

I imagine the current situation with my Nextflow implementation, it would initially satisfy checkIfRunning(), but never satisfy checkIfCompleted(). Is something going to happen automatically in Nextflow to handle these situations, or should I either a) include "lost" as a "complete" status, or b) should users be checking wr's own status interface to deal with lost jobs?

Currently, my implementation is only working running locally. I still haven't thought about how to handle input and output files being in S3. With existing Executors, how does a user start a workflow running when their input files are not available locally, but in an S3 bucket?

This should be managed automatically by BashWrapperBuilder

It's not clear to me how this is done. What does a user have to do to enable this feature?

The scenario I envisage is:

Nextflow is run on a local machine from which wr has been deployed to OpenStack. This local machine does not have the S3 bucket mounted, and is completely different to the OpenStack servers that will be created to run tasks (in particular, different home directory location).

nextflow.config is configured with the location of an S3 bucket that holds their input files and where output files should go, and their ~/.s3cfg holds their connection details.

This results in WrRestApi.add() (called by submit()) running the TaskBean(task).script in a directory that is mounted on the configured S3 bucket location.

For this to actually work, task.getWorkDirStr() would have to return a unique task path relative to the configured S3 bucket location, and wr would cd there before running the task bean script. Or alternatively, copyStrategy.getStageInputFilesScript needs to symlink from the correct location within a mounted bucket location to the local working directory, and then output files need to be copied over to the mounted bucket location.

Is a scenario like this at all possible? Or what is the "Nextflow" way of doing this kind of thing?

pditommaso commented 5 years ago

OK, I'll give it a try. But what is session.getConfigAttribute() and how are you supposed to use it?

Actually, it should work. TES executor uses the same method. The best thing to do is to create a unit test and debug it.

If possible I don't want Nextflow to create any files of its own (only tasks commands should create workflow output files). If that's not possible, then at least I need the wrapper script it creates to not have any absolute paths in it, because those paths won't exist on other OpenStack instances that wr might create to run tasks.

You can create a copy strategy that does nothing. Have a look at TesFileCopyStrategy

For a scatter task, task.name looks like name (1), while I just want name

Check the TaskRun#processor#name

Specifically, what state could a task be in when kill() is called? Any state?

It's used to kill pending or running tasks when the workflow execution is aborted.

It's not clear to me how this is done. What does a user have to do to enable this feature?

It strongly depends on the storage type used as a pipeline work dir. When an input file is stored in a file system different for the one used as work dir, NF copies that file from the foreign file system to the work dir (for example, an input is on ftp and the work dir is a S3 bucket). This is controlled by the resolveForeignFiles method linked above. Then if the work dir is NOT a posix file system, files are copied to a scratch node in the local node. This is done generally by the Task handler.

sb10 commented 5 years ago

Are there any guides or examples aimed at end-users of using Nextflow with data in S3?

pditommaso commented 5 years ago

There's nothing special, file paths need to start with s3://

sb10 commented 5 years ago

I'm testing with this basic workflow:

#!/usr/bin/env nextflow

Channel.fromPath('test_inputs/*.input').set { inputs_ch }

process capitalize {
  input:
  file x from inputs_ch
  output:
  file 'file.output' into outputs_ch
  script:
  """
  cat $x | tr [a-z] [A-Z] > file.output
  """
}

outputs_ch
      .collectFile()
      .println{ it.text }

workflow.onComplete {
    println "Pipeline completed at: $workflow.complete"
    println "Execution status: ${ workflow.success ? 'OK' : 'failed' }"
}

If I change the input to be specified like:

Channel.fromPath('s3://mybucket/test_inputs/*.input').set { inputs_ch }

Then it fails because it tries to contact Amazon S3. How do I tell Nextflow to use my own S3 service?

I looked at https://www.nextflow.io/docs/latest/config.html#config-aws and created a nextflow.config with an aws section with an endpoint value, but it didn't seem to help. In my ~/.s3cfg file I have to specify:

[default]
access_key = xxx
secret_key = xxx
encrypt = False
host_base = cog.sanger.ac.uk
host_bucket = %(bucket)s.cog.sanger.ac.uk
progress_meter = True
use_https = True

Assuming that can be resolved, what change do I make to my workflow so that the file.output is actually an input-specific output file stored in S3 as well?

I'm hoping that instead of the current method of copying files from S3 to local disk as part of staging, I can make use of wr's S3 mount capabilities, avoiding the need for large local disks when working on large files in S3.

pditommaso commented 5 years ago

Here there's an example (an by the way it should be a guy at sanger).

what change do I make to my workflow so that the file.output is actually an input-specific output file stored in S3 as well

Tasks are implicitly assumed to be executed in a posix file system and therefore output files are created in the task work dir. Then if the pipeline work dir is a remote storage, NF will copy that file to the remote work dir.

I'm hoping that instead of the current method of copying files from S3 to local disk as part of staging, I can make use of wr's S3 mount capabilities, avoiding the need for large local disks when working on large files in S3.

It should be possible writing your own copying strategy.

sb10 commented 5 years ago

Then if the pipeline work dir is a remote storage

How does a user specify this? Or how does Nextflow know?

Anyway, I'll look in to writing my own copy strategy.

pditommaso commented 5 years ago

How does a user specify this? Or how does Nextflow know?

Using the -w command line option and specifying a remote supported storage path e.g. s3 or gs

nextflow run <something> -w s3://bucket/work/dir

Then, this decides if an input file is on a foreign storage ie. different from the current work dir

sb10 commented 5 years ago

OK, I'll give it a try. But what is session.getConfigAttribute() and how are you supposed to use it?

Actually, it should work.

It wasn't working because of my config file, but I'm not sure how to get the TesExecutor's way of doing it to work.

session.getConfigAttribute('executor.cacertpath', [...]) works with:

executor {
  name='wr'
  cacertpath='foo.pem'
}

but session.getConfigAttribute('executor.wr.cacertpath', [...]) does not work with:

process {
  executor='wr'
}

executor {
  $wr {
  cacertpath='foo.pem'
  }
}

or any other combination I could think of. What is the correct way that people would expect to be able to specify executor options in their nextflow.config, and how do I pick those options up in all cases?

sb10 commented 5 years ago

I've now implemented my WrTaskHandler as a BatchHandler, which speeds up getting task status. Is there an equivalent way to batch up the submit requests as well?

pditommaso commented 5 years ago

Do you mean to batch submit requests? nope, there isn't a specific mechanism for this.

sb10 commented 5 years ago

Yes, that's what I mean. I'm thinking I can add a BatchSubmitHandler trait to BatchHandler.groovy, which is identical to the BatchHandler trait but just with a different method name (batchSubmit() instead of batch()). I need this because a single batch() method can't handle the needs of batching both status requests and submit requests.

Alternatively, I could pass my REST API client to my TaskMonitor and have the monitor class directly submit all the tasks in the pending queue to the client in 1 go.

Or something else?

pditommaso commented 5 years ago

I think both approaches can work. I would suggest to keep it as an optimisation step, or just postpone it because I need to merge some changes in the TaskPolling and Executor login in the master next week.

sb10 commented 5 years ago

Getting back to S3, I've got my config file working by doing:

aws {
  client {
    endpoint = "https://cog.sanger.ac.uk"
    signerOverride = "S3SignerType"
  }
}

And then using the normal new BashWrapperBuilder(task).build(), plus running like nextflow testS3.nf -w s3://sb10/nextflow/work it finds and scatters over my S3 input files and writes the .command.run and .command.sh files to S3. Great!

But my wrapperFile variable is /sb10/nextflow/work/4f/03a48f82c42eaa2228e931eae3d8f9/.command.run and the uploaded .command.run file contains paths like:

nxf_launch() {
    /bin/bash -ue /sb10/nextflow/work/4f/03a48f82c42eaa2228e931eae3d8f9/.command.sh
}

Is this expected, or am I not quite doing something correctly? Am I supposed to mount my sb10 bucket at /sb10 for this to all work and come together?

sb10 commented 5 years ago

And if that's the case, how can my WrTaskHandler.submit() method know that it should request such a mount when submitting to wr?

pditommaso commented 5 years ago

One question at a time please, otherwise the thread become unreadable. You are getting that path because you are using the SimpleFileCopyStrategy (I guess) which is designed to work with local paths and the S3Path object when converted to a string omit the protocol path ie. s3://.

Provided that you will mount the inputs (in a container I supposed) how are you expecting the script to reference these files?

sb10 commented 5 years ago

The ideal way would be that I submit the command .mnt/sb10/nextflow/work/4f/03a48f82c42eaa2228e931eae3d8f9/.command.sh to wr while telling it to mount s3://sb10 at .mnt/sb10 in the current working directory.

On an OpenStack node wr will create a unique working directory in /tmp, mount the s3 bucket at .mnt/sb10, and execute the .sh file that is now available. Currently that won't work because the .sh would also need to refer to the .run file as being in .mnt/sb10/nextflow/work/4f/03a48f82c42eaa2228e931eae3d8f9/.command.run.

I've been trying to get my head around SimpleFileCopyStrategy and how I could make minimal changes while getting it to give me the relative paths I want, but I haven't understood it yet.

pditommaso commented 5 years ago

Let's put in this way, give a pipeline execution work dir as s3://foo NF will compute for each task a work dir as s3://foo/xx/yy. This path is set in the TaskBean#workDir attribute that is then set in the SimpleFileCopyStrategy#workDir attribute.

Then for each (s3) input file the stageInputFile method is invoked to which is passed the source S3 path and the expected relative file name resolved against the task work dir (most of the time just the file name w/o path).

Therefore it's the role of your custom file copy strategy to implement the logic to handle your use case. IMO it should be very similar to AwsBatchFileCopyStrategy.groovy with the difference that you don't need to copy the input files but only the output files to the targetDir (that is supposed to be the same as workDir unless the process specifies a custom storeDir)

Entering in Easter mode 😄👋👋

sb10 commented 5 years ago

Thanks. I now have tasks running successfully in OpenStack with data in S3, but my output file is not "unstaged" to be put in S3, so the workflow fails due to missing output files. This is probably just because I don't know how to write a proper S3-based workflow. Can you tell me how to adjust this so it does the right thing?

#!/usr/bin/env nextflow

Channel.fromPath('s3://sb10/nextflow/test_inputs/*.input').set { inputs_ch }

process capitalize {
  input:
  file x from inputs_ch
  output:
  file 'file.output' into outputs_ch
  script:
  """
  cat $x | tr [a-z] [A-Z] > file.output
  """
}

outputs_ch
      .collectFile()
      .println{ it.text }

workflow.onComplete {
    println "Pipeline completed at: $workflow.complete"
    println "Execution status: ${ workflow.success ? 'OK' : 'failed' }"
}

In particular, I don't know how to specify that I want file.output to get stored back in S3 somewhere, at a location that is unique to the scatter, such that it can be found and acted on by the outputs_ch (or, in a more complex workflow, by a subsequent step).

Or should the workflow remain unchanged, and instead my CopyStrategy be changed to copy all output files to the same place the .command.sh was in (ie. the workDir in S3)?

Edit: my stageOutCommand() is not getting called because in BashWrapperBuilder, copyStrategy.getUnstageOutputFilesScript(outputFiles,targetDir) is only called if( changeDir || workDir != targetDir ). My workDir and targetDir are the same in this case, and changeDir is null. Is this expected? Should I be trying to set changeDir to something by enabling a scratch directory?

pditommaso commented 5 years ago

my output file is not "unstaged" to be put in S3

Who is supposed to do this? the job wrapper script or wr is expected to copy the output file to S3?

sb10 commented 5 years ago

I was hoping to use BashWrapperBuilder's unstaging ability. If I force the copyStrategy.getUnstageOutputFilesScript(outputFiles,targetDir) call so that my stageOutCommand() method is called, then the output file gets copied to S3 as desired.

Should I extend BashWrapperBuilder and override the method that calls copyStrategy.getUnstageOutputFilesScript so that it is always called? Or am I missing the proper way to do this?

pditommaso commented 5 years ago

Yes creating a subclass and overring the method makeBinding as shown below should do the trick

protected Map<String,String> makeBinding() { 
  def result = super.makeBinding()
  result.unstage_outputs = <your code>
  return result
}
sb10 commented 5 years ago

I'm unclear on what's supposed to happen with stdout and stderr files. The wrapper script creates them in the current working directory, but SimpleFileCopyStrategy does not seem to unstage them. Should I make alterations to my stageOutCommand() to somehow unstage these files along with the output files? Or should they not be copied to S3?

pditommaso commented 5 years ago

They are copied by this snippet and wr executor should do the same. Maybe it could makr sense to introduce an extra flag other to changeDir to force the copying of that files e.g.


if( changeDir || unstageOutputFiles ) {
    //.. etc
}  
sb10 commented 5 years ago

Thanks for all your help so far. It's now more or less feature complete and working to my satisfaction. With wr in LSF mode, Nextflow tasks all get put in a job array. And it's very very easy to run a workflow in OpenStack, data in S3.

Main outstanding task for the PR is to write tests. I have started with a file modules/nextflow/src/test/groovy/nextflow/executor/WrExecutorTest.groovy in package nextflow.executor that is like class WrExecutorTest extends Specification {, but can't figure out how to just run the tests in this file.

make test does run the tests, but takes 5mins since it does everything.

Variations on ./gradlew test --tests *WrExecutorTest* all fail because of "No tests found for given includes". What's the correct way to do it?

sb10 commented 5 years ago

I'm trying to move my code to an nf-wr module. I now have nextflow.wr.executor.WrExecutor.

But I was not able to figure out how to move over my WrMonitor class. It must be in package nextflow.processor in order to call nextflow.processor.TaskProcessor#resumeOrDie and nextflow.processor.TaskProcessor#finalizeTask.

I tried reimplementing it as extending TaskPollingMonitor instead, but could not avoid run-time errors related to trying to access TaskPollingMonitor's private vars and methods.

How can I structure/inherit properly to make it work? Please see the existing modules/nextflow/src/main/groovy/nextflow/processor/WrMonitor.groovy in the PR (which is mostly direct code duplication of TaskPollingMonitor.groovy); how would you move that out of the nextflow.processor package successfully?

pditommaso commented 5 years ago

Push the code to this or a separate branch, I'll try to review during the w-e.

sb10 commented 5 years ago

Thanks. https://github.com/sb10/nextflow/tree/nf-wr

modules/nf-wr/src/main/nextflow/wr/client/WrRestApi.groovy is the problem file. I have 2 main differences to TaskPollingMonitor:

  1. My submitPendingTasks() submits all the pending tasks in one go to wr client, and my submit() method calls a custom handler method to update submitted state.
  2. The currently commented out methods all differ from TaskPollingMonitor in that I don't need or want to check "slot" availability or have a submit rate limit. The PR ('wr' branch) has this code uncommented and it all works as desired.

My module compiles as-is and nextflow runs until it hits this runtime error:

Exception in thread "Task submitter" org.codehaus.groovy.runtime.metaclass.MissingPropertyExceptionNoStack: No such property: pendingQueue for class: nextflow.wr.processor.WrMonitor

If I try to add a pendingQueue variable to my own class like this:

class WrMonitor extends TaskPollingMonitor {

    final private WrRestApi client
    private Queue<TaskHandler> pendingQueue

    // [...]
    protected WrMonitor( Map params ) {
        super(params)
        assert params
        assert params.client instanceof WrRestApi
        this.client = params.client as WrRestApi
        this.pendingQueue = new LinkedBlockingQueue()
    }
    // [...]

Then there are no runtime errors, but nothing gets submitted because my submitPendingTasks() is called once with 0 pending tasks.

On the other hand, if I change my submitPendingTasks() to do def itr = super.pendingQueue.iterator(), then I get run-time error:

Exception in thread "Task submitter" groovy.lang.MissingMethodException: No signature of method: nextflow.wr.processor.WrMonitor.getPendingQueue() is applicable for argument types: () values: []
    at org.codehaus.groovy.runtime.ScriptBytecodeAdapter.unwrap(ScriptBytecodeAdapter.java:70)
    at org.codehaus.groovy.runtime.ScriptBytecodeAdapter.invokeMethodOnSuperN(ScriptBytecodeAdapter.java:146)
    at org.codehaus.groovy.runtime.ScriptBytecodeAdapter.invokeMethodOnSuper0(ScriptBytecodeAdapter.java:164)
    at nextflow.wr.processor.WrMonitor.submitPendingTasks(WrMonitor.groovy:181)

I don't know groovy or java well enough to understand what this means or how to get around it.

Edit: submits are successful if I alter TaskPollingMonitor to make the pendingQueue variable protected instead of private. But that doesn't seem like a good idea?

pditommaso commented 5 years ago

The exception is thrown because the pendingQueue is declared private therefore it cannot be accessed from your subclass.

To avoid that instead of reimplementing submitPendingTasks you could override canSubmit return always true.

For dev have a try to IntelliJ IDEA. It would help you detecting compilation errors. Look here to configure NF with it.

sb10 commented 5 years ago

But I need my own submitPendingTasks() in order to batch up all the submits. This is what enables the use of LSF job arrays, and is also a more efficient use of wr. Or is there another way I could do that?

pditommaso commented 5 years ago

Ideally, it could be collected by a bounded buffer and fire altogether, tho it could be tricky and not necessary it would be faster. What if your workflow submit tasks with a slow rate, let's a task every 200-500ms? How long it should wait to fill up the buffer?

Regarding LSF job arrays, would allow the use of a different work dir for each task in the array?

sb10 commented 5 years ago

Tasks are unaffected, they still have their own work directories. The job array just means it's more efficient for LSF to schedule each task, and better for the user as well.

pditommaso commented 5 years ago

My understanding is that the API call made on NF side is irrespective of the back-end. Would OpenStack take advantage of job arrays as well?

One possible solution is that you manage this on wr backend side. Maybe the only advantage to make on NF side is to send with a unique request more than one job submission.

sb10 commented 5 years ago

Not OpenStack per se, but its a small benefit to wr for the submissions to be batched.

wr already does have a little buffer to try and batch incoming jobs, but it doesn't manage to batch many Nextflow tasks as they're usually submitted too far apart.

Anyway, I'd really rather not reimplement something when there is already perfectly working code in Nextflow to do this. Can I just make a protected getPendingQueue() method in TaskPollingMonitor to get access to the variable?

pditommaso commented 5 years ago

Not sure it will be enough, but you can try this approach.

sb10 commented 5 years ago

Thanks, that worked fine. I closed the previous PR and opened a new one for the nf-wr module version: https://github.com/nextflow-io/nextflow/pull/1148 Still have to write tests...

sb10 commented 5 years ago

I'm completely stuck on a test for my BashBuilder:

https://github.com/nextflow-io/nextflow/pull/1148/files?file-filters%5B%5D=.gradle&file-filters%5B%5D=.groovy&owned-by%5B%5D=#diff-6c25df820422ef48a19701ff1088a497

If you run make test module=nf-wr class=nextflow.wr.executor.WrBashBuilderTest with the 3rd test uncommented, you'll see the problem.

Maybe the solution is really trivial, but I can't see it because I don't know groovy, Spock or Nextflow well enough...

pditommaso commented 5 years ago

I'll try to have a look at my earliest convenience.

sb10 commented 5 years ago

Ok, https://github.com/nextflow-io/nextflow/pull/1148 is now feature complete, functional and ready for review. There are *** comments that warrant special attention.

The main issue right now is that I don't think I've done the "activation" of the nf-wr module correctly, because when you run the nextflow executable, you get a bunch of warnings like CAPSULE: Warning: duplicate file nextflow-19.06.0-edge.jar in capsule.

Also, if I create the executable on a Mac, then I can't run it on a linux machine without having to do java -jar nextflow, as otherwise it complains about something to do with security certificate location I think (but only when actually running a workflow; getting -h works fine).

Oh, and I don't know why some of the Travis tests aren't working. It was developed with Java 8.

PeteClapham commented 5 years ago

Regarding SB10's earlier comment about state and artifacts being maintained on disk, this seems to be a scale limitation. e.g how many concurrent jobs NF can manage and recover from should a failure arise ?

pditommaso commented 5 years ago

Sorry being late on this. Need to find some time to review it. It would be interesting to know if you are already using it.

Regarding SB10's earlier comment about state and artifacts being maintained on disk, this seems to be a scale limitation. e.g how many concurrent jobs NF can manage and recover from should a failure arise ?

What do you mean? the task work dirs? There's no implicit limitation on the number of jobs that can manage and recover.

sb10 commented 5 years ago

It would be interesting to know if you are already using it.

As I understand it, someone here is using it to do "real" work, yes. It's a workflow using Singularity containers. This is with wr in OpenStack mode and with data in S3.

PeteClapham commented 5 years ago

We do indeed, but we are seeing reports of scale limitations on large workflows, particularly SNV / GATK joint calling. The number of intermediate files and small state files on disk become significant barriers to scaling and restart post workflow errors.

Due to the nature of the issue, all platform backends will heve similar issues eventually. How soon they become an impact will depend upon scale and the workflow in play.

Backend databases, are generally more efficient at serving and managing this type of data as systems grow. For example eHive