druid-io / pydruid

A Python connector for Druid
Other
506 stars 194 forks source link

Optimize rows_from_chunks #262

Open mauguignard opened 3 years ago

mauguignard commented 3 years ago

My team and I are considering moving into Druid. As part of our tests, we were benchmarking some big queries and we found out that the fetch of the rows was taking longer than expected. After some profiling, we traced back the bottleneck to the loop in rows_from_chunks.

The new version introduced in this PR reduced the wall time of this function from 28.70s to only 4.93s (providing an effective speedup of ~5.8x) when running the following benchmark on my laptop (Ubuntu 20.04 - CPython 3.7.10 - Intel i5-8250U):

import argparse
import datetime
import json
import logging
import timeit
from collections import OrderedDict
from random import Random

from pydruid.db.api import rows_from_chunks

STRING_CHOICES = [
    "alice",
    "bob",
    r"ali\"ce",
    "ali{ce",
    r"b\ob",
    "{bob}",
    "{1: 2}",
    r"{\"id\": 1}",
]

def generate_rows():
    seed = 123
    rnd = Random(seed)

    rows = []
    base_timestamp = datetime.datetime(2021, 1, 1).timestamp()

    for i in range(500_000):
        row = OrderedDict(
            [
                (
                    "__time",
                    datetime.datetime.fromtimestamp(
                        rnd.random() * base_timestamp
                    ).isoformat(),
                ),
                ("dimension0", rnd.choice(STRING_CHOICES)),
                ("dimension1", rnd.choice(STRING_CHOICES)),
                ("dimension2", rnd.choice(STRING_CHOICES)),
                ("dimension3", rnd.choice(STRING_CHOICES)),
                ("dimension4", rnd.choice(STRING_CHOICES)),
                ("counter0", rnd.randrange(10_000_000)),
                ("counter1", rnd.randrange(10_000_000)),
                ("counter2", rnd.randrange(10_000_000)),
                ("counter3", rnd.randrange(10_000_000)),
                ("counter4", rnd.randrange(10_000_000)),
                ("counter5", rnd.randrange(10_000_000)),
                ("counter6", rnd.randrange(10_000_000)),
                ("counter7", rnd.randrange(10_000_000)),
                ("counter8", rnd.randrange(10_000_000)),
                ("counter9", rnd.randrange(10_000_000)),
                ("counter0f", rnd.randrange(10_000_000) / 10_000.0),
                ("counter1f", rnd.randrange(10_000_000) / 10_000.0),
                ("counter2f", rnd.randrange(10_000_000) / 10_000.0),
                ("counter3f", rnd.randrange(10_000_000) / 10_000.0),
                ("counter4f", rnd.randrange(10_000_000) / 10_000.0),
                ("counter5f", rnd.randrange(10_000_000) / 10_000.0),
                ("counter6f", rnd.randrange(10_000_000) / 10_000.0),
                ("counter7f", rnd.randrange(10_000_000) / 10_000.0),
                ("counter8f", rnd.randrange(10_000_000) / 10_000.0),
                ("counter9f", rnd.randrange(10_000_000) / 10_000.0),
            ]
        )
        rows.append(row)

    return rows

def generate_chunks(rows, chunk_size=8192):
    body = json.dumps(rows, separators=(",", ":"))
    return [body[i: i + chunk_size] for i in range(0, len(body), chunk_size)]

def verify():
    logging.info("Generating data...")
    rows = generate_rows()

    logging.info("Generating chunks...")
    chunks = generate_chunks(rows)

    logging.info("Parsing chunks...")
    parsed_rows = list(rows_from_chunks(chunks))

    logging.info("Verifying results...")

    assert len(rows) == len(parsed_rows), "The number of rows is not the expected"

    for row, parsed_row in zip(rows, parsed_rows):
        assert tuple(row.items()) == tuple(
            parsed_row.items()
        ), "Rows differ. %r != %r" % (row, parsed_row)

def main(options):
    if options.verify:
        verify()

    logging.info("Starting benchmark...")
    timer = timeit.Timer(
        setup="chunks = generate_chunks(generate_rows())",
        stmt="list(rows_from_chunks(chunks))",
        globals=globals(),
    )

    timings = timer.repeat(repeat=options.repeat, number=1)
    logging.info("Best timing = %f", min(timings))

if __name__ == "__main__":
    parser = argparse.ArgumentParser(description="Benchmark rows_from_chunks.")
    parser.add_argument("--verify", dest="verify", action="store_true")
    parser.add_argument("--no-verify", dest="verify", action="store_false")
    parser.set_defaults(verify=True)

    parser.add_argument("--repeat", "-r", type=int, default=20)

    logging.basicConfig(
        level=logging.INFO,
        format="%(asctime)s - [%(levelname)s] - %(name)s:%(module)s:%(funcName)s: %(message)s",
    )
    main(parser.parse_args())

In our tests, however, we had an actual speedup closer to 9x when fetching the rows from one of our test queries.

In Python 3.6 and lower, we cannot avoid the overhead of calling OrderedDict. In our tests, though, the speedup is still close to 4x.

As a side effect, this PR should also solve #242 , since the parsing is now completely delegated to the official json module.