intel / hdk

A low-level execution library for analytic data processing.
Apache License 2.0
31 stars 14 forks source link

H2O: checksum computation is 100 times slower for HDK vs Panas(or polars) #703

Open Egor-Krivov opened 11 months ago

Egor-Krivov commented 11 months ago

Currently HDK is very slow at checksum computation for H2O benchmark. It is significantly slower than pandas at checksum computation. Compared with polars it is significantly slower as well if we combine query execution and checksum computation.

Here are my results Query HDK execution time, s HDK checksum time, s Pandas checksum time, s
Groupby Q10 9.7 22.5 0.2
Join Q1 1.1 9.0 0.2
Join Q2 1.0 9.6 0.1
Join Q3 0.7 2.9 0.3
Join Q4 1.0 10.3 0.2
Join Q5 6.5 10.8 0.1

I will attach csv files with raw results in H2O format to this issue. results_groupby.csv results_join.csv

Might be related to https://github.com/intel-ai/hdk/issues/696

Groupby source code (mainly provided by HDK team) ``` #!/usr/bin/env python print("# groupby-pyhdk.py", flush=True) import os import gc import sys import timeit import pyhdk sys.path.append(os.path.join(os.path.dirname(__file__), "..", "_helpers")) from helpers import memory_usage, write_log, make_chk # exec(open("./_helpers/helpers.py").read()) ver = pyhdk.__version__ git = pyhdk.__version__ task = "groupby" solution = "pyhdk_nonlazy" fun = ".groupby" cache = "TRUE" on_disk = "FALSE" data_name = os.environ["SRC_DATANAME"] src_grp = os.path.join("data", data_name + ".csv") print("loading dataset %s" % data_name, flush=True) # hdk = pyhdk.init(enable_cpu_groupby_multifrag_kernels=False) # hdk = pyhdk.init(enable_non_lazy_data_import=True)# enable_cpu_groupby_multifrag_kernels=False) pyhdk.initLogger(debug_logs=True) hdk = pyhdk.init(enable_debug_timer=True, ) x = hdk.import_csv( src_grp, schema={ "id1": "dict", "id2": "dict", "id3": "dict", "id4": "int32", "id5": "int32", "id6": "int32", "v1": "int32", "v2": "int32", "v3": "fp64", }, ) # TODO: use warm-up SQL query if using SQL in bench task_init = timeit.default_timer() print("grouping...", flush=True) question = "sum v1 by id1" # q1 gc.collect() t_start = timeit.default_timer() ans = x.agg("id1", v1="sum(v1)").run() t = timeit.default_timer() - t_start print(ans.shape, flush=True) m = memory_usage() t_start = timeit.default_timer() chk = ans.agg([], v1="sum(v1)").run().row(0) chkt = timeit.default_timer() - t_start write_log( task=task, data=data_name, in_rows=x.shape[0], question=question, out_rows=ans.shape[0], out_cols=ans.shape[1], solution=solution, version=ver, git=git, fun=fun, run=1, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk, ) del ans gc.collect() t_start = timeit.default_timer() ans = x.agg("id1", v1="sum(v1)").run() t = timeit.default_timer() - t_start print(ans.shape, flush=True) m = memory_usage() t_start = timeit.default_timer() chk = ans.agg([], v1="sum(v1)").run().row(0) chkt = timeit.default_timer() - t_start write_log( task=task, data=data_name, in_rows=x.shape[0], question=question, out_rows=ans.shape[0], out_cols=ans.shape[1], solution=solution, version=ver, git=git, fun=fun, run=2, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk, ) print(ans.head(3), flush=True) print(ans.tail(3), flush=True) del ans question = "sum v1 by id1:id2" # q2 gc.collect() t_start = timeit.default_timer() ans = x.agg(["id1", "id2"], v1="sum(v1)").run() print(ans.shape, flush=True) t = timeit.default_timer() - t_start m = memory_usage() t_start = timeit.default_timer() chk = ans.agg([], v1="sum(v1)").run().row(0) chkt = timeit.default_timer() - t_start write_log( task=task, data=data_name, in_rows=x.shape[0], question=question, out_rows=ans.shape[0], out_cols=ans.shape[1], solution=solution, version=ver, git=git, fun=fun, run=1, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk, ) del ans gc.collect() t_start = timeit.default_timer() ans = x.agg(["id1", "id2"], v1="sum(v1)").run() print(ans.shape, flush=True) t = timeit.default_timer() - t_start m = memory_usage() t_start = timeit.default_timer() chk = ans.agg([], v1="sum(v1)").run().row(0) chkt = timeit.default_timer() - t_start write_log( task=task, data=data_name, in_rows=x.shape[0], question=question, out_rows=ans.shape[0], out_cols=ans.shape[1], solution=solution, version=ver, git=git, fun=fun, run=2, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk, ) print(ans.head(3), flush=True) print(ans.tail(3), flush=True) del ans question = "sum v1 mean v3 by id3" # q3 gc.collect() t_start = timeit.default_timer() ans = x.agg("id3", v1="sum(v1)", v3="avg(v3)").run() print(ans.shape, flush=True) t = timeit.default_timer() - t_start m = memory_usage() t_start = timeit.default_timer() chk = ans.agg([], v1="sum(v1)", v3="sum(v3)").run().row(0) chkt = timeit.default_timer() - t_start write_log( task=task, data=data_name, in_rows=x.shape[0], question=question, out_rows=ans.shape[0], out_cols=ans.shape[1], solution=solution, version=ver, git=git, fun=fun, run=1, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk, ) del ans gc.collect() t_start = timeit.default_timer() ans = x.agg("id3", v1="sum(v1)", v3="avg(v3)").run() print(ans.shape, flush=True) t = timeit.default_timer() - t_start m = memory_usage() t_start = timeit.default_timer() chk = ans.agg([], v1="sum(v1)", v3="sum(v3)").run().row(0) chkt = timeit.default_timer() - t_start write_log( task=task, data=data_name, in_rows=x.shape[0], question=question, out_rows=ans.shape[0], out_cols=ans.shape[1], solution=solution, version=ver, git=git, fun=fun, run=2, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk, ) print(ans.head(3), flush=True) print(ans.tail(3), flush=True) del ans question = "mean v1:v3 by id4" # q4 gc.collect() t_start = timeit.default_timer() ans = x.agg("id4", v1="avg(v1)", v2="avg(v2)", v3="avg(v3)").run() print(ans.shape, flush=True) t = timeit.default_timer() - t_start m = memory_usage() t_start = timeit.default_timer() chk = ans.agg([], v1="sum(v1)", v2="sum(v2)", v3="sum(v3)").run().row(0) chkt = timeit.default_timer() - t_start write_log( task=task, data=data_name, in_rows=x.shape[0], question=question, out_rows=ans.shape[0], out_cols=ans.shape[1], solution=solution, version=ver, git=git, fun=fun, run=1, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk, ) del ans gc.collect() t_start = timeit.default_timer() ans = x.agg("id4", v1="avg(v1)", v2="avg(v2)", v3="avg(v3)").run() print(ans.shape, flush=True) t = timeit.default_timer() - t_start m = memory_usage() t_start = timeit.default_timer() chk = ans.agg([], v1="sum(v1)", v2="sum(v2)", v3="sum(v3)").run().row(0) chkt = timeit.default_timer() - t_start write_log( task=task, data=data_name, in_rows=x.shape[0], question=question, out_rows=ans.shape[0], out_cols=ans.shape[1], solution=solution, version=ver, git=git, fun=fun, run=2, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk, ) print(ans.head(3), flush=True) print(ans.tail(3), flush=True) del ans question = "sum v1:v3 by id6" # q5 gc.collect() t_start = timeit.default_timer() ans = x.agg("id6", v1="sum(v1)", v2="sum(v2)", v3="sum(v3)").run() print(ans.shape, flush=True) t = timeit.default_timer() - t_start m = memory_usage() t_start = timeit.default_timer() chk = ans.agg([], v1="sum(v1)", v2="sum(v2)", v3="sum(v3)").run().row(0) chkt = timeit.default_timer() - t_start write_log( task=task, data=data_name, in_rows=x.shape[0], question=question, out_rows=ans.shape[0], out_cols=ans.shape[1], solution=solution, version=ver, git=git, fun=fun, run=1, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk, ) del ans gc.collect() t_start = timeit.default_timer() ans = x.agg("id6", v1="sum(v1)", v2="sum(v2)", v3="sum(v3)").run() print(ans.shape, flush=True) t = timeit.default_timer() - t_start m = memory_usage() t_start = timeit.default_timer() chk = ans.agg([], v1="sum(v1)", v2="sum(v2)", v3="sum(v3)").run().row(0) chkt = timeit.default_timer() - t_start write_log( task=task, data=data_name, in_rows=x.shape[0], question=question, out_rows=ans.shape[0], out_cols=ans.shape[1], solution=solution, version=ver, git=git, fun=fun, run=2, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk, ) print(ans.head(3), flush=True) print(ans.tail(3), flush=True) del ans question = "median v3 sd v3 by id4 id5" # q6 gc.collect() t_start = timeit.default_timer() ans = x.agg( ["id4", "id5"], v3_median="approx_quantile(v3, 0.5)", v3_stddev="stddev(v3)" ).run() print(ans.shape, flush=True) t = timeit.default_timer() - t_start m = memory_usage() t_start = timeit.default_timer() chk = ans.agg([], "sum(v3_median)", "sum(v3_stddev)").run().row(0) chkt = timeit.default_timer() - t_start write_log( task=task, data=data_name, in_rows=x.shape[0], question=question, out_rows=ans.shape[0], out_cols=ans.shape[1], solution=solution, version=ver, git=git, fun=fun, run=1, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk, ) del ans gc.collect() t_start = timeit.default_timer() ans = x.agg( ["id4", "id5"], v3_median="approx_quantile(v3, 0.5)", v3_stddev="stddev(v3)" ).run() print(ans.shape, flush=True) t = timeit.default_timer() - t_start m = memory_usage() t_start = timeit.default_timer() chk = ans.agg([], "sum(v3_median)", "sum(v3_stddev)").run().row(0) chkt = timeit.default_timer() - t_start write_log( task=task, data=data_name, in_rows=x.shape[0], question=question, out_rows=ans.shape[0], out_cols=ans.shape[1], solution=solution, version=ver, git=git, fun=fun, run=2, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk, ) print(ans.head(3), flush=True) print(ans.tail(3), flush=True) del ans question = "max v1 - min v2 by id3" # q7 gc.collect() t_start = timeit.default_timer() tmp = x.agg("id3", "max(v1)", "min(v2)") ans = tmp.proj("id3", range_v1_v2=tmp["v1_max"] - tmp["v2_min"]).run() print(ans.shape, flush=True) t = timeit.default_timer() - t_start m = memory_usage() t_start = timeit.default_timer() chk = ans.agg([], range_v1_v2="sum(range_v1_v2)").run().row(0) chkt = timeit.default_timer() - t_start write_log( task=task, data=data_name, in_rows=x.shape[0], question=question, out_rows=ans.shape[0], out_cols=ans.shape[1], solution=solution, version=ver, git=git, fun=fun, run=1, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk, ) del ans gc.collect() t_start = timeit.default_timer() tmp = x.agg("id3", "max(v1)", "min(v2)") ans = tmp.proj("id3", range_v1_v2=tmp["v1_max"] - tmp["v2_min"]).run() print(ans.shape, flush=True) t = timeit.default_timer() - t_start m = memory_usage() t_start = timeit.default_timer() chk = ans.agg([], range_v1_v2="sum(range_v1_v2)").run().row(0) chkt = timeit.default_timer() - t_start write_log( task=task, data=data_name, in_rows=x.shape[0], question=question, out_rows=ans.shape[0], out_cols=ans.shape[1], solution=solution, version=ver, git=git, fun=fun, run=2, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk, ) print(ans.head(3), flush=True) print(ans.tail(3), flush=True) del ans question = "largest two v3 by id6" # q8 gc.collect() t_start = timeit.default_timer() tmp = x.proj( "id6", "v3", row_no=hdk.row_number().over(x.ref("id6")).order_by((x.ref("v3"), "desc")), ) ans = tmp.filter(tmp.ref("row_no") < 3).proj("id6", "v3").run() print(ans.shape, flush=True) t = timeit.default_timer() - t_start m = memory_usage() t_start = timeit.default_timer() chk = ans.agg([], "sum(v3)").run().row(0) chkt = timeit.default_timer() - t_start write_log( task=task, data=data_name, in_rows=x.shape[0], question=question, out_rows=ans.shape[0], out_cols=ans.shape[1], solution=solution, version=ver, git=git, fun=fun, run=1, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk, ) del ans gc.collect() t_start = timeit.default_timer() tmp = x.proj( "id6", "v3", row_no=hdk.row_number().over(x.ref("id6")).order_by((x.ref("v3"), "desc")), ) ans = tmp.filter(tmp.ref("row_no") < 3).proj("id6", "v3").run() print(ans.shape, flush=True) t = timeit.default_timer() - t_start m = memory_usage() t_start = timeit.default_timer() chk = ans.agg([], "sum(v3)").run().row(0) chkt = timeit.default_timer() - t_start write_log( task=task, data=data_name, in_rows=x.shape[0], question=question, out_rows=ans.shape[0], out_cols=ans.shape[1], solution=solution, version=ver, git=git, fun=fun, run=2, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk, ) print(ans.head(3), flush=True) print(ans.tail(3), flush=True) del ans question = "regression v1 v2 by id2 id4" # q9 gc.collect() t_start = timeit.default_timer() tmp = x.agg(["id2", "id4"], r2="corr(v1, v2)") ans = tmp.proj(r2=tmp["r2"] * tmp["r2"]).run() print(ans.shape, flush=True) t = timeit.default_timer() - t_start m = memory_usage() t_start = timeit.default_timer() chk = ans.agg([], r2="sum(r2)").run().row(0) chkt = timeit.default_timer() - t_start write_log( task=task, data=data_name, in_rows=x.shape[0], question=question, out_rows=ans.shape[0], out_cols=ans.shape[1], solution=solution, version=ver, git=git, fun=fun, run=1, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk, ) del ans gc.collect() t_start = timeit.default_timer() tmp = x.agg(["id2", "id4"], r2="corr(v1, v2)") ans = tmp.proj(r2=tmp["r2"] * tmp["r2"]).run() print(ans.shape, flush=True) t = timeit.default_timer() - t_start m = memory_usage() t_start = timeit.default_timer() chk = ans.agg([], r2="sum(r2)").run().row(0) chkt = timeit.default_timer() - t_start write_log( task=task, data=data_name, in_rows=x.shape[0], question=question, out_rows=ans.shape[0], out_cols=ans.shape[1], solution=solution, version=ver, git=git, fun=fun, run=2, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk, ) print(ans.head(3), flush=True) print(ans.tail(3), flush=True) del ans question = "sum v3 count by id1:id6" # q10 gc.collect() t_start = timeit.default_timer() ans = x.agg(["id1", "id2", "id3", "id4", "id5", "id6"], v3="sum(v3)", v1="count").run() print(ans.shape, flush=True) t = timeit.default_timer() - t_start m = memory_usage() t_start = timeit.default_timer() chk = ans.agg([], v3="sum(v3)", v1="sum(v1)").run().row(0) chkt = timeit.default_timer() - t_start write_log( task=task, data=data_name, in_rows=x.shape[0], question=question, out_rows=ans.shape[0], out_cols=ans.shape[1], solution=solution, version=ver, git=git, fun=fun, run=1, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk, ) del ans gc.collect() t_start = timeit.default_timer() ans = x.agg(["id1", "id2", "id3", "id4", "id5", "id6"], v3="sum(v3)", v1="count").run() print(ans.shape, flush=True) t = timeit.default_timer() - t_start m = memory_usage() t_start = timeit.default_timer() chk = ans.agg([], v3="sum(v3)", v1="sum(v1)").run().row(0) chkt = timeit.default_timer() - t_start write_log( task=task, data=data_name, in_rows=x.shape[0], question=question, out_rows=ans.shape[0], out_cols=ans.shape[1], solution=solution, version=ver, git=git, fun=fun, run=2, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk, ) print(ans.head(3), flush=True) print(ans.tail(3), flush=True) del ans print( "grouping finished, took %0.fs" % (timeit.default_timer() - task_init), flush=True ) exit(0) ```
Join source code (mainly provided by HDK team) ``` #!/usr/bin/env python print("# join-pyhdk.py", flush=True) import os import gc import timeit import sys import pyhdk sys.path.append(os.path.join(os.path.dirname(__file__), "..", "_helpers")) from helpers import memory_usage, write_log, make_chk, join_to_tbls # exec(open("./_helpers/helpers.py").read()) ver = pyhdk.__version__ git = pyhdk.__version__ task = "join" solution = "pyhdk" fun = ".join" cache = "TRUE" on_disk = "FALSE" data_name = os.environ["SRC_DATANAME"] src_jn_x = os.path.join("data", data_name + ".csv") y_data_name = join_to_tbls(data_name) print("pyhdk data_name: ", data_name) src_jn_y = [ os.path.join("data", y_data_name[0] + ".csv"), os.path.join("data", y_data_name[1] + ".csv"), os.path.join("data", y_data_name[2] + ".csv"), ] if len(src_jn_y) != 3: raise Exception("Something went wrong in preparing files used for join") print( "loading datasets " + data_name + ", " + y_data_name[0] + ", " + y_data_name[1] + ", " + y_data_name[2], flush=True, ) pyhdk_init_args = {} # pyhdk_init_args["enable_debug_timer"] = True # pyhdk_init_args["enable-non-lazy-data-import"] = True pyhdk_init_args["enable_cpu_groupby_multifrag_kernels"] = False pyhdk.initLogger(debug_logs=True) fragment_size = int(os.environ["FRAGMENT_SIZE"]) print(f"Using fragment size {fragment_size}") hdk = pyhdk.init(**pyhdk_init_args) # TODO: use 32-bit integers for less memory consumption and better perf x = hdk.import_csv( src_jn_x, schema={ "id1": "int32", "id2": "int32", "id3": "int32", "id4": "dict", "id5": "dict", "id6": "dict", "v1": "fp64", }, fragment_size=fragment_size, ) small = hdk.import_csv( src_jn_y[0], schema={"id1": "int32", "id4": "dict", "v2": "fp64"}, fragment_size=fragment_size, ) medium = hdk.import_csv( src_jn_y[1], schema={"id1": "int32", "id2": "int32", "id4": "dict", "id5": "dict", "v2": "fp64"}, fragment_size=fragment_size, ) big = hdk.import_csv( src_jn_y[2], schema={ "id1": "int32", "id2": "int32", "id3": "int32", "id4": "dict", "id5": "dict", "id6": "dict", "v2": "fp64", }, fragment_size=fragment_size, ) print(x.shape[0], flush=True) print(small.shape[0], flush=True) print(medium.shape[0], flush=True) print(big.shape[0], flush=True) task_init = timeit.default_timer() print("joining...", flush=True) question = "small inner on int" # q1 gc.collect() t_start = timeit.default_timer() ans = x.join(small, "id1").run() print(ans.shape, flush=True) t = timeit.default_timer() - t_start m = memory_usage() t_start = timeit.default_timer() chk = ans.agg([], v1="sum(v1)", v2="sum(v2)").run().row(0) chkt = timeit.default_timer() - t_start write_log( task=task, data=data_name, in_rows=x.shape[0], question=question, out_rows=ans.shape[0], out_cols=ans.shape[1], solution=solution, version=ver, git=git, fun=fun, run=1, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk, ) del ans gc.collect() t_start = timeit.default_timer() ans = x.join(small, "id1").run() print(ans.shape, flush=True) t = timeit.default_timer() - t_start m = memory_usage() t_start = timeit.default_timer() chk = ans.agg([], v1="sum(v1)", v2="sum(v2)").run().row(0) chkt = timeit.default_timer() - t_start write_log( task=task, data=data_name, in_rows=x.shape[0], question=question, out_rows=ans.shape[0], out_cols=ans.shape[1], solution=solution, version=ver, git=git, fun=fun, run=2, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk, ) # print(ans.head(3), flush=True) # print(ans.tail(3), flush=True) del ans question = "medium inner on int" # q2 gc.collect() t_start = timeit.default_timer() ans = x.join(medium, "id2").run() print(ans.shape, flush=True) t = timeit.default_timer() - t_start m = memory_usage() t_start = timeit.default_timer() chk = ans.agg([], v1="sum(v1)", v2="sum(v2)").run().row(0) chkt = timeit.default_timer() - t_start write_log( task=task, data=data_name, in_rows=x.shape[0], question=question, out_rows=ans.shape[0], out_cols=ans.shape[1], solution=solution, version=ver, git=git, fun=fun, run=1, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk, ) del ans gc.collect() t_start = timeit.default_timer() ans = x.join(medium, "id2").run() print(ans.shape, flush=True) t = timeit.default_timer() - t_start m = memory_usage() t_start = timeit.default_timer() chk = ans.agg([], v1="sum(v1)", v2="sum(v2)").run().row(0) chkt = timeit.default_timer() - t_start write_log( task=task, data=data_name, in_rows=x.shape[0], question=question, out_rows=ans.shape[0], out_cols=ans.shape[1], solution=solution, version=ver, git=git, fun=fun, run=2, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk, ) # print(ans.head(3), flush=True) # print(ans.tail(3), flush=True) del ans question = "medium outer on int" # q3 gc.collect() t_start = timeit.default_timer() ans = x.join(medium, "id2", how="left").run() print(ans.shape, flush=True) t = timeit.default_timer() - t_start m = memory_usage() t_start = timeit.default_timer() chk = ans.agg([], v1="sum(v1)", v2="sum(v2)").run().row(0) chkt = timeit.default_timer() - t_start write_log( task=task, data=data_name, in_rows=x.shape[0], question=question, out_rows=ans.shape[0], out_cols=ans.shape[1], solution=solution, version=ver, git=git, fun=fun, run=1, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk, ) del ans gc.collect() t_start = timeit.default_timer() ans = x.join(medium, "id2", how="left").run() print(ans.shape, flush=True) t = timeit.default_timer() - t_start m = memory_usage() t_start = timeit.default_timer() chk = ans.agg([], v1="sum(v1)", v2="sum(v2)").run().row(0) chkt = timeit.default_timer() - t_start write_log( task=task, data=data_name, in_rows=x.shape[0], question=question, out_rows=ans.shape[0], out_cols=ans.shape[1], solution=solution, version=ver, git=git, fun=fun, run=2, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk, ) # print(ans.head(3), flush=True) # print(ans.tail(3), flush=True) del ans question = "medium inner on factor" # q4 gc.collect() t_start = timeit.default_timer() ans = x.join(medium, "id5").run() print(ans.shape, flush=True) t = timeit.default_timer() - t_start m = memory_usage() t_start = timeit.default_timer() chk = ans.agg([], v1="sum(v1)", v2="sum(v2)").run().row(0) chkt = timeit.default_timer() - t_start write_log( task=task, data=data_name, in_rows=x.shape[0], question=question, out_rows=ans.shape[0], out_cols=ans.shape[1], solution=solution, version=ver, git=git, fun=fun, run=1, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk, ) del ans gc.collect() t_start = timeit.default_timer() ans = x.join(medium, "id5").run() print(ans.shape, flush=True) t = timeit.default_timer() - t_start m = memory_usage() t_start = timeit.default_timer() chk = ans.agg([], v1="sum(v1)", v2="sum(v2)").run().row(0) chkt = timeit.default_timer() - t_start write_log( task=task, data=data_name, in_rows=x.shape[0], question=question, out_rows=ans.shape[0], out_cols=ans.shape[1], solution=solution, version=ver, git=git, fun=fun, run=2, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk, ) # print(ans.head(3), flush=True) # print(ans.tail(3), flush=True) del ans question = "big inner on int" # q5 gc.collect() t_start = timeit.default_timer() ans = x.join(big, "id3").run() print(ans.shape, flush=True) t = timeit.default_timer() - t_start m = memory_usage() t_start = timeit.default_timer() chk = ans.agg([], v1="sum(v1)", v2="sum(v2)").run().row(0) chkt = timeit.default_timer() - t_start write_log( task=task, data=data_name, in_rows=x.shape[0], question=question, out_rows=ans.shape[0], out_cols=ans.shape[1], solution=solution, version=ver, git=git, fun=fun, run=1, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk, ) del ans gc.collect() t_start = timeit.default_timer() ans = x.join(big, "id3").run() print(ans.shape, flush=True) t = timeit.default_timer() - t_start m = memory_usage() t_start = timeit.default_timer() chk = ans.agg([], v1="sum(v1)", v2="sum(v2)").run().row(0) chkt = timeit.default_timer() - t_start write_log( task=task, data=data_name, in_rows=x.shape[0], question=question, out_rows=ans.shape[0], out_cols=ans.shape[1], solution=solution, version=ver, git=git, fun=fun, run=2, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk, ) # print(ans.head(3), flush=True) # print(ans.tail(3), flush=True) del ans print("joining finished, took %0.fs" % (timeit.default_timer() - task_init), flush=True) exit(0) ```