There are times when we need to work with async C libraries in our C++ programs. The async calls in those libraries always take a prototype similar to this:
int async_rpc(rpcArgs* args, callbackFn callback, void* cb_data);
where cb_data is the data you want to pass into the callback function, and callbackFn is the function to be called when rpc receives a reply. The callback often has this signature:
typedef void (*callbackFn)(rpcReply* reply, void* cb_data);
This old style callback usually works well when the control flow is relatively simple. However, when callbacks are nested within other callbacks, it’s easy to step into callback hell.
Take this example. For a given rpcArgs, the server returns rpcReply containing an integer member value. Let’s suppose we want to implement this function:
int addValue(rpcArgs* first_args, rpcArgs* second_args);
It should call async_rpc twice sequentially with the given rpcArgs in each call, add up values and return the sum.
Unfortunately we’ll have to add extra complexity into the program:
- We’ll have to write two callback functions even though they are doing almost the same thing: the first one adds up the reply value and immediately calls async_rpc again, the second one just adds up the reply value.
- We’ll need a way for the main thread to wait for the second callback to finish before returning.
- We’ll need to create a struct to encapsulate data needed for all following actions. in this case, we need rpcArgs* for initiating the second rpc_async, and a member for storing sum.
struct ArgsAndResult{
rpcArgs* second_call_args;
int total;
folly::Baton baton;
};
int second_callback(rpcReply* reply, void* cb_data) {
auto r = reinterpret_cast<ArgsAndResult*>(cb_data);
r->total += reply->value;
r->baton.post();
}
int first_callback(rpcReply* reply, void* cb_data) {
auto r = reinterpret_cast<ArgsAndResult*>(cb_data);
r->total += reply->value;
async_rpc(r->second_call_args, second_callback, cb_data);
}
int start(rpcArgs first_args, rpcArgs second_args) {
ArgsAndResult data{};
data.second_call_args = &second_args;
async_rpc(&first_args, first_callback, &data);
// block waiting data.baton.post to be called
data.baton.wait();
return data->value;
}
Conceivably, the program is going to grow to an unmanageable state where tracking the control flow becomes difficult when more callbacks are nested.
By using folly::Promise, we can turn nested callbacks into chained callbacks in only two steps:
The first step is to wrap the raw async_rpc into a function that returns folly::SemiFuture of the reply type. In this wrapper, create a folly::Promise, pass the promise to the callback, and return the SemiFuture associated with the promise:
folly::SemiFuture<std::unique_ptr<rpcReply>> asyncRpc(rpcArgs* args) {
auto p = new Promise<rpcReply*>();
auto f = p.getSemiFuture();
async_rpc(args, callback, reinterpret_cast<void*>(p));
return f;
}
The second step is to call promise->setValue in the callback to pass the reply to the SemiFuture.
void callback(rpcReply* reply, void* cb_data) {
Promise* p = reinterpret_cast<Promise*>(cb_data);
p->setValue(std::unique_ptr<rpcReply>(reply));
delete p;
}
With these two simple steps, we have enabled callback chains in any libraries written in C language. We can convert the above start function:
int start(rpcArgs first_args, rpcArgs second_args) {
int total = 0;
return asyncRpc(first_args)
.deferValue([&](std::unique_ptr<rpcReply> reply){
total += reply->value;
return asyncRpc(&second_args);
})
.deferValue([&](std::unique_ptr<rpcReply> reply){
total += reply->value;
return total;
}).get();
}
Once we have wrapped the library to return SemiFuture, it’s easy to schedule callbacks based on the nature of computation: we can schedule all rpc call on IOExecutor and reply handlers on CPUExecutor.
For example, in the following chained callbacks, cpuExecutor is used to handle replies, and ioExecutor is used to send the second rpc.
int start(rpcArgs first_args, rpcArgs second_args) {
int total = 0;
return asyncRpc(first_args)
.via(cpuExecutor)
.then([&](std::unique_ptr<rpcReply> reply){
total += reply->value;
})
.then(ioExecutor, [&](folly::Unit) {
return asyncRpc(&second_args);
})
.then([&](std::unique_ptr<rpcReply> reply){
total += reply->value;
return total;
}).get();
}