Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

OOM in FULL OUTER JOIN of 2x 40k row tables #185

Open
mildbyte opened this issue Oct 31, 2022 · 1 comment
Open

OOM in FULL OUTER JOIN of 2x 40k row tables #185

mildbyte opened this issue Oct 31, 2022 · 1 comment

Comments

@mildbyte
Copy link
Contributor

WITH old AS (SELECT * FROM socrata.dataset_history WHERE sg_image_tag = '20221024-120115'),
  new AS (SELECT * FROM socrata.dataset_history WHERE sg_image_tag = '20221031-000137')
SELECT
  COALESCE(old.domain, new.domain) AS domain,
  COALESCE(old.id, new.id) AS id,
  COALESCE(old.name, new.name) AS name,
  COALESCE(old.description, new.description) AS description,
  COALESCE(old.created_at, new.created_at) AS created_at,
  COALESCE(old.updated_at, new.updated_at) AS updated_at,
  old.id IS NULL AS is_added   -- TRUE if added, FALSE if deleted
FROM old FULL OUTER JOIN new
ON old.domain = new.domain AND old.id = new.id
-- Only include added/deleted datasets
WHERE old.id IS NULL OR new.id IS NULL
ORDER BY domain, name, is_added

(3.6M / 2.7GB Socrata history dataset, the old/new CTEs are supposed to narrow it down to 2x40k row tables)

mildbyte added a commit that referenced this issue Nov 2, 2022
With one partition (the default if `num_cpus=1`), we seem to hit some DataFusion
bugs:

- #186
- potentially #185

As a temporary workaround, pretend we always need at least 2 partitions, which
makes DataFusion use alternative query plans.
@mildbyte
Copy link
Contributor Author

mildbyte commented Nov 2, 2022

This works fine on a multi-core machine, meaning it's due to single-core plan differences:

1 CORE:

 SortExec: [domain@0 ASC NULLS LAST]                                                                                                                                                                                                                  
   ProjectionExec: expr=[coalesce(old.domain,new.domain)@0 as domain, SUM(CASE WHEN old.id IS NULL THEN Int64(1) ELSE Int64(0) END)@1 as added, SUM(CASE WHEN new.id IS NULL THEN Int64(1) ELSE Int64(0) END)@2 as deleted]                           
     AggregateExec: mode=Final, gby=[coalesce(old.domain,new.domain)@0 as coalesce(old.domain,new.domain)], aggr=[SUM(CASE WHEN #old.id IS NULL THEN Int64(1) ELSE Int64(0) END), SUM(CASE WHEN #new.id IS NULL THEN Int64(1) ELSE Int64(0) END)]     
       CoalescePartitionsExec                                                                                                                                                                                                                         
         AggregateExec: mode=Partial, gby=[coalesce(domain@0, domain@2) as coalesce(old.domain,new.domain)], aggr=[SUM(CASE WHEN #old.id IS NULL THEN Int64(1) ELSE Int64(0) END), SUM(CASE WHEN #new.id IS NULL THEN Int64(1) ELSE Int64(0) END)]    
           CoalesceBatchesExec: target_batch_size=4096                                                                                                                                                                                                
             FilterExec: id@1 IS NULL OR id@3 IS NULL                                                                                                                                                                                                 
               CoalesceBatchesExec: target_batch_size=4096                                                                                                                                                                                            
                 HashJoinExec: mode=CollectLeft, join_type=Full, on=[(Column { name: "id", index: 1 }, Column { name: "id", index: 1 })]                                                                                                              
                   ProjectionExec: expr=[domain@0 as domain, id@1 as id]                                                                                                                                                                              
                     CoalesceBatchesExec: target_batch_size=4096                                                                                                                                                                                      
                       FilterExec: sg_image_tag@2 = 20220814-000122                                                                                                                                                                                   
                         ParquetExec: limit=None, partitions=[...]
                   ProjectionExec: expr=[domain@0 as domain, id@1 as id]                                                                                                                                                                              
                     CoalesceBatchesExec: target_batch_size=4096                                                                                                                                                                                      
                       FilterExec: sg_image_tag@2 = 20220815-000129                                                                                                                                                                                   
                         ParquetExec: limit=None, partitions=[...]


MULTICORE:

 SortExec: [domain@0 ASC NULLS LAST]                                                                                                                                                                                                                              
   CoalescePartitionsExec                                                                                                                                                                                                                                         
     ProjectionExec: expr=[coalesce(old.domain,new.domain)@0 as domain, SUM(CASE WHEN old.id IS NULL THEN Int64(1) ELSE Int64(0) END)@1 as added, SUM(CASE WHEN new.id IS NULL THEN Int64(1) ELSE Int64(0) END)@2 as deleted]                                     
       AggregateExec: mode=FinalPartitioned, gby=[coalesce(old.domain,new.domain)@0 as coalesce(old.domain,new.domain)], aggr=[SUM(CASE WHEN #old.id IS NULL THEN Int64(1) ELSE Int64(0) END), SUM(CASE WHEN #new.id IS NULL THEN Int64(1) ELSE Int64(0) END)]    
         CoalesceBatchesExec: target_batch_size=4096                                                                                                                                                                                                              
           RepartitionExec: partitioning=Hash([Column { name: "coalesce(old.domain,new.domain)", index: 0 }], 4)                                                                                                                                                  
             AggregateExec: mode=Partial, gby=[coalesce(domain@0, domain@2) as coalesce(old.domain,new.domain)], aggr=[SUM(CASE WHEN #old.id IS NULL THEN Int64(1) ELSE Int64(0) END), SUM(CASE WHEN #new.id IS NULL THEN Int64(1) ELSE Int64(0) END)]            
               CoalesceBatchesExec: target_batch_size=4096                                                                                                                                                                                                        
                 FilterExec: id@1 IS NULL OR id@3 IS NULL                                                                                                                                                                                                         
                   CoalesceBatchesExec: target_batch_size=4096                                                                                                                                                                                                    
                     HashJoinExec: mode=Partitioned, join_type=Full, on=[(Column { name: "id", index: 1 }, Column { name: "id", index: 1 })]                                                                                                                      
                       CoalesceBatchesExec: target_batch_size=4096                                                                                                                                                                                                
                         RepartitionExec: partitioning=Hash([Column { name: "id", index: 1 }], 4)                                                                                                                                                                 
                           ProjectionExec: expr=[domain@0 as domain, id@1 as id]                                                                                                                                                                                  
                             CoalesceBatchesExec: target_batch_size=4096                                                                                                                                                                                          
                               FilterExec: sg_image_tag@2 = 20220814-000122                                                                                                                                                                                       
                                 ParquetExec: limit=None, partitions=[...]
                       CoalesceBatchesExec: target_batch_size=4096                                                                                                                                                                                                
                         RepartitionExec: partitioning=Hash([Column { name: "id", index: 1 }], 4)                                                                                                                                                                 
                           ProjectionExec: expr=[domain@0 as domain, id@1 as id]                                                                                                                                                                                  
                             CoalesceBatchesExec: target_batch_size=4096                                                                                                                                                                                          
                               FilterExec: sg_image_tag@2 = 20220815-000129                                                                                                                                                                                       
                                 ParquetExec: limit=None, partitions=[...]

in particular:

1 core:

HashJoinExec: mode=CollectLeft, join_type=Full, on=[(Column { name: "id", index: 1 }, Column { name: "id", index: 1 })], metrics=[output_rows=413307, output_batches=16, input_batches=16, input_rows=413307, join_time=8.42958ms]      

multicore:

HashJoinExec: mode=Partitioned, join_type=Full, on=[(Column { name: "id", index: 1 }, Column { name: "id", index: 1 })], metrics=[output_rows=41337, output_batches=14, input_rows=41337, input_batches=14, join_time=3.456564ms]     

mildbyte added a commit that referenced this issue Nov 2, 2022
With one partition (the default if `num_cpus=1`), we seem to hit some DataFusion
bugs:

- #186
- potentially #185

As a temporary workaround, pretend we always need at least 2 partitions, which
makes DataFusion use alternative query plans.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant