facebookincubator / velox

A C++ vectorized database acceleration library aimed to optimizing query engines and data processing systems.
https://velox-lib.io/
Apache License 2.0
3.4k stars 1.11k forks source link

VectorHasher's value ID caching logic makes certain queries unnecessarily slow #10057

Closed zeodtr closed 2 months ago

zeodtr commented 3 months ago

Description

Hi,

(I believe this issue is more of a performance bug report rather than an enhancement suggestion. However, since it is not a functional bug, I have chosen to classify it under the 'enhancement' category.)

I am building an OLAP DBMS system that uses Velox for the execution engine. In this issue, two executor processes exchange intermediate results. The query, which has been slightly modified to hide the real table name, is as follows:

SELECT ip, name
FROM ( 
    SELECT name
    FROM t1 
    WHERE tm_col >= timestamp '2022-07-11 03:00:00' 
    AND tm_col < timestamp '2022-07-11 03:10:00' 
) JOIN ( 
    SELECT ip, name AS name2
    FROM t2 
    WHERE tm_col >= timestamp '2022-09-30 05:30:00' 
    AND tm_col < timestamp '2022-09-30 06:00:00' 
) ON name = name2
GROUP BY ip, name
;

The subqueries' resulting record count is 2,763,772 and 5,600 respectively.

The query became very slow after applying #7404 to my local Velox repository.

I investigated it and found the source code line that causes the problem is in ExchangeQueue.cpp. The line is as follows:

 if (pageBytes > 0 && pageBytes + queue_.front()->size() > maxBytes) {

The code link is as follows: https://github.com/facebookincubator/velox/blob/3a7f8a88afa21cd482fe2bc970473ddd296fcf0a/velox/exec/ExchangeQueue.cpp#L121

After I changed the code to the following, the query became significantly faster (from 176 secs to 43 secs).

if (pageBytes > 0 && pageBytes + queue_.front()->size() > 1) {

(I will call this modification as M1.) The modified code effectively disables what #7404 tries to achieve. Strange.

So, I've run valgrind with callgrind on the original code and the resulting performance graph of one of the two executor processes was as follows:

원복_out 224335

memset() was taking a big portion of runtime. It is called by std::fill() by VectorHasher::makeValueIdsDecoded().

The code link is as follows: https://github.com/facebookincubator/velox/blob/3a7f8a88afa21cd482fe2bc970473ddd296fcf0a/velox/exec/VectorHasher.cpp#L252

M1's valgrind result graph is as follows:

임시_out 271447

memset()'s portion is now negligible.

Upon further investigation, I found that the DecodedVector's base vector size becomes disproportionately large relative to the SelectivityVector's size in the unmodified code. For example, the DecodedVector's base vector size is 1,066,500, while the SelectivityVector's size is 1024. For M1, the DecodedVector's base vector size is 11,900, while the SelectivityVector's size is 1024.

As a result, the cost of clearing the value ID cache outweighs the benefits of caching.

When I removed the caching logic, the query performed as quickly as M1.

Since I am working with a modified version of Velox's source code and not the current official source, I cannot be completely certain that this issue is present in the current version. However, I believe there is a high probability that it exists.

It would be nice if VectorHasher could be made more intelligent to avoid this kind of issue. (For example, disable the caching logic if DecodedVector's size is too big for the SelectivityVector's size.)

Thank you.

mbasmanova commented 3 months ago

@zeodtr Thank you for reporting this issue with so much detail.

I found that the DecodedVector's base vector size becomes disproportionately large relative to the SelectivityVector's size in the unmodified code. For example, the DecodedVector's base vector size is 1,066,500, while the SelectivityVector's size is 1024.

This sounds similar to https://github.com/facebookincubator/velox/pull/9843

CC: @Yuhta @xiaoxmeng

Yuhta commented 3 months ago

If VectorHasher is the place we spend most of the time, this can be fixed with a similar approach as #7150. I can prepare something so you can try.

mbasmanova commented 3 months ago

@Yuhta Jimmy, let's first find out which code produced such a dictionary vector. It might be better to change that code to avoid producing such vectors (similar to Unnest).

Yuhta commented 2 months ago

@mbasmanova Agree that we should find out the code producing this dictionary and selectivity in this case (out of exchange), because peeling can be inefficient as well on the data.

But as general case I see this can happen legitimately, for example whenever we use dictionary to filter rows (remaining filter / join filter). As in VectorHasher there is no downside to handle this case separately and it is much easier to implement than encoding peeling, we should fix it in VectorHasher in addition to investigating how the data is generated.

Yuhta commented 2 months ago

@zeodtr #10084 is a fix for VectorHasher you can try it. But as @mbasmanova pointed out this is probably not the only place it is inefficient. Can you

zeodtr commented 2 months ago

@Yuhta Thank you very much for your fix. Since I am out of the office this week, I will try it next week.

I have this for the plan printout for now (I have changed names and deleted the column names in ROW<>). Two executor processes were given the same plans. They each have their portion of the table data. It's a shared-nothing configuration.

velox plan[0] :
 -- PartitionedOutput[partitionFunction: HASH(0) with 2 partitions] -> n2_2:VARCHAR
  -- Project[expressions: (n2_2:VARCHAR, ROW["n0_1"])] -> n2_2:VARCHAR
    -- Filter[expression: and(and(not(is_null(ROW["n0_1"])),gte(ROW["n0_0"],"2022-07-11T03:00:00.000000000")),lt(ROW["n0_0"],"2022-07-11T03:10:00.000000000"))] -> n0_0:TIMESTAMP, n0_1:VARCHAR
      -- TableScan[table: hive_table, data columns: ROW<...>] -> n0_0:TIMESTAMP, n0_1:VARCHAR

velox plan[1] :
 -- PartitionedOutput[partitionFunction: HASH(1) with 2 partitions] -> n2_3:VARCHAR, n2_4:VARCHAR
  -- Project[expressions: (n2_3:VARCHAR, ROW["n0_0"]), (n2_4:VARCHAR, ROW["n0_2"])] -> n2_3:VARCHAR, n2_4:VARCHAR
    -- Filter[expression: and(and(not(is_null(ROW["n0_2"])),gte(ROW["n0_1"],"2022-09-30T05:30:00.000000000")),lt(ROW["n0_1"],"2022-09-30T06:00:00.000000000"))] -> n0_0:VARCHAR, n0_1:TIMESTAMP, n0_2:VARCHAR
      -- TableScan[table: hive_table, data columns: ROW<...>] -> n0_0:VARCHAR, n0_1:TIMESTAMP, n0_2:VARCHAR

velox plan[2] :
 -- PartitionedOutput[partitionFunction: HASH(0, 1) with 2 partitions] -> n3_3:VARCHAR, n3_4:VARCHAR
  -- Aggregation[SINGLE [n3_3, n3_4] ] -> n3_3:VARCHAR, n3_4:VARCHAR
    -- Project[expressions: (n3_3:VARCHAR, ROW["t2.1.ip#0"]), (n3_4:VARCHAR, ROW["t1.0.name#0"])] -> n3_3:VARCHAR, n3_4:VARCHAR
      -- HashJoin[INNER t1.0.name#0=t2.1.name2#1] -> "t1.0.name#0":VARCHAR, "t2.1.ip#0":VARCHAR, "t2.1.name2#1":VARCHAR
        -- Exchange[] -> "t1.0.name#0":VARCHAR
        -- Exchange[] -> "t2.1.ip#0":VARCHAR, "t2.1.name2#1":VARCHAR

velox plan[3] :
 -- Project[expressions: (ip:VARCHAR, "#anonymous#.2.ip#0"), (name:VARCHAR, "#anonymous#.2.name#1")] -> ip:VARCHAR, name:VARCHAR
  -- Aggregation[SINGLE [#anonymous#.2.ip#0, #anonymous#.2.name#1] ] -> "#anonymous#.2.ip#0":VARCHAR, "#anonymous#.2.name#1":VARCHAR
    -- Exchange[] -> "#anonymous#.2.ip#0":VARCHAR, "#anonymous#.2.name#1":VARCHAR

Also, I will try to diagnose the dictionary vector. However, given my current knowledge of Velox's internals, I may have difficulty locating the place.

Creating a unit test case might not be possible due to the large amount of data in the tables.

Thank you.

Yuhta commented 2 months ago

@zeodtr Thanks for the detail. Just a question, which plan tree are you observing the slowness? Is it plan[2] or plan[3]? Both are taking data from exchange.

zeodtr commented 2 months ago

@Yuhta I'm not entirely sure, but I guess it's plan[2]. The task for plan[2] ran for more than 2 minutes logging the following debugging messages many times. (I've added code to print the message each second.)

{Driver: running Exchange(0)<xdb_cpu_executor_task_3:0.0 0x7f91a9896700> HashProbe(1)<xdb_cpu_executor_task_3:0.0 0x7f91a9845a00> FilterProject(2)<xdb_cpu_executor_task_3:0.0 0x7f91a9835c00> Aggregation(3)<xdb_cpu_executor_task_3:0.0 0x7f91a9834000> PartitionedOutput(4)<xdb_cpu_executor_task_3:0.0 0x7f91a9865900> CallbackSink(5)<xdb_cpu_executor_task_3:0.0 0x7f91a9896a80> {OpCallStatus: executing HashProbe::getOutput for 0ms}}

{Driver: running Exchange(0)<xdb_cpu_executor_task_3:0.0 0x7f91a9896700> HashProbe(1)<xdb_cpu_executor_task_3:0.0 0x7f91a9845a00> FilterProject(2)<xdb_cpu_executor_task_3:0.0 0x7f91a9835c00> Aggregation(3)<xdb_cpu_executor_task_3:0.0 0x7f91a9834000> PartitionedOutput(4)<xdb_cpu_executor_task_3:0.0 0x7f91a9865900> CallbackSink(5)<xdb_cpu_executor_task_3:0.0 0x7f91a9896a80> {OpCallStatus: executing Aggregation::addInput for 0ms}}

Each executor process spent its most time on different plans, (maybe) one process was busy executing plan[2] while the other process was waiting for the results of its peer.

Thank you.

zeodtr commented 2 months ago

@Yuhta For your information, the version of Velox for my executor process is taken from the upstream code as of January 2024, specifically from the commit titled "Rename getDataType and getDataChannels funcs in HiveDataSink (#8404)" on January 17, 2024. Therefore, it's possible that the issue related to plan[2] has already been resolved in the latest upstream version.

mbasmanova commented 2 months ago

Plan 2 has Aggregation and Join. It seems likely that "bad" dictionary was produced by the Join and is causing trouble during Aggregation.

Yuhta commented 2 months ago

I think it's likely due to HashProbe plan[2] wrapping input in dictionary to filter out non-matching rows. The ratio of 5600/2763772 has the same order of magnitude as 1024/1066500. @mbasmanova I think the solution should belong to the same story as #7801

zeodtr commented 2 months ago

@Yuhta I've tried #10084. I've run the code with the combinations of the following modifications:

The execution times for the query are as follows:

Here are my thoughts:

Thank you.

Yuhta commented 2 months ago

The exact condition to use cache is tricky and depending on the data. The current solution makes sure there is no large regression if we are using dictionary for filtering.

For more investigation, it would be nice if you can find out what the dictionary vector is wrapping around. If it is from the probe side rows, I would imagine it's used exclusively for filtering so cache should not improve performance here. For build side rows, they are extracted from row container so should not be in dictionary. So it's a little mystery here why cache is beneficial. Maybe the join duplicates the probe side rows in some cases?

zeodtr commented 2 months ago

I've investigated further.

The sparse dictionary vectors are returned from HashProbe::getOutput(). I checked the shape of the data of the two tables. In short, the query was executed on poorly shaped data, resulting in many join mismatches and many duplications in the join results. Therefore, this query might not be suitable for Velox code optimization. I believe the fix in #10084 is sufficient.

The shape of the data for the two tables (t1, t2) is as follows. The record counts are filtered by the query's WHERE clause.

  1. t1's count(name): 2,763,772
  2. t1's count(distinct name): 457
  3. t2's count(name): 5,600
  4. t2's count(distinct name): 54
  5. count(name) after join: 533,305,844
  6. count(distinct name) after join: 52
  7. t1's count(name) that do not exist in t2: 837,094
  8. t1's count(distinct name) that do not exist in t2: 405
  9. t1's count(name) that exist in t2: 1,926,678
  10. t1's count(distinct name) that exist in t2: 52

The queries for each count result (slightly modified to hide the real table and column name) are as follows:

-- 1.
SELECT count(name)
FROM t1 
WHERE tm_col >= timestamp '2022-07-11 03:00:00' 
AND tm_col < timestamp '2022-07-11 03:10:00' 
;

-- 2.
SELECT count(DISTINCT name)
FROM t1 
WHERE tm_col >= timestamp '2022-07-11 03:00:00' 
AND tm_col < timestamp '2022-07-11 03:10:00' 
;

-- 3.
SELECT count(name)
FROM t2 
WHERE tm_col >= timestamp '2022-09-30 05:30:00' 
AND tm_col < timestamp '2022-09-30 06:00:00' 
;

-- 4.
SELECT count(DISTINCT name)
FROM t2 
WHERE tm_col >= timestamp '2022-09-30 05:30:00' 
AND tm_col < timestamp '2022-09-30 06:00:00' 
;

-- 5., 6.
SELECT count(name), count(DISTINCT name)
FROM ( 
    SELECT name AS name
    FROM t1 
    WHERE tm_col >= timestamp '2022-07-11 03:00:00' 
    AND tm_col < timestamp '2022-07-11 03:10:00' 
) JOIN ( 
    SELECT ip, name AS name2
    FROM t2 
    WHERE tm_col >= timestamp '2022-09-30 05:30:00' 
    AND tm_col < timestamp '2022-09-30 06:00:00' 
) ON name = name2 
;

-- 7., 8
SELECT count(name1), count(DISTINCT name1)
FROM ( 
    SELECT name AS name1
    FROM t1 
    WHERE tm_col >= timestamp '2022-07-11 03:00:00' 
    AND tm_col < timestamp '2022-07-11 03:10:00' 
)
WHERE NOT EXISTS 
( 
    SELECT 1
    FROM t2 
    WHERE tm_col >= timestamp '2022-09-30 05:30:00' 
    AND tm_col < timestamp '2022-09-30 06:00:00'
    AND name = name1    
) 
;

-- 9., 10
SELECT count(name1), count(DISTINCT name1)
FROM ( 
    SELECT name AS name1
    FROM t1 
    WHERE tm_col >= timestamp '2022-07-11 03:00:00' 
    AND tm_col < timestamp '2022-07-11 03:10:00' 
)
WHERE EXISTS 
( 
    SELECT 1
    FROM t2 
    WHERE tm_col >= timestamp '2022-09-30 05:30:00' 
    AND tm_col < timestamp '2022-09-30 06:00:00'
    AND name = name1    
) 
;
Yuhta commented 2 months ago

I see so the build side are both duplicating and filtering. Agree that the current solution should be enough, unless there is very important use case that requires us to optimize for this data shape.