intel / hdk

A low-level execution library for analytic data processing.
Apache License 2.0
31 stars 14 forks source link

[Perf][Bench] Join is slow on big tables. #574

Open Devjiu opened 1 year ago

Devjiu commented 1 year ago

On size of tables ~10^8 join performance is very low, compared with duckdb.

On join by single int column for 2 tables 10^8 size takes about 12 s.

12888ms total duration for executeRelAlgQuery
        12886ms start(1ms) executeWorkUnit RelAlgExecutor.cpp:1389
          569ms start(1ms) compileWorkUnit NativeCodegen.cpp:1403
          5394ms start(571ms) fetchChunks Execute.cpp:3090
          67ms start(5966ms) executePlan Execute.cpp:3462
          9ms start(6487ms) collectAllDeviceResults Execute.cpp:2550
          70ms start(6497ms) compileWorkUnit NativeCodegen.cpp:1403
          3071ms start(6568ms) fetchChunks Execute.cpp:3090
          527ms start(9640ms) executePlan Execute.cpp:3462

The biggest performance drop is in fetchChunks. There are also 2 subqueries, the first is COUNT(*) to collect metadata.

Devjiu commented 1 year ago

Possible points/hints to increase performance:

  1. There are too much copying of data in getAllTableColumnFragments.
    585ms start(597ms) getAllTableColumnFragments ColumnFetcher.cpp:243
        68ms start(597ms) getOneTableColumnFragment ColumnFetcher.cpp:176
        58ms start(666ms) ColumnarResults ColumnarResults.cpp:124
        69ms start(724ms) getOneTableColumnFragment ColumnFetcher.cpp:176
        57ms start(793ms) ColumnarResults ColumnarResults.cpp:124
        69ms start(851ms) getOneTableColumnFragment ColumnFetcher.cpp:176
        57ms start(921ms) ColumnarResults ColumnarResults.cpp:124
        8ms start(979ms) getOneTableColumnFragment ColumnFetcher.cpp:176
        7ms start(987ms) ColumnarResults ColumnarResults.cpp:124
        187ms start(995ms) mergeResults ColumnarResults.cpp:139

    Looks like there are 3 chunk copies.

    • getChunkBuffer - should be copy from Arrow and aimed to create chunks
    • createZeroCopyBuffer - is fast and fine
    • if not we are going to persistent DataMgr, than to ArrowStorage::fetchBuffer <- this is slow.
    • mergeResults is too slow

1st copy - is copy from specific fragment to required format (arrow), 2nd copy - is copy of all fragments to united common structured format (getAllTableColumnFragments colunbFetcher), 3rd copy - is unknown <- possible issue.

  1. Why getAllTableColumnFragments took ~ 3 s. In some cases waiting to acquire a lock.

    2070ms start(2ms) getAllTableColumnFragments ColumnFetcher.cpp:248
    0ms start(2072ms) lock taken, execution started ColumnFetcher.cpp:262
  2. Columns in getAllTableColumnFragments are fetched not in parallel, but it's accessing different fragments. (connected with previous issue) It's possible point for improvement. Already tried, reduced total time for about 2 s,

  3. Second call of fetchChunks also do some data transfers. Didn't we already save the columns during the previous fetchChunks run? Most of cache is stored in Cpu/Gpu BufferMgr.

  4. At the end of the execution, we spent about 3.5 seconds on unknown activity, which looks suspicious.

  5. Fragment data sometimes is available without any copying as they already in the correct format.
    Do we keep copying data even when everything is fine with format? Currently there is no interface to check DataLayout, but maybe we should add something like this.

alexbaden commented 1 year ago

Support for multi-fragment joins has always been poor. There is multi-fragment join hash table construction, but the references from the hash table to the actual data are 0-indexed and therefore do not support multi-fragment.

count(*) first is a known pattern; it is used to size output buffers.

Hash tables should be cached, so I suspect the extra copies are part of the query. It might be worthwhile to check that the hash table cache is working between the count(*) and the actual query.

  1. is interesting - do you know what method this is happening in?

I am curious - what is the fragment size when you run these queries? Can you try running where the inner join table fits into a single fragment?

Devjiu commented 1 year ago

Discussed with Alex this issue. Possible solution here is to remove call of getAllTableColumnFragments with needFetchAllFragments function. During this function we are moving and linearizing data, that in common not required and can be covered with changing of accessing mechanism to hash table data by row id - to convert it to fragment_idx + offset instead of copying.
So it should be investigated with fragment_size manipulation and hash_table initiation on compileWorkUnit stage.

Devjiu commented 1 year ago

I found how to reduce generated fragment_size via number_fo_fragments variable. In this case number_fo_fragments = 100_000_000 reduces number of fragments per kernel to 1. e.g. with needFetchAllFragments number of fragments: 25 table_id: 2 fragment_size = 4_000_000

