Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

C++ std::vector of independent std::threads

I'm building a real time software where I have a main infinite loops on main() and threads used to read and process data.

One of the issues is keeping a std::vector of running threads to send signals to them and to monitor execution. So I put together this code:

#include <iostream>
#include <string>
#include <vector>
#include <thread>
#include <chrono>

namespace readerThread {

    void start(int id)
    {
        while (1)
        {
            std::cout << "Reader " << id << " running..." <<  std::endl;
            std::this_thread::sleep_for(std::chrono::milliseconds(1000));
        }
    }

}


int main() 
{

        int readers[] = { 1, 2, 3 };
        
        std::vector<std::thread> readerThreads;

        for (int &reader : readers)
        {
            std::thread th(readerThread::start, reader);
            readerThreads.push_back(th);
        }

        while(true)
        {
            std::cout << "Waiting..." << std::endl;
            std::this_thread::sleep_for(std::chrono::milliseconds(10000));
        }
        
        return 0;
}

It doesn´t even compile, getting this error:

In file included from /usr/local/include/c++/5.1.0/x86_64-unknown-linux-gnu/bits/c++allocator.h:33:0,
                 from /usr/local/include/c++/5.1.0/bits/allocator.h:46,
                 from /usr/local/include/c++/5.1.0/string:41,
                 from /usr/local/include/c++/5.1.0/bits/locale_classes.h:40,
                 from /usr/local/include/c++/5.1.0/bits/ios_base.h:41,
                 from /usr/local/include/c++/5.1.0/ios:42,
                 from /usr/local/include/c++/5.1.0/ostream:38,
                 from /usr/local/include/c++/5.1.0/iostream:39,
                 from main.cpp:1:
/usr/local/include/c++/5.1.0/ext/new_allocator.h: In instantiation of 'void __gnu_cxx::new_allocator<_Tp>::construct(_Up*, _Args&& ...) [with _Up = std::thread; _Args = {const std::thread&}; _Tp = std::thread]':
/usr/local/include/c++/5.1.0/bits/alloc_traits.h:256:4:   required from 'static std::_Require<std::allocator_traits<_Alloc>::__has_construct<_Tp, _Args ...> > std::allocator_traits<_Alloc>::_S_construct(_Alloc&, _Tp*, _Args&& ...) [with _Tp = std::thread; _Args = {const std::thread&}; _Alloc = std::allocator<std::thread>; std::_Require<std::allocator_traits<_Alloc>::__has_construct<_Tp, _Args ...> > = void]'
/usr/local/include/c++/5.1.0/bits/alloc_traits.h:402:16:   required from 'static decltype (_S_construct(__a, __p, (forward<_Args>)(std::allocator_traits::construct::__args)...)) std::allocator_traits<_Alloc>::construct(_Alloc&, _Tp*, _Args&& ...) [with _Tp = std::thread; _Args = {const std::thread&}; _Alloc = std::allocator<std::thread>; decltype (_S_construct(__a, __p, (forward<_Args>)(std::allocator_traits::construct::__args)...)) = <type error>]'
/usr/local/include/c++/5.1.0/bits/stl_vector.h:917:30:   required from 'void std::vector<_Tp, _Alloc>::push_back(const value_type&) [with _Tp = std::thread; _Alloc = std::allocator<std::thread>; std::vector<_Tp, _Alloc>::value_type = std::thread]'
main.cpp:37:30:   required from here
/usr/local/include/c++/5.1.0/ext/new_allocator.h:120:4: error: use of deleted function 'std::thread::thread(const std::thread&)'
  { ::new((void *)__p) _Up(std::forward<_Args>(__args)...); }
    ^
In file included from main.cpp:4:0:
/usr/local/include/c++/5.1.0/thread:126:5: note: declared here
     thread(const thread&) = delete;
     ^

The threads are independent, so I don't need to call join on the main program nor on any thread...

So, here are my doubts:

Why my code does not compile?

Is that the correct way to store the vector of threads ?

Thanks for helping...

PS:Original code here:

like image 888
Mendes Avatar asked Jun 10 '15 22:06

Mendes


3 Answers

You need to use something like

readerThreads.push_back(move(th));

