googleapis / google-cloud-python

Google Cloud Client Library for Python
https://googleapis.github.io/google-cloud-python/
Apache License 2.0
4.84k stars 1.53k forks source link

BigQuery Storage: Disappointing performance when parsing Avro blocks #7805

Closed tswast closed 5 years ago

tswast commented 5 years ago

The google-cloud-bigquery-storage performance of parsing the Avro-encoded blocks of rows is disappointing, especially when compared to the Go implementation.

All three of the following benchmarks read data from the bigquery-public-data.usa_names.usa_1910_current table. They were run on an n1-standard-8 instance (though only a single stream is used).

# coding: utf-8
import concurrent.futures
from google.cloud import bigquery_storage_v1beta1
client = bigquery_storage_v1beta1.BigQueryStorageClient()
project_id = 'swast-scratch'
table_ref = bigquery_storage_v1beta1.types.TableReference()
table_ref.project_id = 'bigquery-public-data'
table_ref.dataset_id = 'usa_names'
table_ref.table_id = 'usa_1910_current'
session = client.create_read_session(
    table_ref,
    'projects/{}'.format(project_id),
    requested_streams=1,
)
stream = session.streams[0]
position = bigquery_storage_v1beta1.types.StreamPosition(
    stream=stream,
)
rowstream = client.read_rows(position)

Where they differ is in what they do with the blocks.

Parse the proto, but not Avro bytes: print(sum([page.num_items for page in rowstream.rows(session).pages]))

swast@pandas-gbq-test:~/benchmark$ time python3 parse_proto_no_avro.py 
5933561

real    0m12.278s
user    0m3.496s
sys     0m2.376s

Parse the Avro into rows with print(len(list(rowstream.rows(session)))):

swast@pandas-gbq-test:~/benchmark$ time python3 parse_avro.py 
5933561

real    0m42.055s
user    0m37.784s
sys     0m3.504s

Parse the Avro bytes into a pandas DataFrame.

df = rowstream.rows(session).to_dataframe()
print(len(df.index))
swast@pandas-gbq-test:~/benchmark$ time python3 parse_avro_to_dataframe.py 
5933561

real    1m13.449s
user    1m8.180s
sys     0m2.396s

CC @jadekler, since I'd like to track these metrics over time with the benchmarks project you're working on.

jeanbza commented 5 years ago

Ack :+1: We're a bit a ways off from getting to bigquery but good to keep this in our pocket when we start planning for it - thanks for filing!

max-sixty commented 5 years ago

To the extent we can get other formats than Avro, we wouldn't need to go into Python objects. Parquet / Arrow / even CSV (!) can all be decoded directly into an array.

The results above are consistent with that (though 40% of the total towards creating the dataframe from python objects is more than I would have expected)

I'm not really familiar with the relative benefits outside of that of Avro over Parquet / Arrow, though - maybe there are good reasons to favor Avro (streaming?)

tswast commented 5 years ago

To the extent we can get other formats than Avro, we wouldn't need to go into Python objects. Parquet / Arrow / even CSV (!) can all be decoded directly into an array.

Yeah, it's a little embarrassing that it's actually faster to decode the same table as a CSV file. 😳

though 40% of the total towards creating the dataframe from python objects is more than I would have expected

I think that's my fault for adding the dtypes override feature. To do that, I'm converting from row-oriented to column-orient in some Python loops. 🤦‍♂ That can probably be cut down with Cython or Numba.

maybe there are good reasons to favor Avro (streaming?)

Avro is the best supported encoder/decoder on the BigQuery-side. I'd love to have Arrow, since that makes "parsing" just a memory remapping operation (though we'll probably still have some copying due to the protobuf parser.)

tswast commented 5 years ago

I did some experiments with hacking up fastavro, as well as using numba. I can get a few percent speed-up by parsing the whole block at a time in fastavro with a hacked together "schemaless block parser", but it's not enough to really be worth pursuing.

cpdef _read_block(
    rows, fo, int block_count, writer_schema, reader_schema=None
):
    for i in range(block_count):
        rows[i] = _read_data(fo, writer_schema, reader_schema=reader_schema)

