Task graphs
It is quite common to compose task graphs with future adaptors. In many of these graphs, some continuation tasks might need to recur to a task that has already been executed in the graph.
Directed acyclic graphs
Say we want to execute the following task graph:
This is not an uncommon pattern in asynchronous applications:
We can combine future adaptors to directly express task graphs without cycles. In this case, our problem is that the adaptor then only supports a single continuation. What we need is another continuation that defines which continuation to execute. This is achieved by using the continuations to launch new tasks.
cfuture<int> A = async([]() { return 2; });
cfuture<bool> B = then(A, [](int a) {
return try_operation(a);
});
inline_executor ex = make_inline_executor();
auto C_or_D = then(ex, B, [](bool ok) {
return ok ? async(handle_success) : async(handle_error);
});
int r = C_or_D.get().get();
assert(r == 0 || r == 1);
Task A
can attach its continuation B
as usual. However, task B
needs to check a condition before launching C
or
D
. The process of checking this condition becomes the continuation to B
: a continuation to decide which continuation
to launch.
However, note that we use an inline_executor for checking the condition. Deciding what task to launch is a cheap operation, and we do not want to send another task to the underlying executor to do that. This inline execution effectively makes the continuation behave as a light callback.
This pattern works because futures and promises enable representations equivalent to an implicit task queue. Any task we launch is going to this implicit queue. Continuations make reference to earlier objects in the queue. Earlier tasks can decide what to push to this queue.
Rescheduling tasks
Say we want to execute the following task graph:
This kind of pattern is not uncommon for tasks that might fail
or tasks that should be split into smaller homogeneous tasks:
The previous pattern will not work for this graph because B
needs to have recursive access to the task that launches
itself. This simplest way to store these recursive functions is with a struct
or class
.
struct graph_launcher {
promise<int> end_;
// ...
The graph contains a single promise whose value we will set when the complete subgraph is executed. Note that we can't simply wait for C outside the graph because its future instance is not valid until B succeeds. The future C will not exist yet rather than simply not being ready. So we start the task graph by launching A.
// struct graph_launcher {
// ...
cfuture<int>
start() {
cfuture<int> A = async([]() { return 2; });
inline_executor ex = make_inline_executor();
then(ex, A, [this](int a) { schedule_B(a); }).detach();
return end_.get_future();
}
// ...
A is launched as soon as we start. We use the inline_executor for light callbacks as in the previous example. We detach the callback function to because we don't need to wait for this task. We only need to wait for the final promise.
The process of scheduling B is modularized into another function because we need to access it recursively. This works as usual:
// struct graph_launcher {
// ...
void
schedule_B(int a) {
cfuture<bool> B = async(
[](int ra) { return try_operation(ra); },
a);
inline_executor ex = make_inline_executor();
then(ex, B, [this, a](bool ok) {
if (ok) {
schedule_C();
} else {
handle_error();
schedule_B(a);
}
}).detach();
}
// ...
The light callback for B works as in the previous example for DAGs. The only difference here is schedule_B
might need
to call itself when it fails. The struct
makes this recursion easier to access.
If the operation is successful and we schedule task C, it simply handles the operation result and sets the promise we created.
// struct graph_launcher {
// ...
void
schedule_C() {
async([this]() {
int r = handle_success();
end_.set_value(1);
return r;
}).detach();
}
};
Outside the graph, we can just wait for the promise to be set.
graph_launcher g;
cfuture<int> f = g.start();
assert(f.get() == 1);
In practice, we would probably attempt to reschedule B a number of times and a stop token could be attached to the graph to allow us to request it to stop at any time. These scheduling functions are usually going to be interleaved with the application logic. For instance, a web client would also use this object to store variables related to the state of the request.
When we compare this model with implicit queues, the complete asynchronous operation as become a subgraph that
effectively represents a single subtask in this task queue. In fact, if we know the executor we are going to use is
modelled as an explicit task queue, such as asio::io_context
, we don't even need the promise because we can just pop
tasks from the queue until there are no tasks left. At this point, we implicitly know task C has been executed.
The promise makes the subgraph itself behave as a single future in the implicit task queue. The graph members could be
encapsulated into a class, and the functions get
/wait
could be provided to request the value of the promise. In this
case, we would have one more complete future_like object. This type would be able to interact
with other futures through the library future adaptors.
Loops in graphs
The pattern above can also be reused for asynchronous loops. Say we want to execute the following task graph:
This kind of pattern is not uncommon for tasks that run continuously:
In this example, we have rescheduling because reading and writing longer message needs to be split into smaller tasks. The server also needs to launch another listening task while it serves that client. We also have loops because after writing a response we might need to read more requests or disconnect the client.
What we have here is a conditional continuation that might move backwards in case of failure. The logic for recursively
rescheduling A
is the same as the logic for rescheduling B
. We define this logic in a separate function and
recursively call it. The new struct
would be:
struct graph_launcher {
promise<int> end_;
cfuture<int>
start() {
schedule_A();
return end_.get_future();
}
// ...
The logic to schedule A is now moved into another function because we need to reuse it. A schedules B as usual.
// struct graph_launcher {
// ...
void
schedule_A() {
cfuture<int> A = async([]() { return 2; });
inline_executor ex = make_inline_executor();
then(ex, A, [this](int a) { schedule_B(a); }).detach();
}
// ...
This time, in case of failure, task B moves back to task A instead of rescheduling.
// struct graph_launcher {
// ...
void
schedule_B(int a) {
cfuture<bool> B = async(
[](int ra) { return try_operation(ra); },
a);
inline_executor ex = make_inline_executor();
then(ex, B, [this](bool ok) {
if (ok) {
schedule_C();
} else {
handle_error();
schedule_A();
}
}).detach();
}
// ...
Task C sets the promise as usual.
// struct graph_launcher {
// ...
void
schedule_C() {
async([this]() {
int r = handle_success();
end_.set_value(1);
return r;
}).detach();
}
};
We also wait for the graph as usual.
graph_launcher g;
cfuture<int> f = g.start();
assert(f.get() == 1);
Note that this pattern of implicit task graphs is easy enough to generalize as an explicit task graph object. An explicit task graph needs to be aware of the tasks it might launch (its vertices) and the connections between tasks (its edges).
However, there are many reasons not to reuse such a graph object. A graph object would make it more difficult to interleave data related to the application logic with the tasks and the intermediary tasks usually have different types. We could either recur to template instantiations for each possible graph combination or type erase these differences. Both alternatives are more expensive and verbose than directly creating functions to recursively reschedule tasks.
Task graphs in C++
Libraries such as Taskflow and TTB provide facilities to compose complete task graphs:
std::future A = std::async([] () {
std::cout << "TaskA\n";
});
// A runs before B and C
std::future B = std::async([&A] () {
A.wait(); // Polling :(
std::cout << "TaskB\n";
});
std::future C = std::async([&A] () {
A.wait(); // Polling :(
std::cout << "TaskC\n";
});
// D runs after B and C
std::future D = std::async([&B, &C] () {
B.wait(); // Polling :(
C.wait(); // Polling :(
std::cout << "TaskD\n";
});
D.wait();
std::future A = std::async([] () {
std::cout << "TaskA\n";
});
// A runs before B and C
std::future B = A.then([] () { // Synchronization cost :(
std::cout << "TaskB\n"; // No polling :)
});
std::future C = A.then([] () { // Synchronization cost :(
std::cout << "TaskC\n"; // No polling :)
});
// D runs after B and C
std::future D = some_future_lib::when_all(B, C).then([] () {
std::cout << "TaskD\n";
});
D.wait();
tf::Executor executor;
tf::Taskflow taskflow;
auto [A, B, C, D] = g.emplace(
[] () {
std::cout << "TaskA\n"; // No eager execution
},
[] () {
std::cout << "TaskB\n"; // No eager execution
},
[] () {
std::cout << "TaskC\n"; // No eager execution
},
[] () {
std::cout << "TaskD\n"; // No eager execution
}
);
A.precede(B, C); // No synchronization cost :)
D.succeed(B, C); // No synchronization cost :)
executor.run(g).wait();
graph g;
function_node<void> A( g, 1, [] () {
std::cout << "TaskA\n"; // No eager execution
} );
function_node<void> B( g, 1, [] () {
std::cout << "TaskB\n"; // No eager execution
} );
function_node<void> C( g, 1, [] () {
std::cout << "TaskC\n"; // No eager execution
} );
function_node<void> D( g, 1, [] () {
std::cout << "TaskD\n"; // No eager execution
} );
make_edge(A, B); // No synchronization cost :)
make_edge(A, C); // No synchronization cost :)
make_edge(B, D); // No synchronization cost :)
make_edge(C, D); // No synchronization cost :)
g.wait_for_all();
Tasks in a task graph are analogous to deferred futures whose continuations are defined before the execution starts. However, we need to explicitly define all relationships between tasks before any execution starts, which might be inconvenient in some applications. Futures and async functions, on the other hand, allow us to 1) combine eager and lazy tasks, and 2) directly express their relationships in code without any explicit graph containers.
On the other hand, P1055 proposed the concept of deferred work, in opposition to eager futures, such as std::future. The idea is that a task related to a future should not start before its continuation is applied. This eliminates the race between the result and the continuation in eager futures. Futures with deferred work are also easier to implement (example).