bigdatagenomics / adam

ADAM is a genomics analysis platform with specialized file formats built using Apache Avro, Apache Spark, and Apache Parquet. Apache 2 licensed.
Apache License 2.0
997 stars 309 forks source link

Investigate sorted join in dataset api #1223

Closed jpdna closed 7 years ago

jpdna commented 7 years ago

Related to #1216

It looks like dataframe/dataset api has, or will have, some built in support for sorted join operations: https://issues.apache.org/jira/browse/SPARK-12394

Needs further investigation to see if this is useful for ADAM goals of persisting and joining pre-sorted data.

jpdna commented 7 years ago

@devin-petersohn This answer claims that sortedness will be preserved by "bucketing" in Spark 2.0 sparkSQL https://forums.databricks.com/questions/7717/how-can-i-preserve-a-sorted-table.html

More on this: https://issues.apache.org/jira/browse/SPARK-11512

This sort-merge-bucket join definitely seems to be a real thing at least in SparkSQL - I 'd sure like to see or make a simple demo of it in action....

jpdna commented 7 years ago

I guess the trouble is we likely would like an interval-join, not a join on exact key matches.

devin-petersohn commented 7 years ago

The current build's ShuffleRegionJoin uses a bucketing method.

The new method #1216 I am working on is a bit different. Instead of using a bucketing, if we know that one of the RDDs is sorted, we just repartition the other one to match the regions on that partition once it's sorted. We move things around in sorted blocks rather than individually to maintain the sort post shuffle (and possibly improve runtime of the shuffle). We can make some optimizations on this model, but I think the core idea (maintaining sort through shuffle) should outperform something like this because we have state information about the RDD.

fnothaft commented 7 years ago

OOC, what do you mean by "sorted blocks rather than individually". Just wanted to check my understanding.

devin-petersohn commented 7 years ago

Once we know where the data on a partition is going, we can easily split the partition into "blocks" or lists such that each block is still sorted and all going to the same destination. To merge the blocks on arrival, we simply order by the source partition number (there are actually a number of things we could sort on that would result in the same outcome), such that the sort is at most on the number of partitions.

The general idea here is that because we want a global sort, the first (or last, or some combination of the two) n-k tuples will be shuffled, where n is the number on a partition and k is the number of tuples that are staying on that node.

This will hopefully improve performance over sending tuples individually and sorting after the shuffle. It will likely be data dependent, but that is my next test.

devin-petersohn commented 7 years ago

@fnothaft To further explain: the problem with repartitioning a sorted RDD is that there is no way of maintaining the sort if individual tuples are sent. Another sort must be done on each partition if that is the case. I have worked to reduce the sort cost (because a sort is required no matter what) such that the worst case sort is on the number of partitions. The worst case will only be met if you are repartitioning everything into a single node.

fnothaft commented 7 years ago

Ah, thanks for explaining it further! That makes a fair bit of sense to me.

ryan-williams commented 7 years ago

I'm missing context here, but a few possibly-related things:

Sorry if these are not relevant to the problem at hand here; I'd be curious to see an example of the expected inputs and outputs if so!

devin-petersohn commented 7 years ago

@ryan-williams That is definitely relevant to the conversation.

The code you wrote that would repartition a sorted RDD would definitely work, however it would have to perform a sort on each partition's data once it all arrives because there is no guarantee that the data will arrive in the sorted order. My work is just an optimization that decreases the amount of sorting done once all the data has arrived.

I think I will need to create some kind of animation to illustrate what is happening. It is difficult to put into words.

ryan-williams commented 7 years ago

Cool, I think I understand what you're proposing: on each partition P, group the elements that are going to each other partition Q (into an Array, say), and shuffle those Arrays, and concatenate them in order of their source partition P on the other side?

Or are you trying to do something else/further in terms of recognizing which data will not even end up on a different partition than they started on?

devin-petersohn commented 7 years ago

Currently, the repartitioner doesn't do anything special with the data that isn't moving. I will have to dig deeper into Spark's repartitionBy method to see if there is any optimization that we can do on that front.

ryan-williams commented 7 years ago

Off-hand, I'm pretty bearish on the potential for optimizations based on a presumption that some data will stay on the same executor (or node, or partition index), having attempted to make some optimizations based on that in the past.

