Notes on Folly: Thread Pool Executors

2020-06-13
c++

ThreadPoolExecutor introduces threads to run tasks. There are two important executors based on ThreadPoolExecutor: IOThreadPoolExecutor for IO bound tasks, and CPUThreadPoolExecutor for CPU bound tasks. They respond to loads differently.

What is an executor? Read Notes on Folly: Executors

ThreadPoolExecutor is an abstract class that introduces threads into executors. For IO executors, threads run in parallel to do async I/O, and for CPU executors, threads run in parallel to execute tasks as fast as they can.

ThreadPoolExecutor has following helper classes:

  • PoolStats: a struct about total count of threads and tasks in different in different states
  • TaskStats: a struct of individual task stats
  • Observer: to allow subscription of events on thread start and stop
  • Thread: to store additional information to std::thread
  • Task: to store the actual task function and information for TaskStats
  • ThreadList: the thread pool
  • StoppedThreadQueue: store stopped thread.

Thread Management

ThreadPoolExecutor is an Executor with a thread pool implementation that can dynamically adjust the number of threads. So the meat here is its implementation on thread management.

How does it manage its threads? All threads are recorded in threadList_, Stopped threads are recorded on stoppedThreads_, and the number of threads to join is recorded in threadsToJoin_:

ThreadList threadList_;
SharedMutex threadListLock_;
StoppedThreadQueue stoppedThreads_;
...
std::atomic<size_t> threadsToJoin_{0};

ThreadList is a struct that supports thread add and remove:

class ThreadList {
 public:
  void add(const ThreadPtr& state) {...}
  void remove(const ThreadPtr& state) {...}
  ...

To control the number of threads and ensure there are enough threads taking tasks, ThreadPoolExecutor has the following methods:

public:
 void setNumThreads(size_t numThreads);

protected:
 void addThreads(size_t n);
 void removeThreads(size_t n, bool isJoin);
 virtual void threadRun(ThreadPtr thread) = 0;
 virtual void stopThreads(size_t n) = 0;
 void joinStoppedThreads(size_t n);
 void ensureActiveThreads();
 void ensureJoined();
 bool minActive();
 bool tryTimeoutThread();

First, addThreads(size_t n) adds n threads to current pool: it creates n Thread struct, each one running ThreadPoolExecutor::threadRun, and then it waits on thread->startupBaton. Subclasses decide the behavior of threadRun, but all threadRun should call startupBaton.post() to unblock the caller addThreads.

removeThreads(size_t n, bool isJoin) removes n threads. Because thread startup is handed off to subclasses, this one simply calls stopThreads, which should be implemented by subclasses.

setNumThreads(size_t numThreads) sets the max threads, and adjust the threads number: if the number of running threads is greater than the max threads, removeThreads should stop extra threads. If the number of running threads is smaller than the max threads, new threads will be created only when there are pending tasks. This ensures the thread pool can dynamically adjust the number of threads.

joinStoppedThreads takes n threads out of stoppedThreads_ queue, and calls handle.join() on the thread. ensureJoined ensures to pass the right number of n to joinStoppedThreads.

tryTimeoutThread and ensureActiveThreads are hard to explain when they’re separated from the actual use case, so we’ll come back to them later.

CPUThreadPoolExecutor

CPUThreadPoolExecutor is one of the most useful executors in folly. It’s good for running CPU bound tasks. There are a few questions we can use to drive our exploration here:

  1. How does each thread run tasks?
  2. How does the thread pool respond to a suddenly increased volume of CPU tasks?
  3. How does the thread pool release resources when the flood of CPU tasks goes away?
  4. How does the thread pool shutdown?

threadRun

We have mentioned that threadRun is where threads begin their lives, so let’s take a look at what does each thread do in this executor:

void CPUThreadPoolExecutor::threadRun(ThreadPtr thread) {
  this->threadPoolHook_.registerThread();

  thread->startupBaton.post();
  while (true) {
    auto task = taskQueue_->try_take_for(threadTimeout_);
    // Handle thread stopping, either by task timeout, or
    // by 'poison' task added in join() or stop().
    if (UNLIKELY(!task || task.value().poison)) {
      // Actually remove the thread from the list.
      SharedMutex::WriteHolder w{&threadListLock_};
      if (taskShouldStop(task)) {
        for (auto& o : observers_) {
          o->threadStopped(thread.get());
        }
        threadList_.remove(thread);
        stoppedThreads_.add(thread);
        return;
      } else {
        continue;
      }
    }

    runTask(thread, std::move(task.value()));

    if (UNLIKELY(threadsToStop_ > 0 && !isJoin_)) {
      SharedMutex::WriteHolder w{&threadListLock_};
      if (tryDecrToStop()) {
        threadList_.remove(thread);
        stoppedThreads_.add(thread);
        return;
      }
    }
  }
}

We see thread->startupBaton unblocks addThreads immediately when a thread runs. Then it simply takes task from taskQueue_, and runs

runTask(thread, std::move(task.value()))

Note the way it pulls tasks. It calls taskQueue_->try_take_for, which is a blocking call if there is no task in the queue so that the thread is not in busy loop wasting CPU.

Respond to Flood of Tasks

When there is a sudden volume increase in tasks, how does the thread pool handle it?

void CPUThreadPoolExecutor::add(Func func) {
  add(std::move(func), std::chrono::milliseconds(0));
}

void CPUThreadPoolExecutor::add(
    Func func,
    std::chrono::milliseconds expiration,
    Func expireCallback) {
  auto result = taskQueue_->add(
      CPUTask(std::move(func), expiration, std::move(expireCallback)));
  if (!result.reusedThread) {
    ensureActiveThreads();
  }
}

taskQueue_->add is doing the heavy lift here. When add returns, it knows whether it has awaken a thread of not, and if not, ensureActiveThreads creates more threads. What does ensureActiveThreads do?

// If we can't ensure that we were able to hand off a task to a thread,
// attempt to start a thread that handled the task, if we aren't already
// running the maximum number of threads.
void ThreadPoolExecutor::ensureActiveThreads() {
  ensureJoined();

  // Matches barrier in tryTimeoutThread().  Ensure task added
  // is seen before loading activeThreads_ below.
  asymmetricLightBarrier();

  // Fast path assuming we are already at max threads.
  auto active = activeThreads_.load(std::memory_order_relaxed);
  auto total = maxThreads_.load(std::memory_order_relaxed);

  if (active >= total) {
    return;
  }

  SharedMutex::WriteHolder w{&threadListLock_};
  // Double check behind lock.
  active = activeThreads_.load(std::memory_order_relaxed);
  total = maxThreads_.load(std::memory_order_relaxed);
  if (active >= total) {
    return;
  }
  ThreadPoolExecutor::addThreads(1);
  activeThreads_.store(active + 1, std::memory_order_relaxed);
}

ensureJoined joins all threads that need to be joined. Then a memory barrier is set to ensure all tasks are added before the start of threads counting, and if there is still room to start a new thread, addThreads(1) is called.

In sum the system ensures the maximum performance by reusing or adding more threads to take tasks.

Respond to Quiet System

When the thread pool is not busy anymore, tons of threads are already created. Now they need to detect whether the system is not busy.

In threadRun, the thread blocking waits on taking tasks

auto task = taskQueue_->try_take_for(threadTimeout_);

ThreadPoolExecutor‘s ctor sets threadTimeout_ to FLAGS_threadtimeout_ms, which is a gflag that by default has 60s value.

So thread will block waiting 60s to take one task. When the system has been quiet for 60s, task will be folly::none that has false value in test condition

if (UNLIKELY(!task || task.value().poison)) {
  // Actually remove the thread from the list.
  SharedMutex::WriteHolder w{&threadListLock_};
  if (taskShouldStop(task)) {
    for (auto& o : observers_) {
      o->threadStopped(thread.get());
    }
    threadList_.remove(thread);
    stoppedThreads_.add(thread);
    return;
  } else {
    continue;
  }
}

The main logic here is to decide whether the system should stop current thread, if so do house cleanup and exit the thread execution, and otherwise just continue to try to take tasks.

Let’s quickly look at how it decides to stop a thread. There are two cases in which we need to stop a thread. One case is what we’re looking at now, that the thread has been idle for too long. The second case is when the system shuts down (see the shutting down case below).

taskShouldStop is

bool CPUThreadPoolExecutor::taskShouldStop(folly::Optional<CPUTask>& task) {
  if (tryDecrToStop()) {
    return true;
  }
  if (task) {
    return false;
  } else {
    return tryTimeoutThread();
  }
  return true; // NOT REACHABLE
}

Let’s take a look at tryDecrToStop first. Calling tryDecrToStop simply checks threadsToStop_ variable and decreases the value by 1.

// threadListLock_ must be writelocked.
bool CPUThreadPoolExecutor::tryDecrToStop() {
  auto toStop = threadsToStop_.load(std::memory_order_relaxed);
  if (toStop <= 0) {
    return false;
  }
  threadsToStop_.store(toStop - 1, std::memory_order_relaxed);
  return true;
}

Who increases threadsToStop_? The only place is stopThreads, which is called only by removeThreads, which is called in a few places where the thread numbers need to be resized.

During shutdown, it needs to stop threads. In shutdown taskShouldStop always returns true regardless whether there are pending tasks.

The task can be a poisoned task added during shutdown. taskShouldStop returns true because tryDecrToStop returns true. But if ever taskShouldStop returns false, threadRun will just drop the poisoned task anyway.

Let’s get back to taskShouldStop. If the given task is real (versus an empty task because of timeout), it returns false indicating the thread should continue running, otherwise it tries to run tryTimeoutThread.

tryTimeoutThread checks if there is still pending tasks, if no pending tasks found, it increases threadsToJoin_ and returns true. Otherwise, this thread should be reused to run pending tasks, so it returns false.

taskShouldStop is called only when the system is quiet. It calls into tryTimeoutThread, which bumps threadsToJoin_, and taskShouldStop returns true, which leads to

threadList_.remove(thread);
stoppedThreads_.add(thread);
return;

Shutdown

When the thread pool is shutting down, stopThreads will be called:

void CPUThreadPoolExecutor::stopThreads(size_t n) {
  threadsToStop_ += n;
  for (size_t i = 0; i < n; i++) {
    taskQueue_->addWithPriority(CPUTask(), Executor::LO_PRI);
  }
}

Here taskQueue_ receives a CPUTask that has poison set to true. Once the poison task is received, the thread will follow the exit flow described above.

What about the case where some threads are busy on tasks while poison tasks are sent? threadsToStop_ was incremented before poison tasks are sent. So active threads will stop immediately after finish their tasks without receiving the poison message because taskShouldStop is going to return true when threadsToStop_ is greater than 0.

In sum, CPUThreadPoolExecutor maximizes the performance in busy time by spawning more threads to run tasks, and reduces the number of threads in quiet time.

IOThreadPoolExecutor

Different from CPUThreadPoolExecutor, IOThreadPoolExecutor leverages EventBase to maximize the performance.

Again we explore the executor by answering the following questions

