common-workflow-language / cwltool

Common Workflow Language reference implementation
https://cwltool.readthedocs.io/
Apache License 2.0
335 stars 230 forks source link

cwltool running from Celery #1080

Closed r78v10a07 closed 5 years ago

r78v10a07 commented 5 years ago

Hi,

Expected Behavior

Use cwltool in a Celery task

Actual Behavior

Exception while running job:

AttributeError: 'LoggingProxy' object has no attribute 'fileno'

Workflow Code

@shared_task
def dngsstats_analysis_cwl(data):
    runtimeContext = RuntimeContext()
    runtimeContext.use_container = settings.CWLTOOL_USE_DOCKER
    runtimeContext.tmp_outdir_prefix = settings.MEDIA_ROOT
    runtimeContext.tmpdir_prefix = settings.MEDIA_ROOT

    fac = cwltool.factory.Factory(runtime_context=runtimeContext)
    corr = fac.make(settings.CWLTOOL_BASE_DIR + '/tools/R/correlation_json.cwl')
        matrix = {
            "class": "File",
            "location": data['matrixfile']
        }
        factor = {
            "class": "File",
            "location": data['samplefile']
        }

        results = corr(
            matrix=matrix,
            factor=factor,
            gene_id_column='Gene_Chr_Start',
            factor_sample_column='sample',
            min_count=10,
            min_number_samples=10,
            json='correlation.json')

Full Traceback

[2019-03-08 16:55:11,264: INFO/MainProcess] Received task: project.dngsstats.tasks.dngsstats_analysis_cwl[66146f54-d54e-41b3-85fd-4afbc773ec13]
Resolved '/Users/veraalva/Work/Developer/Python/cwl-workflow/tools/R/correlation_json.cwl' to 'file:///Users/veraalva/Work/Developer/Python/cwl-workflow/tools/R/correlation_json.cwl'
[2019-03-08 16:55:11,283: INFO/ForkPoolWorker-2] Resolved '/Users/veraalva/Work/Developer/Python/cwl-workflow/tools/R/correlation_json.cwl' to 'file:///Users/veraalva/Work/Developer/Python/cwl-workflow/tools/R/correlation_json.cwl'
[2019-03-08 16:55:12,731: WARNING/ForkPoolWorker-2] http://schema.org/docs/!DOCTYPE html does not look like a valid URI, trying to serialize this will break.
[2019-03-08 16:55:12,732: WARNING/ForkPoolWorker-2] http://schema.org/docs/html lang="en" does not look like a valid URI, trying to serialize this will break.
[2019-03-08 16:55:13,158: INFO/ForkPoolWorker-2] Current options:
        preserve space                         : True
        output processor graph                 : True
        output default graph                   : True
        host language                          : RDFa Core
        accept embedded RDF                    : False
        check rdfa lite                        : False
        cache vocabulary graphs                : False

[job correlation_json.cwl] /private/var/folders/q7/cf50cxlx56z0gz82063p9zb8000chm/T/kv_55fr5$ Rscript \
    correlation.R \
    /private/var/folders/q7/cf50cxlx56z0gz82063p9zb8000chm/T/tmpbajm0gcy/stg455bd409-3778-400b-aff3-991dff732091/L70HRH9I5V32UPR_matrix.tsv \
    /private/var/folders/q7/cf50cxlx56z0gz82063p9zb8000chm/T/tmpbajm0gcy/stgd9848a6b-2ebd-419a-bd46-0d46a684c367/L70HRH9I5V32UPR_samples.tsv \
    Gene_Chr_Start \
    sample \
    10 \
    10 \
    correlation.json
[2019-03-08 16:55:16,210: INFO/ForkPoolWorker-2] [job correlation_json.cwl] /private/var/folders/q7/cf50cxlx56z0gz82063p9zb8000chm/T/kv_55fr5$ Rscript \
    correlation.R \
    /private/var/folders/q7/cf50cxlx56z0gz82063p9zb8000chm/T/tmpbajm0gcy/stg455bd409-3778-400b-aff3-991dff732091/L70HRH9I5V32UPR_matrix.tsv \
    /private/var/folders/q7/cf50cxlx56z0gz82063p9zb8000chm/T/tmpbajm0gcy/stgd9848a6b-2ebd-419a-bd46-0d46a684c367/L70HRH9I5V32UPR_samples.tsv \
    Gene_Chr_Start \
    sample \
    10 \
    10 \
    correlation.json
