nextflow-io / nextflow

A DSL for data-driven computational pipelines
http://nextflow.io
Apache License 2.0
2.75k stars 628 forks source link

file(x).getName() mapped on multiple channels has unexpected result #4226

Closed feiloo closed 1 year ago

feiloo commented 1 year ago

Bug report

Expected behavior and actual behavior

I expect that file(read).getName() returns the value of read, but it sometimes returns the next value in the channel.

It returns for example "s1r2" instead of "s1r1" in the workflow below, in about 1 of 4 runs.

Steps to reproduce the problem

commands:

mkdir temp
rm -rf temp/* .nextflow*
nextflow clean -f
export NXF_DEBUG=3
nextflow run -w temp bug.nf

contents of bug.nf:

def fun1 = { sid, read ->
  readfile = file(read)
  assert read == readfile.getName()
  return read
}

workflow {
  samples = Channel.of("sampleid,read1,read2\ns1,s1r1,s1r2\ns2,s2r1,s2r2").splitCsv(header:true)
  samples.multiMap{ row ->
        sampleid: "${row.sampleid}"
        reads1: ["${row.sampleid}", "${row.read1}"]
        reads2: ["${row.sampleid}", "${row.read2}"]
        }.set{samplechannels}

  r1 = samplechannels.reads1.map(fun1)
  r2 = samplechannels.reads2.map(fun1)
  }

Program output

command output (spacing edited, still incorrect in github somehow):

ERROR ~ assert read == readfile.getName()
               |           |        |          
               s1r1     |        's1r2'
                          /data/ngs_pipeline/workflow/nextflow/workflows/clc_and_variantinterpretation/s1r2

 -- Check script 'bug2.nf' at line: 3 or see '.nextflow.log' file for more details

nextflow.log:

Aug-25 14:34:36.045 [main] DEBUG nextflow.cli.Launcher - $> nextflow run -w temp bug2.nf
Aug-25 14:34:36.138 [main] INFO  nextflow.cli.CmdRun - N E X T F L O W  ~  version 23.04.0
Aug-25 14:34:36.180 [main] DEBUG nextflow.plugin.PluginsFacade - Setting up plugin manager > mode=prod; embedded=false; plugins-dir=/home/fhoelsch/.nextflow/plugins; core-plugins: nf-amazon@1.16.1,nf-azure@1.0.0,nf-codecommit@0.1.3,nf-console@1.0.5,nf-ga4gh@1.0.4,nf-google@1.7.2,nf-tower@1.5.11,nf-wave@0.8.1
Aug-25 14:34:36.189 [main] INFO  org.pf4j.DefaultPluginStatusProvider - Enabled plugins: []
Aug-25 14:34:36.190 [main] INFO  org.pf4j.DefaultPluginStatusProvider - Disabled plugins: []
Aug-25 14:34:36.194 [main] INFO  org.pf4j.DefaultPluginManager - PF4J version 3.4.1 in 'deployment' mode
Aug-25 14:34:36.204 [main] INFO  org.pf4j.AbstractPluginManager - No plugins
Aug-25 14:34:36.221 [main] DEBUG nextflow.config.ConfigBuilder - Found config local: /data/ngs_pipeline/workflow/nextflow/workflows/clc_and_variantinterpretation/nextflow.config
Aug-25 14:34:36.221 [main] DEBUG nextflow.config.ConfigBuilder - Parsing config file: /data/ngs_pipeline/workflow/nextflow/workflows/clc_and_variantinterpretation/nextflow.config
Aug-25 14:34:36.241 [main] DEBUG nextflow.config.ConfigBuilder - Applying config profile: `standard`
Aug-25 14:34:36.781 [main] DEBUG nextflow.cli.CmdRun - Applied DSL=2 by global default
Aug-25 14:34:36.816 [main] INFO  nextflow.cli.CmdRun - Launching `bug2.nf` [ecstatic_curie] DSL2 - revision: 3dcc747b0a
Aug-25 14:34:36.817 [main] DEBUG nextflow.plugin.PluginsFacade - Plugins default=[]
Aug-25 14:34:36.818 [main] DEBUG nextflow.plugin.PluginsFacade - Plugins resolved requirement=[]
Aug-25 14:34:36.827 [main] DEBUG nextflow.secret.LocalSecretsProvider - Secrets store: /home/fhoelsch/.nextflow/secrets/store.json
Aug-25 14:34:36.830 [main] DEBUG nextflow.secret.SecretsLoader - Discovered secrets providers: [nextflow.secret.LocalSecretsProvider@5e65afb6] - activable => nextflow.secret.LocalSecretsProvider@5e65afb6
Aug-25 14:34:36.933 [main] DEBUG nextflow.Session - Session UUID: 90c79312-f247-454c-893a-f14e51197f26
Aug-25 14:34:36.933 [main] DEBUG nextflow.Session - Run name: ecstatic_curie
Aug-25 14:34:36.934 [main] DEBUG nextflow.Session - Executor pool size: 12
Aug-25 14:34:36.946 [main] DEBUG nextflow.util.ThreadPoolBuilder - Creating thread pool 'FileTransfer' minSize=10; maxSize=36; workQueue=LinkedBlockingQueue[10000]; allowCoreThreadTimeout=false
Aug-25 14:34:36.968 [main] DEBUG nextflow.cli.CmdRun - 
  Version: 23.04.0 build 5857
  Created: 01-04-2023 21:09 UTC (23:09 CEST)
  System: Linux 5.14.21-150400.24.81-default
  Runtime: Groovy 3.0.16 on OpenJDK 64-Bit Server VM 17.0.8+0-suse-150400.3.27.1-x8664
  Encoding: UTF-8 (UTF-8)
  Process: 108300@ukb2580 [10.14.25.80]
  CPUs: 12 - Mem: 62.7 GB (558.8 MB) - Swap: 2 GB (1.6 GB)
Aug-25 14:34:36.998 [main] DEBUG nextflow.Session - Work-dir: /data/ngs_pipeline/workflow/nextflow/workflows/clc_and_variantinterpretation/temp [xfs]
Aug-25 14:34:36.999 [main] DEBUG nextflow.Session - Script base path does not exist or is not a directory: /data/ngs_pipeline/workflow/nextflow/workflows/clc_and_variantinterpretation/bin
Aug-25 14:34:37.010 [main] DEBUG nextflow.executor.ExecutorFactory - Extension executors providers=[]
Aug-25 14:34:37.021 [main] DEBUG nextflow.Session - Observer factory: DefaultObserverFactory
Aug-25 14:34:37.040 [main] DEBUG nextflow.cache.CacheFactory - Using Nextflow cache factory: nextflow.cache.DefaultCacheFactory
Aug-25 14:34:37.050 [main] DEBUG nextflow.util.CustomThreadPool - Creating default thread pool > poolSize: 13; maxThreads: 1000
Aug-25 14:34:37.123 [main] DEBUG nextflow.Session - Session start
Aug-25 14:34:37.457 [main] DEBUG nextflow.script.ScriptRunner - > Launching execution
Aug-25 14:34:37.565 [main] DEBUG nextflow.Session - Workflow process names [dsl2]: 
Aug-25 14:34:37.566 [main] DEBUG nextflow.Session - Igniting dataflow network (1)
Aug-25 14:34:37.566 [main] DEBUG nextflow.script.ScriptRunner - > Awaiting termination 
Aug-25 14:34:37.566 [main] DEBUG nextflow.Session - Session await
Aug-25 14:34:37.566 [main] DEBUG nextflow.Session - Session await > all processes finished
Aug-25 14:34:37.566 [main] DEBUG nextflow.Session - Session await > all barriers passed
Aug-25 14:34:37.607 [Actor Thread 7] ERROR nextflow.extension.OperatorImpl - @unknown
org.codehaus.groovy.runtime.powerassert.PowerAssertionError: assert read == readfile.getName()
       |       |        |
       s1r1    |        's1r2'
               /data/ngs_pipeline/workflow/nextflow/workflows/clc_and_variantinterpretation/s1r2
    at org.codehaus.groovy.runtime.InvokerHelper.assertFailed(InvokerHelper.java:432)
    at org.codehaus.groovy.runtime.ScriptBytecodeAdapter.assertFailed(ScriptBytecodeAdapter.java:670)
    at Script_89def771$_runScript_closure1.doCall(Script_89def771:3)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
    at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.base/java.lang.reflect.Method.invoke(Method.java:568)
    at org.codehaus.groovy.reflection.CachedMethod.invoke(CachedMethod.java:107)
    at org.codehaus.groovy.runtime.metaclass.TransformMetaMethod.invoke(TransformMetaMethod.java:55)
    at groovy.lang.MetaClassImpl$2.invoke(MetaClassImpl.java:1298)
    at org.codehaus.groovy.runtime.metaclass.TransformMetaMethod.doMethodInvoke(TransformMetaMethod.java:62)
    at org.codehaus.groovy.runtime.metaclass.ClosureMetaClass.invokeMethod(ClosureMetaClass.java:274)
    at groovy.lang.MetaClassImpl.invokeMethod(MetaClassImpl.java:1035)
    at org.codehaus.groovy.runtime.callsite.PogoMetaClassSite.call(PogoMetaClassSite.java:38)
    at org.codehaus.groovy.runtime.callsite.AbstractCallSite.call(AbstractCallSite.java:139)
    at nextflow.extension.MapOp$_apply_closure1.doCall(MapOp.groovy:56)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
    at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.base/java.lang.reflect.Method.invoke(Method.java:568)
    at org.codehaus.groovy.reflection.CachedMethod.invoke(CachedMethod.java:107)
    at groovy.lang.MetaMethod.doMethodInvoke(MetaMethod.java:323)
    at org.codehaus.groovy.runtime.metaclass.ClosureMetaClass.invokeMethod(ClosureMetaClass.java:274)
    at groovy.lang.MetaClassImpl.invokeMethod(MetaClassImpl.java:1035)
    at groovy.lang.Closure.call(Closure.java:412)
    at groovyx.gpars.dataflow.operator.DataflowOperatorActor.startTask(DataflowOperatorActor.java:120)
    at groovyx.gpars.dataflow.operator.DataflowOperatorActor.onMessage(DataflowOperatorActor.java:108)
    at groovyx.gpars.actor.impl.SDAClosure$1.call(SDAClosure.java:43)
    at groovyx.gpars.actor.AbstractLoopingActor.runEnhancedWithoutRepliesOnMessages(AbstractLoopingActor.java:293)
    at groovyx.gpars.actor.AbstractLoopingActor.access$400(AbstractLoopingActor.java:30)
    at groovyx.gpars.actor.AbstractLoopingActor$1.handleMessage(AbstractLoopingActor.java:93)
    at groovyx.gpars.util.AsyncMessagingCore.run(AsyncMessagingCore.java:132)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
    at java.base/java.lang.Thread.run(Thread.java:833)
Aug-25 14:34:37.619 [Actor Thread 7] DEBUG nextflow.Session - Session aborted -- Cause: assert read == readfile.getName()
       |       |        |
       s1r1    |        's1r2'
               /data/ngs_pipeline/workflow/nextflow/workflows/clc_and_variantinterpretation/s1r2
Aug-25 14:34:37.661 [Actor Thread 7] DEBUG nextflow.Session - The following nodes are still active:
  [operator] map

Aug-25 14:34:37.670 [main] DEBUG nextflow.trace.WorkflowStatsObserver - Workflow completed > WorkflowStats[succeededCount=0; failedCount=0; ignoredCount=0; cachedCount=0; pendingCount=0; submittedCount=0; runningCount=0; retriesCount=0; abortedCount=0; succeedDuration=0ms; failedDuration=0ms; cachedDuration=0ms;loadCpus=0; loadMemory=0; peakRunning=0; peakCpus=0; peakMemory=0; ]
Aug-25 14:34:37.780 [main] DEBUG nextflow.cache.CacheDB - Closing CacheDB done
Aug-25 14:34:37.801 [main] DEBUG nextflow.script.ScriptRunner - > Execution complete -- Goodbye

Environment

Additional context

bentsherman commented 1 year ago

This is a hidden gotcha about Nextflow scripts. Since you declared the readfile variable without def, it was created as a global i.e. script variable, rather than a local variable to the closure. This causes a race condition where each invocation of the map operator sets this variable, then reads from it in the print statement.

Basically, temp variables in a closure like this should always be declared with def:

def readfile = file(read)

I will add a note to the documentation about it.

feiloo commented 1 year ago

Indeed, thank you 👍.