  1. How does each thread run tasks?
  2. How does the thread pool respond to a suddenly increased volume of of I/O tasks?
  3. How does the thread pool save the CPU and memory when the flood of I/O tasks goes away?
  4. How does the thread pool shutdown?

threadRun

void IOThreadPoolExecutor::threadRun(ThreadPtr thread) {
  this->threadPoolHook_.registerThread();

  const auto ioThread = std::static_pointer_cast<IOThread>(thread);
  ioThread->eventBase = eventBaseManager_->getEventBase();
  thisThread_.reset(new std::shared_ptr<IOThread>(ioThread));

  auto idler = std::make_unique<MemoryIdlerTimeout>(ioThread->eventBase);
  ioThread->eventBase->runBeforeLoop(idler.get());

  ioThread->eventBase->runInEventBaseThread(
      [thread] { thread->startupBaton.post(); });
  while (ioThread->shouldRun) {
    ioThread->eventBase->loopForever();
  }
  if (isJoin_) {
    while (ioThread->pendingTasks > 0) {
      ioThread->eventBase->loopOnce();
    }
  }
  idler.reset();
  if (isWaitForAll_) {
    // some tasks, like thrift asynchronous calls, create additional
    // event base hookups, let's wait till all of them complete.
    ioThread->eventBase->loop();
  }

  std::lock_guard<std::mutex> guard(ioThread->eventBaseShutdownMutex_);
  ioThread->eventBase = nullptr;
  eventBaseManager_->clearEventBase();
}

It has a lot of code, but how it works is simple because EventBase does the heavy lift here: each thread gets one EventBase from EventBaseManager and then repeatedly works on tasks when loopForever starts.

Respond to Flood of Tasks

Let’s see what add does

void IOThreadPoolExecutor::add(
    Func func,
    std::chrono::milliseconds expiration,
    Func expireCallback) {
  ensureActiveThreads();
  SharedMutex::ReadHolder r{&threadListLock_};
  if (threadList_.get().empty()) {
    throw std::runtime_error("No threads available");
  }
  auto ioThread = pickThread();

  auto task = Task(std::move(func), expiration, std::move(expireCallback));
  auto wrappedFunc = [ioThread, task = std::move(task)]() mutable {
    runTask(ioThread, std::move(task));
    ioThread->pendingTasks--;
  };

  ioThread->pendingTasks++;
  ioThread->eventBase->runInEventBaseThread(std::move(wrappedFunc));
}

It picks one of the active threads (in pickThread), run the function in the event base of the chosen thread via runInEventBaseThread.

How to pick a thread then? It’s fairly simple–just round robin.

std::shared_ptr<IOThreadPoolExecutor::IOThread>
IOThreadPoolExecutor::pickThread() {
  auto& ths = threadList_.get();
  ...
  auto thread = ths[nextThread_.fetch_add(1, std::memory_order_relaxed) % n];
  return std::static_pointer_cast<IOThread>(thread);
}

nextThread_ is an atomic int, used only in this method, which means pickThread round robins the threadList_ to find a thread.

Respond to Quiet System

When the system becomes quiet, how to shut down unnecessary threads?

threadRun runs MemoryIdlerTimeout before each loop. Let’s see the code again:

void IOThreadPoolExecutor::threadRun(ThreadPtr thread) {
  this->threadPoolHook_.registerThread();

  const auto ioThread = std::static_pointer_cast<IOThread>(thread);
  ioThread->eventBase = eventBaseManager_->getEventBase();
  thisThread_.reset(new std::shared_ptr<IOThread>(ioThread));

  auto idler = std::make_unique<MemoryIdlerTimeout>(ioThread->eventBase);
  ioThread->eventBase->runBeforeLoop(idler.get());
  ...

runBeforeLoop will add a LoopCallback into a queue that will run before the main loop every time.

What is a MemoryIdlertimeout?

/* Class that will free jemalloc caches and madvise the stack away
 * if the event loop is unused for some period of time
 */
class MemoryIdlerTimeout : public AsyncTimeout, public EventBase::LoopCallback {
 public:
  explicit MemoryIdlerTimeout(EventBase* b) : AsyncTimeout(b), base_(b) {}

  void timeoutExpired() noexcept override {
    idled = true;
  }

  void runLoopCallback() noexcept override {
    if (idled) {
      MemoryIdler::flushLocalMallocCaches();
      MemoryIdler::unmapUnusedStack(MemoryIdler::kDefaultStackToRetain);

      idled = false;
    } else {
      std::chrono::steady_clock::duration idleTimeout =
          MemoryIdler::defaultIdleTimeout.load(std::memory_order_acquire);

      idleTimeout = MemoryIdler::getVariationTimeout(idleTimeout);

      scheduleTimeout(static_cast<uint32_t>(
          std::chrono::duration_cast<std::chrono::milliseconds>(idleTimeout)
              .count()));
    }

    // reschedule this callback for the next event loop.
    base_->runBeforeLoop(this);
  }

 private:
  EventBase* base_;
  bool idled{false};
};

You’ll need to know the construct AsyncTimeout and LoopCallback to know the running logic here.

  1. AsyncTimeout schedules a timeout event, and when it times out, method timeoutExpired runs.
  2. LoopCallback runs runLoopCallback in the event base thread.

So this MemoryIdlerTimeout schedules a periodic job (interval is idleTimeout, which is 5s) to run before event loop. It sets idled to true. The next time when the idler job runs, and sees idled is already true, it flushes local cache and unmap unused stacks because the system hasn’t received any tasks for a while. It does not free threads because it need threads to wait for I/O (waits on poll or epoll internally in EventBase).

Shutdown

Remember all threads are blocked running EventBase::LoopForever in threadRun?

while (ioThread->shouldRun) {
  ioThread->eventBase->loopForever();
}

during shutdown, stopThreads sets shouldRun to false, and terminates the loop.

void IOThreadPoolExecutor::stopThreads(size_t n) {
  ...
  for (size_t i = 0; i < n; i++) {
    ...
    ioThread->shouldRun = false;
    stoppedThreads.push_back(ioThread);
    std::lock_guard<std::mutex> guard(ioThread->eventBaseShutdownMutex_);
    if (ioThread->eventBase) {
      ioThread->eventBase->terminateLoopSoon();
    }
  }

  for (auto thread : stoppedThreads) {
    stoppedThreads_.add(thread);
    threadList_.remove(thread);
  }
}

When EventBase‘s loop terminates, the associated thread will exit.

We’ve learned how CPUThreadPoolExecutor and IOThreadPoolExecutor are implemented in this post.

They have different strategies for sudden increase in the load: for CPU intensive tasks, parallel processing increases throughput so CPUThreadPoolExecutor spawns more threads, but for IO intensive tasks, spawning threads does not help because of the nature of IO, so IOThreadPoolExecutor simply tries to distribute the load evenly across all threads.

Because they respond to loads differently, they behave differently when the load is gone: CPU thread pool releases threads, whereas IO thread pool releases only memory and stacks.