pluralsight / spavro

Spavro is a (sp)eedier avro library -- Spavro is a fork of the official Apache AVRO python 2 implementation with the goal of greatly improving data read deserialization and write serialization performance.
Apache License 2.0
26 stars 15 forks source link

low performance when parsing many short documents #14

Closed hrabalp closed 3 years ago

hrabalp commented 3 years ago

I have a kafka stream of small avro documents, often only 1 entry but with very complex schema... performance is realy bad (same for fastavro) root cause is schema parsing ... I understand that for standart spavro usage it is irelevant but on stream of small avro documents its devastating.

There are several options how to improve ...

statlelss aproach:

statefull aproach -

performance would be dramaticaly improved but behaviour would be little bit different

I am looking into statefull aproach but I am curious on your perspective

mikepk commented 3 years ago

Hi there... yeah this isn't a use case spavro was optimized for but you should be able to use the "parts" of Spavro to build something that works for your case.

I don't think changing the json parsing is going to save you much (I could be wrong) -- I assume if performance is bad you're doing this reading of a few records a lot, in which case the work in schema parsing would eat your time.

Like you suggest, you could build your own reader implementation that parses the schema once and re-uses the reader or parser for each file/stream.

The code should be fairly straightforward.

import json
import sys
import spavro.schema
import spavro.io
import struct

def make_spavro_reader(json_schema):
    parsed_schema = spavro.schema.parse(json_schema)
    reader = spavro.io.DatumReader(parsed_schema)
    def read_func(record):
        '''Take a file-like binary stream record and return a decoded datum'''
        decoder = spavro.io.BinaryDecoder(record)
        return reader.read(decoder)
    return read_func

reader = make_spavro_reader(schema)
for single_record_in_binary in kafka_stream:
    record_bytestream = io.BytesIO(single_record_in_binary)
    datum = reader(record_bytestream)

You would need to build a small cache keyed off schema names containing the read functions.

Then you would need to pass streamIO binary data to the reader function. If you know the schema you could just reuse the datum reader. You would pass your Kafka records to the datum reader function and just keep the process running continuously to re-process your small records.

I'm not sure if I completely understand your use case, but I think for continuously processing small counts of records of the same schema type this would probably be your best bet.

Also one side note -- if you're using Confluent's schema registry and record format you'll need to take care of the "magic bytes" at the beginning of each avro encoded kafka record. Confluent's model is to encode metadata about the schema in each record using a schema_id. You could use the schema_id as your cache key.

The following would take the binary of a single kafka message -- avro encoded in confluent's format -- and read out the datum. Traditionally you might use the schema_id to look up the schema in the confluent schema_registry and cache that... similar idea. In your case you would just keep the same process running regardless of the number of topics you read (I assume).

reader_cache = {}

def confluent_format_read(confluent_format_avro_record):
    bytes_reader = io.BytesIO(confluent_format_avro_record)
    # first 5 bytes are magic byte and schema id (Confluent format)
    magic_byte, schema_id = struct.unpack("!bi", bytes_reader.read(5))

    reader = cache.get(schema_id, None)
    if reader is not None:
        return reader(bytes_reader)

    reader = reader_cache[schema_id] = make_spavro_reader(schema)
    return reader(bytes_reader)  

Huge caveat -- I haven't tested the above at all so it may not work exactly as written but it should be pretty close. Hope this helps.

hrabalp commented 3 years ago

Hi, I was playing with this few weeks ago and I can confirm that swaping parser had only minimal effect. json parsing is expensive but thereare much more expensive parts within reader / writer schema merge.

Generaly there are 3 significant CPU hogs 1) schema parse + schema merge (merge is more significant) 2) header schema parsing 3) datum extraction

all numbers are depending on your schema complexity but for my case it was 45% for schema parse, 5%header schema parse 45% for datum extraction, 5% everything else. if you have small datums than header schema parse will be much more significant

we are not using confluent and I was not realy sure about avro internals so I created cache inside reader itself where spavro already handles schema bytestring extraction.

anyway I was able to get major speedup by just cachng parsed data schemas, another improvement might be achieved by header schema caching.

to be sure I am doing it correctly I used schema string itself as a key, works fast and its 100% reliable :) still your aproach seems to be much more clear and would handle also the header issue.

I will close this issue but I will use it as a reference how to aproach this problem in the future