Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Commit sst in dedicated threads #1275

Open
wants to merge 2 commits into
base: fb-mysql-8.0.28
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
16 changes: 16 additions & 0 deletions storage/rocksdb/rdb_sst_info.cc
Original file line number Diff line number Diff line change
Expand Up @@ -355,6 +355,7 @@ Rdb_sst_info::Rdb_sst_info(rocksdb::DB *const db, const std::string &tablename,

Rdb_sst_info::~Rdb_sst_info() {
assert(m_sst_file == nullptr);
SHIP_ASSERT(m_commiting_threads.empty());

for (const auto &sst_file : m_committed_files) {
// In case something went wrong attempt to delete the temporary file.
Expand Down Expand Up @@ -392,13 +393,23 @@ int Rdb_sst_info::open_new_sst_file() {
}

void Rdb_sst_info::commit_sst_file(Rdb_sst_file_ordered *sst_file) {
m_commiting_threads_mutex.lock();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

consider std::lock_guardstd::mutex? we use RDB_MUTEX_LOCK_CHECK macros elsewhere in this file

m_commiting_threads.emplace_back(
&Rdb_sst_info::commit_sst_file_func, this, sst_file);
m_commiting_threads_mutex.unlock();
}

void Rdb_sst_info::commit_sst_file_func(Rdb_sst_file_ordered* sst_file) {
const rocksdb::Status s = sst_file->commit();

m_commiting_threads_mutex.lock();
if (!s.ok()) {
set_error_msg(sst_file->get_name(), s);
set_background_error(HA_ERR_ROCKSDB_BULK_LOAD);
}

m_committed_files.push_back(sst_file->get_name());
m_commiting_threads_mutex.unlock();

delete sst_file;
}
Expand Down Expand Up @@ -479,6 +490,11 @@ int Rdb_sst_info::finish(Rdb_sst_commit_info *commit_info,
close_curr_sst_file();
}

for (auto& thr : m_commiting_threads) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we need to grab a log here too?

thr.join();
}
m_commiting_threads.clear();

// This checks out the list of files so that the caller can collect/group
// them and ingest them all in one go, and any racing calls to commit
// won't see them at all
Expand Down
3 changes: 3 additions & 0 deletions storage/rocksdb/rdb_sst_info.h
Original file line number Diff line number Diff line change
Expand Up @@ -132,13 +132,16 @@ class Rdb_sst_info {

// List of committed SST files - we'll ingest them later in one single batch
std::vector<std::string> m_committed_files;
std::vector<std::thread> m_commiting_threads;
std::mutex m_commiting_threads_mutex;

const bool m_tracing;
bool m_print_client_error;

int open_new_sst_file();
void close_curr_sst_file();
void commit_sst_file(Rdb_sst_file_ordered *sst_file);
void commit_sst_file_func(Rdb_sst_file_ordered*);

void set_error_msg(const std::string &sst_file_name,
const rocksdb::Status &s);
Expand Down