cytomining / CytoTable

Transform CellProfiler and DeepProfiler data for processing image-based profiling readouts with Pycytominer and other Cytomining tools.
https://cytomining.github.io/CytoTable/
BSD 3-Clause "New" or "Revised" License
6 stars 5 forks source link

parsl.dataflow.errors.JoinError: Join failure for task 1 with failed join dependencies #111

Open sbamin opened 11 months ago

sbamin commented 11 months ago

I am trying to collate csv files from a phenotypic screen into a single parquet or sqlite file using cytotable or cytominer (based on your suggestion). Some details on our study:

I have cellprofiler output in csv format with each folder containing well-level run of cellprofiler across multiple 384-well plates. Each well-level folder contains csv data for segmented nuclei, cells, and cytoplasm using brightfield, NucBlue, and NucGreen with respective file names ending bf.csv, live.csv, and (dead|dying).csv.

image

I have collated well-level summary data from Image.csv. While that works to infer aggregate effects per knockout gene, I like to leverage cell or segment-level data from respective NucBlue (live) and NucGreen (dying or dead) stains, especially for nuclei and less so for cells and cytoplasm (named cyto).

While running cellprofiler, I made a mistake for one of output csv and named it nuclei_dead.csv instead of nuclei_dying.csv. I believe the way collate feature works for cytominer and cytotable is to look for matching string in file name (? and column names for those files), and then start merging data at well and then plate level (depending on folder structure).

For now, I get the following error regardless of if I used original data (with inconsistent filenames) or after renaming _hcssyn_seg_nucleidying.csv to _hcssyn_seg_nucleidead.csv (but not replacing dying with dead within column names!). I am not sure if error is related to not replacing dying to dead within column names or something else but would be much help if there is a way to overcome this issue.

I have shared toyset data via email.

PS: For shared toyset, source path can be one of toyset/orig or toyset/renamed directory

gist: apptainer definition file to build cytotable_0.0.1p2.sif, using commit: e74a6785ece152008d91e5f4bd59b8f5fc8b9314

foo@r209u11n01.mccleary :~/palmer_scratch/syn/toyset$ apptainer run /gpfs/gibbs/pi/lab/foo/hpcenv/opt/apptainer/sifbin/cytotable_0.0.1p2.sif
Python 3.10.11 (main, May 23 2023, 13:58:30) [GCC 10.2.1 20210110] on linux
Type "help", "copyright", "credits" or "license" for more information.
>>> from cytotable import convert
>>> convert(
...     source_path="/home/foo/palmer_scratch/syn/toyset/data",
...     source_datatype="csv",
...     dest_path="take1.parquet",
...     dest_datatype="parquet",
...     preset="cellprofiler_csv",
... )
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/usr/local/lib/python3.10/site-packages/cytotable/convert.py", line 1434, in convert
    ).result()
  File "/usr/local/lib/python3.10/concurrent/futures/_base.py", line 458, in result
    return self.__get_result()
  File "/usr/local/lib/python3.10/concurrent/futures/_base.py", line 403, in __get_result
    raise self._exception
  File "/usr/local/lib/python3.10/site-packages/parsl/dataflow/dflow.py", line 301, in handle_exec_update
    res = self._unwrap_remote_exception_wrapper(future)
  File "/usr/local/lib/python3.10/site-packages/parsl/dataflow/dflow.py", line 573, in _unwrap_remote_exception_wrapper
    result.reraise()
  File "/usr/local/lib/python3.10/site-packages/parsl/app/errors.py", line 122, in reraise
    reraise(t, v, v.__traceback__)
  File "/usr/local/lib/python3.10/site-packages/six.py", line 719, in reraise
    raise value
  File "/usr/local/lib/python3.10/site-packages/parsl/app/errors.py", line 145, in wrapper
    return func(*args, **kwargs)
  File "/usr/local/lib/python3.10/site-packages/cytotable/convert.py", line 1094, in _to_parquet
    ).result()
  File "/usr/local/lib/python3.10/concurrent/futures/_base.py", line 458, in result
    return self.__get_result()
  File "/usr/local/lib/python3.10/concurrent/futures/_base.py", line 403, in __get_result
    raise self._exception
parsl.dataflow.errors.JoinError: Join failure for task 1 with failed join dependencies from tasks [5]
>>> 

