ohsu-comp-bio / cwl-tes

cwl-tes submits your tasks to a TES server. Task submission is parallelized when possible.
Apache License 2.0
18 stars 28 forks source link

Added importable function to use in external python modules #33

Open medcelerate opened 5 years ago

medcelerate commented 5 years ago

Added a helper function to support loading in as a library for async execution.

medcelerate commented 5 years ago

This is now complete and functional.

medcelerate commented 4 years ago

I can definitely move to another file. And make the other change. And you are right about the asynchronously execution issue. My thought is to have a helper function that returns a group of for running a workflow then when all the steps complete that is is marked as complete. Otherwise it is marked as failure if the entire set of tasks has the issue. Something closer akin to Cromwell but with functionality on a TES server.

On Mon, Nov 18, 2019 at 1:54 PM Adam Struck notifications@github.com wrote:

@adamstruck requested changes on this pull request.

In cwl_tes/tes.py https://github.com/ohsu-comp-bio/cwl-tes/pull/33#discussion_r347549563:

@@ -389,13 +405,31 @@ def run(self, ) log.info(pformat(task))

  • if self.lib:

Why not add this conditional check after the logging lines in the original code block rather than duplicate this whole code block?

In cwl_tes/main.py https://github.com/ohsu-comp-bio/cwl-tes/pull/33#discussion_r347550592:

  • job_executor = SingleJobExecutor()
  • job_executor.max_ram = job_executor.max_cores = float("inf")
  • executor = functools.partial(
  • tes_execute, job_executor=job_executor,
  • loading_context=loading_context,
  • remote_storage_url=parsed_args.remote_storage_url,
  • ftp_access=ftp_fs_access)
  • cwltool.main.main(
  • args=parsed_args,
  • executor=executor,
  • loadingContext=loading_context,
  • runtimeContext=runtime_context,
  • versionfunc=versionstring,
  • logger_handler=console
  • )
  • return lib_helper.task_id

Since the steps are executing asynchronously, this method would break if you were to provide a multi-step workflow right?

Could you describe your envisioned use case in a bit more detail?

In cwl_tes/main.py https://github.com/ohsu-comp-bio/cwl-tes/pull/33#discussion_r347551001:

@@ -711,5 +713,72 @@ def arg_parser(): # type: () -> argparse.ArgumentParser return parser

+def non_interactive_executor(workflow_buffer,

I'd rather have this live in another file and not in main.py

— You are receiving this because you authored the thread. Reply to this email directly, view it on GitHub https://github.com/ohsu-comp-bio/cwl-tes/pull/33?email_source=notifications&email_token=AHYKRGI7SBVGDSUPA3ZE4I3QULQGVA5CNFSM4JNPT3E2YY3PNVWWK3TUL52HS4DFWFIHK3DMKJSXC5LFON2FEZLWNFSXPKTDN5WW2ZLOORPWSZGOCL6LLKY#pullrequestreview-318551467, or unsubscribe https://github.com/notifications/unsubscribe-auth/AHYKRGKT34J3UJ3X3VQ5NH3QULQGVANCNFSM4JNPT3EQ .

adamstruck commented 4 years ago

Cromwell is compatible with TES and runs both WDL and CWL workflows. I am not sure which filesystems they have made available with that backend.

To really get this project working the way you'd like I'd take a slightly different approach. I'd kick off cwltool.main, and store the execution id. In the TES executer rather than return the ID as you are doing I'd post the ID to and internal API (or something like Kafka) and then associate it with the execution ID. Then internally you could track the workflows progress and see all of the running task IDs associated with it.

medcelerate commented 4 years ago

We are using Cromwell but are looking to switch to another executor as we are having some issues that may or may not get resolved. On Nov 18, 2019, 7:27 PM -0500, Evan Clark evan.clark.professional@gmail.com, wrote:

I can definitely move to another file. And make the other change. And you are right about the asynchronously execution issue. My thought is to have a helper function that returns a group of for running a workflow then when all the steps complete that is is marked as complete. Otherwise it is marked as failure if the entire set of tasks has the issue. Something closer akin to Cromwell but with functionality on a TES server.

On Mon, Nov 18, 2019 at 1:54 PM Adam Struck notifications@github.com wrote:

@adamstruck requested changes on this pull request. In cwl_tes/tes.py:

@@ -389,13 +405,31 @@ def run(self, ) log.info(pformat(task))

  • if self.lib: Why not add this conditional check after the logging lines in the original code block rather than duplicate this whole code block? In cwl_tes/main.py:
    • job_executor = SingleJobExecutor()
  • job_executor.max_ram = job_executor.max_cores = float("inf")
  • executor = functools.partial(
  • tes_execute, job_executor=job_executor,
  • loading_context=loading_context,
  • remote_storage_url=parsed_args.remote_storage_url,
  • ftp_access=ftp_fs_access)
  • cwltool.main.main(
  • args=parsed_args,
  • executor=executor,
  • loadingContext=loading_context,
  • runtimeContext=runtime_context,
  • versionfunc=versionstring,
  • logger_handler=console
  • )
  • return lib_helper.task_id Since the steps are executing asynchronously, this method would break if you were to provide a multi-step workflow right? Could you describe your envisioned use case in a bit more detail? In cwl_tes/main.py:

    @@ -711,5 +713,72 @@ def arg_parser(): # type: () -> argparse.ArgumentParser return parser

+def non_interactive_executor(workflow_buffer, I'd rather have this live in another file and not in main.py — You are receiving this because you authored the thread. Reply to this email directly, view it on GitHub, or unsubscribe.