GELOG / adam-ibs

Ports the IBS/MDS/IBD functionality of Plink to Spark / ADAM
Apache License 2.0
3 stars 6 forks source link

Persistence of Variant/Genotype with ADAM format #5

Closed davidonlaptop closed 9 years ago

davidonlaptop commented 9 years ago

Description

This feature allow persistence of the PED/MAP fields imported in issue #2.

The Spark models should integrates with the ADAM format. The ADAM models for Variant and Genotypes should be a good start, but other record types may need to be added to the ADAM format.

The ADAM format can be added as a maven dependency. The structure of the ADAM format is defined in an Avro file. The Avro file can be compiled into Java classes.

For updates to the ADAM format, choose the most simple solution for you as long as it is easy to diff the changes to the Avro file.

Analysis

Add a comment to this issue with:

Add a comment to this issue describing how this will be implemented in Spark, and explain how the persistence model will work.

Implementation

The implementation should use:

francois-boyer commented 9 years ago

To convert Avro Schema to Java Classes:

  1. Clone: https://github.com/bigdatagenomics/bdg-formats/
  2. Convert [LOCAL_PATH]/bdg-formats/src/main/resources/avro/bdg.avdl to a avpr file with the following command: $ java -jar avro-tools-1.7.7.jar idl bdg.avdl bdg.avpr
  3. Generate the java classes with the following command: $ java -jar avro-tools-1.7.7.jar compile protocol bdg.avpr .
  4. Classes will then be created here: [LOCAL_PATH]/org/bdgenomics/formats/avro/
francois-boyer commented 9 years ago