cpdef schemaless_block_reader(fo, int block_count, writer_schema, reader_schema=None):
    if writer_schema == reader_schema:
        # No need for the reader schema if they are the same
        reader_schema = None

    writer_schema = parse_schema(writer_schema)

    if reader_schema:
        reader_schema = parse_schema(reader_schema)

    rows = [None] * block_count
    _read_block(
        rows, fo, block_count, writer_schema, reader_schema=reader_schema
    )
    return rows

More promising are the numba experiments. RTB House published a post about their Avro parsing logic in which they got a big speed-up from generating parser code and then jitting it. To see if this would work in Python, I took the Avro schema for usa_names and converted it to handwritten Python code.


@numba.jit(nopython=True)
def _read_long(position, block):
    """int and long values are written using variable-length, zig-zag
    coding.

    Returns (new position, long integer)

    Derived from fastavro's implementation.
    """
    b = block[position]
    n = b & 0x7F
    shift = 7

    while (b & 0x80) != 0:
        position += 1
        b = block[position]
        n |= (b & 0x7F) << shift
        shift += 7

    return (position + 1, (n >> 1) ^ -(n & 1))

@numba.jit(nopython=True)
def _avro_rows(row_count, block):  #, avro_schema):
    """Parse all rows in a stream block.

    Args:
        block ( \
            ~google.cloud.bigquery_storage_v1beta1.types.ReadRowsResponse \
        ):
            A block containing Avro bytes to parse into rows.
        avro_schema (fastavro.schema):
            A parsed Avro schema, used to deserialized the bytes in the
            block.

    Returns:
        Iterable[Mapping]:
            A sequence of rows, represented as dictionaries.
    """
    position = 0
    rows = []
    for _ in range(row_count):
        # {
        #     "name": "state",
        #     "type": [
        #         "null",
        #         "string"
        #     ],
        #     "doc": "2-digit state code"
        # }
        position, union_type = _read_long(position, block)
        if union_type == 0:
            state = None
        else:
            position, strlen = _read_long(position, block)
            #state = str(block[position:position + strlen], encoding="utf-8")
            #state = block[position:position + strlen].decode("utf-8")
            #state = block[position:position + strlen]
            #state = b"" + block[position:position + strlen]
            state = None
            position = position + strlen
        # {
        #     "name": "gender",
        #     "type": [
        #         "null",
        #         "string"
        #     ],
        #     "doc": "Sex (M=male or F=female)"
        # }
        position, union_type = _read_long(position, block)
        if union_type == 0:
            gender = None
        else:
            position, strlen = _read_long(position, block)
            #gender = str(block[position:position + strlen], encoding="utf-8")
            #gender = block[position:position + strlen].decode("utf-8")
            #gender = block[position:position + strlen]
            #gender = b"" + block[position:position + strlen]
            gender = None
            position = position + strlen
        #gender = block[position:position + strlen]
        # {
        #     "name": "year",
        #     "type": [
        #         "null",
        #         "long"
        #     ],
        #     "doc": "4-digit year of birth"
        # }
        position, union_type = _read_long(position, block)
        if union_type == 0:
            year = None
        else:
            position, year = _read_long(position, block)
        # {
        #     "name": "name",
        #     "type": [
        #         "null",
        #         "string"
        #     ],
        #     "doc": "Given name of a person at birth"
        # }
        position, union_type = _read_long(position, block)
        if union_type == 0:
            name = None
        else:
            position, strlen = _read_long(position, block)
            #name = str(block[position:position + strlen], encoding="utf-8")
            #name = block[position:position + strlen].decode("utf-8")
            #name = block[position:position + strlen]
            #name = b"" + block[position:position + strlen]
            name = None
            position = position + strlen

        #name = block[position:position + strlen]
        # {
        #     "name": "number",
        #     "type": [
        #         "null",
        #         "long"
        #     ],
        #     "doc": "Number
        # }
        position, union_type = _read_long(position, block)
        if union_type == 0:
            number = None
        else:
            position, number = _read_long(position, block)

        rows.append((state, gender, year, name, number))
    return rows