Exception while running job
Traceback (most recent call last):
  File "/Users/veraalva/Work/Developer/Python/Django/ngsproject/venv/lib/python3.6/site-packages/cwltool/job.py", line 308, in _execute
    monitor_function=monitor_function
  File "/Users/veraalva/Work/Developer/Python/Django/ngsproject/venv/lib/python3.6/site-packages/cwltool/job.py", line 748, in _job_popen
    cwd=cwd)
  File "/Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/subprocess.py", line 667, in __init__
    errread, errwrite) = self._get_handles(stdin, stdout, stderr)
  File "/Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/subprocess.py", line 1184, in _get_handles
    c2pwrite = stdout.fileno()
AttributeError: 'LoggingProxy' object has no attribute 'fileno'

Your Environment

cwltool 1.0.20181217162649 celery 4.2.1 (windowlicker)

mr-c commented 5 years ago

Hello @r78v10a07, thank you for your issue.

LoggingProxy isn't part of the cwltool codebase. Maybe celery or you need to configure the logging differently? Take a look at cwltool/loghandler.py

r78v10a07 commented 5 years ago

Hi,I managed to get the cwltool running from Celery using this method in my tasks

@signals.setup_logging.connect
def setup_celery_logging(**kwargs):
    pass

Although, I don't see any error and the cwltool output dict is printed out in the stdout the files are not in the folder they should be.

Any idea?

mr-c commented 5 years ago

@r78v10a07 Glad to hear about the progress!

the files are not in the folder they should be.

Which folder do you want them in? Did you set runtimeContext.outdir to match?

r78v10a07 commented 5 years ago

Hi, runtimeContext.outdir fixed my problem. Everything is working fine now. Thanks a lot. Best, Roberto

dshepelev15 commented 5 years ago

Hi @r78v10a07. Can you explain how did you setup up runtimeContext.outdir variable? Because I tried to change it, but it does not work

Error - AttributeError: 'LoggingProxy' object has no attribute 'fileno'

r78v10a07 commented 5 years ago

Hi,

This is my config:

import os
from cwltool.context import RuntimeContext

def ngsworkflow_runtimeContext():
     runtimeContext = RuntimeContext()
     cwltool_base_dir = os.getenv('CWLTOOL_BASE_DIR',  '/Users/veraalva/Work/Developer/Python/cwl-workflow/')
     runtimeContext.use_container = os.getenv('CWLTOOL_USE_DOCKER', False)
     runtimeContext.tmp_outdir_prefix = os.getenv('TMP_OUTDIR_PREFIX', '/tmp/')
     runtimeContext.tmpdir_prefix = os.getenv('TMPDIR_PREFIX', '/tmp/')
     runtimeContext.outdir = os.getenv('OUTDIR', '/Users/veraalva/')

     return runtimeContext

This is the workflow function:

import os
import json
import pandas
import cwltool
import cwltool.factory

from ngsworkflow.ngsworkflow_runtimeContext import ngsworkflow_runtimeContext

def r_pca_corr(data, cwltool_base_dir):
    runtimeContext = ngsworkflow_runtimeContext()

    fac = cwltool.factory.Factory(runtime_context=runtimeContext)

    workflow = fac.make(cwltool_base_dir + '/tools/R/r_pca_corr.cwl')

    results = workflow(
        matrix={
            "class": "File",
            "location": data['matrix']
        },
        factor={
            "class": "File",
            "location": data['sample']
        },
        gene_column=data['colgene'],
        sample_column=data['colsample'],
        min_reads=data['min'],
        condition1=data['cond1'],
        condition2=data['cond2'],
        out=data['out'])
    return result

Finally the Celery task:

from __future__ import absolute_import, unicode_literals

from celery import shared_task
from celery import signals

from ngsworkflow.r_pca_corr import r_pca_corr

@signals.setup_logging.connect
def setup_celery_logging(**kwargs):
    pass

@shared_task
def dngsstats_analysis_cwl(data, cwltool_base_dir):
    result = r_pca_corr(data, cwltool_base_dir)
    return result

The method you're missing is, see the previous code snippet:

@signals.setup_logging.connect
def setup_celery_logging(**kwargs):
    pass