I am able to successfully run tutorial code with above apptainer image.

from cytotable import convert

# using a local path with cellprofiler csv presets
convert(
    source_path="./tests/data/cellprofiler/ExampleHuman",
    source_datatype="csv",
    dest_path="ExampleHuman.parquet",
    dest_datatype="parquet",
    preset="cellprofiler_csv",
)
d33bs commented 11 months ago

Hi @sbamin, thank you so much for opening this issue and for sharing along the details of your efforts here! I took some time to look into this and wanted to share my findings so far. I'm currently running into challenges which are specific to my limited local hardware resources and chunk sizes which may not apply to your execution environment (you may see different results based on what I share).

I created a gist for some troubleshooting I performed which may be found here: https://gist.github.com/d33bs/5dd2e5384cee6ea5907e4cd94ae51ff1 . I focused only on the "orig" dataset, presuming this might be preferred for you (not needing additional steps to preprocess the data before it reaches CytoTable). Please note: I used Docker for container-based troubleshooting as I'm more familiar with it than Apptainer and to balance providing feedback here sooner.

Answering a question you raised which may provide more context on the above:

I believe the way collate feature works for cytominer and cytotable is to look for matching string in file name...

  • CytoTable gathers and groups input data by filename for CSV's and filename + tablename for SQLite.
  • Each filename is filtered by the compartments and metadata parameters which are lists of strings provided to CytoTable in order to understand what will be processed for inclusion (cytotable.convert(compartments : List[str] = [...], metadata : List[str] = [...])). Filenames matching those inputs are grouped together for later data concatenation and/or join work.
  • Compartments and image metadata have relationships which are important to the resulting datasets created. See here for more description.
  • Currently, including "compartment-type" files for processing with CytoTable (i.e. cytoplasm_bf, cytoplasm_dying, cytoplasm_live, etc.) will I believe require they be treated like "compartment groups" or distinct sets of data. I tried but was unable to perform a SQL UNION on like-files of the same compartment type due to I believe the column-naming scheme found in each.

See below for a snippet of what might work for you in this circumstance. Please note: this is based on many assumptions, and the result may not be what you desire. Don't hesitate to let me know if you have questions or would like to see something different happen within CytoTable.

From cytotable_test.py:

"""
Perform a test using cytotable as part of:
https://github.com/cytomining/CytoTable/issues/111
"""

import cytotable

cytotable.convert(
    source_path="toyset/data/orig",
    source_datatype="csv",
    dest_path="take1.parquet",
    dest_datatype="parquet",
    preset="cellprofiler_csv",
    # note, here we override some of the above preset options
    metadata=["hcssyn_seg_image"],
    compartments=[
        "hcssyn_seg_cyto_bf",
        "hcssyn_seg_cyto_dying",
        "hcssyn_seg_cyto_live",
        "hcssyn_seg_cells_bf",
        "hcssyn_seg_cells_dying",
        "hcssyn_seg_cells_live",
        "hcssyn_seg_nuclei_bf",
        "hcssyn_seg_nuclei_dead",
        "hcssyn_seg_nuclei_live",
    ],
    chunk_columns=["Metadata_Plate"],
    joins="""
        WITH Image_filtered AS (
            SELECT
                Metadata_Hcssyn_seg_image_ImageNumber,
                Metadata_Well,
                Metadata_Plate
            FROM
                read_parquet('hcssyn_seg_image.parquet')
        )
        SELECT
            *
        FROM
            Image_filtered AS Hcssyn_seg_image
        /* compartment type: bf */
        LEFT JOIN read_parquet('hcssyn_seg_cyto_bf.parquet') as Hcssyn_seg_cyto_bf ON
            Hcssyn_seg_cyto_bf.Metadata_Hcssyn_seg_cyto_bf_ImageNumber = Hcssyn_seg_image.Metadata_Hcssyn_seg_image_ImageNumber
        LEFT JOIN read_parquet('hcssyn_seg_cells_bf.parquet') as Hcssyn_seg_cells_bf ON
            Hcssyn_seg_cells_bf.Metadata_Hcssyn_seg_cells_bf_ImageNumber = Hcssyn_seg_cyto_bf.Metadata_Hcssyn_seg_cyto_bf_ImageNumber
            AND Hcssyn_seg_cells_bf.Hcssyn_seg_cells_bf_Number_Object_Number = Hcssyn_seg_cyto_bf.Hcssyn_seg_cyto_bf_Parent_cells_bf
        LEFT JOIN read_parquet('hcssyn_seg_nuclei_bf.parquet') as Hcssyn_seg_nuclei_bf ON
            Hcssyn_seg_nuclei_bf.Metadata_Hcssyn_seg_nuclei_bf_ImageNumber = Hcssyn_seg_cyto_bf.Metadata_Hcssyn_seg_cyto_bf_ImageNumber
            AND Hcssyn_seg_nuclei_bf.Hcssyn_seg_nuclei_bf_Number_Object_Number = Hcssyn_seg_cyto_bf.Hcssyn_seg_cyto_bf_Parent_nuclei_bf

        /* compartment type: dying or dead */
        LEFT JOIN read_parquet('hcssyn_seg_cyto_dying.parquet') as Hcssyn_seg_cyto_dying ON
            Hcssyn_seg_cyto_dying.Metadata_Hcssyn_seg_cyto_dying_ImageNumber = Hcssyn_seg_image.Metadata_Hcssyn_seg_image_ImageNumber
        LEFT JOIN read_parquet('hcssyn_seg_cells_dying.parquet') as Hcssyn_seg_cells_dying ON
            Hcssyn_seg_cells_dying.Metadata_Hcssyn_seg_cells_dying_ImageNumber = Hcssyn_seg_cyto_dying.Metadata_Hcssyn_seg_cyto_dying_ImageNumber
            AND Hcssyn_seg_cells_dying.Hcssyn_seg_cells_dying_Number_Object_Number = Hcssyn_seg_cyto_dying.Hcssyn_seg_cyto_dying_Parent_cells_dying
        LEFT JOIN read_parquet('hcssyn_seg_nuclei_dead.parquet') as Hcssyn_seg_nuclei_dead ON
            Hcssyn_seg_nuclei_dead.Metadata_Hcssyn_seg_nuclei_dead_ImageNumber = Hcssyn_seg_cyto_dying.Metadata_Hcssyn_seg_cyto_dying_ImageNumber
            AND Hcssyn_seg_nuclei_dead.Hcssyn_seg_nuclei_dead_Number_Object_Number = Hcssyn_seg_cyto_dying.Hcssyn_seg_cyto_dying_Parent_nuclei_dead

        /* compartment type: live */
        LEFT JOIN read_parquet('hcssyn_seg_cyto_live.parquet') as Hcssyn_seg_cyto_live ON
            Hcssyn_seg_cyto_live.Metadata_Hcssyn_seg_cyto_live_ImageNumber = Hcssyn_seg_image.Metadata_Hcssyn_seg_image_ImageNumber
        LEFT JOIN read_parquet('hcssyn_seg_cells_live.parquet') as Hcssyn_seg_cells_live ON
            Hcssyn_seg_cells_live.Metadata_Hcssyn_seg_cells_live_ImageNumber = Hcssyn_seg_cyto_live.Metadata_Hcssyn_seg_cyto_live_ImageNumber
            AND Hcssyn_seg_cells_live.Hcssyn_seg_cells_live_Number_Object_Number = Hcssyn_seg_cyto_live.Hcssyn_seg_cyto_live_Parent_cells_live
        LEFT JOIN read_parquet('hcssyn_seg_nuclei_live.parquet') as Hcssyn_seg_nuclei_live ON
            Hcssyn_seg_nuclei_live.Metadata_Hcssyn_seg_nuclei_live_ImageNumber = Hcssyn_seg_cyto_live.Metadata_Hcssyn_seg_cyto_live_ImageNumber
            AND Hcssyn_seg_nuclei_live.Hcssyn_seg_nuclei_live_Number_Object_Number = Hcssyn_seg_cyto_live.Hcssyn_seg_cyto_live_Parent_nuclei_live
    """,
)

In case it's helpful, I recommend also looking at the results of using CytoTable with convert(..., join=False, ...). This will provide a multi-file result that may provide a different perspective on analysis or implementation possibilities.

sbamin commented 11 months ago

@d33bs thanks a lot for such detailed look into the issue. I am able to run the above code and now able to understand join argument.