@ikizema: Where should I commit the generated classes (see point #4 of previous comment)?

Classes are in the org.bdgenomics.formats.avro package. Should I commit the file in ".../src/main/scala/"? So next to the "com" folder?

Thanks.

iki-v commented 9 years ago

@francois-boyer, what do you think about ".../src/main/java/" for now ? Can you please create a new branch for that ? We'll be able to check and merge everything later on.

francois-boyer commented 9 years ago

ok. its done!

davidonlaptop commented 9 years ago

@francois-boyer @ikizema : I think the ideal would be that the generate classes are not included in version control, but rather part of the maven build. So when bdg-format would be a requirement to build the project, and when you build the classes are compiled for you automatically.

Let me know if this avenue is problematic.

iki-v commented 9 years ago

Les dépendances sont ici.

francois-boyer commented 9 years ago

Hey guys:

  1. I was finally able to generate the genomic classes from the .avdl file through maven. (@davidonlaptop, currently, I saved the .avdl file in the project. Do you want me to download the file directly from a repo instead?)
  2. Then I created a bunch of Genotype instances and had them saved in a parquet file.

Now I'm not exactly sure what should I do next. @ikizema, do you want me to load back the hole data that have been saved or you just want to query it?

For example, I'm now able to query the data like this: "SELECT sampleDescription FROM GenotypeTable WHERE sampleId = 15". Warning: even if it looks like an SQL database driven, it's not. I'm using SparkSQL to do that. I guess this is what you want to do in order to create new dataset for the next stages of the program. Can you confirm?

iki-v commented 9 years ago

Well done @francois-boyer !

If my understanding is good, for each original Plink .map and .ped dataset we'll create one .adam dataset in ADAM format. On each execution of comand (such as --genome, --cluster...) we'll querry the dataset and we'll add some extra information to it. At any time we may need to commit changes to dataset and make the persistance.

So :

It well be grate if you could provide for us a kind of gide on how to use it :

Thanks, Ivan

francois-boyer commented 9 years ago

@ikizema, I will pursue my researches but I'm currently not able to update datasets.

See one of the comment I found on the web: parquet is by design a write once format. As hdfs is append only, the metadata for a parquet file is stored at the end, to prevent having to know all of the file statistics by analyzing the data in memory and then Rebuffering all of it in memory again to compress and write it to the disk. If you want to update a file with new data frequently, parquet is not going to be the right format to use, it us designed to hold large datasets that are written once.

davidonlaptop commented 9 years ago

Well done Francois,

Yes, immutability is a feature of a big data set, for performance reason. If you have a 10 GB that is replicated on 3 nodes on the cluster and readers/writers are actively reading/writing to it, updating it is not a trivial task.

Yesterday at the Spark Summit, they've presented IndexedRDD API, a method to do fine-grain updates on in-memory dataset. Still you won't be able to update a persisted dataset. (RDD is the fundamental data structure that is the building block for every dataset in Spark).

So if you make a transformation to an existing dataset, you need to create another file - re-using the same data structure if possible (which plink does not).

SparkSQL is the easiest way to use Spark, and it lets the optimizer decide how to distribute the work on the cluster. That's fine to read data from disk, but I'm not sure if you can do UPDATE or INSERT statements with SparkSQL. For finer control over transformation, you can look at SchemaRDD (now called DataFrame in current Spark version). A DataFrame, is a very famous dataset abstraction that also exists in other Data Science languages like R and Python. You can do basic relational algebra like projections, filtering, aggregation, etc. Like the old name says, it is a RDD + schema (column names, data types, etc.). When using Parquet, the schema is inferred for you.

If you use a plain RDD in Spark, it is just like a regular file; however it could be useful for parsing CSV or custom plink format.

@Francois, do you want to investigate how to use the DataFrames ?

@Ivan, the first step is to import plink data (either text or binary) into an ADAM structure. As I understand Parquet, you can only store one type of record at the root, but each record can store other types of record. See the Genotype structure which holds a Variant record inside. So unlike RDBMS, where you join tables based on a key, the whole dataset is "pre-joined" (better performance) at the cost of denormalization.

Good job guys,

-David

On Mon, Jun 15, 2015 at 8:07 AM, francois-boyer notifications@github.com wrote:

@ikizema https://github.com/ikizema, I will pursue my researches but I'm currently not able to update datasets.

See one of the comment I found on the web: parquet is by design a write once format. As hdfs is append only, the metadata for a parquet file is stored at the end, to prevent having to know all of the file statistics by analyzing the data in memory and then Rebuffering all of it in memory again to compress and write it to the disk. If you want to update a file with new data frequently, parquet is not going to be the right format to use, it us designed to hold large datasets that are written once.

— Reply to this email directly or view it on GitHub https://github.com/GELOG/adam-ibs/issues/5#issuecomment-112100326.

francois-boyer commented 9 years ago

Hi @davidonlaptop, @ikizema,

Ok, I'll investigate with DataFrames.

On the side note, I wanted to confirm that currently I'm able to build, for testing purpose, Genotype objects and save them into a parquet file. For my example, I'm only setting the SampleId, the SampleDescription and the Variant members. Something like this: def createGenotype(idx: Int): Genotype = { return Genotype.newBuilder() .setSampleId(idx.toString()) .setSampleDescription("mySampleDescription" + idx.toString()) .setVariant(Variant.newBuilder() .setStart(idx.toLong) .setEnd(idx.toLong + 10) .build()) .build() }

Now, with SparkSQL I'm able to execute queries like this: SELECT sampleDescription FROM GenotypeTable WHERE sampleId = 20, but I can't filter the results by the variant member. I did tried something like this: SELECT Variant.end FROM GenotypeTable WHERE Variant.start = 20 but it's just not working. I spend more than 6 hours trying to figure out how this can be archive with SparkSQL. The only thing I found would be to switch to Hive and do something like this: SELECT end FROM GenotypeTable LATERAL VIEW explode(variant) v as variant WHERE start = 20. But according to the request, we must use spark.

I have to admit that this starts to go beyond my technicals capabilities. There is really not much documentations/tutorials/examples on the web on the subject (Examples are often too simple which doesn't help me to go any further). Anyway; I'm not giving up but I hope Ivan will be able to help me a little bit when I'll show him the code tomorrow.

francois-boyer commented 9 years ago

Hi @davidonlaptop, @ikizema,

I'm so happy! I managed to make it work!

I think SparkSQL is just not the right thing for more complex queries.

It's as easy as: dataFrame.filter("variant.start = 20").select("variant.start", "variant.end").show()

Now I only need to figure out if querying the data this way is efficient and also if I can get an Genotype object back...

Stay tuned!

iki-v commented 9 years ago

Bien joué @francois-boyer, exellente nouvelle ! Impatient de voir une petite demo ce soir. On va pouvoir commencer à converger nos infos et voir la big picture.

francois-boyer commented 9 years ago

Hi @rbahassi, @karim73, @ikizema, @naceurMh, @sfcwallace, @singemanator, @Aboubakary,

Ok, the example I shown yesterday is now available here: https://github.com/GELOG/example-avro-parquet-spark

I'll keep updating this example project according what we discussed yesterday:

Stay tuned!

francois-boyer commented 9 years ago

Hi @rbahassi, @karim73, @ikizema, @naceurMh, @sfcwallace, @singemanator, @Aboubakary,

Ok, I cleaned up the code so it'll be simpler for you to understand it. I also created a Users > Messages (users sends messages to other users) example to illustrate how we can join 2 parquet files using Spark.

So, now the example can create one parquet file that contains Users and another parquet file that contains the messages. Then, there is a function who load the two parquet files and join them through a Spark DataFrame. Than, the example realize some simple queries.

francois-boyer commented 9 years ago

Salut @davidonlaptop,

Note: "Je vais écrire ce message en français. Si cette idée est conservée, je traduirai en anglais pour la cause."

@ikizema et moi avons eu quelques discussions au sujet de Parquet et Spark au sein de notre projet et à la lumière de nos connaissances combinées, nous avons fait les constats suivant (@ikizema, tu me corrigeras si jamais je dis des bêtises) et aurions une idée à te proposer et voir si elle ferait du sens:

Le constat:

  1. Nous sommes à modifier le format Adam pour l'adapter à nos besoins.
  2. Une fois que la structure sera adaptée, nous serons en mesure de transférer les fichiers (.ped et .map) vers notre nouvelle structure
  3. On sauvegarde le tout dans un fichier parquet
  4. La seconde étape est de calculer les variances des individus et de sauvegarder les résultats au sein des données générées en 3
  5. La troisième étape est de calculer le Genome et de sauvegarder les résultats au sein des données générées en 4.
  6. etc.

L'enjeu ici est que nous ne pouvons pas modifier les contenus d'un fichier Parquet; c'est du read-only. Par contre on pourrait toujours loader en mémoire tous le contenu du fichier Parquet, le modifier et ensuite re-sauvegarder dans un nouveau fichier Parquet, mais on ne trouve pas que ce soit une bonne idée puisque ça affectera grandement les performances.

La proposition: @ikizema et moi pensions qu'il pourrait être intéressant de ne pas remplacer les fichiers Parquet générés, mais de les bonifier à l'aide de nouveaux fichiers Parquet en faisant des "join" à l'aide des DataFrame. Je m'explique: L'étape 3 génère un premier fichier parquet. L'étape 4 pourrait alors loader les informations dont elle a besoin du fichier parquet, effectuer ses calculs, créer les classes qu'elle aura besoin pour persister ses calculs (toujours de la même structure Adam modifiée par les membres du groupe MGL804). Ensuite, lors de l'étape 5, on pourrait joindre le fichier parquet de l'étape 4 à celui de l'étape 3 et obtenir les données nécessaires pour que l'étape 5 puisse réaliser ses calculs. Cette étape pourra alors créer un tout nouveau fichier parquet qui pourra être utile aux étapes succédantes qui elles joindraient les 3 fichiers.

Selon toi, est-ce que ça pourrait être une solution envisageable ou si vraiment tu tiens à ce que chaque étape load les données de son étape précédente et re-sauve tout dans un nouveau fichier parquet? Ça sera important de statuer rapidement car ce choix aura un effet sur la structure Adam que nous sommes à adapter.

J'espère que mes explications auront été assez claires. Si jamais tu as des questions ou besoin d'informations supplémentaires, n'hésite surtout pas à me contacter. Merci,

davidonlaptop commented 9 years ago

Hi Francois and Ivan, it is probably possible to join 2 datasets with the dataframes, but I think it is best to have all the data for an operation in a single dataset.

1) The first operation is to import the PED & MAP files into your Genotypes / Variants data structures in an ADAM dataset.

2) The second operation loads the files from step 1), and generates a new ADAM dataset similar to the GENOME record.

