Skip to content

Commit

Permalink
[EventEngine] Refactor ThreadManager to leverage a shared ThreadPool (#…
Browse files Browse the repository at this point in the history
…31392)

* Refactor ThreadManager to leverage a shared ThreadPool

This is a step towards a work-stealing thread pool implementation:
unifying thread pools so that work stealing affects timer callbacks as
well.

A subsequent step will expose the timeout logic so that the timer wakeup
and check can be triggered externally (by pollers, in the common case).

* fix atomic uint64_t type missing on some platforms

* sanitize + platform fixes

* ->quiesce

* shut down the timer manager to release the main thread

* roll back atomics

* use a dedicated thread for timer_manager to prevent local execution (work stealing)

* drain the pools after timer manager tests; sanitize

* iwyu

* reintroduce fork handling

* sanitize

* fix
  • Loading branch information
drfloob committed Oct 20, 2022
1 parent 987b50a commit 66df2c6
Show file tree
Hide file tree
Showing 8 changed files with 101 additions and 221 deletions.
2 changes: 2 additions & 0 deletions BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -2515,9 +2515,11 @@ grpc_cc_library(
],
deps = [
"event_engine_base_hdrs",
"event_engine_thread_pool",
"forkable",
"gpr",
"grpc_trace",
"notification",
"posix_event_engine_timer",
"time",
],
Expand Down
6 changes: 4 additions & 2 deletions src/core/lib/event_engine/posix_engine/posix_engine.cc
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

#include <atomic>
#include <chrono>
#include <memory>
#include <string>
#include <utility>

Expand Down Expand Up @@ -91,12 +92,12 @@ PosixEnginePollerManager::~PosixEnginePollerManager() {
}

