adjust / parquet_fdw

Parquet foreign data wrapper for PostgreSQL
PostgreSQL License
333 stars 37 forks source link

Multifile merge strategy fix #50

Open trollfred opened 1 year ago

trollfred commented 1 year ago
  1. Fixes sorting pathkeys
  2. Introduces force_multifile_merge GUC variable
jrogov commented 1 year ago

See initial observation and following discussion from here: https://github.com/adjust/segmentation_api/issues/485#issuecomment-1162767603

cabecada commented 1 year ago

corrected after artur comment of wrong sort order in create table. it looks like it now skips sorting.


postgres@parquet:~/demo$ more ~/one.py 
parquet generation for pre sorted data
import numpy
import pyarrow as pa
import pyarrow.parquet as pq

for i in range(4):
    data = pa.table({"col1": [i + 10 for x in range(10000)],
                     "col2": [i + 10 for x in range(10000)],
                     "col3": [i + 100 for x in range(10000)]})
    pq.write_table(data, "example.parquet." + str(i), compression=None)

create foreign table t1 ( col1 int, col2 int, col3 int ) server parquet_srv options ( filename '/var/lib/postgresql/example.parquet.0 /var/lib/postgresql/example.parquet.1 /var/lib/postgresql/example.parquet.2 /var/lib/postgresql/example.parquet.3', sorted 'col1 col2 col3', files_in_order 'true');

foo=# select count(*) from t1;
 count 
-------
 40000
(1 row)

foo=# select distinct col1,col2,col3 from t1;
 col1 | col2 | col3 
------+------+------
   10 |   10 |  100
   11 |   11 |  101
   12 |   12 |  102
   13 |   13 |  103
(4 rows)

foo=# select col1,col2,col3,count(*) from t1 group by col1,col2,col3;
 col1 | col2 | col3 | count 
------+------+------+-------
   10 |   10 |  100 | 10000
   11 |   11 |  101 | 10000
   12 |   12 |  102 | 10000
   13 |   13 |  103 | 10000

foo=# set parquet_fdw.force_multifile_merge TO true;

foo=# explain analyze select col1, col2  from t1 where col1 > 1 and col2 > 1 order by col1, col2;
                                               QUERY PLAN                                               
--------------------------------------------------------------------------------------------------------
 Foreign Scan on t1  (cost=0.00..0.00 rows=4444 width=8) (actual time=1.904..17.811 rows=40000 loops=1)
   Filter: ((col1 > 1) AND (col2 > 1))
   Reader: Multifile Merge
   Row groups: 
     example.parquet.0: 1
     example.parquet.1: 1
     example.parquet.2: 1
     example.parquet.3: 1
 Planning Time: 1.243 ms
 Execution Time: 20.740 ms
(10 rows)

foo=# explain analyze select col1, col2, count(*)  from t1 where col1 > 1 and col2 > 1 group by col1, col2 order by col1, col2;
                                                  QUERY PLAN                                                  
--------------------------------------------------------------------------------------------------------------
 GroupAggregate  (cost=0.00..61.01 rows=2768 width=16) (actual time=14.094..34.549 rows=4 loops=1)
   Group Key: col1, col2
   ->  Foreign Scan on t1  (cost=0.00..0.00 rows=4444 width=8) (actual time=2.040..26.037 rows=40000 loops=1)
         Filter: ((col1 > 1) AND (col2 > 1))
         Reader: Multifile Merge
         Row groups: 
           example.parquet.0: 1
           example.parquet.1: 1
           example.parquet.2: 1
           example.parquet.3: 1
 Planning Time: 1.899 ms
 Execution Time: 35.649 ms
(12 rows)

but when the predicate spans multiple files and the order by does not include primary col1 but col2,col3 ...

foo=# explain analyze select * from t1 where col1 < 15 order by col2, col3;
                                                   QUERY PLAN                                                   