With nopython=True, I can download and parse the whole table in 11.8 seconds! That's close to the same speed as the Go implementation.

$ time python benchmark/parse_avro.py
5552452
python benchmark/parse_avro.py  8.39s user 1.43s system 82% cpu 11.841 total

Without nopython=True, the speed is about 1/2 that (takes twice as long) of fastavro.

$ time python benchmark/parse_avro.py
5552452
python benchmark/parse_avro.py  13.15s user 3.32s system 28% cpu 57.406 total

There's some big problems with nopython=True, though. I can't construct strings in numba nopython=true and I can't even return bytes.

This experiment definitely gives me some hope that we can speed it up, though. I think it'd be interesting to try writing an Avro parser in a jitted language with good interoperability with Python (Julia? Or even continue with Numba if I figure out how to decode utf-8). Maybe it could write to Arrow blocks, since those are definitely more optimized for cross-language interoperability than Python dictionaries / tuples.

tswast commented 5 years ago

There is one other difference between my hand-written table-specific schema parser: it uses the block.avro_rows.serialized_binary_rows as a bytes buffer directly, whereas to use fastavro, I have to wrap with a BytesIO object. I imagine the bytes buffer is more easily optimized by numba or Cython than the BytesIO file-like object.

Before I give up on fastavro, I'll experiment with making it use a bytes buffer instead of a file-like object and see how that affects the speed.

max-sixty commented 5 years ago

That's v interesting. I think I understand most of it; though there's a lot there! A few thoughts:

Avro is the best supported encoder/decoder on the BigQuery-side. I'd love to have Arrow, since that makes "parsing" just a memory remapping operation

From what I read, Avro is row-orientated, and can be easily streamed. Parquet & arrow are column-orientated, and so would have to be read in blocks and then merged. Given that (as I understand it), the storage API is going to stream rows, and we want columns, something is going to have to buffer the rows and then transform into columns.

I can get a few percent speed-up by parsing the whole block at a time in fastavro

I think this is still creating python objects, which is likely contributing to it being only very slightly faster?

RTB House published a post about their Avro parsing logic in which they got a big speed-up from generating parser code and then jitting it.

That's v interesting. Am I right in thinking they are creating parsing code for the specific schema and JITing each time? That is another level of optimization!

There's some big problems with nopython=True, though. I can't construct strings in numba nopython=true and I can't even return bytes.

That does sound a bit awkward. I've had some success with numba, with admittedly easier tasks. I don't know enough to be helpful on this specific issue, but happy to help research / ask ppl who would.

rows.append((state, gender, year, name, number))

One thing to note is that we really don't want to be returning python objects / anything containing python objects (except with strings, which is unavoidable - pandas uses python strings in arrays). We want to be returning an array of say, floats. Numba handles arrays v well. NB: an array of floats is much smaller and cheaper to create than a list of those floats in python. e.g.


In [2]: sys.getsizeof(3.)
Out[2]: 24

In [8]: a=np.array(range(100)).astype(float)
In [9]: sys.getsizeof(a)
Out[9]: 896

# would be 2400 if python objects

Writing a partial Avro decoder in Numba would be pretty awesome. It is ambitious! Potentially it would be worth floating on the numba mailing list to see if people have good ideas.

Let me know how I can be helpful. Some of this is at the limit of my understanding, but more than happy to help where I can.

tswast commented 5 years ago

I did some more microbenchmarks today. I think the main reason for the slowness in parsing is the need to make extra copies.

Making 5,552,452 Python objects (one for each row) takes 8 seconds (or 5 seconds if using __slots__). This is indeed slow compared to constructing the same number of tuples, which only takes a fraction of a second.

But, what slowed parsing down more was creating copies of the strings as numpy arrays in numpy (to workaround inability to return byte slices). No copies: 12.9 seconds, Copies: 27.5 seconds. Then, when I pass those objects numba gives me into a Python object constructor, I think it has to make copies yet again (maybe because of reference counting tricks numba does in nopython mode?), bringing parsing time to about 1 minute.

