nextflow-io / patterns

A curated collection of Nextflow implementation patterns
http://nextflow-io.github.io/patterns/
MIT License
332 stars 71 forks source link

manipulating variable outside of scripts #18

Closed daudn closed 1 year ago

daudn commented 5 years ago

My title may be slightly misleading, however, bare with me.

I have a process iterate_list. Process iterate_list takes a list and does something on each item in the list. When running the script, it takes two inputs. The list and the item it needs to process (which it gets as a consumer from a rabbitmq queue)

Currently, I give a python script the entire list, and it iterates over each one does the processing (as one big chunk) and returns after completion. This is fine, however, if the system restarts, it starts all over again.

I was wondering, how can I make it so that every time my python script processes a single item, it returns the item, I remove it from the list, and then pass in the new list to the process. So in case of a system restart/crash, nextflow knows where it left off and can continue from there.

 import groovy.json.JsonSlurper

    def jsonSlurper = new JsonSlurper()
    def cfg_file = new File('/config.json')
    def analysis_config = jsonSlurper.parse(cfg_file)
    def cfg_json = cfg_file.getText()
    def list_of_items_to_process = [] 

    items = Channel.from(analysis_config.items.keySet())

    for (String item : items) {
        list_of_items_to_process << item
        } 

    process iterate_list{
        echo true

        input:
        list_of_items_to_process

        output:
        val 1 into typing_cur

        script:
        """
        python3.7 process_list_items.py ${my_queue} \'${list_of_items_to_process}\'
        """ 
    }

    process signal_completion{

        echo true

        input:
        val typing_cur

        script:
        """
        echo "all done!"
        """
    }

Basically, the process "iterate_list" takes one "item" from a queue in the message broker. Process iterate_list should look something like:

    process iterate_list{
        echo true

        input:
        list_of_items_to_process

        output:
        val 1 into typing_cur

        script:
        """
        python3.7 process_list_items.py ${my_queue} \'${list_of_items_to_process}\'
        list_of_items_to_process.remove(<output from python script>)
        """
    }

And so for each one, it shd run, remove the item it jus processed, and restart with a new list.

    initial_list = [1,2,3,4]
    after_first_process_completes = [2,3,4]
    and_eventually = [] <- This is when it should move on to the next process.

Excuse the indents, SO wasn't letting me post the code without indents.

bentsherman commented 1 year ago

Closing since this is a general Nextflow question rather than a new pattern suggestion. Feel free to ask your question on the Nextflow Discussions page.