MaterializeInc / materialize

The Cloud Operational Data Store: use SQL to transform, deliver, and act on fast-changing data.
https://materialize.com
Other
5.71k stars 465 forks source link

CH-benCHmark Query 20 #543

Open frankmcsherry opened 4 years ago

frankmcsherry commented 4 years ago
SELECT su_name, su_address
FROM supplier, nation
WHERE su_suppkey IN (
    SELECT mod(s_i_id * s_w_id, 10000)
    FROM stock, orderline
    WHERE s_i_id IN (SELECT i_id FROM item WHERE i_data LIKE 'co%')
    AND ol_i_id = s_i_id
    AND ol_delivery_d > TIMESTAMP '2010-05-23 12:00:00'
    GROUP BY s_i_id, s_w_id, s_quantity
    HAVING 2 * s_quantity > sum(ol_quantity)
)
AND su_nationkey = n_nationkey
AND n_name = 'GERMANY'
ORDER BY su_name
cuongdo commented 4 years ago

The execution plan for this is nasty because of the mod(s_i_id * s_w_id, 10000). OOMs on my setup.

frankmcsherry commented 4 years ago

This is a thing that I can land with an hours work or so. Shouldn't be too hard, but might not be especially broad. If we don't have an issue we should file one, and assign it lots of points!

cuongdo commented 4 years ago

@frankmcsherry Could you please file an issue? I think that someone else on the team should work on this (for bus factor reasons). This unlocks a bunch of chbenchmark queries, which allows us to tell a more compelling story.

frankmcsherry commented 4 years ago

Filed! https://github.com/MaterializeInc/materialize/issues/569

frankmcsherry commented 4 years ago

Currently plans as:

Let {
  id-1 = Join {
    variables: [[(0, 3), (1, 0)]],
    Get { supplier },
    Filter { predicates: [#1 = "GERMANY"], Get { nation } }
  }
} in
Let { id-2 = Distinct { group_key: [0], Get { id-1 } } } in
Let {
  id-3 = Project {
    outputs: [0, 12 .. 29, 2 .. 11],
    Join {
      variables: [[(0, 0), (1, 0)]],
      Get { id-2 },
      Get { id-2 },
      Get { orderline },
      Get { stock }
    }
  }
} in
Project {
  outputs: [1, 2, 1],
  Join {
    variables: [[(0, 0), (1, 0)]],
    Get { id-1 },
    Reduce {
      group_key: [0],
      aggregates: [any(true)],
      Filter {
        predicates: [#5 = i32toi64 #0],
        Map {
          scalars: [(#1 * #2) % 10000],
          Filter {
            predicates: [(2 * i32toi64 #3) > i32toi64 #4],
            Reduce {
              group_key: [0 .. 3],
              aggregates: [sum(#26)],
              Join {
                variables: [[(0, 1), (1, 0)]],
                Filter {
                  predicates: [
                    #1 = #23,
                    datetots #25 > 2010-05-23 12:00:00
                  ],
                  Get { id-3 }
                },
                Project {
                  outputs: [0, 6],
                  Map {
                    scalars: [true],
                    Join {
                      variables: [[(0, 0), (1, 0)]],
                      Distinct { group_key: [1], Get { id-3 } },
                      Filter {
                        predicates: [#4 ~ /^co.*$/],
                        Get { item }
                      }
                    }
                  }
                }
              }
            }
          }
        }
      }
    }
  }
}
frankmcsherry commented 4 years ago

This is probably not the right plan.

frankmcsherry commented 4 years ago

Something is probably wrong here:

    Join {
      variables: [
        [(0, 0), (1, 0)],
        [(0, 1), (1, 1)],
        [(0, 2), (1, 2)],
        [(0, 3), (1, 3)],
        [(0, 4), (1, 4)],
        [(0, 5), (1, 5)],
        [(0, 6), (1, 6)],
        [(0, 7), (1, 7)],
        [(0, 8), (1, 8)],
        [(0, 9), (1, 9)],
        [(0, 10), (1, 10)]
      ],
      Get { id-2 },
      Get { id-2 },
      Get { orderline },
      Get { stock }
    }
rjnn commented 4 years ago

We should take a stab at diagnosing this query again in beta 2.

wangandi commented 4 years ago

Good news! Query 20 runs now and results match.

What I did: 1) Turn on the chbench demo 2) Wait until the one warehouse of records was fully loaded. 3) Often times Query 20 returns the empty set, especially since the Materialize LIKE is case-sensitive, unlike the MySql LIKE. I assumed that just as in #524 and #537 we intend to test the case-sensitive LIKE, so in MySql, I replace LIKE with LIKE BINARY. I run query 20 returning an extra n_name column and without the n_name = 'GERMANY' filter so that I can pick the country that there are the most rows of results for. 4) To get expected results, I run Query 20 with 'GERMANY' in MySql changed to whatever country has the most rows of results. 5) In materialize, I create a materialized view q20 as <insert q20 statement here with 'GERMANY' replaced with country used in step 4>. 6) In materialize I run select * from q20 order by su_name (since order bys in query creation don't propagate to the results being select). 7) Repeat steps 1-6 several times since normally only 2 rows of results are returned, so I figured I should run this several times to ensure that results actually match.

Note that removing the filter AND n_name = 'GERMANY' would normally increase number of expected results to 24, but materialize would always falls over before any meaningful results is returned.

wangandi commented 4 years ago

Current plan when the materialized view is made on raw unmaterialized sources:

|Let {                                                      |
|   l0 = Join {                                              |
|     variables: [[(0, 3), (1, 0)]],                         |
|     implementation: DifferentialLinear,                    |
|     Get { materialize.public.src_supplier (u5) },          |
|     ArrangeBy {                                            |
|       keys: [[#0]],                                        |
|       Filter {                                             |
|         predicates: [#1 = "GERMANY"],                      |
|         Get { materialize.public.src_nation (u2) }         |
|       }                                                    |
|     }                                                      |
|   }                                                        |
| } in                                                       |
| Let {                                                      |
|   l1 = Join {                                              |
|     variables: [[(0, 0), (1, 4)]],                         |
|     implementation: DifferentialLinear,                    |
|     Get { materialize.public.src_stock (u4) },             |
|     ArrangeBy {                                            |
|       keys: [[#4]],                                        |
|       Filter {                                             |
|         predicates: [datetots #6 > 2010-05-23 12:00:00],
|         Get { materialize.public.src_orderline (u3) }      |
|       }                                                    |
|     },                                                     |
|     ArrangeBy { keys: [[]], Get { l0 } }                   |
|   }                                                        |
| } in                                                       |
| Project {                                                  |
|   outputs: [1, 2],                                         |
|   Join {                                                   |
|     variables: [[(0, 0), (1, 0)]],                         |
|     implementation: DifferentialLinear,                    |
|     Get { l0 },                                            |
|     ArrangeBy {                                            |
|       keys: [[#0]],                                        |
|       Reduce {                                             |
|         group_key: [#0],                                   |
|         aggregates: [any(true)],                           |
|         Filter {                                           |
|           predicates: [(2 * #3) > #4],                     |
|           Reduce {                                         |
|             group_key: [#0, #11, #12, #13],                |
|             aggregates: [sum(#36)],                        |
|             Join { 
|               variables: [[(0, 11), (1, 0), (2, 0)]],      |
|               implementation: DifferentialLinear,          |
|               Filter {                                     |
|                 predicates: [#0 = ((#11 * #12) % 10000)],  |
|                 Get { l1 }                                 |
|               },                                           |
|               ArrangeBy {                                  |
|                 keys: [[#0]],                              |
|                 Distinct { group_key: [#11], Get { l1 } }  |
|               },                                           |
|               ArrangeBy {                                  |
|                 keys: [[#0]],                              |
|                 Filter {                                   |
|                   predicates: [^co.*$ ~ #4],               |
|                   Get { materialize.public.src_item (u1) } |
|                 }                                          |
|               }                                            |
|             }                                              |
|           }                                                |
|         }                                                  |
|       }                                                    |
|     }                                                      |
|   } 
| }                                                          |
+------------------------------------------------------------+

Baseline memory: 58 mib Largest spike when materializing the view: 916 mib (+858 mib from baseline) Steady state memory usage: 308 mib (+250 mib from baseline)

wangandi commented 4 years ago

Current plan when all views used by q20 are materialized, but no additional indexes are created:

| Let {                                                                  |
|   l0 = Filter {                                                        |
|     predicates: [#8 = "GERMANY"],                                      |
|     Join {                                                             |
|       variables: [[(0, 3), (1, 0)]],                                   |
|       implementation: DifferentialLinear,                              |
|       Get { materialize.public.tpcch_supplier (u12) },                 |
|       ArrangeBy {                                                      |
|         keys: [[#0]],                                                  |
|         Get { materialize.public.tpcch_nation (u8) }                   |
|       }                                                                |
|     }                                                                  |
|   }                                                                    |
| } in                                                                   |
| Let {                                                                  |
|   l1 = Join {                                                          |
|     variables: [[(0, 0), (1, 4)]],                                     |
|     implementation: DifferentialLinear,                                |
|     Get { materialize.public.tpcch_stock (u14) },                      |
|     ArrangeBy {                                                        |
|       keys: [[#4]],                                                    |
|       Filter {                                                         |
|         predicates: [datetots #6 > 2010-05-23 12:00:00],               |
|         Get { materialize.public.tpcch_orderline (u10) }               |
|       }                                                                |
|     },                                                                 |
|     ArrangeBy { keys: [[]], Distinct { group_key: [#0], Get { l0 } } } |
|   }                                                                    |
| } in                                                                   |
| Project {                                                              |
|   outputs: [1, 2],                                                     |
|   Join {                                                               |
|     variables: [[(0, 0), (1, 0)]],                                     |
|     implementation: DifferentialLinear,                                |
|     Get { l0 },                                                        |
|     ArrangeBy {                                                        |
|       keys: [[#0]],                                                    |
|       Reduce {                                                         |
|         group_key: [#0],                                               |
|         aggregates: [any(true)],                                       |
|         Filter {                                                       |
|           predicates: [(2 * #3) > #4],                                 |
|           Reduce {                                                     |
|             group_key: [#0, #1, #2, #3],                               |
|             aggregates: [sum(#26)],                                    |
|             Join {                                                     |
|               variables: [[(0, 1), (1, 0)]],                           |
|               implementation: DifferentialLinear,                      |
|               Filter {                                                 |
|                 predicates: [#0 = ((#1 * #2) % 10000)],                |
|                 Get { l1 }                                             |
|               },                                                       |
|               ArrangeBy {                                              |
|                 keys: [[#0]],                                          |
|                 Reduce {                                               |
|                   group_key: [#0],                                     |
|                   aggregates: [any(true)],                             |
|                   Filter {                                             |
|                     predicates: [^co.*$ ~ #5],                         |
|                     Join {                                             |
|                       variables: [[(0, 0), (1, 0)]],                   |
|                       implementation: DeltaQuery,                      |
|                       ArrangeBy {                                      |
|                         keys: [[#0]],                                  |
|                         Distinct { group_key: [#1], Get { l1 } }       |
|                       },                                               |
|                       ArrangeBy {                                      |
|                         keys: [[#0]],                                  |
|                         Get { materialize.public.tpcch_item (u6) }     |
|                       }                                                |
|                     }                                                  |
|                   }                                                    |
|                 }                                                      |
|               }                                                        |
|             }                                                          |
|           }                                                            |
|         }                                                              |
|       }                                                                |
|     }                                                                  |
|   }                                                                    |
| }                                                                      |
+------------------------------------------------------------------------+

Baseline memory: 396 mib Largest spike when materializing the view: 3.335 gib (+ ~3gib from baseline) Steady state memory usage: 595 mib (+200 mib from baseline)

wangandi commented 4 years ago

Current plan when the same additional foreign key indexes are created as in chbench.slt. (This weirdly differs from the plan produced in chbench.slt)

| Let {                                                              |
|   l0 = Filter {                                                    |
|     predicates: [#8 = "GERMANY"],                                  |
|     Join {                                                         |
|       variables: [[(0, 3), (1, 0)]],                               |
|       implementation: DeltaQuery,                                  |
|       ArrangeBy {                                                  |
|         keys: [[#3]],                                              |
|         Get { materialize.public.tpcch_supplier (u14) }            |
|       },                                                           |
|       ArrangeBy {                                                  |
|         keys: [[#0]],                                              |
|         Get { materialize.public.tpcch_nation (u8) }               |
|       }                                                            |
|     }                                                              |
|   }                                                                |
| } in                                                               |
| Let {                                                              |
|   l1 = Filter {                                                    |
|     predicates: [datetots #25 > 2010-05-23 12:00:00],              |
|     Join {                                                         |
|       variables: [[(0, 0), (1, 4)]],                               |
|       implementation: DifferentialLinear,                          |
|       Get { materialize.public.tpcch_stock (u12) },                |
|       ArrangeBy {                                                  |
|         keys: [[#4]],                                              |
|         Get { materialize.public.tpcch_orderline (u10) }           |
|       },                                                           |
|       ArrangeBy {                                                  |
|         keys: [[]],                                                |
|         Distinct { group_key: [#0], Get { l0 } }                   |
|       }                                                            |
|     }                                                              |
|   }                                                                |
| } in                                                               |
| Project {                                                          |
|   outputs: [1, 2],                                                 |
|   Join {                                                           |
|     variables: [[(0, 0), (1, 0)]],                                 |
|     implementation: DifferentialLinear,                            |
|     Get { l0 },                                                    |
|     ArrangeBy {                                                    |
|       keys: [[#0]],                                                |
|       Reduce {                                                     |
|         group_key: [#0],                                           |
|         aggregates: [any(true)],                                   |
|         Filter {                                                   |
|           predicates: [(2 * #3) > #4],                             |
|           Reduce {                                                 |
|             group_key: [#0, #1, #2, #3],                           |
|             aggregates: [sum(#26)],                                |
|             Join {                                                 |
|               variables: [[(0, 1), (1, 0)]],                       |
|               implementation: DifferentialLinear,                  |
|               Filter {                                             |
|                 predicates: [#0 = ((#1 * #2) % 10000)],            |
|                 Get { l1 }                                         |
|               },                                                   |
|               ArrangeBy {                                          |
|                 keys: [[#0]],                                      |
|                 Reduce {                                           |
|                   group_key: [#0],                                 |
|                   aggregates: [any(true)],                         |
|                   Filter {                                         |
|                     predicates: [^co.*$ ~ #5],                     |
|                     Join {                                         |
|                       variables: [[(0, 0), (1, 0)]],               |
|                       implementation: DeltaQuery,                  |
|                       ArrangeBy {                                  |
|                         keys: [[#0]],                              |
|                         Distinct { group_key: [#1], Get { l1 } }   |
|                       },                                           |
|                       ArrangeBy {                                  |
|                         keys: [[#0]],                              |
|                         Get { materialize.public.tpcch_item (u6) } |
|                       }                                            |
|                     }                                              |
|                   }                                                |
|                 }                                                  |
|               }                                                    |
|             }                                                      |
|           }                                                        |
|         }                                                          |
|       }                                                            |
|     }                                                              |
|   }                                                                |
| }                                                                  |
+--------------------------------------------------------------------+

Baseline memory: 650 mib Largest spike when materializing the view: 4.383 gib (+ ~3.7 gib from baseline) Steady state memory usage: 868 mib (+218 mib from baseline)

cuongdo commented 4 years ago

During a chat with @frankmcsherry today, we want to shift our attention away from this query. q20 has multiple issues that we may not address soon. Setting milestone to Later.

frankmcsherry commented 4 years ago

Mostly, until we tidy up decorrelation, the cross-join in l1 is going to confound a fair bit of analysis. We could certainly try to carve out other mysteries about the query, but doing the whole thing might be a big reach at the moment.

sploiselle commented 3 years ago

Update on this query available here, tl;dr: this query now matches the DB and seemingly has reasonable performance.

edited