Consider the functions
#include <iostream>
#include <boost/bind.hpp>
#include <boost/asio.hpp>
void foo(const uint64_t begin, uint64_t *result)
{
uint64_t prev[] = {begin, 0};
for (uint64_t i = 0; i < 1000000000; ++i)
{
const auto tmp = (prev[0] + prev[1]) % 1000;
prev[1] = prev[0];
prev[0] = tmp;
}
*result = prev[0];
}
void batch(boost::asio::thread_pool &pool, const uint64_t a[])
{
uint64_t r[] = {0, 0};
boost::asio::post(pool, boost::bind(foo, a[0], &r[0]));
boost::asio::post(pool, boost::bind(foo, a[1], &r[1]));
pool.join();
std::cerr << "foo(" << a[0] << "): " << r[0] << " foo(" << a[1] << "): " << r[1] << std::endl;
}
where foo
is a simple "pure" function that performs a calculation on begin
and writes the result to the pointer *result
.
This function gets called with different inputs from batch
. Here dispatching each call to another CPU core might be beneficial.
Now assume the batch function gets called several 10 000 times. Therefore a thread pool would be nice which is shared between all the sequential batch calls.
Trying this with (for the sake of simplicity only 3 calls)
int main(int argn, char **)
{
boost::asio::thread_pool pool(2);
const uint64_t a[] = {2, 4};
batch(pool, a);
const uint64_t b[] = {3, 5};
batch(pool, b);
const uint64_t c[] = {7, 9};
batch(pool, c);
}
leads to the result
foo(2): 2 foo(4): 4
foo(3): 0 foo(5): 0
foo(7): 0 foo(9): 0
Where all three lines appear at the same time, while the computation of foo
takes ~3s.
I assume that only the first join
really waits for the pool to complete all jobs.
The others have invalid results. (The not initialized values)
What is the best practice here to reuse the thread pool?
The best practice is not to reuse the pool (what would be the use of pooling, if you keep creating new pools?).
If you want to be sure you "time" the batches together, I'd suggest using when_all
on futures:
Live On Coliru
#define BOOST_THREAD_PROVIDES_FUTURE_WHEN_ALL_WHEN_ANY
#include <iostream>
#include <boost/bind.hpp>
#include <boost/asio.hpp>
#include <boost/thread.hpp>
uint64_t foo(uint64_t begin) {
uint64_t prev[] = {begin, 0};
for (uint64_t i = 0; i < 1000000000; ++i) {
const auto tmp = (prev[0] + prev[1]) % 1000;
prev[1] = prev[0];
prev[0] = tmp;
}
return prev[0];
}
void batch(boost::asio::thread_pool &pool, const uint64_t a[2])
{
using T = boost::packaged_task<uint64_t>;
T tasks[] {
T(boost::bind(foo, a[0])),
T(boost::bind(foo, a[1])),
};
auto all = boost::when_all(
tasks[0].get_future(),
tasks[1].get_future());
for (auto& t : tasks)
post(pool, std::move(t));
auto [r0, r1] = all.get();
std::cerr << "foo(" << a[0] << "): " << r0.get() << " foo(" << a[1] << "): " << r1.get() << std::endl;
}
int main() {
boost::asio::thread_pool pool(2);
const uint64_t a[] = {2, 4};
batch(pool, a);
const uint64_t b[] = {3, 5};
batch(pool, b);
const uint64_t c[] = {7, 9};
batch(pool, c);
}
Prints
foo(2): 2 foo(4): 4
foo(3): 503 foo(5): 505
foo(7): 507 foo(9): 509
I would consider
Make it somewhat more flexible by not hardcoding batch sizes. After all, the pool size is already fixed, we don't need to "make sure batches fit" or something:
Live On Coliru
#define BOOST_THREAD_PROVIDES_FUTURE_WHEN_ALL_WHEN_ANY
#include <iostream>
#include <boost/bind.hpp>
#include <boost/asio.hpp>
#include <boost/thread.hpp>
#include <boost/thread/future.hpp>
struct Result { uint64_t begin, result; };
Result foo(uint64_t begin) {
uint64_t prev[] = {begin, 0};
for (uint64_t i = 0; i < 1000000000; ++i) {
const auto tmp = (prev[0] + prev[1]) % 1000;
prev[1] = prev[0];
prev[0] = tmp;
}
return { begin, prev[0] };
}
void batch(boost::asio::thread_pool &pool, std::vector<uint64_t> const a)
{
using T = boost::packaged_task<Result>;
std::vector<T> tasks;
tasks.reserve(a.size());
for(auto begin : a)
tasks.emplace_back(boost::bind(foo, begin));
std::vector<boost::unique_future<T::result_type> > futures;
for (auto& t : tasks) {
futures.push_back(t.get_future());
post(pool, std::move(t));
}
for (auto& fut : boost::when_all(futures.begin(), futures.end()).get()) {
auto r = fut.get();
std::cerr << "foo(" << r.begin << "): " << r.result << " ";
}
std::cout << std::endl;
}
int main() {
boost::asio::thread_pool pool(2);
batch(pool, {2});
batch(pool, {4, 3, 5});
batch(pool, {7, 9});
}
Prints
foo(2): 2
foo(4): 4 foo(3): 503 foo(5): 505
foo(7): 507 foo(9): 509
Contrary to popular believe (and honestly, what usually happens) this time we can leverage variadics to get rid of all the intermediate vectors (every single one of them):
Live On Coliru
void batch(boost::asio::thread_pool &pool, T... a)
{
auto launch = [&pool](uint64_t begin) {
boost::packaged_task<Result> pt(boost::bind(foo, begin));
auto fut = pt.get_future();
post(pool, std::move(pt));
return fut;
};
for (auto& r : {launch(a).get()...}) {
std::cerr << "foo(" << r.begin << "): " << r.result << " ";
}
std::cout << std::endl;
}
If you insist on outputting the results in time, you can still add when_all
into the mix (requiring a bit more heroics to unpack the tuple):
Live On Coliru
template <typename...T>
void batch(boost::asio::thread_pool &pool, T... a)
{
auto launch = [&pool](uint64_t begin) {
boost::packaged_task<Result> pt(boost::bind(foo, begin));
auto fut = pt.get_future();
post(pool, std::move(pt));
return fut;
};
std::apply([](auto&&... rfut) {
Result results[] {rfut.get()...};
for (auto& r : results) {
std::cerr << "foo(" << r.begin << "): " << r.result << " ";
}
}, boost::when_all(launch(a)...).get());
std::cout << std::endl;
}
Both still print the same result
This is very natural to boost, and sort of skips most complexity. If you also want to report per batched group, you'd have to coordinate:
Live On Coliru
#include <iostream>
#include <boost/asio.hpp>
#include <memory>
struct Result { uint64_t begin, result; };
Result foo(uint64_t begin) {
uint64_t prev[] = {begin, 0};
for (uint64_t i = 0; i < 1000000000; ++i) {
const auto tmp = (prev[0] + prev[1]) % 1000;
prev[1] = prev[0];
prev[0] = tmp;
}
return { begin, prev[0] };
}
using Group = std::shared_ptr<size_t>;
void batch(boost::asio::thread_pool &pool, std::vector<uint64_t> begins) {
auto group = std::make_shared<std::vector<Result> >(begins.size());
for (size_t i=0; i < begins.size(); ++i) {
post(pool, [i,begin=begins.at(i),group] {
(*group)[i] = foo(begin);
if (group.unique()) {
for (auto& r : *group) {
std::cout << "foo(" << r.begin << "): " << r.result << " ";
std::cout << std::endl;
}
}
});
}
}
int main() {
boost::asio::thread_pool pool(2);
batch(pool, {2});
batch(pool, {4, 3, 5});
batch(pool, {7, 9});
pool.join();
}
Note this is having concurrent access to
group
, which is safe due to the limitations on element accesses.
Prints:
foo(2): 2
foo(4): 4 foo(3): 503 foo(5): 505
foo(7): 507 foo(9): 509
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With