----------------------------------------------------------------------------------------------------------------
 Sort  (cost=913.52..946.85 rows=13333 width=12) (actual time=35.151..37.386 rows=40000 loops=1)
   Sort Key: col2, col3
   Sort Method: quicksort  Memory: 3417kB
   ->  Foreign Scan on t1  (cost=0.00..0.00 rows=13333 width=12) (actual time=2.394..25.772 rows=40000 loops=1)
         Filter: (col1 < 15)
         Reader: Multifile Merge
         Row groups: 
           example.parquet.0: 1
           example.parquet.1: 1
           example.parquet.2: 1
           example.parquet.3: 1
 Planning Time: 1.537 ms
 Execution Time: 40.369 ms
(13 rows)

but ..
foo=# explain analyze select * from t1 where col1 < 15 order by col2, col3;
                                                   QUERY PLAN                                                   
----------------------------------------------------------------------------------------------------------------
 Sort  (cost=913.52..946.85 rows=13333 width=12) (actual time=35.151..37.386 rows=40000 loops=1)
   Sort Key: col2, col3
   Sort Method: quicksort  Memory: 3417kB
   ->  Foreign Scan on t1  (cost=0.00..0.00 rows=13333 width=12) (actual time=2.394..25.772 rows=40000 loops=1)
         Filter: (col1 < 15)
         Reader: Multifile Merge
         Row groups: 
           example.parquet.0: 1
           example.parquet.1: 1
           example.parquet.2: 1
           example.parquet.3: 1
 Planning Time: 1.537 ms
 Execution Time: 40.369 ms
(13 rows)

-- if it does include the primary col, then ok.

foo=# explain analyze select * from t1 where col1 < 15 order by col1, col2, col3;
                                                QUERY PLAN                                                
----------------------------------------------------------------------------------------------------------
 Foreign Scan on t1  (cost=0.00..0.00 rows=13333 width=12) (actual time=2.135..23.925 rows=40000 loops=1)
   Filter: (col1 < 15)
   Reader: Multifile Merge
   Row groups: 
     example.parquet.0: 1
     example.parquet.1: 1
     example.parquet.2: 1
     example.parquet.3: 1
 Planning Time: 1.496 ms
 Execution Time: 26.344 ms
(10 rows)
trollfred commented 1 year ago

Hi @cabecada! This was not the use case we were solving. The one which we need is explain analyze select * from abc where col1 = 10 order by col2, col3; If we ever need to fix your case can we do that in a separate PR?

za-arthur commented 1 year ago

Order of columns in your ORDER BY doesn't match sorted option.

cabecada commented 1 year ago

@za-arthur yep, that was one of the cases i was testing. sorry for wrong order. sorry for the noise. i'll paste the relevant cases just to correct above scenario.

cabecada commented 1 year ago

@trollfred so yes your use case as well as the group by option works as well (did not pick up sort) atleast for small workload.

cabecada commented 1 year ago

i dont have much data but

we might end up reduce number of parallel workers due to this, was my concern.

foo=# set enable_hashagg TO off;
SET
foo=# explain analyze select col1, col2, col3, count(*)  from t1 where col1 > 1 and col2 > 1 and col3 > 0 group by col1, col2,col3 having last(col1) > 1 and last(col2) > 1 order by col1, col2, col3;
                                                              QUERY PLAN                                                               
---------------------------------------------------------------------------------------------------------------------------------------
 Finalize GroupAggregate  (cost=1000.06..3652.21 rows=1397 width=20) (actual time=170.017..173.702 rows=4 loops=1)
   Group Key: col1, col2, col3
   Filter: ((last(col1) > 1) AND (last(col2) > 1))
   ->  Gather Merge  (cost=1000.06..3285.82 rows=11852 width=28) (actual time=170.008..173.691 rows=4 loops=1)
         Workers Planned: 4
         Workers Launched: 4
         ->  Partial GroupAggregate  (cost=0.00..874.07 rows=2963 width=28) (actual time=80.400..80.401 rows=1 loops=5)
               Group Key: col1, col2, col3
               ->  Parallel Foreign Scan on t1  (cost=0.00..800.00 rows=2963 width=12) (actual time=11.328..49.506 rows=80000 loops=5)
                     Filter: ((col1 > 1) AND (col2 > 1) AND (col3 > 0))
                     Reader: Multifile
                     Row groups: 
                       example.parquet.0: 1
                       example.parquet.1: 1
                       example.parquet.2: 1
                       example.parquet.3: 1
 Planning Time: 2.543 ms
 Execution Time: 173.859 ms