With your code above, I am able to see non-empty pqrquet file per compartment but also get this error which is likely due to slurm config on our end.

parsl.executors.high_throughput.errors.WorkerLost: Task failure due to loss of worker 23 on host 

I will peek into these parquet files and check for consistency and will update here accordingly.

Screenshot 2023-09-25 at 8 01 27 AM

d33bs commented 11 months ago

Thank you @sbamin ! Looking into this further I believe the error is occurring with the joinable chunk operations. Once CytoTable reaches this stage it is attempting to export joined data from the compartment data (the individual parquet files you show in your screenshot) in joined chunks. This has historically occurred through the chunk_columns parameter and by using ImageNumber's (which are numeric). This usually relies on the column having the same name across all data and being numeric. In the dataset you're working with I noticed there aren't many columns which share a name and area also numeric, making CytoTable fall short here.

I'm going to work on finding a way which simplifies this join process to avoid depending on a specific set of columns. Thank you for your patience in the meantime.

As a workaround, the following would be a "brute force" approach requiring potentially large amounts of memory. You could use the data output from running the existing cytotable.convert() call, adding an additional parameter of join=False (this should run without error). Afterwards, to attempt to join all the data at once (note, replacing "take1" with appropriate labeling):

from pyarrow import parquet
import duckdb

sql_stmt = """
        WITH Image_filtered AS (
            SELECT
                Metadata_Hcssyn_seg_image_ImageNumber,
                Metadata_Well,
                Metadata_Plate
            FROM
                read_parquet('take1.parquet/hcssyn_seg_image/hcssyn_seg_image.parquet')
        )
        SELECT
            *
        FROM
            Image_filtered AS image

        /* compartment type: bf */
        LEFT JOIN read_parquet('take1.parquet/hcssyn_seg_cyto_bf/hcssyn_seg_cyto_bf.parquet') as Hcssyn_seg_cytoplasm_bf ON
            Hcssyn_seg_cytoplasm_bf.Metadata_Hcssyn_seg_cyto_bf_ImageNumber = image.Metadata_Hcssyn_seg_image_ImageNumber
        LEFT JOIN read_parquet('take1.parquet/hcssyn_seg_cells_bf/hcssyn_seg_cells_bf.parquet') as Hcssyn_seg_cells_bf ON
            Hcssyn_seg_cells_bf.Metadata_Hcssyn_seg_cells_bf_ImageNumber = Hcssyn_seg_cytoplasm_bf.Metadata_Hcssyn_seg_cyto_bf_ImageNumber
            AND Hcssyn_seg_cells_bf.Hcssyn_seg_cells_bf_Number_Object_Number = Hcssyn_seg_cytoplasm_bf.Hcssyn_seg_cyto_bf_Parent_cells_bf
        LEFT JOIN read_parquet('take1.parquet/hcssyn_seg_nuclei_bf/hcssyn_seg_nuclei_bf.parquet') as Hcssyn_seg_nuclei_bf ON
            Hcssyn_seg_nuclei_bf.Metadata_Hcssyn_seg_nuclei_bf_ImageNumber = Hcssyn_seg_cytoplasm_bf.Metadata_Hcssyn_seg_cyto_bf_ImageNumber
            AND Hcssyn_seg_nuclei_bf.Hcssyn_seg_nuclei_bf_Number_Object_Number = Hcssyn_seg_cytoplasm_bf.Hcssyn_seg_cyto_bf_Parent_nuclei_bf

        /* compartment type: dying or dead */
        LEFT JOIN read_parquet('take1.parquet/hcssyn_seg_cyto_dying/hcssyn_seg_cyto_dying.parquet') as Hcssyn_seg_cytoplasm_dying ON
            Hcssyn_seg_cytoplasm_dying.Metadata_Hcssyn_seg_cyto_dying_ImageNumber = image.Metadata_Hcssyn_seg_image_ImageNumber
        LEFT JOIN read_parquet('take1.parquet/hcssyn_seg_cells_dying/hcssyn_seg_cells_dying.parquet') as Hcssyn_seg_cells_dying ON
            Hcssyn_seg_cells_dying.Metadata_Hcssyn_seg_cells_dying_ImageNumber = Hcssyn_seg_cytoplasm_dying.Metadata_Hcssyn_seg_cyto_dying_ImageNumber
            AND Hcssyn_seg_cells_dying.Hcssyn_seg_cells_dying_Number_Object_Number = Hcssyn_seg_cytoplasm_dying.Hcssyn_seg_cyto_dying_Parent_cells_dying
        LEFT JOIN read_parquet('take1.parquet/hcssyn_seg_nuclei_dead/hcssyn_seg_nuclei_dead.parquet') as Hcssyn_seg_nuclei_dead ON
            Hcssyn_seg_nuclei_dead.Metadata_Hcssyn_seg_nuclei_dead_ImageNumber = Hcssyn_seg_cytoplasm_dying.Metadata_Hcssyn_seg_cyto_dying_ImageNumber
            AND Hcssyn_seg_nuclei_dead.Hcssyn_seg_nuclei_dead_Number_Object_Number = Hcssyn_seg_cytoplasm_dying.Hcssyn_seg_cyto_dying_Parent_nuclei_dead

        /* compartment type: live */
        LEFT JOIN read_parquet('take1.parquet/hcssyn_seg_cyto_live/hcssyn_seg_cyto_live.parquet') as Hcssyn_seg_cytoplasm_live ON
            Hcssyn_seg_cytoplasm_live.Metadata_Hcssyn_seg_cyto_live_ImageNumber = image.Metadata_Hcssyn_seg_image_ImageNumber
        LEFT JOIN read_parquet('take1.parquet/hcssyn_seg_cells_live/hcssyn_seg_cells_live.parquet') as Hcssyn_seg_cells_live ON
            Hcssyn_seg_cells_live.Metadata_Hcssyn_seg_cells_live_ImageNumber = Hcssyn_seg_cytoplasm_live.Metadata_Hcssyn_seg_cyto_live_ImageNumber
            AND Hcssyn_seg_cells_live.Hcssyn_seg_cells_live_Number_Object_Number = Hcssyn_seg_cytoplasm_live.Hcssyn_seg_cyto_live_Parent_cells_live
        LEFT JOIN read_parquet('take1.parquet/hcssyn_seg_nuclei_live/hcssyn_seg_nuclei_live.parquet') as Hcssyn_seg_nuclei_live ON
            Hcssyn_seg_nuclei_live.Metadata_Hcssyn_seg_nuclei_live_ImageNumber = Hcssyn_seg_cytoplasm_live.Metadata_Hcssyn_seg_cyto_live_ImageNumber
            AND Hcssyn_seg_nuclei_live.Hcssyn_seg_nuclei_live_Number_Object_Number = Hcssyn_seg_cytoplasm_live.Hcssyn_seg_cyto_live_Parent_nuclei_live
    """

