populationgenomics / seqr

web-based analysis tool for rare disease genomics
GNU Affero General Public License v3.0
3 stars 1 forks source link

BigQuery backend #79

Open lgruen opened 3 years ago

lgruen commented 3 years ago

This is a high-level proposal for using BigQuery as a query backend instead of Elasticsearch. @hanars, I'd love to hear whether this is something that you'd find worth considering for Broad's seqr deployment.

The main motivation is cost savings: seqr indexes in Elasticsearch are large and deployment nodes are idle most of the time, as the number of seqr queries is so low. Furthermore, most of the indexes aren't actively used, as only a fraction of families in seqr are curated at any given time, even over extended periods of time. Elasticsearch's recent addition of a frozen tier could potentially help with some of this, but BigQuery might still be a cheaper option.

To get some initial cost estimates, @illusional has built a seqr-specific Elasticsearch query translator that produces corresponding SQL queries for BigQuery. To get the data into BigQuery, we've used exactly the same annotated Hail table that would usually be loaded into Elasticsearch. We've converted each partition into a separate Parquet table, using zstd compression. The only change necessary was a minor sanitization of the column names.

The resulting files can be queried in BigQuery directly, using the external table feature to read from GCS directly. With BigQuery's on-demand pricing model, this is actually preferable compared to converting to BigQuery-native / internal tables: slot time per query increases, but billing is based purely on the amount of data read. Since internal tables don't seem to use compression, querying the compressed Parquet tables turns out to be much cheaper.

Storage on GCS is very cheap, but the amount of data read per BigQuery request is comparatively expensive. It therefore pays off to duplicate some data in GCS, if that results in smaller amount of data volume processed per query.

It therefore makes sense to create per-project tables, similar to the previous scheme for Elasticsearch:

  1. Produce a joint callset across all projects.
  2. Annotate that callset as a Hail MT, which takes advantage of sharing variant annotations across projects.
  3. Generate per-project subsets of the above Hail MT that only include variants that are relevant for each project.
  4. Write compressed Parquet tables for each project.
  5. Update the seqr project "index" to point at the new BigQuery table (there's no lag for indexing anymore).

Cross-project variant searches could be implemented by not subsetting the sample identifiers in the samples_num_alt column. Alternatively, for something like gene-specific cross-project searches, separate tables could be written for genomic intervals, to reduce the amount of data that needs to be scanned.

A query over a cohort with 170 samples requires reading 25 GB of compressed Parquet columns. It takes 12s (wallclock time) to process, with significantly higher slot time. Cost is based only on data read though, which corresponds to 0.20 AUD. Standard Storage on GCS for the whole cohort would cost 0.70 AUD per month.

In other words, cost now increases linearly with the number of queries, as opposed to a "flat rate" with Elasticsearch. Each query is relatively cheap though.

Adding more seqr projects only increases the cost on GCS, which is comparatively negligible. Consequently there's almost no penalty for keeping around older projects that are rarely used.

In the on-demand model, wallclock time per query is hard to control explicitly, as there are no knobs exposed to control query concurrency. 12s per query isn't ideal from a UI perspective, but still seems to be acceptable.

There are a number of ways a migration from Elasticsearch to BigQuery could be implemented. The Elasticsearch query construction is fairly involved, so it might be difficult to move that to a BigQuery-specific implementation, although it could result in some optimizations. Another option would be to leave the query implementation as is, but point the Django app to a proxy, which in turn could either forward the query to Elasticsearch or translate the query for BigQuery. This could also be used in a transparent validation mode, to compare results between both backends.

BigQuery also supports paging of results, but that requires the frontend to provide a token.

hanars commented 3 years ago

So this is something we have considered strongly, for many of the reasons you outlined. My hope is in late fall of this year to research, select, and begin migration to a new, non-Elasticsearch backend that will scale better.

My major concern with BigQuery is that seqr is currently platform-agnostic and free. So my biggest worry is if we switch to BigQuery, seqr would only work on GCP and I'm not sure that is consistent with the open source component of our mission. If you have any ideas about how we could (sanely) support multiple backends or how we could create an open-source compatible BigQuey implementation, I would be really interested to hear them!

A big impetus for us wanting to change backends is we wanted a more relational model, and the ability to duplicate less data. So having the "variant" level data stored once across all projects and only the "sample" level information (num_alt, gq, etc) being stored per-project. This sounds really different from what you are doing. I know you say that storage cost is low and query cost is high so keeping the flat data model is the best option, but I wonder if there would be a way to change the queries to be better optimized for a relational model. There are some non-cost-related advantages for this as well, including the ability to update ClinVar or add in silico predictors across all projects in one go, instead of updating data on a per-project basis and needing to reload all the data. Also, we are having some scaling problems reloading our entire large WGS projects, so the ability to only add a few new samples as we get them instead of the whole thing could be really valuable.

I also do worry a bit about performance - 12s is acceptable if not great, but what about if there were 1000 WGS samples, instead of 170 WES samples? I would assume the time would increase pretty significantly.

lgruen commented 3 years ago

Those are all excellent points, @hanars! Thank you very much! It would be great to work on this together. A few additional comments interspersed below.

My major concern with BigQuery is that seqr is currently platform-agnostic and free.

That's a totally fair point. While both alternatives you point out (supporting multiple backends or implementing yet another one) come with additional complexity, either of them could be manageable.

If some of the logic from EsSearch were lifted to a backend-agnostic query representation, the implementation overhead of adding another backend could be much smaller. Still, supporting multiple backends always runs the risk of feature disparity between backends and it's hard to keep all of them tested sufficiently.

If we can define a relatively small set of backend requirements, it might not be too difficult to implement a custom backend. I've experimented with Cloud Run a bit and I think we should be able to spin up a moderate amount of instances quickly enough to answer seqr queries on demand. That should be super cheap on GCP and in other environments one could either have an equivalent serverless implementation (Lambda on AWS) or potentially standing workers in an HPC environment.

I know you say that storage cost is low and query cost is high so keeping the flat data model is the best option, but I wonder if there would be a way to change the queries to be better optimized for a relational model.

The alternative to a flat model would be to run queries on a subset of those columns and then join that result with additional annotation tables (like ClinVar), right? With columnar formats (like BigQuery or Parquet), you don't actually pay much for the additional columns at query time. Instead I think of it format as a "predefined join", which generally is much cheaper than joining two tables with a different set of rows. That's why I'm a bit skeptical you'd actually get better performance from a relational model.

There are some non-cost-related advantages for this as well, including the ability to update ClinVar or add in silico predictors across all projects in one go, instead of updating data on a per-project basis and needing to reload all the data.

Aren't you annotating the joint callset table before you split data into multiple projects? I.e. wouldn't that annotation be across all projects and therefore annotate any shared row between projects only once already? Or are you referring mostly to the overhead of ingesting that data multiple times in ES? Note that this would completely disappear in a model where you just scan tables stored as blobs on GCS.

Another optimization we've been thinking about is annotating incrementally: i.e. instead of reannotating all columns, only update those that are dependent on e.g. a new ClinVar version. Alternatively, when the annotation sources stay the same, only annotate new rows when samples are added.

Also, we are having some scaling problems reloading our entire large WGS projects, so the ability to only add a few new samples as we get them instead of the whole thing could be really valuable.

That's really interesting to hear! Are you referring to the annotation stage in Hail or the loading into ES itself? We're of course very far from your scale, but I'd be keen to hear more details about these issues.

I also do worry a bit about performance - 12s is acceptable if not great, but what about if there were 1000 WGS samples, instead of 170 WES samples?

Great question :). Those were actually 170 WGS (not WES) samples, but more importantly I simulated them very crudely, by concatenating the annotated table for 17 WGS 10 times. For BigQuery, that's not completely silly, as there are no index structures, so it just increases the amount of data that needs to be scanned by 10x.

However, that of course completely disregards shared rows between samples. I've run a small subsampling experiment on the HGDP + 1KG dataset to see how many "new variants" are discovered relative to the existing number of samples. Above 1k samples, it's only around 25k new variants (and therefore rows) per additional sample, but even before that it drops quite quickly: image

It would be good to know if those numbers roughly correspond to what you're seeing in practice (I'm still a bit surprised by how quickly that curve drops, so I hope it's not a bug). However, it also means that 10 x 17 WGS corresponds to something closer to 5k WGS. That's also why the incremental annotations could save a large amount of cost.

hanars commented 3 years ago

I know you say that storage cost is low and query cost is high so keeping the flat data model is the best option, but I wonder if there would be a way to change the queries to be better optimized for a relational model.

The alternative to a flat model would be to run queries on a subset of those columns and then join that result with additional annotation tables (like ClinVar), right? With columnar formats (like BigQuery or Parquet), you don't actually pay much for the additional columns at query time. Instead I think of it format as a "predefined join", which generally is much cheaper than joining two tables with a different set of rows. That's why I'm a bit skeptical you'd actually get better performance from a relational model.

So I think you misunderstand what I mean by relational. I mean one record for all variant-level information: that would include clinvar, predictor scores, population frequencies, VEP annotations, anything that isn't sample-specific. Then there would be small models representing sample-specific data such as num alt and gq and other quality metrics.. These would be relational to that single variant model. So instead of having each variant represented once per project, each variant would be represented once, total. This way, doing the incremental loading you discuss is actually pretty doable. The graph you show supports this as being a pretty efficient approach, as since we already have thousands of samples in seqr when we go to add a new project or new samples to a project we would add very few new variants

There are some non-cost-related advantages for this as well, including the ability to update ClinVar or add in silico predictors across all projects in one go, instead of updating data on a per-project basis and needing to reload all the data.

Aren't you annotating the joint callset table before you split data into multiple projects? I.e. wouldn't that annotation be across all projects and therefore annotate any shared row between projects only once already? Or are you referring mostly to the overhead of ingesting that data multiple times in ES? Note that this would completely disappear in a model where you just scan tables stored as blobs on GCS.

Yes we annotate the joint callset before loading, but we have over 200 projects. Unless we reload all of them every couple of months (currently prohibitively expensive and labor intensive), the data gets stale really fast. Just because a project hasn't received any new samples in the past year doesn't mean it should have year old clinvar data, or miss out on new predictor scores we added.

Also, we are having some scaling problems reloading our entire large WGS projects, so the ability to only add a few new samples as we get them instead of the whole thing could be really valuable.

That's really interesting to hear! Are you referring to the annotation stage in Hail or the loading into ES itself? We're of course very far from your scale, but I'd be keen to hear more details about these issues.

I was referring to the loading to ES and the data storage size once it is there - it takes about 2 days to export our largest project using a pretty beefed up cluster, and its using well over a terabyte of disk space

I also do worry a bit about performance - 12s is acceptable if not great, but what about if there were 1000 WGS samples, instead of 170 WES samples?

Great question :). Those were actually 170 WGS (not WES) samples, but more importantly I simulated them very crudely, by concatenating the annotated table for 17 WGS 10 times. For BigQuery, that's not completely silly, as there are no index structures, so it just increases the amount of data that needs to be scanned by 10x.

How much does a data size increase of 10x convert to in terms of search time?

It would be good to know if those numbers roughly correspond to what you're seeing in practice (I'm still a bit surprised by how quickly that curve drops, so I hope it's not a bug). However, it also means that 10 x 17 WGS corresponds to something closer to 5k WGS. That's also why the incremental annotations could save a large amount of cost.

We've never done this analysis, but this does track with our intuition so its really cool to see this worked out!

lgruen commented 3 years ago

So instead of having each variant represented once per project, each variant would be represented once, total.

I can definitely see the appeal for not having to duplicate the storage for a common variant across 200 projects :). However, at search time this means you'd always have to take this shared (and therefore large) table into account. In the case of a backend like BigQuery, this means a linear scan across all variants that you've discovered across all projects, even if your sample-specific data would be small and you're just running a join with the large table.

In other words, you'd save a lot of disk space, but the amount of data that needs to be processed at query time would be significantly larger.

With a columnar format, having the sample-specific information in separate columns would essentially give you the same benefits as having two relational tables, unless there's an application in searching the sample-specific table by itself (without requiring a join with the variant-level table).

