This is the main R source code repository for IMPC statistical pipeline.
The IMPC statistical pipeline requires 4 steps to complete:
%%{
init: {
"theme": "default",
"themeVariables": {
"fontSize": "15px"
},
"sequence": {
"useMaxWidth": false
}
}
}%%
flowchart TB
subgraph container[ ]
style container fill:#ffffff
direction TB
subgraph stats_pipeline ["Step 1. Analysis ±2 weeks"]
style stats_pipeline fill:#E6E6FA
style main_ageing fill:#E6FAE6,stroke:#6FC56D
style main_ageing_phase3 fill:#E6FAE6,stroke:#6FC56D
direction LR
subgraph phase1 ["Phase I. Preparing parquet files ±36 min"]
direction TB
inputStatsPipeline[StatsPipeline]-->|DRversion=20.2| step1[far:fa-file Step1MakePar2RdataJobs.R]
step1 --> |Generate file with a list of jobs| step2_parquet2rdata{{jobs_step2_Parquet2Rdata.bch}}
step2_parquet2rdata --> step2[far:fa-file Step2Parquet2Rdata.R]
step2_parquet2rdata --> |Run all jobs in .bch and \nwait until it's finished| step3[far:fa-file Step3MergeRdataFilesJobs.R]
step2 --> step3
step3 --> |Generate file with a list of jobs| step4_merge_rdatas{{jobs_step4_MergeRdatas.bch}}
step4_merge_rdatas --> step4[far:fa-file Step4MergingRdataFiles.R]
step4_merge_rdatas --> |Run all jobs in .bch and \nwait until it's finished| compress_cleaning[Compress log files and clean up]
step4 --> compress_cleaning
compress_cleaning --> |zip -rm| parquet_to_rdata_jobs{{far:fa-folder Parquet2RdataJobs.zip}}
compress_cleaning --> |zip -rm| parquet_to_rdata_logs{{far:fa-folder Parquet2RdataLogs.zip}}
compress_cleaning --> |rm -rf| procedure_scatter_data{{far:fa-folder ProcedureScatterRdata}}
end
subgraph phase2 ["Phase II. Reprocessing the data ±5 days 14 hours"]
direction TB
job_creator[jobCreator from\nsideFunctions.R] --> |Generate file with jobs| data_generation_job_list{{DataGenerationJobList.bch}}
data_generation_job_list --> input_data_generator[far:fa-file InputDataGenerator.R]
data_generation_job_list --> |Run all jobs in .bch and \nwait until it's finished| compress_logs[Compress logs]
input_data_generator --> generate_data[GenerateData from\nInputDataGenerator.R]
generate_data --> |GenerateData run\nmainAgeing function| main_ageing[mainAgeing from\nDRrequiredAgeing]
main_ageing --> |BatchProducer = TRUE| compress_logs
compress_logs --> remove_logs[Remove logs]
end
subgraph phase3 ["Phase III. Initialising the statistical analysis... ±6 days 22 hours"]
direction TB
update_impress[updateImpress from\nsideFunctions.R] --> windowing_pipeline{Is\nwindowingPipeline\nTrue?}
windowing_pipeline --> |"True — default"| window_true[Copy function_windowed.R\n and rename to function.R]
windowing_pipeline --> |Else| window_else[Copy function.R]
window_true --> replace_word[ReplaceWordInFile from\nsideFunctions.R]
window_else --> replace_word
replace_word --> |ReplaceWordInFile use function.R| main_ageing_phase3[mainAgeing from\nDRrequiredAgeing]
main_ageing_phase3 --> |BatchProducer = FALSE\nWait until completion| package_backup[packageBackup from\nsideFunctions.R]
end
end
subgraph further_steps[ ]
direction LR
annotation["Step 2.Annotation\nand transfer pipeline\n±1 Day"] --> report["Step 3. Report\ngenerating pipeline\n±½ day"]
report --> risky["Step 4. Extraction\nof risky genes pipeline\n±30 minutes"]
end
input[/ETL Parquet Files\] --> stats_pipeline --> further_steps
mp_chooser[/mp_chooser\] --> stats_pipeline
phase1 --> phase2
phase2 --> phase3
end
classDef title font-size:30px
class stats_pipeline title
These instructions are tailored for Release 21.0.
Start screen
screen -S stats-pipeline
Switch to the mi_stats virtual user:
become mi_stats
Set necessary variables:
export VERSION="21.0"
export REMOTE="mpi2"
export BRANCH="master"
export KOMP_PATH="<absolute_path_to_directory>"
Create a working directory:
mkdir --mode=775 ${KOMP_PATH}/impc_statistical_pipeline/IMPC_DRs/stats_pipeline_input_dr${VERSION}
cd ${KOMP_PATH}/impc_statistical_pipeline/IMPC_DRs/stats_pipeline_input_dr${VERSION}
Copy the input parquet files (±80*10^6 data points) and mp_chooser_json
:
cp ${KOMP_PATH}/data-releases/latest-input/dr${VERSION}/output/flatten_observations_parquet/*.parquet ./
cp ${KOMP_PATH}/data-releases/latest-input/dr${VERSION}/output/mp_chooser_json/part-*.txt ./mp_chooser.json
Note: Be cautious, the location of the input files may vary.
Refer to the Observations Output Schema. In the current dataset, some fields that should be arrays are presented as comma-separated lists.
Convert the mp_chooser JSON file to Rdata:
R -e "a = jsonlite::fromJSON('mp_chooser.json');save(a,file='mp_chooser.json.Rdata')"
export MP_CHOOSER_FILE=$(echo -n '"'; realpath mp_chooser.json.Rdata | tr -d '\n'; echo -n '"')
Update packages to the latest version:
cd ${KOMP_PATH}/impc_statistical_pipeline/IMPC_DRs/stats_pipeline_input_dr${VERSION}
wget https://raw.githubusercontent.com/${REMOTE}/impc_stats_pipeline/${BRANCH}/Late%20adults%20stats%20pipeline/DRrequiredAgeing/DRrequiredAgeingPackage/inst/extdata/StatsPipeline/UpdatePackagesFromGithub.R
Rscript UpdatePackagesFromGithub.R ${REMOTE} ${BRANCH}
rm UpdatePackagesFromGithub.R
StatsPipeline
function on SLURM:
cd ${KOMP_PATH}/impc_statistical_pipeline/IMPC_DRs/stats_pipeline_input_dr${VERSION}
sbatch \
--time=30-00:00:00 \
--mem=8G \
-o ../stats_pipeline_logs/stats_pipeline_${VERSION}.log \
-e ../stats_pipeline_logs/stats_pipeline_${VERSION}.err \
--wrap="R -e 'DRrequiredAgeing:::StatsPipeline(DRversion=${VERSION})'"
Note: Remember to note down the job ID number that will appear after submitting the job.
Ctrl + A + D
.screen -r 3507472.stats-pipeline
squeue
to check job status.less ${KOMP_PATH}/impc_statistical_pipeline/IMPC_DRs/stats_pipeline_logs/stats_pipeline_${VERSION}.log
less ${KOMP_PATH}/impc_statistical_pipeline/IMPC_DRs/stats_pipeline_logs/stats_pipeline_${VERSION}.err
The IMPC_HadoopLoad
command uses the power of cluster to assign the annotations to the StatPackets and transfers the files to the Hadoop cluster. The files will be transferred to Hadoop:/hadoop/user/mi_stats/impc/statpackets/DRXX.
Reconnect to screen session Make sure to connect to the same login node you used to start the screen session.
screen -r 3507472.stats-pipeline
Update packages to the latest version:
wget https://raw.githubusercontent.com/${REMOTE}/impc_stats_pipeline/${BRANCH}/Late%20adults%20stats%20pipeline/DRrequiredAgeing/DRrequiredAgeingPackage/inst/extdata/StatsPipeline/UpdatePackagesFromGithub.R
Rscript UpdatePackagesFromGithub.R ${REMOTE} ${BRANCH}
rm UpdatePackagesFromGithub.R
Run annotation pipeline without exporting it to Hadoop: transfer=FALSE
cd ${KOMP_PATH}/impc_statistical_pipeline/IMPC_DRs/stats_pipeline_input_dr${VERSION}/SP/jobs/Results_IMPC_SP_Windowed
sbatch \
--time=3-00:00:00 \
--mem=8G \
-o ../../../../stats_pipeline_logs/annotation_pipeline_${VERSION}.log \
-e ../../../../stats_pipeline_logs/annotation_pipeline_${VERSION}.err \
--wrap="R -e 'DRrequiredAgeing:::IMPC_HadoopLoad(prefix=${VERSION},transfer=FALSE,mp_chooser_file=${MP_CHOOSER_FILE})'"
This process generates statistical reports typically utilized by the IMPC working groups.
${KOMP_PATH}/impc_statistical_pipeline/IMPC_DRs/stats_pipeline_input_drXX.y/SP/jobs/Results_IMPC_SP_Windowed
bsub –M 300000 –e errReportGeneratingPipeline –o outReportGeneratingPipeline –Is /bin/bash
${KOMP_PATH}/impc_statistical_pipeline/IMPC_DRs/stats_pipeline_input_drXX.y/SP/jobs/Results_IMPC_SP_Windowed
directory for the unidimentional and categorical results. The files can be gzip and moved to the FTP directory. You can decorate and format the files by using one of the formatted files in the previous data releases.
R
DRrequiredAgeing:::IMPC_statspipelinePostProcess(mp_chooser_file=${MP_CHOOSER_FILE})
DRrequiredAgeing:::ClearReportsAfterCreation()
This process generates a list of risky genes to check manually.
bsub –M 8000 –Is /bin/bash
R
DRrequiredAgeing:::extractRiskyGenesFromDRs('path to the gzip report from the NEW release','path to the new report on the OLD release')
RiskyGenesToCheck_[DATE].txt
in the current directory with each line a gene that should be manually checked.bjobs
command. During the first 4 days of running the pipeline, there should be less than 20 jobs running. Otherwise, there should be 5000+ jobs running on the codon cluster.<stats pipeline directory>/SP/logs
directory after the pipeline completes.cd <stats pipeline directory>/SP
find ./*/*_RawData/ClusterOut/ -name *ClusterOut -type f |xargs cp --backup=numbered -t <path to a log directory>
find ./*/*_RawData/ClusterErr/ -name *ClusterErr -type f |xargs cp --backup=numbered -t <path to a log directory>
<stats pipeline directory>/SP/jobs
and executing AllJobs.bch
. Before doing so, make sure to edit function.R and set the parameter onlyFillNonExistingResults
to TRUE. After making this change, run the pipeline by executing ./AllJobs.bch
and wait for the pipeline to fill in the missing analyses. Please note that this process may take up to 2 days./hadoop/user/mi_stats/impc/statpackets/DRXX.YY/
${KOMP_PATH}/impc_statistical_pipeline/IMPC_DRs/stats_pipeline_input_drXX.YY/SP/jobs/Results_IMPC_SP_Windowed/AnnotationExtractorAndHadoopLoader/tmp
cd DDD
DDD
directory, you can run the following R command: DRrequiredAgeing:::HadoopReTransferSCP(prefix=‘DRXX.YY/tmpDir’)
tmpDir
on Hadoop.tmpDir
into the 'YYY' directory. It's important to note that the Hadoop HDFS does not allow rewriting files. If the file already exists in the target directory YYY
, the process will fail. Hence, the intermediate step of transferring to tmpDir
is essential to avoid conflicts.