Skip to content

Commit

Permalink
fix: incorrect memory usage track for sort (#2135)
Browse files Browse the repository at this point in the history
* fix:  incorrect memory usage track for sort

* tests
  • Loading branch information
yjshen committed Apr 4, 2022
1 parent f99c271 commit 2a4a835
Show file tree
Hide file tree
Showing 3 changed files with 75 additions and 7 deletions.
37 changes: 32 additions & 5 deletions datafusion/core/src/execution/memory_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,12 @@ pub trait MemoryConsumer: Send + Sync {
Ok(())
}

/// Return `freed` memory to the memory manager,
/// may wake up other requesters waiting for their minimum memory quota.
fn shrink(&self, freed: usize) {
self.memory_manager().record_free(freed);
}

/// Spill in-memory buffers to disk, free memory, return the previous used
async fn spill(&self) -> Result<usize>;

Expand Down Expand Up @@ -303,7 +309,8 @@ impl MemoryManager {
));
}

fn get_requester_total(&self) -> usize {
/// Return the total memory usage for all requesters
pub fn get_requester_total(&self) -> usize {
*self.requesters_total.lock()
}

Expand Down Expand Up @@ -342,8 +349,8 @@ impl MemoryManager {
// if we cannot acquire at lease 1/2n memory, just wait for others
// to spill instead spill self frequently with limited total mem
debug!(
"Cannot acquire minimum amount of memory {} on memory manager {}, waiting for others to spill ...",
human_readable_size(min_per_rqt), self);
"Cannot acquire a minimum amount of {} memory from the manager of total {}, waiting for others to spill ...",
human_readable_size(min_per_rqt), human_readable_size(self.pool_size));
let now = Instant::now();
self.cv.wait(&mut rqt_current_used);
let elapsed = now.elapsed();
Expand All @@ -361,12 +368,30 @@ impl MemoryManager {
granted
}

fn record_free_then_acquire(&self, freed: usize, acquired: usize) -> usize {
fn record_free_then_acquire(&self, freed: usize, acquired: usize) {
let mut requesters_total = self.requesters_total.lock();
debug!(
"free_then_acquire: total {}, freed {}, acquired {}",
human_readable_size(*requesters_total),
human_readable_size(freed),
human_readable_size(acquired)
);
assert!(*requesters_total >= freed);
*requesters_total -= freed;
*requesters_total += acquired;
self.cv.notify_all()
self.cv.notify_all();
}

fn record_free(&self, freed: usize) {
let mut requesters_total = self.requesters_total.lock();
debug!(
"free: total {}, freed {}",
human_readable_size(*requesters_total),
human_readable_size(freed)
);
assert!(*requesters_total >= freed);
*requesters_total -= freed;
self.cv.notify_all();
}

/// Drop a memory consumer and reclaim the memory
Expand All @@ -378,6 +403,8 @@ impl MemoryManager {
let mut total = self.requesters_total.lock();
assert!(*total >= mem_used);
*total -= mem_used;
self.cv.notify_all();
return;
}
}
self.shrink_tracker_usage(mem_used);
Expand Down
37 changes: 35 additions & 2 deletions datafusion/core/src/physical_plan/sorts/sort.rs
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ impl ExternalSorter {
&self.expr,
tracking_metrics,
)?;
let prev_used = self.metrics.mem_used().set(0);
let prev_used = self.free_all_memory();
streams.push(SortedStream::new(in_mem_stream, prev_used));
}

Expand Down Expand Up @@ -169,13 +169,19 @@ impl ExternalSorter {
tracking_metrics,
);
// Report to the memory manager we are no longer using memory
self.metrics.mem_used().set(0);
self.free_all_memory();
result
} else {
Ok(Box::pin(EmptyRecordBatchStream::new(self.schema.clone())))
}
}

fn free_all_memory(&self) -> usize {
let used = self.metrics.mem_used().set(0);
self.shrink(used);
used
}

fn used(&self) -> usize {
self.metrics.mem_used().value()
}
Expand Down Expand Up @@ -678,6 +684,15 @@ mod tests {
assert_eq!(c7.value(0), 15);
assert_eq!(c7.value(c7.len() - 1), 254,);

assert_eq!(
session_ctx
.runtime_env()
.memory_manager
.get_requester_total(),
0,
"The sort should have returned all memory used back to the memory manager"
);

Ok(())
}

Expand Down Expand Up @@ -755,6 +770,15 @@ mod tests {
assert_eq!(c7.value(0), 15);
assert_eq!(c7.value(c7.len() - 1), 254,);

assert_eq!(
session_ctx
.runtime_env()
.memory_manager
.get_requester_total(),
0,
"The sort should have returned all memory used back to the memory manager"
);

Ok(())
}

Expand Down Expand Up @@ -941,6 +965,15 @@ mod tests {
drop(fut);
assert_strong_count_converges_to_zero(refs).await;

assert_eq!(
session_ctx
.runtime_env()
.memory_manager
.get_requester_total(),
0,
"The sort should have returned all memory used back to the memory manager"
);

Ok(())
}
}
8 changes: 8 additions & 0 deletions datafusion/core/tests/order_spill_fuzz.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,14 @@ async fn run_sort(pool_size: usize, size_spill: Vec<(usize, bool)>) {
assert_eq!(sort.metrics().unwrap().spill_count().unwrap(), 0);
}

assert_eq!(
session_ctx
.runtime_env()
.memory_manager
.get_requester_total(),
0,
"The sort should have returned all memory used back to the memory manager"
);
assert_eq!(expected, actual, "failure in @ pool_size {}", pool_size);
}
}
Expand Down

0 comments on commit 2a4a835

Please sign in to comment.