I find myself frequently writing C++ code of the form:
while (getline(strm, line)) {
cout << computationally_intensive_function(line) << endl;
}
I would like to parallelize this code. The best solution I have come up with so far is to build vector of strings to hold a large (10000-100000) number of lines, and then parallelize over this vector with
#pragma omp parallel for
Then empty the vector and repeat while lines remain. However, this method requires a lot of memory and the other cores are idle while the main process is buffering strings. Is there a better way? Something like Python's multiprocessing.Pool.map
or Hadoop? (I would like to avoid using Hadoop's C++ API however, because Hadoop is rather heavyweight and may not be installed everywhere my code would be running.)
There exists that not well known feature of OpenMP 3.0 tasks, which is quite unfortunate since they were specifically created to cover cases like this one. If your compiler supports that standard version you should definitely go for OpenMP tasks. But keep in mind that writing to stdout
(or std::cout
) from multiple threads generally mixes their output badly and you would most likely want to synchronise on it:
#pragma omp parallel
{
#pragma omp master
while (getline(strm, line))
#pragma omp task
{
result_type result = computationally_intensive_function(line);
#pragma omp critical
{
cout << result << endl;
cout.flush();
}
}
#pragma omp taskwait
}
I leave it up to you to decide what variables should be shared
and what should be private
.
You should be overlapping your computations with reading lines from file. One good way to do it would be to use Threading Building Blocks pipeline algorithm. What you do is you specify three (based on what you're showing in the pseudo-code example) filters, two serial one and one parallel. Serial filters are input and output ones. The first one reads data from a file line by line and passes each line to a second filter, which is parallel and runs your computation/processing function in a multi-threaded mode. The last stage/filter is also serial and it does ouput. I'm copy-pasting an example from TBB Tutorial, which seems to be doing exactly what you want to achieve:
// Holds a slice of text.
/** Instances *must* be allocated/freed using methods herein, because the
C++ declaration
represents only the header of a much larger object in memory. */
class TextSlice {
// Pointer to one past last character in sequence
char* logical_end;
// Pointer to one past last available byte in sequence.
char* physical_end;
public:
// Allocate a TextSlice object that can hold up to max_size characters.
static TextSlice* allocate( size_t max_size ) {
// +1 leaves room for a terminating null character.
TextSlice* t = (TextSlice*)tbb::tbb_allocator<char>().allocate(sizeof(TextSlice)+max_size+1 );
t->logical_end = t->begin();
t->physical_end = t->begin()+max_size;
return t;
}
// Free this TextSlice object
void free() {
tbb::tbb_allocator<char>().deallocate((char*)this,
sizeof(TextSlice)+(physical_end-begin())+1);
}
// Pointer to beginning of sequence
char* begin() {return (char*)(this+1);}
// Pointer to one past last character in sequence
char* end() {return logical_end;}
// Length of sequence
size_t size() const {return logical_end-(char*)(this+1);}
// Maximum number of characters that can be appended to sequence
size_t avail() const {return physical_end-logical_end;}
// Append sequence [first,last) to this sequence.
void append( char* first, char* last ) {
memcpy( logical_end, first, last-first );
logical_end += last-first;
}
// Set end() to given value.
void set_end( char* p ) {logical_end=p;}
};
And the function to get this to run is:
void RunPipeline( int ntoken, FILE* input_file, FILE* output_file ) {
tbb::parallel_pipeline(
ntoken,
tbb::make_filter<void,TextSlice*>(
tbb::filter::serial_in_order, MyInputFunc(input_file) )
&
tbb::make_filter<TextSlice*,TextSlice*>(
tbb::filter::parallel, MyTransformFunc() )
&
tbb::make_filter<TextSlice*,void>(
tbb::filter::serial_in_order, MyOutputFunc(output_file) ) );
}
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With