splitgraph / seafowl

Analytical database for data-driven Web applications 🪶
https://seafowl.io
Apache License 2.0
409 stars 10 forks source link

OOM in FULL OUTER JOIN of 2x 40k row tables #185

Open mildbyte opened 1 year ago

mildbyte commented 1 year ago
WITH old AS (SELECT * FROM socrata.dataset_history WHERE sg_image_tag = '20221024-120115'),
  new AS (SELECT * FROM socrata.dataset_history WHERE sg_image_tag = '20221031-000137')
SELECT
  COALESCE(old.domain, new.domain) AS domain,
  COALESCE(old.id, new.id) AS id,
  COALESCE(old.name, new.name) AS name,
  COALESCE(old.description, new.description) AS description,
  COALESCE(old.created_at, new.created_at) AS created_at,
  COALESCE(old.updated_at, new.updated_at) AS updated_at,
  old.id IS NULL AS is_added   -- TRUE if added, FALSE if deleted
FROM old FULL OUTER JOIN new
ON old.domain = new.domain AND old.id = new.id
-- Only include added/deleted datasets
WHERE old.id IS NULL OR new.id IS NULL
ORDER BY domain, name, is_added

(3.6M / 2.7GB Socrata history dataset, the old/new CTEs are supposed to narrow it down to 2x40k row tables)

mildbyte commented 1 year ago

This works fine on a multi-core machine, meaning it's due to single-core plan differences:

1 CORE:

 SortExec: [domain@0 ASC NULLS LAST]                                                                                                                                                                                                                  
   ProjectionExec: expr=[coalesce(old.domain,new.domain)@0 as domain, SUM(CASE WHEN old.id IS NULL THEN Int64(1) ELSE Int64(0) END)@1 as added, SUM(CASE WHEN new.id IS NULL THEN Int64(1) ELSE Int64(0) END)@2 as deleted]                           
     AggregateExec: mode=Final, gby=[coalesce(old.domain,new.domain)@0 as coalesce(old.domain,new.domain)], aggr=[SUM(CASE WHEN #old.id IS NULL THEN Int64(1) ELSE Int64(0) END), SUM(CASE WHEN #new.id IS NULL THEN Int64(1) ELSE Int64(0) END)]     
       CoalescePartitionsExec                                                                                                                                                                                                                         
         AggregateExec: mode=Partial, gby=[coalesce(domain@0, domain@2) as coalesce(old.domain,new.domain)], aggr=[SUM(CASE WHEN #old.id IS NULL THEN Int64(1) ELSE Int64(0) END), SUM(CASE WHEN #new.id IS NULL THEN Int64(1) ELSE Int64(0) END)]    
           CoalesceBatchesExec: target_batch_size=4096                                                                                                                                                                                                
             FilterExec: id@1 IS NULL OR id@3 IS NULL                                                                                                                                                                                                 
               CoalesceBatchesExec: target_batch_size=4096                                                                                                                                                                                            
                 HashJoinExec: mode=CollectLeft, join_type=Full, on=[(Column { name: "id", index: 1 }, Column { name: "id", index: 1 })]                                                                                                              
                   ProjectionExec: expr=[domain@0 as domain, id@1 as id]                                                                                                                                                                              
                     CoalesceBatchesExec: target_batch_size=4096                                                                                                                                                                                      
                       FilterExec: sg_image_tag@2 = 20220814-000122                                                                                                                                                                                   
                         ParquetExec: limit=None, partitions=[...]
                   ProjectionExec: expr=[domain@0 as domain, id@1 as id]                                                                                                                                                                              
                     CoalesceBatchesExec: target_batch_size=4096                                                                                                                                                                                      
                       FilterExec: sg_image_tag@2 = 20220815-000129                                                                                                                                                                                   
                         ParquetExec: limit=None, partitions=[...]

MULTICORE:

 SortExec: [domain@0 ASC NULLS LAST]                                                                                                                                                                                                                              
   CoalescePartitionsExec                                                                                                                                                                                                                                         
     ProjectionExec: expr=[coalesce(old.domain,new.domain)@0 as domain, SUM(CASE WHEN old.id IS NULL THEN Int64(1) ELSE Int64(0) END)@1 as added, SUM(CASE WHEN new.id IS NULL THEN Int64(1) ELSE Int64(0) END)@2 as deleted]                                     
       AggregateExec: mode=FinalPartitioned, gby=[coalesce(old.domain,new.domain)@0 as coalesce(old.domain,new.domain)], aggr=[SUM(CASE WHEN #old.id IS NULL THEN Int64(1) ELSE Int64(0) END), SUM(CASE WHEN #new.id IS NULL THEN Int64(1) ELSE Int64(0) END)]    
         CoalesceBatchesExec: target_batch_size=4096                                                                                                                                                                                                              
           RepartitionExec: partitioning=Hash([Column { name: "coalesce(old.domain,new.domain)", index: 0 }], 4)                                                                                                                                                  
             AggregateExec: mode=Partial, gby=[coalesce(domain@0, domain@2) as coalesce(old.domain,new.domain)], aggr=[SUM(CASE WHEN #old.id IS NULL THEN Int64(1) ELSE Int64(0) END), SUM(CASE WHEN #new.id IS NULL THEN Int64(1) ELSE Int64(0) END)]            
               CoalesceBatchesExec: target_batch_size=4096                                                                                                                                                                                                        
                 FilterExec: id@1 IS NULL OR id@3 IS NULL                                                                                                                                                                                                         
                   CoalesceBatchesExec: target_batch_size=4096                                                                                                                                                                                                    
                     HashJoinExec: mode=Partitioned, join_type=Full, on=[(Column { name: "id", index: 1 }, Column { name: "id", index: 1 })]                                                                                                                      
                       CoalesceBatchesExec: target_batch_size=4096                                                                                                                                                                                                
                         RepartitionExec: partitioning=Hash([Column { name: "id", index: 1 }], 4)                                                                                                                                                                 
                           ProjectionExec: expr=[domain@0 as domain, id@1 as id]                                                                                                                                                                                  
                             CoalesceBatchesExec: target_batch_size=4096                                                                                                                                                                                          
                               FilterExec: sg_image_tag@2 = 20220814-000122                                                                                                                                                                                       
                                 ParquetExec: limit=None, partitions=[...]
                       CoalesceBatchesExec: target_batch_size=4096                                                                                                                                                                                                
                         RepartitionExec: partitioning=Hash([Column { name: "id", index: 1 }], 4)                                                                                                                                                                 
                           ProjectionExec: expr=[domain@0 as domain, id@1 as id]                                                                                                                                                                                  
                             CoalesceBatchesExec: target_batch_size=4096                                                                                                                                                                                          
                               FilterExec: sg_image_tag@2 = 20220815-000129                                                                                                                                                                                       
                                 ParquetExec: limit=None, partitions=[...]

in particular:

1 core:

HashJoinExec: mode=CollectLeft, join_type=Full, on=[(Column { name: "id", index: 1 }, Column { name: "id", index: 1 })], metrics=[output_rows=413307, output_batches=16, input_batches=16, input_rows=413307, join_time=8.42958ms]      

multicore:

HashJoinExec: mode=Partitioned, join_type=Full, on=[(Column { name: "id", index: 1 }, Column { name: "id", index: 1 })], metrics=[output_rows=41337, output_batches=14, input_rows=41337, input_batches=14, join_time=3.456564ms]