Open dna0ff opened 7 years ago
Ingesting data into HBase consists in tree steps:
In each execution, transform and load will be executed in parallel. By default, 2 files at a time.
-Dhadoop.load.archive.batch.size=<number>
This two steps can also be executed in different processes. The number of parallel executions will depend on your hardware.
--transform
. --load -Dhadoop.load.archive=true -Dhadoop.load.variant=false
The third step processes a batch of files every time. It depends, also, on the hardware. We have tried different sizes, and with a decent hardware can merge up to 200 files every time. This step can not be executed concurrently. It will raise an exception if you do so.
--load -Dhadoop.load.archive=false -Dhadoop.load.variant=true
-Dhadoop.load.variant.batch.size=<number>
You can combine the parameters as you want.
Just to add:
bcftools norm -cw -cs
on the gVCFs first for left alignment.hadoop.load.archive.batch.size
). That allows to load ~100 files per hour. Too many parallel loads can crash the HBase cluster ;)Thanks for detailed response, really appreciate.
I'll try to clarify the question. It wasn't about running 3 phases separately (it would be good to know if there are any reasons to do it this way though, i.e start Transform, Load and Merge from separate processes sequentially for a single file).
I was interested in running all 3 phases in one go (this worked well for a single process/file), but running several processes in parallel (i.e. each processing a different gvcf file through all 3 phases).
Based on your reply it seems it's not possible, due to phase 3 (Merge/MapReduce) can't be executed for more than 1 file concurrently, correct ?
Curiously, in my case a concurrent loading of 3 files by 3 separate processes failed in phase 2 (loading) for all the files but the first (not in phase 3, as it could be expected based on your reply) with the following exception
....
2017-01-29 16:00:24 [main] INFO PhoenixHelper:199 - Creating index: CREATE LOCAL INDEX IF NOT EXISTS "OPENCGA_U_P_A_SIFT2_IDX" ON "opencga_u_p" ( "A_SIFT"[2] ) INCLUDE("A_GENES", "A_SO" )
2017-01-29 16:00:24 [main] INFO PhoenixHelper:199 - Creating index: CREATE LOCAL INDEX IF NOT EXISTS "OPENCGA_U_P_TYPE_IDX" ON "opencga_u_p" ( "TYPE" ) INCLUDE("A_GENES", "A_SO" )
2017-01-29 16:00:24 [main] INFO HBaseLock:130 - Won the lock with token S10iEHpaMt (997786194) from lock: [S10iEHpaMt:1485705634722]
2017-01-29 16:00:24 [main] INFO HadoopDirectVariantStoragePipeline:413 - Found files in Archive DB: [37]
2017-01-29 16:00:24 [main] INFO HadoopDirectVariantStoragePipeline:417 - Found registered indexed files: []
2017-01-29 16:00:25 [main] INFO HadoopDirectVariantStoragePipeline:427 - Found source for file id 37 with registered id 37
2017-01-29 16:00:25 [main] INFO HadoopDirectVariantStoragePipeline:445 - Found pending in DB: [37]
2017-01-29 16:00:25 [main] INFO HBaseLock:162 - Unlock lock with token 997786194
2017-01-29 16:00:25 [main] ERROR VariantFileIndexerStorageOperation:270 - Error executing INDEX
org.opencb.opencga.storage.core.exceptions.StoragePipelineException: Exception executing load: Unable to process a new batch. Ongoing batch operation: BatchFileOperation{operationName='Load'
, fileIds=[37], timestamp=1, status={Sun Jan 29 13:40:34 UTC 2017=RUNNING}}
at org.opencb.opencga.storage.core.StorageEngine.loadFile(StorageEngine.java:127)
at org.opencb.opencga.storage.core.StorageEngine.index(StorageEngine.java:98)
at org.opencb.opencga.storage.core.variant.VariantStorageEngine.index(VariantStorageEngine.java:239)
at org.opencb.opencga.storage.hadoop.variant.HadoopVariantStorageEngine.index(HadoopVariantStorageEngine.java:117)
at org.opencb.opencga.storage.core.manager.variant.operations.VariantFileIndexerStorageOperation.index(VariantFileIndexerStorageOperation.java:268)
at org.opencb.opencga.storage.core.manager.variant.VariantStorageManager.index(VariantStorageManager.java:163)
at org.opencb.opencga.storage.core.manager.variant.VariantStorageManager.index(VariantStorageManager.java:154)
at org.opencb.opencga.app.cli.analysis.VariantCommandExecutor.index(VariantCommandExecutor.java:223)
at org.opencb.opencga.app.cli.analysis.VariantCommandExecutor.execute(VariantCommandExecutor.java:91)
at org.opencb.opencga.app.cli.analysis.AnalysisMain.privateMain(AnalysisMain.java:92)
at org.opencb.opencga.app.cli.analysis.AnalysisMain.main(AnalysisMain.java:31)
Caused by: org.opencb.opencga.storage.hadoop.exceptions.StorageHadoopException: Unable to process a new batch. Ongoing batch operation: BatchFileOperation{operationName='Load', fileIds=[37], timestamp=1, status={Sun Jan 29 13:40:34 UTC 2017=RUNNING}}
at org.opencb.opencga.storage.hadoop.variant.AbstractHadoopVariantStoragePipeline.addBatchOperation(AbstractHadoopVariantStoragePipeline.java:527)
at org.opencb.opencga.storage.hadoop.variant.AbstractHadoopVariantStoragePipeline.securePreMerge(AbstractHadoopVariantStoragePipeline.java:481)
at org.opencb.opencga.storage.hadoop.variant.AbstractHadoopVariantStoragePipeline.preMerge(AbstractHadoopVariantStoragePipeline.java:381)
at org.opencb.opencga.storage.hadoop.variant.AbstractHadoopVariantStoragePipeline.preLoad(AbstractHadoopVariantStoragePipeline.java:350)
at org.opencb.opencga.storage.core.StorageEngine.loadFile(StorageEngine.java:115)
... 10 more
The first process that entered the loading phase finished successfully.
Probably it might be relevant to mention that this were 3 combined gvcf files with 30 samples in each, separated by chromosome. I.e. all 3 files contained the same sample IDs, but different chromosomes.
Based on your reply it seems it's not possible, due to phase 3 (Merge/MapReduce) can't be executed for more than 1 file concurrently, correct ?
Correct. What you can do is execute first and second phases with --transform --load -Dhadoop.load.archive=true -Dhadoop.load.variant=false
( Note that we params --transform
and --load
can be excluded, since executing this two steps is the default behaviour, unless we specify one or the other)
Then, in a separated execution, execute only the third step with --load -Dhadoop.load.archive=false -Dhadoop.load.variant=true
The given error is because it was going to start a merge step, and there is already an ongoing merge step.
Loading batches of samples splitted horizontally (i.e. by chromosome) is one of the non tested features in Storage-Hadoop. There is a typo in the error message. It should say:
Unable to merge a new batch. Ongoing batch operation: BatchFileOperation{operationName='Merge' , fileIds=[37], timestamp=1, status={Sun Jan 29 13:40:34 UTC 2017=RUNNING}}
Regarding the type of data, the best scenario would be to load gVCF files with all the chromosomes, and with one or multiple samples. VCF files, or splitted by chromosome may fail or produce incorrect results.
Correct. What you can do is execute first and second phases with --transform --load -Dhadoop.load.archive=true -Dhadoop.load.variant=false ( Note that we params --transform and --load can be excluded, since executing this two steps is the default behaviour, unless we specify one or the other)
Then, in a separated execution, execute only the third step with --load -Dhadoop.load.archive=false -Dhadoop.load.variant=true
Is this the way how you parallelize indexing/loading in Genomics England ?
The given error is because it was going to start a merge step, and there is already an ongoing merge step.
As mentioned above, it wasn't a Merge step. It was Load. I also included some context before the exception, it happened just after Transform before Load.
There is a typo in the error message. It should say:
It doesn't seem so. It was Load (not Merge).
Then, in a separated execution, execute only the third step with --load -Dhadoop.load.archive=false -Dhadoop.load.variant=true
Does it process all variants from archive table at once for all the files previously loaded ? I.e. does it need to be run only once regardless how many files were loaded ?
-Dhadoop.load.archive=false -Dhadoop.load.variant=true
I developed this system for the BRIDGE project (10,000 samples) and would not use these options for the process. My configuration looks like this in the study-configuration.yml
hadoop.load.archive.batch.size: 3
(controls # of samples to load into ARCHIVE in parallel)
hadoop.load.variant.batch.size: 500
(controls # of samples to merge in ONE mapreduce job)
opencga.storage.hadoop.hbase.merge.archive.scan.batchsize: 500
(controls the batch size searching through already merged variants)
I e.g. 'register' 500 converted (single sample gVCF) files - that are: gvcf, json and proto (in that order)
opencga.sh files link -s $study --path 10_input -i $files
(1x for each file type)
followed by:
opencga.sh variant index --file 10_input --outdir 30_load --load
index
command with the above configuration takes care of
all in one go. This process is incremental and only scans the already loaded files, if there are missing information.
The available memory for mapreduce containers can be controlled with the options below. In short: More memory -> less running containers -> takes longer to finish.
e.g.
mapreduce.map.memory.mb: 2500
opencga.variant.table.mapreduce.map.java.opts: -Xmx2048m,-XX:+UseG1GC
Hint: use -XX:+UseG1GC also for the HBase region servers - works much better.
Is it designed to load concurrently to HBase from the same host ? All loading processes but one fail when running concurrently.