The short story is that Spark very much does not guarantee any relationship between the {node,executor} any two tasks run on; I think some wishful assumptions related to that on our part led to some of the staggering amount of headache on #676; specificailly, https://github.com/bigdatagenomics/adam/issues/676#issuecomment-219347677 feels relevant.

So, is my first paragraph above what you are imagining? Or is something more restricted to a "join" context in play here? Is there a small code sample of the naive/"before" case that you're trying to optimize that I could look at?

devin-petersohn commented 7 years ago

This functionality wont be specific to joins, but I am doing work to optimize the joins. There are a lot of optimizations that we can do with the knowledge that RDDs are sorted (#1216). I just pushed my most recent work:

https://github.com/devin-petersohn/adam/blob/partitioner/adam-core/src/main/scala/org/bdgenomics/adam/rdd/SortedGenomicRDD.scala#L62

I tried to adequately document and comment things, so please let me know if something isn't clear.

ryan-williams commented 7 years ago

Cool; if you want any comments on specific pieces of the code, let me know where to do that (e.g. on a PR you could open).

One comment, and I am happy to be proven wrong about this, is that I would think the evenlyRepartition function you pointed me to above would perform worse than the equivalent .map(_ -> null).sortByKey(partitions).values.

At a first glance it is triggering at least 5 Spark jobs (collect, zipWithIndex (=2), partitionBy, collect, with a couple .maps left over for the next stage), the first 4 of which will each require full passes over the data, and several of which will pull all the data in to memory at once.

Sorry if I am missing something but given some of our past Spark woes I have developed some heuristics that this runs afoul of, so I'm happy to discuss further if you like / if I'm not totally missing the mark here!

devin-petersohn commented 7 years ago

It might be worse performing, that is one of the things I am testing. As it is now, it is definitely not perfect or optimal. There are a couple of things that I think could be better, but it have to work first.

There are other considerations we have also though, such as data skew. If this is an expensive up-front cost that saves a lot of downstream time, we can live with that too.

jpdna commented 7 years ago

May be tangential, but I remain a fan of ideas around both broadcast and map side joins using map partition and data access to another source from inside that map partition. Thus we do work parallelized by partition but otherwise escape the main spark framework if appropriate once we have what's needed in each partition On Oct 26, 2016 8:02 PM, "Ryan Williams" notifications@github.com wrote:

Cool; if you want any comments on specific pieces of the code, let me know where to do that (e.g. on a PR you could open).

One comment, and I am happy to be proven wrong about this, is that I would think the evenlyRepartition function you pointed me to above would perform worse than the equivalent .map(_ -> null).sortByKey(partitions). values.

At a first glance it is triggering at least 5 Spark jobs (collect, zipWithIndex (=2), partitionBy, collect, with a couple .maps left over for the next stage), the first 4 of which will each require full passes over the data, and several of which will pull all the data in to memory at once.

Sorry if I am missing something but given some of our past Spark woes I have developed some heuristics that this runs afoul of, so I'm happy to discuss further if you like / if I'm not totally missing the mark here!

— You are receiving this because you authored the thread. Reply to this email directly, view it on GitHub https://github.com/bigdatagenomics/adam/issues/1223#issuecomment-256511268, or mute the thread https://github.com/notifications/unsubscribe-auth/AHFFQKySXY5KQq6TiPQ-pHPX3imAb2C4ks5q3-oRgaJpZM4KfPCi .

fnothaft commented 7 years ago

Broadcast joins, yes, but I look at Broadcast joins as a way to eliminate shuffles when joining against a low cardinality table. If you're doing a map-side joins where you are doing data access outside of Spark inside a mapPartitions call, you're really fighting against both the Spark machinery and the RDD-based abstractions that we provide in the ADAM core. For context, I'll quote what I'd written recently in an email:

TINSTAAFL; if you move the load into the map, you still have to do a remote call/read if the data you are accessing is not on the node you’re running on. If Spark can schedule the logically co-partitioned tasks on the same node, it will. There’s a variety of reasons (data placement, being blocked during scheduling, etc) that Spark will not do that. You get the illusion of control if you move the load into the mapPartitions call, but actually, all you’re doing is fighting against Spark’s scheduler which knows where data is stored (for data in HDFS, that is).

Also, the performance costs at play here depend totally on what your network looks like, how evenly data is distributed in HDFS, how long your tasks are, etc. In https://arxiv.org/pdf/1507.03325.pdf / https://amplab.cs.berkeley.edu/wp-content/uploads/2016/08/Kira-19.pdf, we saw decreased end-to-end performance on astro workloads when task locality went from 92%—>99% because the delay scheduling penalty to get the additional 7% read locality was longer than the cost of doing a remote read from HDFS.

jpdna commented 7 years ago

Broadcast joins, yes, but I look at Broadcast joins as a way to eliminate shuffles when joining against a low cardinality table

Thanks for your feedback @fnothaft

Would you say the set of all exons in the genome would be such a low cardinality table you'd consider for broadcast? I guess the type of "join" I'm thinking of is any one that involves joining a 10-1000s+GB read read/genotype dataset to a 1-20 MB, or maybe even up to 500 MB genomic annotation dataset. This would account for 99% of all traditional genome annotation "track" data and the reference genome itself. My thinking here is very focused on the use case of joining to and filtering on genome features (genes, exons, transcription factor binding sites)

My main point is to make sure we consider broadcast ( and a more efficient one as discussed in #1224 ) when appropriate - which may be most genome feature annotation. If the data fits comfortably on my laptop, they can probably be broadcast....

If we can solve sorting/shuffle problem and ensure that such a join to a small RDD is cheap, then fine - otherwise broadcast then join type methods are a good way to control adding in this annotation data prior to a filter, while avoiding shuffles/sorts.

As for the map-side data access, I'd argue that the data locality doesn't matter much as the data being joined is small anyway and thus by definition will likely NOT be co-located with a large distributed genotype/read dataset. I'm not sure what other costs there are to this within-partition data access. I see the within-partition data access are more "targeted broadcast" (I guess an oxymoron) that is a minor optimization if you know what external data the partition needs exactly, but you don't want to do a RDD level join.

fnothaft commented 7 years ago

Would you say the set of all exons in the genome would be such a low cardinality table you'd consider for broadcast? I guess the type of "join" I'm thinking of is any one that involves joining a 10-1000s+GB read read/genotype dataset to a 1-20 MB, or maybe even up to 500 MB genomic annotation dataset. This would account for 99% of all traditional genome annotation "track" data and the reference genome itself. My thinking here is very focused on the use case of joining to and filtering on genome features (genes, exons, transcription factor binding sites)

I consider broadcast joins appropriate on RDDs <10GB in size (i.e., the size of a dbSNP sites-only VCF). Obviously, this depends on the heap space on each machine, but we routinely broadcast an ~O(10GB) table for BQSR.

As for the map-side data access, I'd argue that the data locality doesn't matter much as the data being joined is small anyway and thus by definition will likely NOT be co-located with a large distributed genotype/read dataset. I'm not sure what other costs there are to this within-partition data access. I see the within-partition data access are more "targeted broadcast" (I guess an oxymoron) that is a minor optimization if you know what external data the partition needs exactly, but you don't want to do a RDD level join.

I guess I just don't really see any wins to this approach. If the data is so small that locality is irrelevant, then it's so small that the costs of doing a collect followed by a broadcast are trivial. If this is the case, then both approaches (map-side load vs. broadcast join) have equivalent performance, but the map-side load sacrifices the join abstraction and is less general (you need novel map-side load code for each data type/source you want to load from). I would also add that you aren't going to get partition level data locality that is necessary for:

that is a minor optimization if you know what external data the partition needs exactly

unless you do a sort first, and if you've already sorted your data, a sort-merge join is likely to beat the performance of a broadcast join anyways. Thoughts? Let me know if I'm missing something.

fnothaft commented 7 years ago

Realistically, I don't think we'd be able to make progress on this without involving the spark-sql team. Our approach requires a bit more logic beyond a plain sort-merge equijoin. I motion that we close this ticket for now in favor of #1216/#1324; any objections?

fnothaft commented 7 years ago

Actually, I'm just going to close this for now. We can reopen at a later date, as needed.