# read joined data as an arrow table, then write it to a file
parquet.write_table(
    table=duckdb.connect().execute(sql_stmt).arrow(), where="take1.joined.parquet"
)
sbamin commented 10 months ago

Hi @d33bs, I tried above approach and one or more spawned processes got killed. I requested 32 gb memory and 4 cpus for an interactive job. While processes got killed, upstream interactive job was still running: So, this is more to do with spawned process somehow being killed in between.

I am new to parquet and duckdb but getting used it for now. Over next few days, I will be working on merging these compartment-specific csvs into a parquet file. I will post here update once I find some meaningful solution here.

Screenshot 2023-09-28 at 10 50 00 AM

d33bs commented 10 months ago

Sorry to hear and thank you for the follow up here @sbamin ! I might be misunderstanding the goals of your work and the possibilities with this dataset.

One thing I feel could help this issue along would be what CellProfiler configuration was used. Do you have a project or cppipe file which was used to generate the csv data? Also, separately, where there specific channels which were used for the work corresponding to live, dead, and bf?

sbamin commented 10 months ago

Absolutely nothing to be sorry about. I believe parquet failure is more to do with number of columns or amount of data we need to gather across 72 384-well plates. Currently, working on field-level aggregate file, Image.csv (nine fields per well) as that is more tractable than handling each of compartment_channel.csv at single-cell level.

I shared cpipe over email. Yes, three separate channels for live (NucBlue), dead or dying (NucGreen), and brightfield. I named primary objects stained with nucgreen as dead but used dying for secondary (plasma membrane) and tertiary objects (cytoplasm).