Skip to content

Executors

Executors

A standard executor represents a policy as to how, when, and where a piece of code should be executed. The standard library does not include executors for parallel algorihms.

Boost.Asio provides a complete implementation of the proposed standard executors.

Creating an execution context, such as a thread pool

  • Execution context: place where we can execute functions
  • A thread pool is an execution context.

An execution context is:

  • Usually long lived.
  • Non-copyable.
  • May contain additional state, such as timers, and threads

Creating an executor from an executor context:

  • Executor: set of rules governing where, when and how to run a function object
  • A thread pool has executors that send tasks to it.
  • Its executor rule is: Run function objects in the pool and nowhere else.

An executor is:

  • May be long or short lived.
  • Lightweight and copyable.
  • May be customized on a fine-grained basis, such as exception behavior, and order

Some references:

  • https://think-async.com/Asio/
  • https://think-async.com/Asio/asio-1.18.1/doc/asio/std_executors.html
  • https://github.com/chriskohlhoff/executors
  • https://taskflow.github.io/taskflow/index.html
  • Read the unit tests, such as asio/src/tests/unit/thread_pool.cpp
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
find_package(Asio 1.21.0 QUIET)
if (NOT Asio_FOUND)
    FetchContent_Declare(asio GIT_REPOSITORY https://github.com/chriskohlhoff/asio.git GIT_TAG asio-1-21-0)
    FetchContent_GetProperties(asio)
    if (NOT asio_POPULATED)
        FetchContent_Populate(asio)
        add_library(asio INTERFACE)
        target_include_directories(asio INTERFACE ${asio_SOURCE_DIR}/asio/include)
        target_compile_definitions(asio INTERFACE ASIO_STANDALONE ASIO_NO_DEPRECATED)
        target_link_libraries(asio INTERFACE Threads::Threads)
    endif ()
endif()

1
2
add_executable(executors executors.cpp)
target_link_libraries(executors asio)

1
asio::thread_pool pool;

1
auto ex = pool.executor();

1
std::cout << (&pool == &ex.context()) << '\n';

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
// Executing directly in the thread pool
// Execution behaviour according to eagerness:
// - https://github.com/chriskohlhoff/executors
// - Dispatch: Run the function object immediately if possible.
//             Most eager operation.
//             Might run before dispatch returns.
//             If inside pool, run immediately.
//             If outside pool, add to queue.
asio::dispatch(ex, [&ex] {
    // This runs before finishing the function
    asio::dispatch(ex, [] { std::cout << "dispatch b" << '\n'; });
    std::cout << "dispatch a" << '\n';
});

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
// - Post: Submit the function for later execution.
//         Never immediately in the same thread.
//         Always adds to pool queue.
//         Never blocking.
asio::post(ex, [&ex] {
    // This will all run in parallel
    asio::post(ex, [] { std::cout << "post b" << '\n'; });
    asio::post(ex, [] { std::cout << "post c" << '\n'; });
    std::cout << "post a" << '\n';
});

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
// - Defer: Submit the function for later execution.
//          Least eager.
//          Implies relationship between calling thread and function.
//          Used when function is a continuation to the calling function.
//          The function is added to the queue after the current function
//          ends. If inside pool, adds to a thread local queue. If outside
//          pool, add to queue. Thread posting might immediately run it.
//          Potentially blocking.
asio::defer(ex, [&ex] {
    // This will all run only when this function is over
    asio::defer(ex, [] { std::cout << "defer b" << '\n'; });
    std::cout << "defer a" << '\n';
});

1
2
std::future<int> r1 = asio::post(ex, asio::use_future([]() { return 2; }));
std::cout << "Result = " << r1.get() << '\n';

1
2
3
4
std::future<void> r2 = asio::post(
    ex, asio::use_future([]() { std::cout << "Print message"; }));
r2.get();
std::cout << " -> Message printed" << '\n';

1
2
3
4
5
// A strand is an executor and an executor adapter.
// Its rule is: Run function objects according to the underlying
// executor’s rules, but also run them in FIFO order and not
// concurrently.
asio::strand<asio::thread_pool::executor_type> st(ex);

1
st.execute([] { std::cout << "FIFO-1a" << '\n'; });

1
asio::post(st, [] { std::cout << "FIFO-1b" << '\n'; });

1
2
3
4
auto fifo_r = asio::post(
    st, asio::use_future([] { std::cout << "FIFO-1c" << '\n'; }));
fifo_r.wait();
std::cout << "FIFO tasks done" << '\n';

1
2
3
4
5
6
7
8
9
std::promise<int> p;
std::future<int> f = p.get_future();
auto fn = [&p]() {
    std::cout << "Task 2 executes asynchronously" << '\n';
    // "return" 2 by setting the promise value
    p.set_value(2);
};
asio::post(fn);
std::cout << "f.get(): " << f.get() << '\n';

1
2
3
4
5
6
7
8
9
std::promise<int> p;
std::future<int> f = p.get_future();
auto fn = [&p]() {
    std::cout << "Task 2 executes asynchronously" << '\n';
    // "return" 2 by setting the promise value
    p.set_value(2);
};
asio::post(fn);
std::cout << "f.get(): " << f.get() << '\n';

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
auto task1 = asio::post(asio::use_future(
    [] { std::cout << "Task 1 executes asynchronously" << '\n'; }));

auto task2 = asio::post(asio::use_future([]() {
    std::cout << "Task 2 executes in parallel with task 1" << '\n';
    return 42;
}));

// something like task3 = task2.then([](int task2_output){...});
auto task3 = asio::post(asio::use_future([&]() {
    // poll task2 for its results
    int task2_output = task2.get();
    std::cout << "Task 3 executes after task 2, which returned "
              << task2_output << '\n';
    return task2_output * 3;
}));

1
2
3
4
5
6
// something like task4 = when_all(task1, task3);
auto task4 = asio::post(asio::use_future([&]() {
    task1.wait();
    auto task3_output = task3.get();
    return task3_output;
}));

1
2
3
4
5
6
7
8
// something like task5 = task4.then([](std::tuple<void, int>))
auto task5 = asio::post(asio::use_future([&]() {
    auto task4_output = task4.get();
    std::cout << "Task 5 executes after tasks 1 and 3. Task 3 returned "
              << task4_output << "." << '\n';
}));
task5.get();
std::cout << "Task 5 has completed" << '\n';

1
2
3
4
5
6
7
for (int i = 0; i < 20; ++i) {
    asio::post(ex, [i] {
        std::cout << "Thread " << i << " going to sleep" << '\n';
        std::this_thread::sleep_for(std::chrono::seconds(1));
        std::cout << "Thread " << i << " awake" << '\n';
    });
}

1
2
3
4
auto parallel_invoke = [](auto ex, auto fn1, auto fn2) {
    asio::post(ex, fn1);
    asio::post(ex, fn2);
};

1
2
3
parallel_invoke(
    ex, [] { std::cout << "parallel_invoke a" << '\n'; },
    [] { std::cout << "parallel_invoke b" << '\n'; });

1
2
3
4
5
6
auto parallel_for = [](auto ex, auto begin, auto end, auto fn) {
    while (begin != end) {
        asio::post(ex, [begin, &fn] { fn(*begin); });
        ++begin;
    }
};

1
2
3
std::vector<int> v = {0, 1, 2, 3, 4, 5};
parallel_for(ex, v.begin(), v.end(), [](int x) { std::cout << x; });
std::cout << '\n';

1
2
3
4
5
6
template <class T> auto make_ready_future(T &&value) {
    std::promise<std::decay_t<T>> result_promise;
    std::future<std::decay_t<T>> result_future = result_promise.get_future();
    result_promise.set_value(value);
    return result_future;
}

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
template <class FN, class Iterator>
std::future<typename Iterator::value_type>
parallel_reduce(auto ex, Iterator begin, Iterator end, FN fn) {
    auto second = std::next(begin);
    const bool is_single_element = second == end;
    const bool is_single_pair =
        !is_single_element && (std::next(second) == end);
    if (is_single_element) {
        return make_ready_future(*begin);
    } else if (is_single_pair) {
        return asio::post(ex, asio::use_future([begin, second, &fn] {
                              return fn(*begin, *second);
                          }));
    } else {
        // we would probably add a heuristic here for small ranges
        size_t n = std::distance(begin, end);
        auto half = std::next(begin, n / 2);
        auto lhs = parallel_reduce(ex, begin, half, fn);
        auto rhs = parallel_reduce(ex, half, end, fn);
        return make_ready_future(lhs.get() + rhs.get());
    }
}

Share Snippets