dolthub / dolt

Dolt – Git for Data
Apache License 2.0
17.89k stars 507 forks source link

Slow Join + Filter query #6180

Closed kuza55 closed 6 months ago

kuza55 commented 1 year ago

Hi,

The following query took several seconds to resolve on an m5.2xlarge:

SELECT instances.uid, instances.extra
FROM instances
WHERE (EXISTS (SELECT 1
FROM queries
WHERE instances.uid = queries.instance_uid AND queries.`sql` = '...' AND (EXISTS (SELECT 1
FROM query_tags
WHERE queries.uid = query_tags.query_uid AND query_tags.tag = 'primary')))) AND (EXISTS (SELECT 1
FROM query_tags, queries
WHERE queries.uid = query_tags.query_uid AND query_tags.tag = 'bird_gpt35_kg_cot'))

In particular both the pieces of query_tags filtering made this much slower than filtering based on the sql query alone, loading the relevant instance by uid, loading all the queries by uid, then loading all the tags by uid, and doing the filtering manually in python.

timsehn commented 1 year ago

Thanks. @JCOR11599 or @max-hoffman will dig in today.

max-hoffman commented 1 year ago

Which version of Dolt are you using?

kuza55 commented 1 year ago

dolt version 1.3.0

max-hoffman commented 1 year ago

There are two analyzer problems here.

The first relates to the independent EXIST subquery as a return switch. If the subquery conditional returns one row, we return the join result. If the conditional returns no rows, we do not return the join result.

  SELECT 1
  FROM query_tags, queries
  WHERE
    queries.uid = query_tags.query_uid AND
    query_tags.tag = 'bird_gpt35_kg_cot'
  LIMIT 1

We execute SEMI_JOIN(join_body, conditional, true) without caching the join-RHS:

+-----------------------------------------------------------------------------------------------------------------+
| plan                                                                                                            |
+-----------------------------------------------------------------------------------------------------------------+
| Project                                                                                                         |
|  ├─ columns: [instances.uid, instances.extra]                                                                   |
|  └─ SemiJoin                                                                                                    |
|      ├─ true                                                                                                    |
|      ├─ Project                                                                                                 |
|      │   ├─ columns: [instances.extra, instances.uid]                                                           |
|      │   └─ MergeJoin                                                                                           |
|      │       ├─ cmp: (queries.instance_uid = instances.uid)                                                     |
|      │       ├─ Distinct                                                                                        |
|      │       │   └─ Project                                                                                     |
|      │       │       ├─ columns: [queries.instance_uid]                                                         |
|      │       │       └─ Filter                                                                                  |
|      │       │           ├─ ((queries.sql = '...') AND EXISTS Subquery                                          |
|      │       │           │   ├─ cacheable: false                                                                |
|      │       │           │   └─ Project                                                                         |
|      │       │           │       ├─ columns: [1]                                                                |
|      │       │           │       └─ Filter                                                                      |
|      │       │           │           ├─ ((queries.uid = query_tags.query_uid) AND (query_tags.tag = 'primary')) |
|      │       │           │           └─ IndexedTableAccess(query_tags)                                          |
|      │       │           │               ├─ index: [query_tags.query_uid]                                       |
|      │       │           │               └─ columns: [tag query_uid]                                            |
|      │       │           │  )                                                                                   |
|      │       │           └─ IndexedTableAccess(queries)                                                         |
|      │       │               ├─ index: [queries.instance_uid]                                                   |
|      │       │               └─ filters: [{[NULL, ∞)}]                                                          |
|      │       └─ IndexedTableAccess(instances)                                                                   |
|      │           ├─ index: [instances.uid]                                                                      |
|      │           └─ filters: [{[NULL, ∞)}]                                                                      |
|      └─ Limit(1)                                                                                                |
|          └─ MergeJoin                                                                                           |
|              ├─ cmp: (queries_1.uid = query_tags.query_uid)                                                     |
|              ├─ TableAlias(queries_1)                                                                           |
|              │   └─ IndexedTableAccess(queries)                                                                 |
|              │       ├─ index: [queries.uid]                                                                    |
|              │       ├─ filters: [{[NULL, ∞)}]                                                                  |
|              │       └─ columns: [uid]                                                                          |
|              └─ Filter                                                                                          |
|                  ├─ (query_tags.tag = 'bird_gpt35_kg_cot')                                                      |
|                  └─ IndexedTableAccess(query_tags)                                                              |
|                      ├─ index: [query_tags.query_uid]                                                           |
|                      ├─ filters: [{[NULL, ∞)}]                                                                  |
|                      └─ columns: [tag query_uid]                                                                |
+-----------------------------------------------------------------------------------------------------------------+

This causes us to run a merge join n-times. Degenerate SEMI_JOIN RHS could be cached in general.

Another solution is to maybe represent this is a LATERAL join, where we execute the subquery, and then invoke the join for every row in the subquery:

select instances.*
from (SELECT 1 ... LIMIT 1) conditional
LATERAL
select * from instances join ...