10782ms total duration for executeRelAlgQuery
  10781ms start(0ms) executeRelAlgQueryNoRetry RelAlgExecutor.cpp:216
    1ms start(0ms) Query pre-execution steps RelAlgExecutor.cpp:217
    10779ms start(2ms) execute RelAlgExecutor.cpp:411
      10779ms start(2ms) executeStep RelAlgExecutor.cpp:895
        10779ms start(2ms) executeWorkUnit RelAlgExecutor.cpp:1389
          547ms start(3ms) compileWorkUnit NativeCodegen.cpp:1403
          0ms start(551ms) ExecutionKernel::run ExecutionKernel.cpp:12
          4678ms start(551ms) fetchChunks Execute.cpp:3092
          0ms start(5229ms) create QueryExecutionContext.cpp:94
          108ms start(5229ms) executePlan Execute.cpp:3464
          10ms start(5340ms) collectAllDeviceResults Execute.cpp:2550
          71ms start(5351ms) compileWorkUnit NativeCodegen.cpp:1403
          0ms start(5423ms) ExecutionKernel::run ExecutionKernel.cpp:126
          3542ms start(5423ms) fetchChunks Execute.cpp:3092
          0ms start(8965ms) create QueryExecutionContext.cpp:94
          1023ms start(8965ms) executePlan Execute.cpp:3464

with needFetchAllFragments number of fragments: 1 table_id: 4 fragment_size = 100_000_000

8097ms total duration for executeRelAlgQuery
  8097ms start(0ms) executeRelAlgQueryNoRetry RelAlgExecutor.cpp:216
    0ms start(0ms) Query pre-execution steps RelAlgExecutor.cpp:217
    8097ms start(0ms) execute RelAlgExecutor.cpp:411
      8097ms start(0ms) executeStep RelAlgExecutor.cpp:895
        8096ms start(1ms) executeWorkUnit RelAlgExecutor.cpp:1389
          13ms start(1ms) compileWorkUnit NativeCodegen.cpp:1403
          0ms start(14ms) ExecutionKernel::run ExecutionKernel.cpp:126
          1ms start(14ms) fetchChunks Execute.cpp:3092
          0ms start(16ms) create QueryExecutionContext.cpp:94
          1787ms start(16ms) executePlan Execute.cpp:3464
          0ms start(1803ms) collectAllDeviceResults Execute.cpp:255
          54ms start(1804ms) compileWorkUnit NativeCodegen.cpp:1403
          0ms start(1858ms) ExecutionKernel::run ExecutionKernel.cpp:126
          1ms start(1858ms) fetchChunks Execute.cpp:3092
          0ms start(1860ms) create QueryExecutionContext.cpp:94
          6236ms start(1860ms) executePlan Execute.cpp:3464

So fetchChunks time reduced, but executePlan increased.

Devjiu commented 1 year ago

I noticed that count* fetches too much columns. It's done to estimate size of output buffer for query. Currently it's just all cols that participate in query are fetched during count*, so in most cases it's redundant.

Possible fix at and following commit.

Devjiu commented 1 year ago

Original main on current join

5742 ms start(514ms) fetchChunks Execute.cpp:3087  (count*)
4237 ms start(6891ms) fetchChunks Execute.cpp:3087 (join)

Updated with

 428 ms start(455ms) fetchChunks Execute.cpp:3093  (count*)
4558 ms start(1653ms) fetchChunks Execute.cpp:3093 (join)
Devjiu commented 1 year ago

Current plan on this issue:

[thread 852976 also had an error] [thread 852381 also had an error]

[thread 853851 also had an error] #

A fatal error has been detected by the Java Runtime Environment:


SIGSEGV (0xb) at pc=0x00007f509cc352d6, pid=726860, tid=853223


JRE version: OpenJDK Runtime Environment (20.0) (build 20-internal-adhoc..src)

Java VM: OpenJDK 64-Bit Server VM (20-internal-adhoc..src, mixed mode, sharing, tiered, compressed oops, compressed class ptrs, g1 gc, linux-amd64)

Problematic frame:

2023-09-13T16:57:41.939437 F 726860 0 195 ColumnarResults.cpp:371 Check failed: type->isString()

Aborted (core dumped)

~~Irrelevant Issues~~

- [x] Missing 3 seconds at the end of benchmark. 
486ms start(2904ms) executePlan Execute.cpp:3464
  0ms start(6189ms) resultsUnion Execute.cpp:1134

I understand timings in new threads in wrong way. This time is spent on kernel execution:

New thread(6)
  1561ms start(0ms) fetchChunks Execute.cpp:3095
  3136ms start(1562ms) executePlan Execute.cpp:3464
    3136ms start(1562ms) launchCpuCode QueryExecutionContext.cpp:564
    0ms start(4699ms) getRowSet QueryExecutionContext.cpp:192
End thread(6)
New thread(7)
  1562ms start(0ms) fetchChunks Execute.cpp:3095
  3284ms start(1562ms) executePlan Execute.cpp:3464
    3284ms start(1562ms) launchCpuCode QueryExecutionContext.cpp:564
    0ms start(4846ms) getRowSet QueryExecutionContext.cpp:192
End thread(7)
New thread(8)
  1562ms start(0ms) fetchChunks Execute.cpp:3095
  3129ms start(1562ms) executePlan Execute.cpp:3464
    3129ms start(1562ms) launchCpuCode QueryExecutionContext.cpp:564
    0ms start(4692ms) getRowSet QueryExecutionContext.cpp:192
End thread(8)
Devjiu commented 1 year ago

issues with copies and hashing also here, so it should be open.