I'm beginning to think that fastavro is about as fast as can be reasonably expected for getting dictionaries of rows. I think the way forward is as you suggest: to optimize creating dataframes for each block of rows from the BQ Storage API without having the intermediate step of Python objects.

max-sixty commented 5 years ago

Awesome, that's great research. Please could you explain in slightly more detail this part:

But, what slowed parsing down more was creating copies of the strings as numpy arrays in numpy (to workaround inability to return byte slices).

Is numba returning a 0d numpy array with a string inside?

Then, when I pass those objects numba gives me into a Python object constructor, I think it has to make copies yet again (maybe because of reference counting tricks numba does in nopython mode?), bringing parsing time to about 1 minute.

Is the Python object constructor in the numba routine?

--

to optimize creating dataframes for each block of rows from the BQ Storage API without having the intermediate step of Python objects.

To what extent could numba receive an avro blob and return an array? I might have thought that was pushing numba outside its comfort zone of numerical computing, but if it can, that could be a new frontier

tswast commented 5 years ago

Is numba returning a 0d numpy array with a string inside?

It's a byte array with utf-8 bytes. I haven't figured out how to efficiently convert that to a string, which is why I was considering a Row wrapper class to convert to string only when necessary.

@numba.jit(nopython=True)
def _read_bytes(position, block):
    position, strlen = _read_long(position, block)
    value = numpy.empty(strlen, dtype=numpy.uint8)
    for i in range(strlen):
        value[i] = block[position + i]  # Copy the bytes, ugh.
    return (position + strlen, value)

Is the Python object constructor in the numba routine?

Outside of it. I put it in the next() function. Potentially I could do it in a Python jit function, but since I still have to iterate over the row block in Python code, I didn't measure any benefits from doing that.

def next(self):
    """Get the next row in the page."""
    self._parse_block()  # Exits immediately if the block is already parsed.
    if self._remaining > 0:
        self._remaining -= 1
    return Row(self._schema, six.next(self._iter_rows))

To what extent could numba receive an avro blob and return an array?

Actually, lists and list comprehensions are in the list of supported nopython features, so that's what I'm returning: a list of tuples. Based on the slowdown I'm seeing, I suspect they aren't true lists/tuples and convert themselves into one if you try to use them in an unsupported way, like pass into an object constructor, but I haven't traced through to verify that.

max-sixty commented 5 years ago

so that's what I'm returning: a list of tuples.

I mean an array (likely a numpy array), which can hold raw floats, laid out in contiguous memory. Rather than a python list or tuple, which holds references to python floats, and so requires all the python object construction

tswast commented 5 years ago

Do you mean an array per column? Or somehow identify where each row/column starts/ends in a single array?

One idea I keep coming back to is making a uint8 (bytes) numpy array that represents an Arrow in-memory layout. That should be fast to construct in numba nopython mode, and I think Arrow does no copy operations if you create a pyarrow.Buffer from such an array. It still means making at least one copy, but that's impossible to get away from if you're translating to different formats.

Only problem with Arrow is it's easy / fast to then convert to pandas array (could even use Fletcher to keep the advantages of having an Arrow table), but it's still not optimized for getting individual rows. (https://stackoverflow.com/a/53168554/101923) I might keep fastavro around for that use case, since it seems to do an okay job of it.

max-sixty commented 5 years ago

Do you mean an array per column?

Yes, exacly. An array per column is the underlying pandas data structure (sometimes it uses a single array for multiple identically-typed columns, but we can ignore that)

I think you'd want to return a list of arrays, because you only want to parse the row-oriented blocks once. You can't return a 2D-array unless all the types are the same.

One idea I keep coming back to is making a uint8 (bytes) numpy array that represents an Arrow in-memory layout.

Right, I think that's the same thing I'm thinking.

Why uint8 though? Presumably it depends on the datatype (and it's basically never an 8 bit int - or am I misunderstanding something?) Once it's in (almost) any array format (array as in contiguous array), it should be very fast to transform to what we need - numpy / arrow / even parquet given the availability of libraries to transform between them. Do you agree?

It still means making at least one copy, but that's impossible to get away from if you're translating to different formats.

I agree - given the row -> column orientation, we're always going to have to make one copy at a minimum

jkgenser commented 5 years ago

It seems that this thread is largely around the io of getting BQ storage API results from the their binary format sent over the wire into Python memory in an efficient way. First of all, I've already started using the enhancement to the bigquery client package that @tswast committed a few weeks ago and there's already a marked speedup in reading in Avro vs. JSON. So hat tip for that.

However, just wondering what this thread's thoughts are on implementing something like (n_jobs=2) parameter or something like that around reading in the Avro streams in parallel? While it's definitely worthwhile to make a given stream read in faster, are there thoughts on speeding up reads through parallelism?

max-sixty commented 5 years ago

Multiprocessing is an option. It's a fairly heavy duty one for the size of the problem, so I think it would be preferable to speed up the single-process version...

tswast commented 5 years ago

I actually had some thoughts about how to make a parallel Avro parser. We'd have to do it in two passes, since there are a lot of integers that need to be parsed to identify where the record boundaries are. From my notes:

Avro could be parsed in parallel if we do a first pass to identify the offset of each record. We can even keep track of the record order so that when written to Arrow, it doesn't matter the actual execution order.

Also, having a first pass solves the array sizing problem for strings. I need to parse how long each string is in order to find record boundaries, so can track the total bytes in the first pass. Could precalculate Arrow offsets for strings in the first pass, too.

I have tested with processing blocks/pages in parallel within a stream, too, and got a 5-15% speedup. I haven't contributed that change yet, though because I do want to retain the block/page order, especially in the single stream case, so I'd need to do something to make sure the blocks are actually concatenated in the right order once we get dataframes from them.

tswast commented 5 years ago

Also, while not trivial 5-15% speedup is nothing compared to the experiments with avoiding creating Python objects and going straight to dataframe from Avro (maybe with Arrow as intermediary?), which can double the speed, so my efforts have been focussed there first.

jkgenser commented 5 years ago

I know the arrow project is working on compatibility with Avro for 2019, not sure how far along they've gotten. I've achieved very large order of magnitude speedsup in reading data over ODBC into python using turbodbc's fetchallarrow() method.

dkapitan commented 5 years ago

@jkgenser I have had similar experience whilst testing different file-formats from cloud storage, part of our internal workflow. Arrow/parquet is the fastest in reading into pandas, at the cost of longer write times (in our case export from SQL server).

tswast commented 5 years ago

It will take some refactoring (just-in-time codegen!) and testing before it's production-ready, but I want to give an update. I've built an implementation at https://github.com/tswast/google-cloud-python/blob/issue7805-block_reader/bigquery_storage/google/cloud/bigquery_storage_v1beta1/_avro_to_arrow.py which can parse a specific 10,000,000 rows & 162.12 MB table containing INT64, FLOAT64, and BOOL columns (the three easiest scalar types to handle).

With this custom parser, powered by Numba and PyArrow, I can download and parse the whole table in 16 seconds

$ time python benchmark/parse_avro_to_arrow.py
10000000
python benchmark/parse_avro_to_arrow.py  7.65s user 1.68s system 59% cpu 15.560 total

versus 1 minute, 10 seconds with fastavro

$ time python benchmark/parse_df.py
10000000
python benchmark/parse_df.py  57.36s user 2.35s system 85% cpu 1:10.20 total

I want to handle strings next, but string support will require solving https://issues.apache.org/jira/browse/ARROW-5531 in pyarrow. Instead, I plan to get a preliminary implementation ready that supports tables with only INT64, FLOAT64, and BOOL columns. From there, we can improve the parser to add support for more types and maybe add a hidden flag to use the fast path when available.

dkapitan commented 5 years ago

@tswast that's a significant performance improvement, nice work. Will have a look myself to learn more about arrow.

tswast commented 5 years ago

Closing as obsolete now that we are using Arrow as the default wire format in google-cloud-bigquery's to_dataframe methods.