An empty conditional would not trigger the inner loop.

A second problem is that we miss a scope decorrelation:

|      │       │       └─ Filter                                                                                  |
|      │       │           ├─ ((queries.sql = '...') AND EXISTS Subquery                                          |
|      │       │           │   ├─ cacheable: false                                                                |
|      │       │           │   └─ Project                                                                         |
|      │       │           │       ├─ columns: [1]                                                                |
|      │       │           │       └─ Filter                                                                      |
|      │       │           │           ├─ ((queries.uid = query_tags.query_uid) AND (query_tags.tag = 'primary')) |
|      │       │           │           └─ IndexedTableAccess(query_tags)                                          |
|      │       │           │               ├─ index: [query_tags.query_uid]                                       |
|      │       │           │               └─ columns: [tag query_uid]                                            |
|      │       │           │  )                                                                                   |
|      │       │           └─ IndexedTableAccess(queries)                                                         |
|      │       │               ├─ index: [queries.instance_uid]                                                   |
|      │       │               └─ filters: [{[NULL, ∞)}]

This should be collapsed into a join like select queries.* from queries join query_tags on queries.uid = query_tags.query_uid.

max-hoffman commented 1 year ago

Some progress in this PR https://github.com/dolthub/go-mysql-server/pull/1839 that I should be able to clean up tomorrow. Running into issues with TEXT column types now.

before:

image

after:

image

If the TEXT/JSON columns can be converted to VARCHAR, that will fix this performance issue. Otherwise I will try to get a query hint that can push the queries table filter last, minimize the BLOB deserialization cost.

*edit: forgot to add after profile, fixed now

kuza55 commented 1 year ago

VARCHAR is too small, TEXT/JSON are both necessary for my use case.

max-hoffman commented 1 year ago

Some observations after looking over this more:

1) There are two components of the query, a main join body (queries x query_tags x instances) and an independent cross joined subquery queries x query_tags. The way you're running the logic in Python appears mostly interested in the first, which returns the instance info for a query. I don't know if sqlalchemy is adding the extra subquery for a good reason. It is kind of impeding my join hints and turns a 3 table join into a 5-table join (with a cross join), some of which is fixable on our end, but net adds 1-2 seconds to the overall query.

2) If we focus on queries x query_tags x instances, the join is bottlenecked by a tablescan on queries to find the sql = '...'. Reading blobs is very slow, so reading every row in queries takes 3-15seconds for me locally depending on cold/hot caching. Barring extra indexing abilities to narrow down the fraction of queries we read, select * from queries is a floor for the join latency.

3) Adding an index might make this fast, and mimics what you're describing in Python. With hints forcing a lookup join, queries x query_tags x instances takes 40ms. This would scale poorly if > 25% of the rows in queries to share the same sql string, which will read much of the table in spite of the index. This also isn't taking into account the 1-2 second queries x query_tags existence check.

alter table queries add column sqlhash varchar(32);
update queries set sqlhash = md5(queries.sql);
alter table queries add index sqlhash (sqlhash);

We will work on smoothing out the effect of cross joins and making sure we're pruning to avoid reading unused blobs. Interested to hear feedback on whether extra indexing is feasible or if you're limited to a tablescan for select * from queries where sql = '...'.

max-hoffman commented 1 year ago

Here is the fastest I could make the core logic, for reference:


training> explain
SELECT /*+ LOOKUP_JOIN(queries, query_tags) LOOKUP_JOIN(queries, instances) JOIN_ORDER(queries,query_tags,instances) */
  count(*)
FROM instances
WHERE EXISTS (
    SELECT 1
    FROM queries
    WHERE
      instances.uid = queries.instance_uid AND
      queries.`sqlhash` = '489efae328c451837f82ec293c1db117' AND
      EXISTS (
        SELECT 1
        FROM query_tags
        WHERE
          queries.uid = query_tags.query_uid AND
          query_tags.tag = 'primary'
       )
   );
