I have some tasks that need to be performed asynchronously, and the server can't close while there are still tasks running. So I'm trying to store the futures returned by std::async
in a list, but I also don't want to get an infinitely growing list of those. So I want to remove the futures as they're completed.
Here's roughly what I'm trying to do:
// this is a member of the server class
std::list<std::future<void>> pending;
std::list<std::future<void>>::iterator iter = ???;
pending.push_back( std::async( std::launch::async, [iter]()
{
doSomething();
pending.remove( iter );
} );
Here, iter
needs to be pointing to the newly inserted element, but I can't get it before inserting the element (there is no iterator), nor after (since it is passed to the lambda by value). I could make a shared_ptr
to store the iterator, but that seems to be way overkill.
Is there a better pattern for this?
Update: there seems to be another issue with this. When a future attempts to remove itself from the list, it is essentially waiting for itself to complete, which locks everything up. Oops!
On top of that, list destructor empties the list before calling element destructors.
It appears you can just append a default std::future
to the list, get an iterator to that and then move your future in.
Mind you, that non-mutex-protected remove(iter)
looks awfully dangerous.
Here's one way. I don't think this one needs futures:
#include <unordered_set>
#include <condition_variable>
#include <mutex>
#include <thread>
struct server
{
std::mutex pending_mutex;
std::condition_variable pending_condition;
std::unordered_set<unsigned> pending;
unsigned next_id = 0;
void add_task()
{
auto lock = std::unique_lock(pending_mutex);
auto id = next_id++;
auto t = std::thread([this, id]{
this->doSomething();
this->notify_complete(id);
});
t.detach(); // or we could store it somewhere. e.g. pending could be a map
pending.insert(id);
}
void doSomething();
void notify_complete(unsigned id)
{
auto lock = std::unique_lock(pending_mutex);
pending.erase(id);
if (pending.empty())
pending_condition.notify_all();
}
void wait_all_complete()
{
auto none_left = [&] { return pending.empty(); };
auto lock = std::unique_lock(pending_mutex);
pending_condition.wait(lock, none_left);
}
};
int main()
{
auto s = server();
s.add_task();
s.add_task();
s.add_task();
s.wait_all_complete();
}
Here it is with futures, in case that's important:
#include <unordered_map>
#include <condition_variable>
#include <mutex>
#include <thread>
#include <future>
struct server
{
std::mutex pending_mutex;
std::condition_variable pending_condition;
std::unordered_map<unsigned, std::future<void>> pending;
unsigned next_id = 0;
void add_task()
{
auto lock = std::unique_lock(pending_mutex);
auto id = next_id++;
auto f = std::async(std::launch::async, [this, id]{
this->doSomething();
this->notify_complete(id);
});
pending.emplace(id, std::move(f));
}
void doSomething();
void notify_complete(unsigned id)
{
auto lock = std::unique_lock(pending_mutex);
pending.erase(id);
if (pending.empty())
pending_condition.notify_all();
}
void wait_all_complete()
{
auto none_left = [&] { return pending.empty(); };
auto lock = std::unique_lock(pending_mutex);
pending_condition.wait(lock, none_left);
}
};
int main()
{
auto s = server();
s.add_task();
s.add_task();
s.add_task();
s.wait_all_complete();
}
Here's the list version:
#include <list>
#include <condition_variable>
#include <mutex>
#include <thread>
#include <future>
struct server
{
using pending_list = std::list<std::future<void>>;
using id_type = pending_list::const_iterator;
std::mutex pending_mutex;
std::condition_variable pending_condition;
pending_list pending;
void add_task()
{
auto lock = std::unique_lock(pending_mutex);
// redundant construction
auto id = pending.emplace(pending.end());
auto f = std::async(std::launch::async, [this, id]{
this->doSomething();
this->notify_complete(id);
});
*id = std::move(f);
}
void doSomething();
void notify_complete(id_type id)
{
auto lock = std::unique_lock(pending_mutex);
pending.erase(id);
if (pending.empty())
pending_condition.notify_all();
}
void wait_all_complete()
{
auto none_left = [&] { return pending.empty(); };
auto lock = std::unique_lock(pending_mutex);
pending_condition.wait(lock, none_left);
}
};
int main()
{
auto s = server();
s.add_task();
s.add_task();
s.add_task();
s.wait_all_complete();
}
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With