Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reduce SortExec memory usage by void constructing single huge batch #2132

Merged
merged 15 commits into from Apr 5, 2022

Conversation

yjshen
Copy link
Member

@yjshen yjshen commented Apr 1, 2022

Which issue does this PR close?

Closes #2149.

Rationale for this change

It's a known issue that we have "double memory" usage behavior for SortExec. To articulate a little bit, while doing in-memory sorting, we are actually:

  • buffer all incoming record batches in memory
  • Combining all the batches into a single batch with combine_batches and then sorting.

The behavior is not critical for pure in-memory processing while memory is sufficient, but when handling datasets much bigger than memory, it becomes crucial to not double use memory when we are already taking much memory and need to spill before heading more incoming batches.

Currently, we are experiencing exit code 137 while using DataFusion sort in Blaze. While handling a much bigger dataset than memory, double memory sort would give queries no chance to finish.

What changes are included in this PR?

  1. Sort each batch once we saw it before buffering it in memory for a better cache access pattern, as justified with in-line comments below.
  2. Construct only sort columns as "contains-all" arrays for in-memory data sort. Produce indices in batches of batch_size.
  3. Construct a batch of rational size and output each time in SortedSizedRecordBatchStream.

Are there any user-facing changes?

No.

@github-actions github-actions bot added ballista datafusion Changes in the datafusion crate labels Apr 1, 2022
@yjshen
Copy link
Member Author

yjshen commented Apr 1, 2022

Hardware Settings:

H/W path                  Device          Class          Description
====================================================================
                                          system         MS-7D53 (To be filled by O.E.M.)