Can you share how many variants your joint callset currently contains? Comparing that with the number of variants in your largest project would be really instructive. If those differ by maybe just 2-3x from one another, just having one large table (even if it's not relational) might indeed be the way to go.

I was referring to the loading to ES and the data storage size once it is there - it takes about 2 days to export our largest project using a pretty beefed up cluster, and its using well over a terabyte of disk space

Wow, that's impressive. I'm hoping that all of this would go away if we just wrote tables to GCS though (but I get your point that replicating even a subset of those writes 200 times wouldn't be ideal). There's no additional loading involved and Parquet compression seems fairly good.

How much does a data size increase of 10x convert to in terms of search time?

BigQuery tries to keep query time pretty much constant -- it would just throw 10x more workers at the data, which would make the query also 10x more expensive. (In the on-demand model there's a limit of ~2000 slots though.)

I've been looking a bit at Apache Arrow and it seems like it could potentially be a good fit if we wanted to build a small self-contained backend. However, I'm not quite sure how to handle the sort scripts that are passed to ES. Do you think it would be possible to run these calculations at annotation time to create a column that can be sorted by, or are some of these sorts truly dependent on additional user parameters and can't be expressed "statically"?

hanars commented 3 years ago

Can you share how many variants your joint callset currently contains? Comparing that with the number of variants in your largest project would be really instructive. If those differ by maybe just 2-3x from one another, just having one large table (even if it's not relational) might indeed be the way to go.

I'd have to look into this, I would be curious as well. But I see your point that for our smaller exome projects, having their own tables would be a lot less data to scan through

BigQuery tries to keep query time pretty much constant -- it would just throw 10x more workers at the data, which would make the query also 10x more expensive. (In the on-demand model there's a limit of ~2000 slots though.)

Cool. We would need to look into this more to make sure this approach wouldn't get prohibitively expensive for us

I've been looking a bit at Apache Arrow and it seems like it could potentially be a good fit if we wanted to build a small self-contained backend. However, I'm not quite sure how to handle the sort scripts that are passed to ES. Do you think it would be possible to run these calculations at annotation time to create a column that can be sorted by, or are some of these sorts truly dependent on additional user parameters and can't be expressed "statically"?

The big issue is for the gene-specific sorts (in_omim and constraint) we have intentionally kept gene-level metadata out of our loading pipeline and stored separately in seqr. The main reason is because it means we can update the data in seqr and it propagates to all variants and projects immediately instead of needing to reload data in order to get the latest gene annotations. We update OMIM weekly so data that we've loaded will basically always be stale relative to the OMIM we have in our reference database.

illusional commented 3 years ago

Hi @hanars, building on from Leo's work - I've been exploring a couple of methods to support multiple backends in seqr, and wanted to ask you what you think, and whether this is an approach you agree with, and could potentially see upstreaming in the future. This is a proof of concept, definitely not asking for a code-review at this stage :)

At the moment, I'm intercepting the search_model that the seqr UI produces, and instead of building an ES expression directly, it builds an abstract expression in a new set of classes which can translate themselves to elastic-search, our testing protobuff format, and a rough SQL query as a test (SQL couldn't be a full viable replacement anyway).

I've opted for trying to extract the search_model -> query module into a new section, to try and reduce the amount of data and state that's pulled within the query building.

I've put together a DRAFT pr: https://github.com/populationgenomics/seqr/pull/91

But some rough examples, the following search_model:

{"pathogenicity": {"clinvar": ["pathogenic", "likely_pathogenic"]}}

Produces this expression tree:

CallAnd(
    CallFieldIsOneOf(
        field=Field(name="clinvar_clinical_significance"),
        values=[
            "Likely_pathogenic",
            "Pathogenic",
            "Pathogenic/Likely_pathogenic",
        ],
    )
)

Which generates this elastic-search query:

{
  "query": {
    "bool": {
      "must": [{
        "terms": {
          "clinvar_clinical_significance": [
            "Likely_pathogenic", 
            "Pathogenic", 
            "Pathogenic/Likely_pathogenic"
          ]
        }
      }]
    }
  },
  "sort": ["xpos", "variantId"],
  "from": 0,
  "size": 100,
  "_source": ["<source-fields>"]
}

And this rough SQL query:

SELECT * FROM variants 
    WHERE (clinvar_clinical_significance in ("Likely_pathogenic", "Pathogenic", "Pathogenic/Likely_pathogenic"))

It's producing equivalent queries to some more complex examples:

  1. https://github.com/populationgenomics/seqr/blob/c200771a98b21277ceddc8fd61b898e98b0e5394/seqr/utils/search/test_constructor.py#L72-L96
  2. https://github.com/populationgenomics/seqr/blob/c200771a98b21277ceddc8fd61b898e98b0e5394/seqr/utils/search/test_constructor.py#L279-L286
Example of the expression produced by (1):
CallAnd(
    CallOr(
        CallFieldIsOneOf(
            field=Field(name="transcriptConsequenceTerms"),
            values=[""],
        ),
        CallFieldIsOneOf(
            field=Field(name="clinvar_clinical_significance"),
            values=[
                "Likely_pathogenic",
                "Pathogenic",
                "Pathogenic/Likely_pathogenic",
            ],
        ),
    ),
    CallAnd(
        CallAnd(
            CallOr(
                CallEqual(
                    Field(name="samples_num_alt_2"),
                    Literal(literal="NA12878", literal_type="string_value"),
                )
            ),
            CallNegate(
                CallOr(
                    CallEqual(
                        Field(name="samples_no_call"),
                        Literal(literal="NA12891", literal_type="string_value"),
                    ),
                    CallEqual(
                        Field(name="samples_num_alt_2"),
                        Literal(literal="NA12891", literal_type="string_value"),
                    ),
                )
            ),
        )
    ),
)

It's unlikely to produce identical elastic-search expressions, eg: Negate(Should(...)) ~ MustNot(...), and I'm sure there will be more obstacles, but promising results so far.

Sorting

Sorting is the only element that the backend that we're testing doesn't handle, our plan is to max the variant search at 10,000, and sort in the browser. We're confident that the browser should be able to handle the size of the objects, and most JS libraries offer ~10ms search times with 10,000 elements. This matches the default max_result_window for elastic search.

I see there are some more complex predefined sorts within seqr, but I'm confident they could be ported to the web where appropriate.

The main problem I see with this, storing the results of a non-paged search in VariantSearchResults.

Next steps

If you're happy with this approach, my next steps are to:

FYI @lgruen.

Thanks in advance!

hanars commented 3 years ago

So at a high level, I guess I'm confused why we need yet another intermediate representation of the query. It seems to me that that just adds another level of complexity without adding a lot of utility, although maybe I'm misunderstanding the approach. Moreover, a fundamental assumption you are making is that we will represent all fields identically in all backends, so the and/ or/ terms/ range etc behaviors would all be extensible. But to me thats a really limiting approach. Many of the fields we store in elasticsearch are stored and queried in an honestly horrible way that was just necessary to work for the limitations of elasticsearch - In particular genotypes. In your example above, you show the following example intermediate representation:

CallAnd(
        CallAnd(
            CallOr(
                CallEqual(
                    Field(name="samples_num_alt_2"),
                    Literal(literal="NA12878", literal_type="string_value"),
                )
            ),
            CallNegate(
                CallOr(
                    CallEqual(
                        Field(name="samples_no_call"),
                        Literal(literal="NA12891", literal_type="string_value"),
                    ),
                    CallEqual(
                        Field(name="samples_num_alt_2"),
                        Literal(literal="NA12891", literal_type="string_value"),
                    ),
                )
            ),
        )
    ),

This maps quite neatly to the elasticsearch query, and I guess it works for you BigQuery representation, but honestly this is a terrible way to query genotypes and my hope for our next seqr backend is that this query could be totally different. For example, a theoretical pseudo-query might look like

SELECT * FROM variants 
    AGG (SELECT * from genotypes WHERE sample_id in SAMPLE_IDS ) AS gt 
    WHERE COUNT(gt.sample_id = 'NA12878' AND gt.num_alt == 2) == 1
    AND COUNT(gt. sample_id = 'NA12891' AND gt.num_alt in [0, 1]) == 1

To me, theres no intermediate representation you could write that would be a helpful intermediary for those 2 totally different queries, and in fact having that intermediary could really do more harm than good. My hope in creating an abstract representation for query backends would be to free us to represent data totally differently in different backends. Different field names, different structures, different everything if need be, and creating an intermediate representation that essentially duplicates the data model we picked for elasticsearch really limits that goal

To me, a much cleaner solution would be that we create a python base class or abstract class that has utility functions to take in the json query as represented by the json in the ui and then convert it for any given backend, without the intermediate representation step.

Sorting I think its fine to query all the variants and then sort and paginate in seqr itself and not in the database, although given that almost every database in the world has good sorting functionality, it seems like a real waste to do a less performant post-facto sort and also to really slow down the query due to the network lag of sending 10,0000 documents instead of 100. If I were you I would do a hybrid approach where simple sorts are performed at the database level, and complex sorts are performed in seqr itself, but thats just personal preference.

I would also choose to have sort and pagination all performed in a server side proxy rather than in the javascript, because otherwise the javascript behavior would have to be completely different depending on which backend you have running, which strikes me as really complex to maintain. I guess if you were doing a true switchover and did not intend to support the multiple backends that would be okay, but for the main seqr code base I don't think I would ever accept a search implementation that had such severe UI implications