googleapis / google-cloud-go

Google Cloud Client Libraries for Go.
https://cloud.google.com/go/docs/reference
Apache License 2.0
3.74k stars 1.29k forks source link

bigquery: enable transparent storage api usage #3880

Closed derekperkins closed 1 year ago

derekperkins commented 3 years ago

Is your feature request related to a problem? Please describe. Retrieving data from larger queries is painfully slow. In the trace below, the query completed in < 1s, but then it took over 2 minutes to retrieve 180 MB. Typically transferring that inside GCP should take no more than a few seconds, if not sub-second.

Describe the solution you'd like The Python libraries have already enabled usage by default. Not sure if there are any downsides to that, but it seems like it could be an easy win without exposing any more knobs. (just noticed that @shollyman approved it) https://github.com/googleapis/python-bigquery/commit/e75ff8297c65981545b097f75a17cf9e78ac6772

Benchmark copied from the Python PR https://github.com/googleapis/python-bigquery/pull/55#issuecomment-610352614

image

If for some reason it shouldn't be enabled automatically, adding an option to QueryConfig would also be a good option.

Describe alternatives you've considered Using the storage api manually, but that's a completely different paradigm with Avro/Arrow schemas and has a significant mental overhead compared to just using the standard BigQuery package.

Additional context

trace
derekperkins commented 3 years ago

Thinking about it a little more, here's some proposed logic / pseudocode for when to use the storage api. Open to suggestions or clarifications of my assumptions.

var useStorageAPI bool
var tableRef *Table
var wg sync.WaitGroup

// the jobs.query response doesn't return the table reference we'll need for the storage api,
// so we should be able to fetch it concurrently while we wait for the query to complete (not tested)
// we may not need it, but it should be a simple call that will often reduce latency 
if jobsQuery {
    wg.Add(1)
    go func() {
      j, err := getJob()
      tableRef = j.QueryConfig.Dst
      wg.Done()
    }()
}

switch {
  // in this case, we either have all the rows or it's one more RPC to get the remainder, so no need
  // for added concurrency / complexity that will likely increase latency
  case jobsQuery && queryResp.TotalRows <= len(queryResp.Rows) * 2:
    useStorageAPI = false

  // for all other response sizes on jobs.query, use storage api to parallelize it
  case jobsQuery:
    useStorageAPI = true

  // for all jobs.insert calls, including when jobs.query falls back to polling, use the storage api
  // it should be the same number of RPC calls as getQueryRows, but can be parallelized.
  // There shouldn't be a latency penalty even in the case where getQueryRows only needs to be called once
  case jobsInsert:
    useStorageAPI = true
}

wg.Wait()

if useStorageAPI {
  // get data
} else {
  // get data
}
derekperkins commented 3 years ago

More thoughts:

First class support for Storage API

This semi-exceeds the original scope of this issue, but seems like the logical outcome of adding support. Right now, using the storage api kicks you into raw mode, with none of the conveniences of iterating, unmarshaling, etc. Since all of those will necessarily need to be solved, then it should be simple to add a public api like client.ReadRows or something that handles session creation / unmarshaling for you, returning an iterator like a standard query. That would need the filter/column selection options, but those are pretty straightforward, and the transparent query could just set those on the user's behalf, so it all shares the same codepath.

shollyman commented 3 years ago

Yeah, there's a lot of moving parts here: much more complexity in schema mangling code, more external dependencies, lots of corner cases, and need for multiple knobs (stream concurrency, decoder concurrency, etc).

It's definitely something I'd like to improve, but I've been pushing on other efforts recently.

derekperkins commented 3 years ago

Would you accept a PR roughly following that outline?

shollyman commented 3 years ago

This is going to be pretty invasive and touches a lot, and we're also working on some relevant changes in the server contract that may impact here. Let me see about getting this more formalized first; I'd rather save you frustration on some things that will end up being opaque to you.

derekperkins commented 3 years ago

I appreciate the transparency. Any rough idea on timelines for that? We've been very happy with what BI Engine has done for our query latency, but that becomes irrelevant when it takes 10-1000x the actual query time to retrieve the results of those queries. I'm guessing we should just plan on using the storage api directly ourselves?

shollyman commented 3 years ago

Unclear on timelines, so forging ahead with direct storage interactions doesn't block you on me.

The expectation has generally been that BI query patterns don't tend to return large volumes of row data, but your use case seems to be in contrast with that. I've not tested BI-based results in this size/shape before, so it's not clear to me if its written in a way where we can create a read session with more than a single stream (and if you need the rows to retain ordering, you can only use a single stream anyway).

derekperkins commented 3 years ago

I tried implementing the read api myself and wow, it is not simple. After spending time using it, I understand why it is the way it is, since protobuf doesn't really allow for dynamic schemas, but it could and really should be abstracted by this library. There were some optimizations I couldn't do because using query.Read only returns the iterator, leaving you without any of the job metadata.

I was also very disappointed at the performance. It took a few hundred ms to create a read session and wasn't any faster for single page reads. There's still an opportunity for parallelization, though as you mentioned, that would introduce a need for client-side ordering.

derekperkins commented 2 years ago

Any updates on this?

shollyman commented 2 years ago

This hasn't gotten as much attention, my attention has been elsewhere.

The iterator does now expose a reference to the job so that you can go get query statistics while still using the direct query->read path to getting an iterator.

One of the other prerequisites I think we need to address before making an opinionated abstraction over the query using the storage API is around the arrow format. There's some support in the arrow project, but the IPC reader doesn't yet expose a good way of reading multiple independent RecordBatch instances while reusing a common reference schema. I did some hacking testing and it seemed much more performant than avro.

Then the followup issues get into how we want to present the iterator in this case given arrow being a columnar format. Do we need a row-based approach and transpose, or allow users to access column segments directly? How much data conversion do we do vs exposing the arrow types in arrow directly?

alvarowolfx commented 1 year ago

Resolved by #6822 and soon to be released by #7307

jameshartig commented 1 year ago

@alvarowolfx from what I can tell the Storage Read API is free for this usage, correct? The pricing page says

Bytes scanned as part of reads from temporary tables are free

and my best understanding for how this was implemented is that it just does a ReadRows call on the temporary table from the query.