adjust / parquet_fdw

Parquet foreign data wrapper for PostgreSQL
PostgreSQL License
333 stars 37 forks source link

Parallel querying returns zero rows #27

Closed sdressler closed 3 years ago

sdressler commented 3 years ago

Description

After running ANALYZE on the FDW backed tables and enabling parallel querying, the query stops executing properly and always returns zero rows. Disabling parallelism causes the query to work.

Without parallelism enabled:

foo=# explain select sum(id) from table where id > 0;
                                    QUERY PLAN
----------------------------------------------------------------------------------
 Aggregate  (cost=1249875.00..1249875.01 rows=1 width=32)
   ->  Foreign Scan on table  (cost=0.00..999900.00 rows=99990000 width=8)
         Filter: (id > 0)
         Reader: Single File
         Row groups: 1, 2
 JIT:
   Functions: 3
   Options: Inlining true, Optimization true, Expressions true, Deforming true
(8 rows)

Time: 2.994 ms
foo=# select sum(id) from table where id > 0;
       sum
------------------
 5000000050000000
(1 row)

Time: 6437.646 ms (00:06.438)

With parallelism enabled:

foo=# set max_parallel_workers_per_gather = 2;
SET
Time: 0.832 ms
foo=# explain select sum(id) from table where id > 0;
                                              QUERY PLAN
-------------------------------------------------------------------------------------------------------
 Finalize Aggregate  (cost=584283.56..584283.57 rows=1 width=32)
   ->  Gather  (cost=584283.33..584283.54 rows=2 width=32)
         Workers Planned: 2
         ->  Partial Aggregate  (cost=583283.33..583283.34 rows=1 width=32)
               ->  Parallel Foreign Scan on table  (cost=0.00..499950.00 rows=33333333 width=8)
                     Filter: (id > 0)
                     Reader: Single File
                     Row groups: 1, 2
 JIT:
   Functions: 5
   Options: Inlining true, Optimization true, Expressions true, Deforming true
(11 rows)

Time: 4.132 ms
foo=# select sum(id) from table where id > 0;
 sum
-----

(1 row)

Time: 51.703 ms
foo=#

Reproduction

  1. Create parquet file from CSV:
id,value
1,test
2,test
3,test
4,test
5,test
1,test
2,test
3,test
4,test
5,test
1,test
2,test
3,test
4,test
5,test
1,test
2,test
3,test
4,test
5,test
  1. Create table:
create foreign table test(
    id bigint
  , value text
) server parquet_srv options(filename '/data/external_data/test_2.parquet');
  1. Ensure parallelism & run query:
set parallel_tuple_cost = 0;
set parallel_setup_cost = 0;
-- Plan must contain "workers planned > 1"
explain select count(*) from test group by id; 
select count(*) from test group by id;

Observed Result

foo=# select count(*) from test group by id;
 count
-------
(0 rows)

Time: 34.895 ms

Expected Result

foo=# select count(*) from test group by id;
 count
-------
     4
     4
     4
     4
     4
(5 rows)

Time: 1.629 ms
sdressler commented 3 years ago

Also happens with an ORDER BY:

foo=# explain analyze select id, value from test order by id;
                                                       QUERY PLAN

-----------------------------------------------------------------------------------------------------------------------
-
 Gather Merge  (cost=5.10..5.24 rows=12 width=13) (actual time=27.599..31.396 rows=0 loops=1)
   Workers Planned: 2
   Workers Launched: 2
   ->  Sort  (cost=5.08..5.09 rows=6 width=13) (actual time=0.075..0.076 rows=0 loops=3)
         Sort Key: id
         Sort Method: quicksort  Memory: 25kB
         Worker 0:  Sort Method: quicksort  Memory: 25kB
         Worker 1:  Sort Method: quicksort  Memory: 25kB
         ->  Parallel Foreign Scan on test  (cost=0.00..5.00 rows=6 width=13) (actual time=0.002..0.003 rows=0 loops=3)
               Reader: Single File
               Row groups: 1
 Planning Time: 0.851 ms
 Execution Time: 31.638 ms
(13 rows)

Time: 33.196 ms
sdressler commented 3 years ago

Found so far, that it breaks in parquet_impl.cpp at read_next_rowgroup. More precisely here:

if (this->reader_id != (coord->next_reader - 1))
  return false;

Since both, reader_id and next_reader are 0.

sdressler commented 3 years ago

I might be wrong, but I think the above needs to become:

if (this->row_group != (coord->next_rowgroup - 1))
    return false;

Rationale: the single-file fdw plan has only one reader per worker. Parallelism should be achieved via processing row-groups in parallel.

zilder commented 3 years ago

Hi @sdressler,

Thank you for reporting the issue. I pushed bugfix into master. Can you pls check if it works for you?

sdressler commented 3 years ago

Yep, looks good to me. Thanks!