To give some background information, I am processing a saved file, and after using a regular expression to split the file into it's component objects, I then need to process the object's data based on which type of object it is.
My current thought is to use parallelism to get a little bit of a performance gain as loading each object is independent of each other. So I was going to define a LoadObject
function accepting a std::string
for each type of object I'm going to be handling and then calling std::async
as follows:
void LoadFromFile( const std::string& szFileName ) { static const std::regex regexObject( "=== ([^=]+) ===\\n((?:.|\\n)*)\\n=== END \\1 ===", std::regex_constants::ECMAScript | std::regex_constants::optimize ); std::ifstream inFile( szFileName ); inFile.exceptions( std::ifstream::failbit | std::ifstream::badbit ); std::string szFileData( (std::istreambuf_iterator<char>(inFile)), (std::istreambuf_iterator<char>()) ); inFile.close(); std::vector<std::future<void>> vecFutures; for( std::sregex_iterator itObject( szFileData.cbegin(), szFileData.cend(), regexObject ), end; itObject != end; ++itObject ) { // Determine what type of object we're loading: if( (*itObject)[1] == "Type1" ) { vecFutures.emplace_back( std::async( LoadType1, (*itObject)[2].str() ) ); } else if( (*itObject)[1] == "Type2" ) { vecFutures.emplace_back( std::async( LoadType2, (*itObject)[2].str() ) ); } else { throw std::runtime_error( "Unexpected type encountered whilst reading data file." ); } } // Make sure all our tasks completed: for( auto& future : vecFutures ) { future.get(); } }
Note that there will be more than 2 types in the application (this was just a short example) and potentially thousands of objects in the file to be read.
I am aware that creating too many threads is often a bad thing for performance when it exceeds the maximum hardware concurrency due to context switches, but if my memory serves me correctly the C++ runtime is supposed to monitor the number of threads created and schedule std::async
appropriately (I believe in Microsoft's case their ConcRT library is responsible for this?), so the above code may still result in a performance improvement?
Thanks in advance!
So if you want to make sure that the work is done asynchronously, use std::launch::async . @user2485710 it needs to block when you retrieve the result, if you need the result in the launching thread. It cannot use the result if the result is not ready. So if you go to get the result, you have to wait until it is ready.
How does std::launch::async Work in Different Implementations? For now, we know that if no policy is specified, then std::async launches a callable function in a separate thread. However, the C++ standard does not specify whether the thread is a new one or reused from a thread pool.
std::async. Calls fn (with args as arguments) at some point, returning without waiting for the execution of fn to complete. The value returned by fn can be accessed through the future object returned (by calling its member future::get ).
the C++ runtime is supposed to monitor the number of threads created and schedule std::async appropriately
No. If the asynchronous tasks are in fact run asynchronously (rather than deferred) then all that's required is that they are run as if on a new thread. It is perfectly valid for a new thread to be created and started for every task, without any regard for the hardware's limited capacity for parallelism.
There's a note:
[ Note: If this policy is specified together with other policies, such as when using a policy value of launch::async | launch::deferred, implementations should defer invocation or the selection of the policy when no more concurrency can be effectively exploited. —end note ]
However, this is non-normative and in any case it indicates that once no more concurrency can be exploited the tasks may become deferred, and therefore get executed when someone waits on the result, rather than still being asynchronous and running immediately after one of the previous asynchronous tasks is finished, as would be desirable for maximum parallelism.
That is, if we have 10 long running tasks and the implementation can only execute 4 in parallel, then the first 4 will be asynchronous and then the last 6 may be deferred. Waiting on the futures in sequence would execute the deferred tasks on a single thread in sequence, eliminating parallel execution for those tasks.
The note does also say that instead of deferring invocation, the selection of the policy may be deferred. That is, the function may still run asynchronously but that decision may be delayed, say, until one of the earlier tasks completes, freeing up a core for a new task. But again, this is not required, the note is non-normative, and as far as I know Microsoft's implementation is the only one that behaves this way. When I looked at another implementation, libc++, it simply ignores this note altogether so that using either std::launch::async
or std::launch::any
policies result in asynchronous execution on a new thread.
(I believe in Microsoft's case their ConcRT library is responsible for this?)
Microsoft's implementation does indeed behave as you describe, however this is not required and a portable program cannot rely on that behavior.
One way to portably limit how many threads are actually running is to use something like a semaphore:
#include <future> #include <vector> #include <mutex> #include <cstdio> // a semaphore class // // All threads can wait on this object. When a waiting thread // is woken up, it does its work and then notifies another waiting thread. // In this way only n threads will be be doing work at any time. // class Semaphore { private: std::mutex m; std::condition_variable cv; unsigned int count; public: Semaphore(int n) : count(n) {} void notify() { std::unique_lock<std::mutex> l(m); ++count; cv.notify_one(); } void wait() { std::unique_lock<std::mutex> l(m); cv.wait(l, [this]{ return count!=0; }); --count; } }; // an RAII class to handle waiting and notifying the next thread // Work is done between when the object is created and destroyed class Semaphore_waiter_notifier { Semaphore &s; public: Semaphore_waiter_notifier(Semaphore &s) : s{s} { s.wait(); } ~Semaphore_waiter_notifier() { s.notify(); } }; // some inefficient work for our threads to do int fib(int n) { if (n<2) return n; return fib(n-1) + fib(n-2); } // for_each algorithm for iterating over a container but also // making an integer index available. // // f is called like f(index, element) template<typename Container, typename F> F for_each(Container &c, F f) { typename Container::size_type i = 0; for (auto &e : c) f(i++, e); return f; } // global semaphore so that lambdas don't have to capture it Semaphore thread_limiter(4); int main() { std::vector<int> input(100); for_each(input, [](int i, int &e) { e = (i%10) + 35; }); std::vector<std::future<int>> output; for_each(input, [&output](int i, int e) { output.push_back(std::async(std::launch::async, [] (int task, int n) -> int { Semaphore_waiter_notifier w(thread_limiter); std::printf("Starting task %d\n", task); int res = fib(n); std::printf("\t\t\t\t\t\tTask %d finished\n", task); return res; }, i, e)); }); for_each(output, [](int i, std::future<int> &e) { std::printf("\t\t\tWaiting on task %d\n", i); int res = e.get(); std::printf("\t\t\t\t\t\t\t\t\tTask %d result: %d\n", i, res); }); }
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With