PosixEventEngine::PosixEventEngine(PosixEventPoller* poller)
: executor_(std::make_shared<ThreadPool>()) {
: executor_(std::make_shared<ThreadPool>()), timer_manager_(executor_) {
poller_manager_ = std::make_shared<PosixEnginePollerManager>(poller);
}

PosixEventEngine::PosixEventEngine()
: executor_(std::make_shared<ThreadPool>()) {
: executor_(std::make_shared<ThreadPool>()), timer_manager_(executor_) {
if (grpc_core::IsPosixEventEngineEnablePollingEnabled()) {
poller_manager_ = std::make_shared<PosixEnginePollerManager>(executor_);
if (poller_manager_->Poller() != nullptr) {
Expand Down Expand Up @@ -174,6 +175,7 @@ PosixEventEngine::~PosixEventEngine() {
}
GPR_ASSERT(GPR_LIKELY(known_handles_.empty()));
}
timer_manager_.Shutdown();
#ifdef GRPC_POSIX_SOCKET_TCP
if (poller_manager_ != nullptr) {
poller_manager_->TriggerShutdown();
Expand Down
4 changes: 2 additions & 2 deletions src/core/lib/event_engine/posix_engine/posix_engine.h
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ class PosixEventEngine final : public EventEngine,
grpc_event_engine::posix_engine::PosixEventPoller* poller);
PosixEventEngine();
#else // GRPC_POSIX_SOCKET_TCP
PosixEventEngine() = default;
PosixEventEngine();
#endif // GRPC_POSIX_SOCKET_TCP

~PosixEventEngine() override;
Expand Down Expand Up @@ -160,8 +160,8 @@ class PosixEventEngine final : public EventEngine,
grpc_core::Mutex mu_;
TaskHandleSet known_handles_ ABSL_GUARDED_BY(mu_);
std::atomic<intptr_t> aba_token_{0};
posix_engine::TimerManager timer_manager_;
std::shared_ptr<ThreadPool> executor_;
posix_engine::TimerManager timer_manager_;
#ifdef GRPC_POSIX_SOCKET_TCP
std::shared_ptr<PosixEnginePollerManager> poller_manager_;
#endif // GRPC_POSIX_SOCKET_TCP
Expand Down
217 changes: 55 additions & 162 deletions src/core/lib/event_engine/posix_engine/timer_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -40,47 +40,10 @@ namespace posix_engine {

grpc_core::DebugOnlyTraceFlag grpc_event_engine_timer_trace(false, "timer");

void TimerManager::StartThread() {
++waiter_count_;
++thread_count_;
auto* thread = new RunThreadArgs();
thread->self = this;
thread->thread = grpc_core::Thread(
"timer_manager", &TimerManager::RunThread, thread, nullptr,
grpc_core::Thread::Options().set_tracked(false).set_joinable(false));
thread->thread.Start();
}

void TimerManager::RunSomeTimers(
std::vector<experimental::EventEngine::Closure*> timers) {
// if there's something to execute...
{
grpc_core::MutexLock lock(&mu_);
if (shutdown_ || forking_) return;
// remove a waiter from the pool, and start another thread if necessary
--waiter_count_;
if (waiter_count_ == 0) {
// The number of timer threads is always increasing until all the threads
// are stopped, with the exception that all threads are shut down on fork
// events. In rare cases, if a large number of timers fire simultaneously,
// we may end up using a large number of threads.
// TODO(ctiller): We could avoid this by exiting threads in WaitUntil().
StartThread();
} else {
// if there's no thread waiting with a timeout, kick an existing untimed
// waiter so that the next deadline is not missed
if (!has_timed_waiter_) {
cv_wait_.Signal();
}
}
}
for (auto* timer : timers) {
timer->Run();
}
{
grpc_core::MutexLock lock(&mu_);
// get ready to wait again
++waiter_count_;
thread_pool_->Run(timer);
}
}

Expand All @@ -89,64 +52,18 @@ void TimerManager::RunSomeTimers(
// shutdown)
bool TimerManager::WaitUntil(grpc_core::Timestamp next) {
grpc_core::MutexLock lock(&mu_);

if (shutdown_) return false;
if (forking_) return false;

// TODO(ctiller): if there are too many waiting threads, this would be a good
// place to exit the current thread.

// If kicked_ is true at this point, it means there was a kick from the timer
// system that the timer-manager threads here missed. We cannot trust 'next'
// here any longer (since there might be an earlier deadline). So if kicked_
// is true at this point, we should quickly exit this and get the next
// deadline from the timer system

if (!kicked_) {
// if there's no timed waiter, we should become one: that waiter waits
// only until the next timer should expire. All other timers wait forever
//
// 'timed_waiter_generation_' is a global generation counter. The idea here
// is that the thread becoming a timed-waiter increments and stores this
// global counter locally in 'my_timed_waiter_generation' before going to
// sleep. After waking up, if my_timed_waiter_generation ==
// timed_waiter_generation_, it can be sure that it was the timed_waiter
// thread (and that no other thread took over while this was asleep)
//
// Initialize my_timed_waiter_generation to some value that is NOT equal to
// timed_waiter_generation_
uint64_t my_timed_waiter_generation = timed_waiter_generation_ - 1;

/* If there's no timed waiter, we should become one: that waiter waits only
until the next timer should expire. All other timer threads wait forever
unless their 'next' is earlier than the current timed-waiter's deadline
(in which case the thread with earlier 'next' takes over as the new timed
waiter) */
if (next != grpc_core::Timestamp::InfFuture()) {
if (!has_timed_waiter_ || (next < timed_waiter_deadline_)) {
my_timed_waiter_generation = ++timed_waiter_generation_;
has_timed_waiter_ = true;
timed_waiter_deadline_ = next;
} else { // timed_waiter_ == true && next >= timed_waiter_deadline_
next = grpc_core::Timestamp::InfFuture();
}
}

cv_wait_.WaitWithTimeout(&mu_,
absl::Milliseconds((next - host_.Now()).millis()));

// if this was the timed waiter, then we need to check timers, and flag
// that there's now no timed waiter... we'll look for a replacement if
// there's work to do after checking timers (code above)
if (my_timed_waiter_generation == timed_waiter_generation_) {
++wakeups_;
has_timed_waiter_ = false;
timed_waiter_deadline_ = grpc_core::Timestamp::InfFuture();
}
++wakeups_;
}

kicked_ = false;

return true;
}

Expand All @@ -155,54 +72,37 @@ void TimerManager::MainLoop() {
grpc_core::Timestamp next = grpc_core::Timestamp::InfFuture();
absl::optional<std::vector<experimental::EventEngine::Closure*>>
check_result = timer_list_->TimerCheck(&next);
if (check_result.has_value()) {
if (!check_result->empty()) {
RunSomeTimers(std::move(*check_result));
continue;
}
} else {
/* This case only happens under contention, meaning more than one timer
manager thread checked timers concurrently.
If that happens, we're guaranteed that some other thread has just
checked timers, and this will avalanche into some other thread seeing
empty timers and doing a timed sleep.
Consequently, we can just sleep forever here and be happy at some
saved wakeup cycles. */
next = grpc_core::Timestamp::InfFuture();
GPR_ASSERT(check_result.has_value() &&
"ERROR: More than one MainLoop is running.");
if (!check_result->empty()) {
RunSomeTimers(std::move(*check_result));
continue;
}
if (!WaitUntil(next)) return;
if (!WaitUntil(next)) break;
}
main_loop_exit_signal_->Notify();
}

void TimerManager::RunThread(void* arg) {
g_timer_thread = true;
std::unique_ptr<RunThreadArgs> thread(static_cast<RunThreadArgs*>(arg));
if (grpc_event_engine_timer_trace.enabled()) {
gpr_log(GPR_DEBUG, "TimerManager::%p starting thread::%p", thread->self,
&thread->thread);
}
thread->self->Run();
if (grpc_event_engine_timer_trace.enabled()) {
gpr_log(GPR_DEBUG, "TimerManager::%p thread::%p finished", thread->self,
&thread->thread);
}
}
bool TimerManager::IsTimerManagerThread() { return g_timer_thread; }

void TimerManager::Run() {
MainLoop();
grpc_core::MutexLock lock(&mu_);
thread_count_--;
if (thread_count_ == 0) cv_threadcount_.Signal();
void TimerManager::StartMainLoopThread() {
main_thread_ = grpc_core::Thread(
"timer_manager",
[](void* arg) {
auto self = static_cast<TimerManager*>(arg);
self->MainLoop();
},
this, nullptr,
grpc_core::Thread::Options().set_tracked(false).set_joinable(false));
main_thread_.Start();
}

bool TimerManager::IsTimerManagerThread() { return g_timer_thread; }

TimerManager::TimerManager() : host_(this) {
TimerManager::TimerManager(
std::shared_ptr<grpc_event_engine::experimental::ThreadPool> thread_pool)
: host_(this), thread_pool_(std::move(thread_pool)) {
timer_list_ = std::make_unique<TimerList>(&host_);
grpc_core::MutexLock lock(&mu_);
StartThread();
main_loop_exit_signal_.emplace();
StartMainLoopThread();
}

grpc_core::Timestamp TimerManager::Host::Now() {
Expand All @@ -212,70 +112,63 @@ grpc_core::Timestamp TimerManager::Host::Now() {

void TimerManager::TimerInit(Timer* timer, grpc_core::Timestamp deadline,
experimental::EventEngine::Closure* closure) {
if (grpc_event_engine_timer_trace.enabled()) {
grpc_core::MutexLock lock(&mu_);
if (shutdown_) {
gpr_log(GPR_ERROR,
"WARNING: TimerManager::%p: scheduling Closure::%p after "
"TimerManager has been shut down.",
this, closure);
}
}
timer_list_->TimerInit(timer, deadline, closure);
}

bool TimerManager::TimerCancel(Timer* timer) {
return timer_list_->TimerCancel(timer);
}

TimerManager::~TimerManager() {
if (grpc_event_engine_timer_trace.enabled()) {
gpr_log(GPR_DEBUG, "TimerManager::%p shutting down", this);
}
grpc_core::MutexLock lock(&mu_);
shutdown_ = true;
cv_wait_.SignalAll();
while (thread_count_ > 0) {
void TimerManager::Shutdown() {
{
grpc_core::MutexLock lock(&mu_);
if (shutdown_) return;
if (grpc_event_engine_timer_trace.enabled()) {
gpr_log(GPR_DEBUG, "TimerManager::%p waiting for %zu threads to finish",
this, thread_count_);
gpr_log(GPR_DEBUG, "TimerManager::%p shutting down", this);
}
cv_threadcount_.Wait(&mu_);
shutdown_ = true;
// Wait on the main loop to exit.
cv_wait_.Signal();
}
main_loop_exit_signal_->WaitForNotification();
if (grpc_event_engine_timer_trace.enabled()) {
gpr_log(GPR_DEBUG, "TimerManager::%p shutdown complete", this);
}
}

TimerManager::~TimerManager() { Shutdown(); }

void TimerManager::Host::Kick() { timer_manager_->Kick(); }

void TimerManager::Kick() {
grpc_core::MutexLock lock(&mu_);
has_timed_waiter_ = false;
timed_waiter_deadline_ = grpc_core::Timestamp::InfFuture();
++timed_waiter_generation_;
kicked_ = true;
cv_wait_.Signal();
}

void TimerManager::PrepareFork() {
void TimerManager::RestartPostFork() {
grpc_core::MutexLock lock(&mu_);
forking_ = true;
prefork_thread_count_ = thread_count_;
cv_wait_.SignalAll();
while (thread_count_ > 0) {
cv_threadcount_.Wait(&mu_);
}
}

void TimerManager::PostforkParent() {
grpc_core::MutexLock lock(&mu_);
for (int i = 0; i < prefork_thread_count_; i++) {
StartThread();
GPR_ASSERT(GPR_LIKELY(shutdown_));
if (grpc_event_engine_timer_trace.enabled()) {
gpr_log(GPR_DEBUG, "TimerManager::%p restarting after shutdown", this);
}
prefork_thread_count_ = 0;
forking_ = false;
shutdown_ = false;
main_loop_exit_signal_.emplace();
StartMainLoopThread();
}

void TimerManager::PostforkChild() {
grpc_core::MutexLock lock(&mu_);
for (int i = 0; i < prefork_thread_count_; i++) {
StartThread();
}
prefork_thread_count_ = 0;
forking_ = false;
}
void TimerManager::PrepareFork() { Shutdown(); }
void TimerManager::PostforkParent() { RestartPostFork(); }
void TimerManager::PostforkChild() { RestartPostFork(); }

} // namespace posix_engine
} // namespace grpc_event_engine

0 comments on commit 66df2c6

Please sign in to comment.