Flowminder / FlowKit

FlowKit: Flowminder CDR analytics toolkit
https://flowminder.github.io/FlowKit/
Mozilla Public License 2.0
86 stars 21 forks source link

Update `LastLocation` to use `SELECT DISTINCT ON` #6603

Closed jc-harrison closed 3 months ago

jc-harrison commented 4 months ago

It appears we could get a modest performance improvement in LastLocation queries by re-implementing as

SELECT DISTINCT ON (subscriber_locs.subscriber) subscriber_locs.subscriber, time, {rc}
FROM ({subscriber_locs}) AS subscriber_locs
ORDER BY subscriber_locs.subscriber, time DESC

instead of the current

SELECT final_time.subscriber, {rc}
FROM
     (SELECT subscriber_locs.subscriber, time, {rc},
     row_number() OVER (PARTITION BY subscriber_locs.subscriber ORDER BY time DESC)
         AS rank
     FROM ({subscriber_locs}) AS subscriber_locs) AS final_time
WHERE rank = 1

I ran a test on one day of calls data with ~14M events and ~2M subscribers, and saw a ~10% speed-up from 34.4s to 30.8s for the following query:

fm.features.subscriber.last_location.LastLocation(
    start='2024-03-01',
    stop='2024-03-02',
    spatial_unit=fm.core.spatial_unit.make_spatial_unit(
        spatial_unit_type="admin",
        level=3,
        mapping_table="geography.precomputed_cell_to_admin_mapping",
    ),
    table=["events.calls"],
)

Explain output for the current implementation:

Subquery Scan on final_time  (cost=837293.82..883625.99 rows=7128 width=50) (actual time=24182.071..34179.851 rows=1835670 loops=1)
   Filter: (final_time.rank = 1)
   Rows Removed by Filter: 10968194
   ->  WindowAgg  (cost=837293.82..865805.92 rows=1425605 width=66) (actual time=24182.069..33433.680 rows=12803864 loops=1)
         ->  Sort  (cost=837293.82..840857.84 rows=1425605 width=58) (actual time=24182.049..28152.223 rows=12803864 loops=1)
               Sort Key: calls_20240301.msisdn, calls_20240301.datetime DESC
               Sort Method: external merge  Disk: 952288kB
               ->  Gather  (cost=8733.70..645629.59 rows=1425605 width=58) (actual time=945.625..3623.561 rows=12803864 loops=1)
                     Workers Planned: 6
                     Workers Launched: 6
                     ->  Parallel Hash Join  (cost=7733.70..502069.09 rows=237601 width=58) (actual time=882.983..3431.828 rows=1829123 loops=7)
                           Hash Cond: (calls_20240301.location_id = loc_table.id)
                           Join Filter: (((calls_20240301.datetime)::date >= COALESCE((loc_table.date_of_first_service)::timestamp with time zone, '-infinity'::timestamp with time zone)) AND ((calls_20240301.datetime)::date <= COALESCE((loc_table.date_of_last_service)::timestamp with time zone, 'infinity'::timestamp with time zone)))
                           ->  Parallel Append  (cost=0.00..454828.47 rows=2149445 width=59) (actual time=0.046..1062.639 rows=1842069 loops=7)
                                 ->  Parallel Seq Scan on calls_20240301  (cost=0.00..444081.24 rows=2149445 width=59) (actual time=0.023..835.496 rows=1842069 loops=7)
                                       Filter: ((location_id IS NOT NULL) AND (datetime >= '2024-03-01 00:00:00+00'::timestamp with time zone) AND (datetime < '2024-03-02 00:00:00+00'::timestamp with time zone) AND (location_id <> ''::text))
                                       Rows Removed by Filter: 149306
                                 ->  Parallel Seq Scan on calls  (cost=0.00..0.00 rows=1 width=72) (actual time=0.002..0.002 rows=0 loops=2)
                                       Filter: ((location_id IS NOT NULL) AND (datetime >= '2024-03-01 00:00:00+00'::timestamp with time zone) AND (datetime < '2024-03-02 00:00:00+00'::timestamp with time zone) AND (location_id <> ''::text))
                           ->  Parallel Hash  (cost=7481.59..7481.59 rows=20169 width=39) (actual time=882.716..882.722 rows=4898 loops=7)
                                 Buckets: 65536  Batches: 1  Memory Usage: 3040kB
                                 ->  Merge Join  (cost=7006.73..7481.59 rows=20169 width=39) (actual time=815.473..821.887 rows=4898 loops=7)
                                       Merge Cond: (precomputed_cell_to_admin_mapping.id = loc_table.id)
                                       ->  Sort  (cost=3493.65..3544.08 rows=20169 width=20) (actual time=694.039..694.515 rows=4898 loops=7)
                                             Sort Key: precomputed_cell_to_admin_mapping.id
                                             Sort Method: quicksort  Memory: 827kB
                                             Worker 0:  Sort Method: quicksort  Memory: 2734kB
                                             Worker 1:  Sort Method: quicksort  Memory: 40kB
                                             Worker 2:  Sort Method: quicksort  Memory: 40kB
                                             Worker 3:  Sort Method: quicksort  Memory: 40kB
                                             Worker 4:  Sort Method: quicksort  Memory: 40kB
                                             Worker 5:  Sort Method: quicksort  Memory: 40kB
                                             ->  Hash Left Join  (cost=285.13..2051.59 rows=20169 width=20) (actual time=689.184..691.967 rows=4898 loops=7)
                                                   Hash Cond: (precomputed_cell_to_admin_mapping.admin3pcod = (admin3.admin3pcod)::text)
                                                   ->  Parallel Index Only Scan using precomputed_cell_to_admin_mapping_id_admin3pcod_idx on precomputed_cell_to_admin_mapping  (cost=0.29..1489.42 rows=20169 width=21) (actual time=0.036..1.744 rows=4898 loops=7)
                                                         Heap Fetches: 34288
                                                   ->  Hash  (cost=277.71..277.71 rows=570 width=9) (actual time=689.129..689.131 rows=570 loops=7)
                                                         Buckets: 1024  Batches: 1  Memory Usage: 32kB
                                                         ->  Subquery Scan on admin3  (cost=0.00..277.71 rows=570 width=9) (actual time=687.613..688.965 rows=570 loops=7)
                                                               ->  Seq Scan on geoms  (cost=0.00..272.01 rows=570 width=81) (actual time=687.611..688.877 rows=570 loops=7)
                                                                     Filter: (spatial_resolution = 3)
                                                                     Rows Removed by Filter: 151
                                       ->  Sort  (cost=3513.08..3599.24 rows=34465 width=19) (actual time=121.244..122.172 rows=10547 loops=7)
                                             Sort Key: loc_table.id
                                             Sort Method: quicksort  Memory: 3152kB
                                             Worker 0:  Sort Method: quicksort  Memory: 3152kB
                                             Worker 1:  Sort Method: quicksort  Memory: 3152kB
                                             Worker 2:  Sort Method: quicksort  Memory: 3152kB
                                             Worker 3:  Sort Method: quicksort  Memory: 3152kB
                                             Worker 4:  Sort Method: quicksort  Memory: 3152kB
                                             Worker 5:  Sort Method: quicksort  Memory: 3152kB
                                             ->  Seq Scan on cells loc_table  (cost=0.00..915.65 rows=34465 width=19) (actual time=0.027..8.693 rows=34465 loops=7)
 Planning Time: 123.532 ms
 JIT:
   Functions: 258
   Options: Inlining true, Optimization true, Expressions true, Deforming true
   Timing: Generation 34.692 ms, Inlining 454.648 ms, Optimization 2546.012 ms, Emission 1807.859 ms, Total 4843.212 ms
 Execution Time: 34416.014 ms

And for the modified implementation using SELECT DISTINCT ON:

Unique  (cost=837293.82..844421.85 rows=1425605 width=58) (actual time=26080.790..30641.673 rows=1835670 loops=1)
   ->  Sort  (cost=837293.82..840857.84 rows=1425605 width=58) (actual time=26080.789..29374.052 rows=12803864 loops=1)
         Sort Key: calls_20240301.msisdn, calls_20240301.datetime DESC
         Sort Method: external merge  Disk: 952288kB
         ->  Gather  (cost=8733.70..645629.59 rows=1425605 width=58) (actual time=952.785..3794.816 rows=12803864 loops=1)
               Workers Planned: 6
               Workers Launched: 6
               ->  Parallel Hash Join  (cost=7733.70..502069.09 rows=237601 width=58) (actual time=890.494..3493.537 rows=1829123 loops=7)
                     Hash Cond: (calls_20240301.location_id = loc_table.id)
                     Join Filter: (((calls_20240301.datetime)::date >= COALESCE((loc_table.date_of_first_service)::timestamp with time zone, '-infinity'::timestamp with time zone)) AND ((calls_20240301.datetime)::date <= COALESCE((loc_table.date_of_last_service)::timestamp with time zone, 'infinity'::timestamp with time zone)))
                     ->  Parallel Append  (cost=0.00..454828.47 rows=2149445 width=59) (actual time=0.052..1081.464 rows=1842069 loops=7)
                           ->  Parallel Seq Scan on calls_20240301  (cost=0.00..444081.24 rows=2149445 width=59) (actual time=0.027..853.991 rows=1842069 loops=7)
                                 Filter: ((location_id IS NOT NULL) AND (datetime >= '2024-03-01 00:00:00+00'::timestamp with time zone) AND (datetime < '2024-03-02 00:00:00+00'::timestamp with time zone) AND (location_id <> ''::text))
                                 Rows Removed by Filter: 149306
                           ->  Parallel Seq Scan on calls  (cost=0.00..0.00 rows=1 width=72) (actual time=0.002..0.002 rows=0 loops=2)
                                 Filter: ((location_id IS NOT NULL) AND (datetime >= '2024-03-01 00:00:00+00'::timestamp with time zone) AND (datetime < '2024-03-02 00:00:00+00'::timestamp with time zone) AND (location_id <> ''::text))
                     ->  Parallel Hash  (cost=7481.59..7481.59 rows=20169 width=39) (actual time=890.217..890.223 rows=4898 loops=7)
                           Buckets: 65536  Batches: 1  Memory Usage: 3040kB
                           ->  Merge Join  (cost=7006.73..7481.59 rows=20169 width=39) (actual time=804.613..810.019 rows=4898 loops=7)
                                 Merge Cond: (precomputed_cell_to_admin_mapping.id = loc_table.id)
                                 ->  Sort  (cost=3493.65..3544.08 rows=20169 width=20) (actual time=698.188..698.687 rows=4898 loops=7)
                                       Sort Key: precomputed_cell_to_admin_mapping.id
                                       Sort Method: quicksort  Memory: 4120kB
                                       Worker 0:  Sort Method: quicksort  Memory: 40kB
                                       Worker 1:  Sort Method: quicksort  Memory: 40kB
                                       Worker 2:  Sort Method: quicksort  Memory: 40kB
                                       Worker 3:  Sort Method: quicksort  Memory: 40kB
                                       Worker 4:  Sort Method: quicksort  Memory: 40kB
                                       Worker 5:  Sort Method: quicksort  Memory: 40kB
                                       ->  Hash Left Join  (cost=285.13..2051.59 rows=20169 width=20) (actual time=693.764..696.258 rows=4898 loops=7)
                                             Hash Cond: (precomputed_cell_to_admin_mapping.admin3pcod = (admin3.admin3pcod)::text)
                                             ->  Parallel Index Only Scan using precomputed_cell_to_admin_mapping_id_admin3pcod_idx on precomputed_cell_to_admin_mapping  (cost=0.29..1489.42 rows=20169 width=21) (actual time=0.040..1.492 rows=4898 loops=7)
                                                   Heap Fetches: 34288
                                             ->  Hash  (cost=277.71..277.71 rows=570 width=9) (actual time=693.702..693.704 rows=570 loops=7)
                                                   Buckets: 1024  Batches: 1  Memory Usage: 32kB
                                                   ->  Subquery Scan on admin3  (cost=0.00..277.71 rows=570 width=9) (actual time=692.423..693.581 rows=570 loops=7)
                                                         ->  Seq Scan on geoms  (cost=0.00..272.01 rows=570 width=81) (actual time=692.421..693.511 rows=570 loops=7)
                                                               Filter: (spatial_resolution = 3)
                                                               Rows Removed by Filter: 151
                                 ->  Sort  (cost=3513.08..3599.24 rows=34465 width=19) (actual time=106.217..106.833 rows=5708 loops=7)
                                       Sort Key: loc_table.id
                                       Sort Method: quicksort  Memory: 3152kB
                                       Worker 0:  Sort Method: quicksort  Memory: 3152kB
                                       Worker 1:  Sort Method: quicksort  Memory: 3152kB
                                       Worker 2:  Sort Method: quicksort  Memory: 3152kB
                                       Worker 3:  Sort Method: quicksort  Memory: 3152kB
                                       Worker 4:  Sort Method: quicksort  Memory: 3152kB
                                       Worker 5:  Sort Method: quicksort  Memory: 3152kB
                                       ->  Seq Scan on cells loc_table  (cost=0.00..915.65 rows=34465 width=19) (actual time=0.028..6.899 rows=34465 loops=7)
 Planning Time: 106.511 ms
 JIT:
   Functions: 253
   Options: Inlining true, Optimization true, Expressions true, Deforming true
   Timing: Generation 36.442 ms, Inlining 551.970 ms, Optimization 2649.197 ms, Emission 1641.969 ms, Total 4879.578 ms
 Execution Time: 30874.473 ms

Of particular note:

There are other instances in flowmachine where the "rank within window; select where rank=1" approach is used (e.g. ModalLocation) - it would be worth checking whether the same modification would help in those instances.

greenape commented 4 months ago

Ooo, that's interesting.. We explicitly avoided us of DISTINCT because it used a slower code path as I recall. This test is still on a pg12 flowdb, right?

jc-harrison commented 4 months ago

COUNT(DISTINCT ...) uses a slower code path. I'm not sure the same is true of SELECT DISTINCT ON.

Yes, this is still on a pg12 flowdb.