/0                                        bus            MPG X570S EDGE MAX WIFI (MS-7D53)
/0/0                                      memory         64KiB BIOS
/0/11                                     memory         32GiB System Memory
/0/11/0                                   memory         3600 MHz (0.3 ns) [empty]
/0/11/1                                   memory         16GiB DIMM DDR4 Synchronous Unbuffered (Unregistered) 3600 MHz (0.3 
/0/11/2                                   memory         3600 MHz (0.3 ns) [empty]
/0/11/3                                   memory         16GiB DIMM DDR4 Synchronous Unbuffered (Unregistered) 3600 MHz (0.3 
/0/14                                     memory         1MiB L1 cache
/0/15                                     memory         8MiB L2 cache
/0/16                                     memory         64MiB L3 cache
/0/17                                     processor      AMD Ryzen 9 5950X 16-Core Processor

A modified version of TPC-H q1:

select
    l_returnflag,
    l_linestatus,
    l_quantity,
    l_extendedprice,
    l_discount,
    l_tax
from
    lineitem
order by
    l_extendedprice,
    l_discount;
cargo run --release --features "mimalloc" --bin tpch -- benchmark datafusion --iterations 3 --path ../../tpch-parquet/ --format parquet --query 1 --batch-size 4096

Without this PR:

Running benchmarks with the following options: DataFusionBenchmarkOpt { query: 1, debug: false, iterations: 3, partitions: 2, batch_size: 4096, path: "../../tpch-parquet/", file_format: "parquet", mem_table: false, output_path: None }
Query 1 iteration 0 took 2851.7 ms and returned 6001214 rows
Query 1 iteration 1 took 2817.7 ms and returned 6001214 rows
Query 1 iteration 2 took 2735.9 ms and returned 6001214 rows
Query 1 avg time: 2801.75 ms

With this PR:

Running benchmarks with the following options: DataFusionBenchmarkOpt { query: 1, debug: false, iterations: 3, partitions: 2, batch_size: 4096, path: "../../tpch-parquet/", file_format: "parquet", mem_table: false, output_path: None }
Query 1 iteration 0 took 2843.6 ms and returned 6001214 rows
Query 1 iteration 1 took 2824.4 ms and returned 6001214 rows
Query 1 iteration 2 took 2753.0 ms and returned 6001214 rows
Query 1 avg time: 2807.00 ms

Performances are similar.

cargo criterion --bench sort_limit_query_sql

Without this PR:

sort_and_limit_by_int   time:   [869.27 us 870.72 us 872.19 us]                                  

sort_and_limit_by_float time:   [845.95 us 847.36 us 848.77 us]                                    

sort_and_limit_lex_by_int                                                                            
                        time:   [875.07 us 876.80 us 878.59 us]

sort_and_limit_lex_by_string                                                                            
                        time:   [878.78 us 880.40 us 882.06 us]

With this PR:

sort_and_limit_by_int   time:   [856.71 us 858.58 us 860.41 us]                                  

sort_and_limit_by_float time:   [836.55 us 838.40 us 840.29 us]                                    

sort_and_limit_lex_by_int                                                                            
                        time:   [864.83 us 866.33 us 867.83 us]

sort_and_limit_lex_by_string                                                                            
                        time:   [866.89 us 868.25 us 869.62 us]

Similar performance as well.

// NB timer records time taken on drop, so there are no
// calls to `timer.done()` below.
let _timer = tracking_metrics.elapsed_compute().timer();
fn get_sorted_iter(
Copy link
Member Author

@yjshen yjshen Apr 2, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The main changes:

  1. concat all sort-columns (instead of all columns)
  2. sort to get the index array (same as the original sort)
  3. use CompositeIndex to avoid huge batch construction (to access records scattered in different batches)
  4. construct a small batch at a time.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What in the extreme case of when the number of sort columns is equivalent or close to the nr columns?
In that case we are a bit worse of now than before because we need to concat the sort columns anyway?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think in both cases (for this PR and the current master), we need to concat sort columns before the current lexsort. For the current master, the concat is done while constructing the single huge record batch.

For select a, b from table order by a,b, we consume memory with Vec<CompositeIndex> in this PR, but also avoid take huge arrays that do the actual reorder. So I think this behavior is consistent in this PR for different cases that sort columns and payload columns vary?

Copy link
Contributor

@alamb alamb Apr 5, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would say that the peak memory usage is no different in the worst case (all columns are being sorted) as the implementation on master copies the entire input into a new record batch with all columns as well as evaluate the SortExprs into their own area

However, in the common case where not all columns are part of the sort key, this implementation will use significantly less peak memory

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah my intuition is that worst case memory usage will be similar. My conern was that we use mutablearraydata in the new implementation, but I realize we only use them for the non-sort columns. So the implementation in the above case (sort by all columns) is almost the same.

// NB timer records time taken on drop, so there are no
// calls to `timer.done()` below.
let _timer = tracking_metrics.elapsed_compute().timer();
let partial = sort_batch(input, self.schema.clone(), &self.expr)?;
Copy link
Member Author

@yjshen yjshen Apr 2, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A change here: sort each batch before buffering it in memory.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Performance would deteriorate significantly without this change:

Running benchmarks with the following options: DataFusionBenchmarkOpt { query: 1, debug: false, iterations: 3, partitions: 2, batch_size: 4096, path: "/home/yijie/sort_test/tpch-parquet", file_format: "parquet", mem_table: false, output_path: None }
Query 1 iteration 0 took 4619.9 ms and returned 6001214 rows
Query 1 iteration 1 took 4561.0 ms and returned 6001214 rows
Query 1 iteration 2 took 4527.7 ms and returned 6001214 rows

The main reason I think is caused by random memory access while constructing output batches. Without this per-batch sort, while collecting cells from unsorted batches, the memory access would be fully randomized. With this per-batch sort, we are accessing memory linearly for each column in each batch, this would results in much predictable memory access pattern and benefits the CPU cache.

I think the perf counter confirms the above speculation:

sudo perf stat -a -e cache-misses,cache-references,l3_cache_accesses,l3_misses,dTLB-load-misses,dTLB-loads target/release/tpch benchmark datafusion --iterations 3 --path /home/yijie/sort_test/tpch-parquet --format parquet --query 1 --batch-size 4096

Without this per-batch sort:

Performance counter stats for 'system wide':

     1,340,359,889      cache-misses              #   35.817 % of all cache refs  
     3,742,289,458      cache-references                                          
     1,984,089,839      l3_cache_accesses                                         
       540,429,658      l3_misses                                                 
       303,508,234      dTLB-load-misses          #   49.51% of all dTLB cache accesses
       613,048,439      dTLB-loads                                                

      14.222309739 seconds time elapsed

With this per-batch sort:

 Performance counter stats for 'system wide':

     1,059,913,512      cache-misses              #   30.715 % of all cache refs  
     3,450,839,405      cache-references                                          
     1,388,975,765      l3_cache_accesses                                         
       235,570,805      l3_misses                                                 
       239,390,511      dTLB-load-misses          #   51.36% of all dTLB cache accesses
       466,141,655      dTLB-loads                                                

       8.675278258 seconds time elapsed

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It makes sense that sorting a batch in the same thread that produced it (and thus would still be in the cache) improves performance. Nice find @yjshen

cc @tustvold who has been observing similar things while working on scheduling I/O and CPU decoding

Copy link
Member Author

@yjshen yjshen Apr 5, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I think the performance gains come from two folds:

  • Sort and reorder the batch in the same thread while it would still be in the cache, as you mentioned.
  • I think the other one is the memory access pattern for the final output phase. We are serially accessing columns for each batch. So the "sort order materializing we done for each incoming column" changes "purely" randomized collecting to sequentially accessing each column from all the batches, and yields a better cache behavior.

@yjshen yjshen removed the ballista label Apr 4, 2022
@yjshen yjshen changed the title WIP: Reduce sort memory usage v1 Reduce SortExec memory usage by void constructing single huge batch Apr 4, 2022
@yjshen yjshen marked this pull request as ready for review April 4, 2022 05:07
Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Very nice @yjshen 👍 . It is a pleasure to read.

It is fair to say that this PR's core change is to only copy data for the "sort keys" ( rather than all of the columns? If so it I think this is a good approach and state-of-the-art)

There are likely some other improvements that can still be made (I pointed out some below), but this seems like a (great) step in the right direction

I also tested it out using the IOx suite and that passed 🎉 : https://github.com/influxdata/influxdb_iox/pull/4230

Maybe we can eventually write a blog about your sorting adventures (in the vein of
https://duckdb.org/2021/08/27/external-sorting.html) -- you have just as much good stuff to report.

// NB timer records time taken on drop, so there are no
// calls to `timer.done()` below.
let _timer = tracking_metrics.elapsed_compute().timer();
let partial = sort_batch(input, self.schema.clone(), &self.expr)?;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It makes sense that sorting a batch in the same thread that produced it (and thus would still be in the cache) improves performance. Nice find @yjshen

cc @tustvold who has been observing similar things while working on scheduling I/O and CPU decoding

(0..arrays[0].len())
.map(|r| CompositeIndex {
// since we original use UInt32Array to index the combined mono batch,
// component record batches won't overflow as well,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍 yeah I agree this approach is no more prone to overflow than the implementation on master

datafusion/core/src/physical_plan/sorts/sort.rs Outdated Show resolved Hide resolved
.iter()
.map(|b| b.column(i).data())
.collect::<Vec<_>>();
let mut mutable =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is very clever 👍 👍

@alamb
Copy link
Contributor

alamb commented Apr 4, 2022

cc @Dandandan who I think is also interested in clever database internals :)

MutableArrayData::new(arrays, false, combined.len());
for x in combined.iter() {
// we cannot extend with slice here, since the lexsort is unstable
mutable.extend(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think MutableArrayData is a bit more optimized for copying larger chunks of data, instead of single rows at a time. I guess that needs some work optimizing this case at the Arrow side (or somehow use another construction?)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think #2132 (comment) would result in copying larger chunks

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think of something like take but supporting multiple batches.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've updated the SortedIterator above to produce a slice each time to avoid this per cell copy.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I filed apache/arrow-rs#1523 to track adding something like this in arrow-rs

@yjshen
Copy link
Member Author

yjshen commented Apr 5, 2022

Thanks @alamb @Dandandan for your review!

It is fair to say that this PR's core change is to only copy data for the "sort keys" ( rather than all of the columns? If so it I think this is a good approach and state-of-the-art)

Yes, that's the change in this PR.

I also tested it out using the IOx suite and that passed 🎉 : https://github.com/influxdata/influxdb_iox/pull/4230

I've updated the implementation with extend with slice you and Daniël mentioned. Do you mind retrigger tests in InfluxIOx for more tests?

Maybe we can eventually write a blog about your sorting adventures (in the vein of
https://duckdb.org/2021/08/27/external-sorting.html) -- you have just as much good stuff to report.

Sounds great!

Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

len += 1;
// since we have pre-sort each of the incoming batches,
// so if we witnessed a wrong order of indexes from the same batch,
// it must be of the same key with the row pointed by start_row_index.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if this happened, does it mean the sort wasn't stable (as in it rearranged inputs that sorted to equal keys)?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, that's the only case I think.

len = 1;
}
}
slices.push(CompositeSlice {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it important to check here that len > 0? I don't think it is possible to try and sort a 0 length RecordBatch but I wonder what would happen in that case

Copy link
Member Author

@yjshen yjshen Apr 5, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's not possible to be zero here, since it's guarded to not insert empty batches into the in-memory-sorter in the first place (https://github.com/apache/arrow-datafusion/blob/2325d1aac2230182c4cb496327daa76ea0d80088/datafusion/core/src/physical_plan/sorts/sort.rs#L115), so for inner batches merges there wouldn't be empty rows. I will add an assert here in case an unexpected happens.

@alamb
Copy link
Contributor

alamb commented Apr 5, 2022

I updated https://github.com/influxdata/influxdb_iox/pull/4230 to use
2325d1a

and the tests still pass 👍

@alamb
Copy link
Contributor

alamb commented Apr 5, 2022

I think this is good to go, so merging it in 🚀

@alamb alamb merged commit ceffb2f into apache:master Apr 5, 2022
@alamb
Copy link
Contributor

alamb commented Apr 5, 2022

Thanks @yjshen

@yjshen yjshen deleted the sort_memory branch April 22, 2022 08:31
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
datafusion Changes in the datafusion crate
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Optimize memory usage pattern to avoid "double memory" behavior
3 participants