+---------------------------------------------------------------------------------------------------------------------------------------+
| plan                                                                                                                                  |
+---------------------------------------------------------------------------------------------------------------------------------------+
| GroupBy                                                                                                                               |
|  ├─ SelectedExprs(COUNT(1))                                                                                                           |
|  ├─ Grouping()                                                                                                                        |
|  └─ Project                                                                                                                           |
|      ├─ columns: [instances.extra, instances.uid]                                                                                     |
|      └─ LookupJoin                                                                                                                    |
|          ├─ (instances.uid = queries.instance_uid)                                                                                    |
|          ├─ Distinct                                                                                                                  |
|          │   └─ Project                                                                                                               |
|          │       ├─ columns: [queries.instance_uid]                                                                                   |
|          │       └─ Filter                                                                                                            |
|          │           ├─ (queries.sqlhash = '489efae328c451837f82ec293c1db117')                                                        |
|          │           └─ Project                                                                                                       |
|          │               ├─ columns: [queries.sql, queries.data, queries.results, queries.uid, queries.instance_uid, queries.sqlhash] |
|          │               └─ LookupJoin                                                                                                |
|          │                   ├─ (queries.uid = query_tags.query_uid)                                                                  |
|          │                   ├─ Filter                                                                                                |
|          │                   │   ├─ (queries.sqlhash = '489efae328c451837f82ec293c1db117')                                            |
|          │                   │   └─ IndexedTableAccess(queries)                                                                       |
|          │                   │       ├─ index: [queries.sqlhash]                                                                      |
|          │                   │       └─ filters: [{[489efae328c451837f82ec293c1db117, 489efae328c451837f82ec293c1db117]}]             |
|          │                   └─ Distinct                                                                                              |
|          │                       └─ Project                                                                                           |
|          │                           ├─ columns: [query_tags.query_uid]                                                               |
|          │                           └─ Filter                                                                                        |
|          │                               ├─ (query_tags.tag = 'primary')                                                              |
|          │                               └─ IndexedTableAccess(query_tags)                                                            |
|          │                                   ├─ index: [query_tags.query_uid]                                                         |
|          │                                   └─ columns: [tag query_uid]                                                              |
|          └─ IndexedTableAccess(instances)                                                                                             |
|              └─ index: [instances.uid]                                                                                                |
+---------------------------------------------------------------------------------------------------------------------------------------+
+----------+
| count(*) |
+----------+
| 1        |
+----------+
1 row in set (0.02 sec)
kuza55 commented 1 year ago

To be super clear, the high level sqlalchemy ORM query I had was

        query = query.filter(
            db.and_(
                models.TrainingInstance.queries.any(
                    db.and_(
                        models.Query.sql == sql,
                        models.Query.tags.any(models.QueryTag.tag == "primary")
                    )
                ),
                models.Query.tags.any(models.QueryTag.tag == "bird_gpt35_kg_cot")
            )
        )

And I replaced it with

        query = query.filter(
            models.TrainingInstance.queries.any(models.Query.sql == sql)
        )

And then performed the additional tag checks in python. The sql column should be largely unique; most values will only be there once and will generally not be there more than twice.

Adding an index sounds like a good idea, I'd somehow got the impression Dolt didn't support indexes, but clearly I was just misinformed.

Is there a way to index the sql column directly without needing to maintain a separate hash column?

Reading blobs is very slow, so reading every row in queries takes 3-15seconds for me locally depending on cold/hot caching.

Maybe this is a hot cache situation, but I have found this to take <1s (I don't have more granular timestamps to see more detail).

max-hoffman commented 1 year ago

Is there a way to index the sql column directly without needing to maintain a separate hash column?

Would a trigger that populates the variable be low enough maintainance overhead? There are also generated columns, which would do the same thing internally, but we have not spent a lot of time thinking about how we would implement those.

create trigger ins_sql_md5 
    before insert on queries 
    for each row
    set new.sqlhash = md5(new.sql);
create trigger update_sql_md5 
    before update on queries 
    for each row
    set new.sqlhash = md5(new.sql);

example:

training> create table test (a int primary key, b text, c varchar(32), key (c));
training> create trigger add_md5
       ->     before insert on test
       ->     for each row
       ->     set new.c = md5(new.b);
training> create trigger update_md5
       ->     before update on test
       ->     for each row
       ->     set new.c = md5(new.b);
training> insert into test (a,b) values (0, '0'), (1,'1');
Query OK, 2 rows affected (0.00 sec)
training> select * from test;
+---+---+----------------------------------+
| a | b | c                                |
+---+---+----------------------------------+
| 0 | 0 | cfcd208495d565ef66e7dff9f98764da |
| 1 | 1 | c4ca4238a0b923820dcc509a6f75849b |
+---+---+----------------------------------+
2 rows in set (0.00 sec)

training> update test set b = '2' where a = 1;
Query OK, 1 row affected (0.00 sec)
Rows matched: 1  Changed: 1  Warnings: 0
training> select * from test;
+---+---+----------------------------------+
| a | b | c                                |
+---+---+----------------------------------+
| 0 | 0 | cfcd208495d565ef66e7dff9f98764da |
| 1 | 2 | c81e728d9d4c2f636f067f89cc14862c |
+---+---+----------------------------------+
2 rows in set (0.00 sec)

Your query would do queries.sqlhash = md5('...)' rather than inlining the hash, so all you ever see are the sql strings.

the high level sqlalchemy ORM query I had

This check is expensive and independent of the main join, might be worth keeping that one separate and cached in python if writes are infrequent.

models.Query.tags.any(models.QueryTag.tag == "bird_gpt35_kg_cot")

Maybe this is a hot cache situation, but I have found this to take <1s

Your machine probably has a lot more memory than my laptop. In that case you'd see even more dramatic speedups with the lookup join.

max-hoffman commented 6 months ago

I read through this again, we are happy to reopen and address specific performance issues if there are still concerns. We have added a lot features in the last year, including index and join statistics, that may affect the current optimal approach. Closing for now lacking more specific action items.