Skip to content

Commit

Permalink
Parallelize SetState in LaunchParallelMemTableWriters
Browse files Browse the repository at this point in the history
We found that for writers s in STATE_LOCKED_WAITING, the notify-one
function needs to be called, and the cost of calling this function is
very high especially when there are many writers that need to be
awakened. So, we Parallelize this progress.

To wake up each writer to write its own memtable, the leader writer
first wakes up the (n^0.5-1) caller writers, and then those callers
and the leader will wake up n/x separately to write to the memtable.
This reduces the number for the leader's to SetState n-1 writers
to 2*(n^0.5) writers in turn.

vcpu=160, benchmark=db_bench
The score is normalized:
| case name         | optimized/base |
|-------------------|----------------|
| fillrandom        | 182%           |
| fillseq           | 184%           |
| fillsync          | 136%           |
| overwrite         | 179%           |
| randomreplacekeys | 180%           |
| randomtransaction | 161%           |
| updaterandom      | 163%           |
| xorupdaterandom   | 165%           |
  • Loading branch information
casualwind committed May 17, 2024
1 parent b75438f commit 6f20406
Show file tree
Hide file tree
Showing 4 changed files with 89 additions and 6 deletions.
7 changes: 6 additions & 1 deletion db/db_impl/db_impl_write.cc
Original file line number Diff line number Diff line change
Expand Up @@ -329,6 +329,9 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options,
StopWatch write_sw(immutable_db_options_.clock, stats_, DB_WRITE);