This will make th an rvalue, and cause the move ctor to be called. The copy ctor of thread was disabled by design (see Anthony Williams' C++ Concurrency In Action).

like image 148
Ami Tavory Avatar answered Oct 05 '22 22:10

Ami Tavory


/usr/local/include/c++/5.1.0/ext/new_allocator.h: In instantiation of 'void __gnu_cxx::new_allocator<_Tp>::construct(_Up*, _Args&& ...) [with _Up = std::thread; _Args = {const std::thread&}; _Tp = std::thread]':
/usr/local/include/c++/5.1.0/bits/alloc_traits.h:256:4:   required from 'static std::_Require<std::allocator_traits<_Alloc>::__has_construct<_Tp, _Args ...> > std::allocator_traits<_Alloc>::_S_construct(_Alloc&, _Tp*, _Args&& ...) [with _Tp = std::thread; _Args = {const std::thread&}; _Alloc = std::allocator<std::thread>; std::_Require<std::allocator_traits<_Alloc>::__has_construct<_Tp, _Args ...> > = void]'
/usr/local/include/c++/5.1.0/bits/alloc_traits.h:402:16:   required from 'static decltype (_S_construct(__a, __p, (forward<_Args>)(std::allocator_traits::construct::__args)...)) std::allocator_traits<_Alloc>::construct(_Alloc&, _Tp*, _Args&& ...) [with _Tp = std::thread; _Args = {const std::thread&}; _Alloc = std::allocator<std::thread>; decltype (_S_construct(__a, __p, (forward<_Args>)(std::allocator_traits::construct::__args)...)) = <type error>]'
/usr/local/include/c++/5.1.0/bits/stl_vector.h:917:30:   required from 'void std::vector<_Tp, _Alloc>::push_back(const value_type&) [with _Tp = std::thread; _Alloc = std::allocator<std::thread>; std::vector<_Tp, _Alloc>::value_type = std::thread]'
main.cpp:37:30:   required from here
/usr/local/include/c++/5.1.0/ext/new_allocator.h:120:4: error: use of deleted function 'std::thread::thread(const std::thread&)'
  { ::new((void *)__p) _Up(std::forward<_Args>(__args)...); }

Lets peel this back a bit.

error: use of deleted function 'std::thread::thread(const std::thread&)'

Your code is doing something that attempts to introduce a std::thread.

required from 'void std::vector<_Tp, _Alloc>::push_back(const value_type&)

push_back is the culprit.

std::thread is not copyable - what would it mean to copy a thread?

std::thread t1([](){});
std::thread t2 = t1;

So instances of std::thread objects are intended to be unique owners. Aside from the simple confusion, much pain would ensue.

They are, however, movable.

std::thread t1([](){});
std::thread t2 = std::move(t1);

t1 is no-longer a valid thread descriptor, the thread it was describing is now owned by t2.

For placing such things into a container you can either use std::move or std::emplace/std::emplace_back.

std::vector<std::thread> threads;
threads.push_back(std::move(std::thread([](){})));
threads.emplace_back([](){});

While your code is focusing on this particular issue, let me point out that the C++ standard declares it as an error for a thread destructor to be invoked while a thread is still attached and not joined.

int main() {
    std::thread t1([](){ while (true) { std::this_thread::yield(); } };
}

When main terminates, t1.~thread() is called, which detects that the thread is still attached and not joined, this raises an exception causing a shutdown crash.

You either need to join() the thread, waiting for it to terminate running, or detach() it. You'll need some way to tell the thread to stop if you want to use join() and if you detach() the program may exit in the middle of the thread doing something like writing data etc, you may introduce a serious bug.

#include <thread>
#include <chrono>
#include <future>

int main () {
  std::promise<void> cnx_promise;
  std::shared_future<void> cnx_future;

  std::thread t1([cnx_future]() {
      while (cnx_future.valid()) {
        std::this_thread::yield();
      }
  });

  std::this_thread::sleep_for(std::chrono::seconds(1));

  cnx_promise.set_value();

  t1.join();
}

Here we use a promise to let the thread know when it's time to stop running, but you could use condition variables, signals, etc, or even just a simple std::atomic<bool> ok_to_run { true }; that you test for false.

like image 24
kfsone Avatar answered Oct 06 '22 00:10

kfsone


Another variant that works is to create your thread object in the vector.push_back call. No need to call std::move in this case because its already an rvalue (thus it will be be moved).

for (int &reader : readers)
    readerThreads.push_back(std::thread(readerThread::start, reader));
like image 35
Seth Avatar answered Oct 05 '22 23:10

Seth