Executor, the Concept
What is an Executor? It’s code is as simple as
class Executor {
public:
virtual ~Executor() {}
/// Enqueue a function to executed by this executor. This and all
/// variants must be threadsafe.
virtual void add(Func) = 0;
};
This definition explains what an executor is: it’s an interface that states add can submit some work.
When does the work get executed? Who is going to execute the work? When we add the work, would add block? It all depends on the implementation of the implementer. The only constraint is that add must be thread-safe.
The real code also has a concept of priority:
class Executor {
public:
virtual ~Executor() {}
/// Enqueue a function to executed by this executor. This and all
/// variants must be threadsafe.
virtual void add(Func) = 0;
/// Enqueue a function with a given priority, where 0 is the medium priority
/// This is up to the implementation to enforce
virtual void addWithPriority(Func, int8_t priority) {
throw std::runtime_error(
"addWithPriority() is not implemented for this Executor");
}
virtual uint8_t getNumPriorities() const {
return 1;
}
And a concept of safe pointers (more on that next).
Is that it? Yes!
That’s an executor? Yes, that is an executor that involves heavily in EventBase and Future!
Keep Alive
When we want different threads to send tasks to one executor, the executor needs to be shared between threads. Later we’ll see that some executors are heavily relies on libevent, which pins on single thread, and thus we need some new memory share model that’s more clear about the ownership of an executor.
- unique_ptr doesn’t share at all, which isn’t what we want
- shared_ptr shares but all owners are equally treated, so the last owner destroys the memory
What we want is something similar to shared_ptr, but the thread who creates Executor always owns the memory, and when the owner wants to destroy the memory, it blocks and waits for other copies to be destroyed.
KeepAlive for rescue!
#include <folly/futures/Future.h>
#include <folly/executors/IOThreadPoolExecutor.h>
#include <folly/init/Init.h>
#include <glog/logging.h>
int main(int argc, char *argv[]) {
folly::init(&argc, &argv);
auto executor = std::make_unique<folly::IOThreadPoolExecutor>(4);
auto f = folly::futures::sleep(std::chrono::milliseconds{2000})
.via(executor.get())
.thenValue([ka = folly::getKeepAliveToken(executor.get())](folly::Unit) { return 42; })
.semi();
LOG(INFO) << "Start to reset";
executor.reset();
LOG(INFO) << "Finished reset";
LOG(INFO) << std::move(f).get();
}
Run the program:
I0928 16:44:14 2458 executor.cc:16] Start to reset
I0928 16:44:16 2458 executor.cc:18] Finished reset
I0928 16:44:16 2458 executor.cc:19] 42
The executor.reset() blocks waiting for 2s until all KeepAlive instances are destroyed.
If this is a shared_ptr of Executor, reset returns immediately, and the memory will be destroyed in future’s callback.
KeepAlive gives you a predicatble ownership, and that’s why KeepAlive is always preferred when we use executors. To get this safe pointer, it’s as simple as just calling
folly::getKeepAliveToken(executor)
Not all executors by default support the mechanism. The folly library offers DefaultKeepAliveExecutor for executor implementer to opt in the mechanism.
Behind the scene, an executor needs to implement two methods to support KeepAlive mechanism:
// Acquire a keep alive token. Should return false if keep-alive mechanism
// is not supported.
virtual bool keepAliveAcquire();
// Release a keep alive token previously acquired by keepAliveAcquire().
// Will never be called if keepAliveAcquire() returns false.
virtual void keepAliveRelease();
We’ll explain how they support the mechainism when we look at DefaultKeepAliveExecutor below.
Executors
Give this API, let’s see what kind of executors folly provides.
The most important ones are probably CPUThreadPoolExecutor and IOThreadPoolExecutor. We’ll take multiple steps to get there. As of now, let’s look at other executors derived directly from the Executor class, as looking at executor categories gives us broader view of the folly library.
Most executors in this section are abstract public virtual classes of Executor.
InlineExecutor
The most straightforward executor is probably InlineExecutor, where add simply executes the function in the caller thread.
class InlineExecutor : public Executor {
void add(Func f) override {
f();
}
};
Simple, right? The drawback is not that obvious: could you write a program using InlineExecutor to run a job that indefinitely create its own type of jobs? No, you’ll explode the stack!
#include <folly/executors/InlineExecutor.h>
#include <folly/init/Init.h>
#include <glog/logging.h>
void addMoreTasks(folly::Executor &ex) {
ex.add([&ex]() {
LOG(INFO) << "Add task";
addMoreTasks(ex);
LOG(INFO) << "Exit task";
});
}
int main(int argc, char *argv[]) {
folly::init(&argc, &argv);
folly::InlineExecutor executor;
addMoreTasks(executor);
}
addMoreTasks adds one task that creates new tasks using addMoreTasks. The result is not surprising; it keeps adding more tasks without exiting the old one:
I0929 15:21:06.124312 2783 executor.cc:28] Add task
I0929 15:21:06.124320 2783 executor.cc:28] Add task
I0929 15:21:06.124330 2783 executor.cc:28] Add task
I0929 15:21:06.124341 2783 executor.cc:28] Add task
I0929 15:21:06.124349 2783 executor.cc:28] Add task
I0929 15:21:06.124357 2783 executor.cc:28] Add task
I0929 15:21:06.124366 2783 executor.cc:28] Add task
....
QueuedImmediateExecutor
If we change the executor to use QueuedImmediateExecutor. The result is what we expect it to be:
I0929 15:30:05.002698 2720 executor.cc:28] Add task
I0929 15:30:05.002703 2720 executor.cc:31] Exit task
I0929 15:30:05.002708 2720 executor.cc:28] Add task
I0929 15:30:05.002714 2720 executor.cc:31] Exit task
I0929 15:30:05.002719 2720 executor.cc:28] Add task
I0929 15:30:05.002724 2720 executor.cc:31] Exit task
I0929 15:30:05.002729 2720 executor.cc:28] Add task
I0929 15:30:05.002735 2720 executor.cc:31] Exit task
I0929 15:30:05.002740 2720 executor.cc:28] Add task
What’s the difference? We want to run the task immediately, but if there is a task running, we should queue the task instead of blindly call f() as we see in InlineExecutor.
auto& q = *q_;
q.push(std::move(callback));
if (q.size() == 1) {
while (!q.empty()) {
q.front()();
q.pop();
}
}
The top of the frame will push to q and exit, and the bottom of the frame will loop the queue to execute each task.
You might want to protect q_ with a mutex to make the add thread safe. Unfortunately that won’t work; it will cause the mutex locked twice by the same owner and crash the program. The easy solution is to use
folly::ThreadLocal<std::queue<Func>> q_;
to eliminate the shared memory.
ScheduledExecutor
An abstract class. TimekeeperScheduledExecutor derives from it.
If we want to schedule a task, we have two requirements:
- Schedule a task to run after certain amount of time
- Schedule a task to run at certain time
The first case can be transformed to the second case.
ScheduledExecutor defines two member functions for these two use cases:
void schedule(Func&& a, Duration const& dur);
void scheduleAt(Func& a, TimePoint const&);
schedule is implemented using scheduleAt. To subclass this executor requires to implement scheduleAt.
DrivableExecutor
An abstract class. TimedDrivableExecutor derives from it.
Remember at the beginning of this post, we asked when would the executor run the added tasks? DrivableExecutor defines a drive interface to trigger the execution. Its subclass ManualExecutore makes the behavior concrete.
When will the interface be used? We’ll see soon when we start to look at folly::Future. But as of now, we know that this interface adds another Executor‘s category.
SequencedExecutor
An abstract class. This class is simply
class SequencedExecutor : public virtual Executor {};
This class adds an executor category where tasks execution order is the same as submit order.
DefaultKeepAliveExecutor
We’ve mentioned KeepAlive above. DefaultKeepAliveExecutor is one way folly offers to let an executor opt in the mechanism: any subclasses that inherit from DefaultKeepAliveExecutor will get the mechanism for free.
The current DefaultKeepAliveExecutor code has more features than we need to explore the mechanism. So let’s go back to time when this class was first introduced:
class DefaultKeepAliveExecutor : public virtual Executor {
public:
DefaultKeepAliveExecutor() : Executor() {}
virtual ~DefaultKeepAliveExecutor() {
DCHECK(!keepAlive_);
}
protected:
void joinKeepAlive() {
DCHECK(keepAlive_);
keepAlive_.reset();
keepAliveReleaseBaton_.wait();
}
private:
bool keepAliveAcquire() override {
auto keepAliveCounter =
keepAliveCounter_.fetch_add(1, std::memory_order_relaxed);
// We should never increment from 0
DCHECK(keepAliveCounter > 0);
return true;
}
void keepAliveRelease() override {
auto keepAliveCounter = --keepAliveCounter_;
DCHECK(keepAliveCounter >= 0);
if (keepAliveCounter == 0) {
keepAliveReleaseBaton_.post();
}
}
std::atomic<ssize_t> keepAliveCounter_{1};
Baton<> keepAliveReleaseBaton_;
KeepAlive keepAlive_{makeKeepAlive()};
};
In the same commit, ~CPUThreadPoolExecutor starts to call joinKeepAlive, which calls wait on a baton. The wait will be unblocked only when keepAliveRelease is called. When is keepAliveRelease called? The KeepAlive destructor!
class KeepAlive : ... {
~KeepAlive() {
reset();
}
void reset() {
if (Executor* executor = get()) {
auto const flags = std::exchange(storage_, 0) & kFlagMask;
if (!(flags & (kDummyFlag | kAliasFlag))) {
executor->keepAliveRelease();
}
}
}
};
Every aquire increments a counter, and every reset decrements the counter. The last reset unblocks the join, which also unblocks the subclass’ destructor.
IOExecutor
We have seen QueuedImmediateExecutor that’s good for running CPU heavy tasks. Here we need one more executor category to run IO heavy tasks.
What is the difference you may ask? Think about how performing an I/O could block the caller: I/O call traps into the kernel, and then the kernel submits the IO request to hardware controller. The hardware controller interrupts the kernel when the request response is ready, and finally the kernel unblocks the caller.
We need one kind of executor that performs the I/O tasks in an async way!
This IOExecutor is simple:
class IOExecutor : public virtual folly::Executor {
public:
~IOExecutor() override = default;
virtual folly::EventBase* getEventBase() = 0;
};
Because it doesn’t need any more concrete behaviors other than what Executor has provided. The intent of this class is to provide an interface for the subclass to use EventBase to handle async I/O.
Other Executors
There are a few other executors we won’t cover in this post:
- SoftRealTimeExecutor
- ExecutorWithPriority
- ThreadedExecutor
We’ve covered enough categories that now we’re comfortable to read other parts of the library that heavily leverage executors. EventBase and folly::Future are the most important ones that come to my mind.