Closed ErinWeisbart closed 10 months ago
Adds to the 1./2.normalize function the option to output a csv for each guide in the experiment with all normalized single cell profiles in the experiment.
Implemented to enable https://github.com/broadinstitute/pooled-cell-painting-data-analysis/issues/81 but hopefully will be helpful for other circumstances too!
I'm obviously open to any feedback on the newly added function, but in particular any ideas that would make it more efficient would be great! I think increasing speed would be the most helpful improvement.
The normalizing step on single-cell-profiles normalized by plate takes ~20% memory available on an r4.16xlarge (488 GiB of memory). The single_cell_profiles_by_guide I added in looks to be using ~10%. So, even if we scale down the machine we could double the memory we use in this step at no added cost if creating the single_cell_profiles_by_guide csvs in a more memory-intensive manner speeds things up.
@gwaygenomics Mr. Python Expert, any chance you want to take a look at my new function?
I can't promise anything, but this part seems like dask might help -
df = read_csvs_with_chunksize(output_file)
for guide in set(df["Metadata_Foci_Barcode_MatchedTo_Barcode"]):
gene = df[df["Metadata_Foci_Barcode_MatchedTo_Barcode"] == guide][
"Metadata_Foci_Barcode_MatchedTo_GeneCode"
Seems like a very similar pattern to
import dask.dataframe as dd
df = dd.read_csv('2014-*.csv')
df.head()
x y
0 1 a
1 2 b
2 3 c
3 4 a
4 5 b
5 6 c
df2 = df[df.y == 'a'].x + 1
#As with all Dask collections, one triggers computation by calling the .compute() method:
df2.compute()
0 2
3 5
Name: x, dtype: int64
This is Dask's list of things which work fast:
Trivially parallelizable operations (fast):
Element-wise operations: df.x + df.y, df * df
Row-wise selections: df[df.x > 0]
Loc: df.loc[4.0:10.5]
Common aggregations: df.x.max(), df.max()
Is in: df[df.x.isin([1, 2, 3])]
Date time/string accessors: df.timestamp.month
Cleverly parallelizable operations (fast):
groupby-aggregate (with common aggregations): df.groupby(df.x).y.max(), df.groupby('x').max()
groupby-apply on index: df.groupby(['idx', 'x']).apply(myfunc), where idx is the index level name
value_counts: df.x.value_counts()
Drop duplicates: df.x.drop_duplicates()
Join on index: dd.merge(df1, df2, left_index=True, right_index=True) or dd.merge(df1, df2, on=['idx', 'x']) where idx is the index name for both df1 and df2
Join with Pandas DataFrames: dd.merge(df1, df2, on='id')
Element-wise operations with different partitions / divisions: df1.x + df2.y
Date time resampling: df.resample(...)
Rolling averages: df.rolling(...)
Pearson’s correlation: df[['col1', 'col2']].corr()
(Proposal is this:
df = read_csvs_with_chunksize(output_file)
for guide in set(df["Metadata_Foci_Barcode_MatchedTo_Barcode"]):
gene = df[df["Metadata_Foci_Barcode_MatchedTo_Barcode"] == guide][
"Metadata_Foci_Barcode_MatchedTo_GeneCode"
].tolist()[0]
guide_file_name = f"{str(output_file).split('__')[0].split('/')[-1]}__{guide}_{gene}.csv.gz"
guide_path = os.path.join(sc_by_guide_folder, guide_file_name)
if not os.path.exists(guide_path):
guide_df = pd.DataFrame()
else:
guide_df = read_csvs_with_chunksize(guide_path)
append_df = df.loc[
df["Metadata_Foci_Barcode_MatchedTo_Barcode"] == guide
]
Becomes
#earlier
import dask.dataframe as dd
df = dd.read_csv(output_file)
for guide in set(df["Metadata_Foci_Barcode_MatchedTo_Barcode"]):
append_df = df.loc[
df["Metadata_Foci_Barcode_MatchedTo_Barcode"] == guide
]
append_df.compute()
gene = append_df[
"Metadata_Foci_Barcode_MatchedTo_GeneCode"
].unique()[0]
guide_file_name = f"{str(output_file).split('__')[0].split('/')[-1]}__{guide}_{gene}.csv.gz"
guide_path = os.path.join(sc_by_guide_folder, guide_file_name)
if not os.path.exists(guide_path):
guide_df = pd.DataFrame()
else:
guide_df = read_csvs_with_chunksize(guide_path)
Even if you don't switch to dask, right now you're filtering the large DF to the smaller one twice (once to get the gene name, once all the values); doing it just once (to get all the values, and then pull the gene name) can't hurt.
@ErinWeisbart Here's the thing I was talking about, not critical to do this now but it could potentially help us in the future.
The change you'd need to make is
In this notebook, I read a profile data frame and then save it out in a hive-partitioned layout to show what it looks like
library(tidyverse)
Read profiles
profiles_file <-
"https://raw.githubusercontent.com/broadinstitute/cell-health/master/1.generate-profiles/data/consensus/cell_painting_median.tsv.gz"
profiles <- readr::read_tsv(profiles_file)
## Rows: 357 Columns: 952
## ── Column specification ────────────────────────────────────────────────────────────────────────────────────────────────────────
## Delimiter: "\t"
## chr (3): Metadata_profile_id, Metadata_cell_line, Metadata_pert_name
## dbl (949): Cells_AreaShape_Center_Y, Cells_AreaShape_Compactness, Cells_AreaShape_Eccentricity, Cells_AreaShape_Extent, Cell...
##
## ℹ Use `spec()` to retrieve the full column specification for this data.
## ℹ Specify the column types or set `show_col_types = FALSE` to quiet this message.
profiles %>% select(1:4)
Group by Metadata_cell_line, Metadata_pert_name and write it as a partitioned dataset
profiles %>%
group_by(Metadata_cell_line, Metadata_pert_name) %>%
arrow::write_dataset("~/Downloads/cell_painting_median")
The dataset directory will look like this (called the hive-partitioned layout)
/Users/shsingh/Downloads/cell_painting_median
├── Metadata_cell_line=A549
│ ├── Metadata_pert_name=AKT1-1
│ │ └── part-0.parquet
│ ├── Metadata_pert_name=AKT1-2
│ │ └── part-1.parquet
│ ├── Metadata_pert_name=ARID1B-1
│ │ └── part-2.parquet
...
This directory structure was generated by write_dataset
but we could have just done it by hand.
The data in this directory can now be read in at any level, without needing to iterate across files.
This has other a bunch of other advantages, like being interoperable with BigQuery https://cloud.google.com/bigquery/docs/hive-partitioned-queries-gcs#supported_data_layouts
Here’s reading it at the per guide level (the most granular)
arrow::open_dataset("~/Downloads/cell_painting_median/Metadata_cell_line=A549/Metadata_pert_name=AKT1-1/part-0.parquet") %>%
collect() %>%
head() %>%
select(1:4)
(This below is the same as above)
arrow::open_dataset("~/Downloads/cell_painting_median/Metadata_cell_line=A549/Metadata_pert_name=AKT1-1/") %>%
collect() %>%
head() %>%
select(1:4)
Per cell line level
df <-
arrow::open_dataset("~/Downloads/cell_painting_median/Metadata_cell_line=A549/") %>%
collect()
df %>%
head() %>%
select(1:4)
Number of rows
df %>% count()
The whole dataset
df <-
arrow::open_dataset("~/Downloads/cell_painting_median/") %>%
collect()
df %>%
head() %>%
select(1:4)
Number of rows
df %>% count()
It looks like pandas has a to_parquet method that accepts columns as structuring levels, so from an "implementing the code" point of view this is trivial. It doesn't solve the "is it better or worse for our potential stakeholders to do this?", though, right? Although then I guess the argument is that we don't NEED to save out a full copy and then a per-guide copy, we just have a parquet structure that saves things out per gene and per guide
One question that is not clear to me @shntnu , I can almost certainly dig in but if you already know then so much the better - right now, we have data that is individually normalized per plate. Are either of the following two easy to do?
1) Write out Plate A in a per_gene / per_guide structure, then append to each per_gene/per_guide the data from Plate B? 2) Keep the data in a per_plate/per_gene/per_guide structure, but then only read in gene X or guide Y from all plates?
Although then I guess the argument is that we don't NEED to save out a full copy and then a per-guide copy, we just have a parquet structure that saves things out per gene and per guide
Indeed, this is the argument
Are either of the following two easy to do?
- Write out Plate A in a per_gene / per_guide structure, then append to each per_gene/per_guide the data from Plate B?
- Keep the data in a per_plate/per_gene/per_guide structure, but then only read in gene X or guide Y from all plates?
You can definitely do 1. but it is better to do 2. to keep the hierarchical structure the same as the experimental hierarchy
library(tidyverse)
Lazy-load the whole dataset, then filter to read only guide ATF4-2
df <-
arrow::open_dataset("~/Downloads/cell_painting_median/")
object.size(df)
## 504 bytes
Filter to read only guide ATF4-2
df_guide <-
df %>%
filter(Metadata_pert_name == "ATF4-2") %>%
collect()
object.size(df_guide)
## 178888 bytes
We get what we want - one guide per cell line, and we didn’t need to load the whole dataset to get to this
df_guide %>%
select(matches("Metadata"))
Since there is only one step downstream of this (feature select), switching the output of normalize to be saved out as a parquet file instead really only means changing that import step and the normalize export step... but that is a pycytominer function, and it will be a non zero amount of work to add parquet support all across pycytominer.
non zero amount of work to add parquet support all across pycytominer.
Oh, it does not need to be parquet for this functionality to work. CSV is fine too (just not csv.gz I think)
Although it can still be cumbersome, haven’t seen the code.
It’s perfectly fine to skip this suggestion.
Ultimately, we really want to be able to do this in a tool upstream ie cytominer-transport, so one could argue that it’s a bad idea to try to implement it here.
I was unable to get Dask to work on a local test, even using Beth's help to set it up. Quite possible we could figure it out if we dug into it, but with the following simple setup it never finished the test. (I don't understand why but didn't pursue it further)
import dask.dataframe as dd
...
df = dd.read_csv(os.path.join(single_cell_input_dir,'nrows100000.csv'))
for guide in set(df["Metadata_Foci_Barcode_MatchedTo_Barcode"]):
append_df = df.loc[
df["Metadata_Foci_Barcode_MatchedTo_Barcode"] == guide
]
append_df = append_df.compute()
...
Working with a local test I was able to dramatically speed up the script using joblib Parallel, but running it on an EC2 instance r4.16xlarge (488 GB memory) on an actual dataset using the following:
from joblib import Parallel, delayed
...
Parallel(n_jobs=-2)(delayed(append_to_guide_csv)(guide, image_df) for guide in set(df["Metadata_Foci_Barcode_MatchedTo_Barcode"]))
seemed to run fine if I set n_jobs to 1. Anything larger gave me memory problems. With n_jobs = 4 I could see memory spike and machine crash. With n_jobs = 2 I got the following error and it looked like the script just hung out using memory but never actually crashing or running.
/home/ubuntu/miniconda3/envs/pooled-cell-painting/lib/python3.7/site-packages/joblib/externals/loky/process_executor.py:705: UserWarning: A worker stopped while some jobs were given to the executor. This can be caused by a too short worker timeout or by a memory leak.
"timeout or by a memory leak.", UserWarning
So for now I've removed my attempts at parallelizing this script and, given that this is a once/dataset operation that is optional, I'm not going further down that rabbit hole at the moment.
Corresponding update to pooled-cell-painting-profiling-template is here https://github.com/broadinstitute/pooled-cell-painting-profiling-template/pull/33