write_thread_.JoinBatchGroup(&w);
if (w.state == WriteThread::STATE_PARALLEL_MEMTABLE_CALLER) {
write_thread_.SetMemWritersEachStride(&w);
}
if (w.state == WriteThread::STATE_PARALLEL_MEMTABLE_WRITER) {
// we are a non-leader in a parallel group

Expand Down Expand Up @@ -826,7 +829,9 @@ Status DBImpl::PipelinedWriteImpl(const WriteOptions& write_options,
// so we need to set its status to pass ASSERT_STATUS_CHECKED
memtable_write_group.status.PermitUncheckedError();
}

if (w.state == WriteThread::STATE_PARALLEL_MEMTABLE_CALLER) {
write_thread_.SetMemWritersEachStride(&w);
}
if (w.state == WriteThread::STATE_PARALLEL_MEMTABLE_WRITER) {
PERF_TIMER_STOP(write_pre_and_post_process_time);
PERF_TIMER_FOR_WAIT_GUARD(write_memtable_time);
Expand Down
56 changes: 51 additions & 5 deletions db/write_thread.cc
Original file line number Diff line number Diff line change
Expand Up @@ -428,6 +428,7 @@ void WriteThread::JoinBatchGroup(Writer* w) {
TEST_SYNC_POINT_CALLBACK("WriteThread::JoinBatchGroup:BeganWaiting", w);
AwaitState(w,
STATE_GROUP_LEADER | STATE_MEMTABLE_WRITER_LEADER |
STATE_PARALLEL_MEMTABLE_CALLER |
STATE_PARALLEL_MEMTABLE_WRITER | STATE_COMPLETED,
&jbg_ctx);
TEST_SYNC_POINT_CALLBACK("WriteThread::JoinBatchGroup:DoneWaiting", w);
Expand Down Expand Up @@ -656,12 +657,57 @@ void WriteThread::ExitAsMemTableWriter(Writer* /*self*/,
SetState(leader, STATE_COMPLETED);
}

void WriteThread::SetMemWritersEachStride(Writer* w) {
WriteGroup* write_group = w->write_group;
Writer* last_writer = write_group->last_writer;

// The stride is the same for each writer in write_group, so w will
// call the writers with the same number in write_group mod total size
size_t stride = static_cast<size_t>(sqrt(write_group->size));
size_t count = 0;
while (w) {
if (count++ % stride == 0) {
SetState(w, STATE_PARALLEL_MEMTABLE_WRITER);
}
w = (w == last_writer) ? nullptr : w->link_newer;
}
}

void WriteThread::LaunchParallelMemTableWriters(WriteGroup* write_group) {
assert(write_group != nullptr);
write_group->running.store(write_group->size);
for (auto w : *write_group) {
SetState(w, STATE_PARALLEL_MEMTABLE_WRITER);
size_t group_size = write_group->size;
write_group->running.store(group_size);

// The minimum number to allow the group use parallel caller mode.
// The number must no lower than 3;
const size_t MinParallelSize = 20;

// The group_size is too small, and there is no need to have
// the parallel partial callers.
if (group_size < MinParallelSize) {
for (auto w : *write_group) {
SetState(w, STATE_PARALLEL_MEMTABLE_WRITER);
}
return;
}

// The stride is equal to sqrt(group_size) which can minimize
// the total number of leader SetSate.
// Set the leader itself STATE_PARALLEL_MEMTABLE_WRITER, and set
// (stride-1) writers to be STATE_PARALLEL_MEMTABLE_CALLER.
size_t stride = static_cast<size_t>(sqrt(group_size));
auto w = write_group->leader;
SetState(w, STATE_PARALLEL_MEMTABLE_WRITER);

for (size_t i = 1; i < stride; i++) {
w = w->link_newer;
SetState(w, STATE_PARALLEL_MEMTABLE_CALLER);
}

// After setting all STATE_PARALLEL_MEMTABLE_CALLER, the leader also
// does the job as STATE_PARALLEL_MEMTABLE_CALLER.
w = w->link_newer;
SetMemWritersEachStride(w);
}

static WriteThread::AdaptationContext cpmtw_ctx(
Expand Down Expand Up @@ -788,8 +834,8 @@ void WriteThread::ExitAsBatchGroupLeader(WriteGroup& write_group,
}

AwaitState(leader,
STATE_MEMTABLE_WRITER_LEADER | STATE_PARALLEL_MEMTABLE_WRITER |
STATE_COMPLETED,
STATE_MEMTABLE_WRITER_LEADER | STATE_PARALLEL_MEMTABLE_CALLER |
STATE_PARALLEL_MEMTABLE_WRITER | STATE_COMPLETED,
&eabgl_ctx);
} else {
Writer* head = newest_writer_.load(std::memory_order_acquire);
Expand Down
15 changes: 15 additions & 0 deletions db/write_thread.h
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,12 @@ class WriteThread {
// A state indicating that the thread may be waiting using StateMutex()
// and StateCondVar()
STATE_LOCKED_WAITING = 32,

// The state used to inform a waiting writer that it has become a
// caller to call some other waiting writers to write to memtable
// by calling SetMemWritersEachStride. After doing
// this, it will also write to memtable.
STATE_PARALLEL_MEMTABLE_CALLER = 64,
};

struct Writer;
Expand Down Expand Up @@ -323,10 +329,19 @@ class WriteThread {
// Causes JoinBatchGroup to return STATE_PARALLEL_MEMTABLE_WRITER for all of
// the non-leader members of this write batch group. Sets Writer::sequence
// before waking them up.
// If the size of write_group n is not small, the leader will call n^0.5
// members to be PARALLEL_MEMTABLE_CALLER in the write_group to help to set
// other's status parallel. This ensures that the cost to call SetState
// sequentially does not exceed 2(n^0.5).
//
// WriteGroup* write_group: Extra state used to coordinate the parallel add
void LaunchParallelMemTableWriters(WriteGroup* write_group);

// One of the every stride=N number writer in the WriteGroup are set to the
// MemTableWriters, where N is equal to square of the total number of this
// write_group, and all of these MemTableWriters will write to memtable.
void SetMemWritersEachStride(Writer* w);

// Reports the completion of w's batch to the parallel group leader, and
// waits for the rest of the parallel batch to complete. Returns true
// if this thread is the last to complete, and hence should advance
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
# Parallelize SetState in LaunchParallelMemTableWriters
To wake up each writer to write its own memtable, the leader writer first wakes up the (n^0.5-1) caller writers, and then those callers and the leader will wake up n/x separately to write to the memtable. This reduces the number for the leader's to SetState n-1 writers to 2*(n^0.5) writers in turn.

---

vcpu=160, benchmark=db_bench
The score is normalized:
| case name | optimized/base |
|-------------------|----------------|
| fillrandom | 182% |
| fillseq | 184% |
| fillsync | 136% |
| overwrite | 179% |
| randomreplacekeys | 180% |
| randomtransaction | 161% |
| updaterandom | 163% |
| xorupdaterandom | 165% |

0 comments on commit 6f20406

Please sign in to comment.