Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to store self-removing futures in a list

Tags:

c++

c++17

future

I have some tasks that need to be performed asynchronously, and the server can't close while there are still tasks running. So I'm trying to store the futures returned by std::async in a list, but I also don't want to get an infinitely growing list of those. So I want to remove the futures as they're completed.

Here's roughly what I'm trying to do:

// this is a member of the server class
std::list<std::future<void>> pending;

std::list<std::future<void>>::iterator iter = ???;

pending.push_back( std::async( std::launch::async, [iter]()
{
    doSomething();
    pending.remove( iter );
} );

Here, iter needs to be pointing to the newly inserted element, but I can't get it before inserting the element (there is no iterator), nor after (since it is passed to the lambda by value). I could make a shared_ptr to store the iterator, but that seems to be way overkill.

Is there a better pattern for this?

Update: there seems to be another issue with this. When a future attempts to remove itself from the list, it is essentially waiting for itself to complete, which locks everything up. Oops!

On top of that, list destructor empties the list before calling element destructors.

like image 837
riv Avatar asked Mar 18 '19 10:03

riv


2 Answers

It appears you can just append a default std::future to the list, get an iterator to that and then move your future in.

Mind you, that non-mutex-protected remove(iter) looks awfully dangerous.

like image 164
MSalters Avatar answered Nov 06 '22 19:11

MSalters


Here's one way. I don't think this one needs futures:

#include <unordered_set>
#include <condition_variable>
#include <mutex>
#include <thread>

struct server
{
    std::mutex pending_mutex;
    std::condition_variable pending_condition;
    std::unordered_set<unsigned> pending;
    unsigned next_id =  0;

    void add_task()
    {
        auto lock = std::unique_lock(pending_mutex);
        auto id = next_id++;
        auto t = std::thread([this, id]{
            this->doSomething();
            this->notify_complete(id);
        });
        t.detach(); // or we could store it somewhere. e.g. pending could be a map
        pending.insert(id);
    }

    void doSomething();

    void notify_complete(unsigned id)
    {
        auto lock = std::unique_lock(pending_mutex);
        pending.erase(id);
        if (pending.empty())
            pending_condition.notify_all();
    }

    void wait_all_complete()
    {
        auto none_left = [&] { return pending.empty(); };

        auto lock = std::unique_lock(pending_mutex);
        pending_condition.wait(lock, none_left);
    }
};


int main()
{
    auto s = server();
    s.add_task();
    s.add_task();
    s.add_task();
    s.wait_all_complete();
}

Here it is with futures, in case that's important:

#include <unordered_map>
#include <condition_variable>
#include <mutex>
#include <thread>
#include <future>

struct server
{
    std::mutex pending_mutex;
    std::condition_variable pending_condition;
    std::unordered_map<unsigned, std::future<void>> pending;
    unsigned next_id =  0;

    void add_task()
    {
        auto lock = std::unique_lock(pending_mutex);
        auto id = next_id++;
        auto f = std::async(std::launch::async, [this, id]{
            this->doSomething();
            this->notify_complete(id);
        });
        pending.emplace(id, std::move(f));
    }

    void doSomething();

    void notify_complete(unsigned id)
    {
        auto lock = std::unique_lock(pending_mutex);
        pending.erase(id);
        if (pending.empty())
            pending_condition.notify_all();
    }

    void wait_all_complete()
    {
        auto none_left = [&] { return pending.empty(); };

        auto lock = std::unique_lock(pending_mutex);
        pending_condition.wait(lock, none_left);
    }
};


int main()
{
    auto s = server();
    s.add_task();
    s.add_task();
    s.add_task();
    s.wait_all_complete();
}

Here's the list version:

#include <list>
#include <condition_variable>
#include <mutex>
#include <thread>
#include <future>

struct server
{
    using pending_list = std::list<std::future<void>>;
    using id_type = pending_list::const_iterator;

    std::mutex pending_mutex;
    std::condition_variable pending_condition;
    pending_list pending;

    void add_task()
    {
        auto lock = std::unique_lock(pending_mutex);

        // redundant construction
        auto id = pending.emplace(pending.end());
        auto f = std::async(std::launch::async, [this, id]{
            this->doSomething();
            this->notify_complete(id);
        });
        *id = std::move(f);
    }

    void doSomething();

    void notify_complete(id_type id)
    {
        auto lock = std::unique_lock(pending_mutex);
        pending.erase(id);
        if (pending.empty())
            pending_condition.notify_all();
    }

    void wait_all_complete()
    {
        auto none_left = [&] { return pending.empty(); };

        auto lock = std::unique_lock(pending_mutex);
        pending_condition.wait(lock, none_left);
    }
};


int main()
{
    auto s = server();
    s.add_task();
    s.add_task();
    s.add_task();
    s.wait_all_complete();
}
like image 2
Richard Hodges Avatar answered Nov 06 '22 18:11

Richard Hodges