(18 rows)

foo=# set parquet_fdw.force_multifile_merge TO true;
SET
foo=# explain analyze select col1, col2, col3, count(*)  from t1 where col1 > 1 and col2 > 1 and col3 > 0 group by col1, col2,col3 having last(col1) > 1 and last(col2) > 1 order by col1, col2, col3;
                                                    QUERY PLAN                                                     
-------------------------------------------------------------------------------------------------------------------
 GroupAggregate  (cost=0.00..410.83 rows=1397 width=20) (actual time=80.175..183.156 rows=4 loops=1)
   Group Key: col1, col2, col3
   Filter: ((last(col1) > 1) AND (last(col2) > 1))
   ->  Foreign Scan on t1  (cost=0.00..0.00 rows=14815 width=12) (actual time=21.616..134.243 rows=400000 loops=1)
         Filter: ((col1 > 1) AND (col2 > 1) AND (col3 > 0))
         Reader: Multifile Merge
         Row groups: 
           example.parquet.0: 1
           example.parquet.1: 1
           example.parquet.2: 1
           example.parquet.3: 1
 Planning Time: 2.264 ms
 Execution Time: 183.758 ms
(13 rows)

foo=# set parallel_setup_cost TO 0;
SET
foo=# set parallel_tuple_cost TO 0;
SET
foo=# set force_parallel_mode TO on;
SET
foo=# explain analyze select col1, col2, col3, count(*)  from t1 where col1 > 1 and col2 > 1 and col3 > 0 group by col1, col2,col3 having last(col1) > 1 and last(col2) > 1 order by col1, col2, col3;
                                                       QUERY PLAN                                                       
------------------------------------------------------------------------------------------------------------------------
 Gather  (cost=0.00..410.83 rows=1397 width=20) (actual time=75.247..180.483 rows=4 loops=1)
   Workers Planned: 1
   Workers Launched: 1
   Single Copy: true
   ->  GroupAggregate  (cost=0.00..410.83 rows=1397 width=20) (actual time=48.532..151.743 rows=4 loops=1)
         Group Key: col1, col2, col3
         Filter: ((last(col1) > 1) AND (last(col2) > 1))
         ->  Foreign Scan on t1  (cost=0.00..0.00 rows=14815 width=12) (actual time=5.048..106.292 rows=400000 loops=1)
               Filter: ((col1 > 1) AND (col2 > 1) AND (col3 > 0))
               Reader: Multifile Merge
               Row groups: 
                 example.parquet.0: 1
                 example.parquet.1: 1
                 example.parquet.2: 1
                 example.parquet.3: 1
 Planning Time: 2.087 ms
 Execution Time: 180.920 ms
(17 rows)
cabecada commented 1 year ago

as per the expectation of the PR to ensure multimerge files is picked up by the planner, this hack works and is also reversible incase something unexpected comes up. so lgtm.

jrogov commented 1 year ago

@cabecada do I incorrectly remember that you've experienced crashes when using Multifile Merge (when using 1+ parallel workers?) If it is so, how can it be good to merge, if it's prone to failures?

jrogov commented 1 year ago

@za-arthur waiting on your feedback on this

Either way this needs to be deployed to some server for testing with possibility of revert.

llamanna commented 1 year ago

Any possibility of taking care of this PR before the end of the quarter? @za-arthur