deephaven / benchmark

Deephaven Benchmarking
Apache License 2.0
0 stars 3 forks source link

PartitionedBy Benchmarks #341

Closed stanbrub closed 2 weeks ago

stanbrub commented 1 month ago

JJ has been running into performance issues with partitioned tables being slower than their non-partitioned counterparts for the same operations (e.g. raj). This points out the need for some partition_by benchmarks. (A version of this was attempted previously but had problems because small data sets may or may not have a value needed for the test in the partition.)

Related:

Performance is better with partitions:

from deephaven.execution_context import get_exec_ctx
from string import ascii_uppercase
from deephaven import empty_table
from deephaven import agg
from random import choice
from time import time

def rand_sym(n, m) -> str:
    return choice(ascii_uppercase[n:m])

ctx = get_exec_ctx()

def transform_func(t):
    with ctx:
        return t.agg_by(agg.avg(["Value1", "Value2"]), ["Sym1", "Sym2"])

ns = [100_000, 500_000, 1_000_000, 5_000_000, 10_000_000]

for n in ns:
    print(f"{n} rows:")
    t = empty_table(n).update([
        "Sym1 = rand_sym(0, 10)",
        "Sym2 = rand_sym(10, 20)",
        "Value1 = randomDouble(0, 10)",
        "Value2 = randomDouble(-100, 100)"
    ])
    pt = t.partition_by(["Sym1", "Sym2"])
    pt_proxy = pt.proxy()

    start = time()
    t_agged = t.agg_by(agg.avg(["Value1", "Value2"]), ["Sym1", "Sym2"])
    end = time()

    print(f"\tTable agg_by: {(end - start):.4f} seconds.")

    start = time()
    pt_updated = pt.transform(transform_func)
    end = time()

    print(f"\tPartitioned table agg_by transform: {(end - start):.4f} seconds.")

    start = time()
    pt_proxy_agged = pt_proxy.agg_by(agg.avg(["Value1", "Value2"]), ["Sym1", "Sym2"]).target
    end = time()

    print(f"\tPartitioned table proxy agg_by: {(end - start):.4f} seconds.")

Performance is worse with partitions:

from deephaven.time import to_j_instant
from string import ascii_uppercase
from deephaven import empty_table
from random import choice
from time import time

def rand_exchange() -> str:
    return choice([f"Exchange_{letter}" for letter in ascii_uppercase[:7]])

def rand_ticker() -> str:
    return choice(["Q", "W", "E", "R", "T", "Y"])

start_time = to_j_instant("2024-03-01T00:00:00 ET")

n = 5_000_000

print(f"{n} rows:")

starter = empty_table(n).update(["Timestamp1 = start_time + ii * SECOND", "Timestamp2 = Timestamp1 + (long)(SECOND / 10)", "Exchange = rand_exchange()", "Symbol = rand_ticker()"])

quotes = starter.select(["Timestamp = Timestamp1", "Exchange", "Symbol"]).update(["QuotePrice = randomDouble(100.0, 200.0)"])
trades = starter.select(["Timestamp = Timestamp2", "Exchange", "Symbol"]).update(["TradePrice = randomDouble(100.0, 200.0)", "TradeSize = randomInt(1, 50)"])

pt_quotes = quotes.partition_by(["Exchange", "Symbol"])
pt_trades = trades.partition_by(["Exchange", "Symbol"])

pt_quotes_proxy = pt_quotes.proxy()
pt_trades_proxy = pt_trades.proxy()

start = time()
result = trades.raj(quotes, ["Exchange", "Symbol", "Timestamp"])
end = time()

print(f"\tNormal tables: {(end - start):.4f} seconds.")

start = time()
pt_result = pt_trades_proxy.raj(pt_quotes_proxy, ["Exchange", "Symbol", "Timestamp"]).target
end = time()

print(f"\tPartitioned table proxies: {(end - start):.4f} seconds.")

Suggestiong for S3 parquet partitioned test

A resource is the S3ParquetTestBase class
Look at readWriteKeyValuePartitionedParquetData test.