3) The third operation loads the files from step 2) and generates a new ADAM dataset similar to the CLUSTER record.

Sure, it would be great if we can jump from 1) to 3) in one operation, but the risk of failure is higher. Once the 3 operations above are completed, it shouldn't be that hard to add this flexibility.

The tradeoff with this approach is that we are duplicating a bit the data when going from 1) to 2), and 2) to 3). But we gain portability (a single dataset to move around and share with the world) and efficiency (no join to do).

If you're coming from an OLTP databases mindset, it may seems you're violating the law of normalisations (3FN), and you're entirely right. In OLTP databases, you want to do this so that you can do frequent atomic update at single location and lock a single table / row.

However, in genomic use cases, the source data changes very rarely - maybe every few months. So as long as the source data is kept, it is ok to duplicate the data.

Does it answer your question?

iki-v commented 9 years ago

Bonjour, cela peut être également fait de cette manière là...

Chaque approche a ses avantages et ses inconvénients. L’inconvénient majeur de cette approche est bien entendu la limitation de dataset en taille pour pouvoir être traité. Alors dans le cas précédent, la limitation de la taille de dataset est contrainte uniquement par la taille d'HDFS, dans le cas pressent la contrainte est bien plus forte (la RAM disponible sur les nœuds)... Aucun traitement ne serait permis si l'ensemble de l'information ne sont pas chargés dans la RAM.

Comme indiqué dans le CR de la dernière réunion, on semblait avoir l'approbation de notre démarche. @davidonlaptop, pourrai-tu être pressent mercredi prochain ? A ce stade, pour que nous puissions avancer, nous avons besoin de 2 validations suivantes :