Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Idiomatic way to parallelize function across file lines in C++

I find myself frequently writing C++ code of the form:

while (getline(strm, line)) {
    cout << computationally_intensive_function(line) << endl;
}

I would like to parallelize this code. The best solution I have come up with so far is to build vector of strings to hold a large (10000-100000) number of lines, and then parallelize over this vector with

#pragma omp parallel for

Then empty the vector and repeat while lines remain. However, this method requires a lot of memory and the other cores are idle while the main process is buffering strings. Is there a better way? Something like Python's multiprocessing.Pool.map or Hadoop? (I would like to avoid using Hadoop's C++ API however, because Hadoop is rather heavyweight and may not be installed everywhere my code would be running.)

like image 747
gilesc Avatar asked May 21 '12 01:05

gilesc


2 Answers

There exists that not well known feature of OpenMP 3.0 tasks, which is quite unfortunate since they were specifically created to cover cases like this one. If your compiler supports that standard version you should definitely go for OpenMP tasks. But keep in mind that writing to stdout (or std::cout) from multiple threads generally mixes their output badly and you would most likely want to synchronise on it:

#pragma omp parallel
{
    #pragma omp master
    while (getline(strm, line))
    #pragma omp task
    {
        result_type result = computationally_intensive_function(line);
        #pragma omp critical
        {
            cout << result << endl;
            cout.flush();
        }
    }
    #pragma omp taskwait
}

I leave it up to you to decide what variables should be shared and what should be private.

like image 51
Hristo Iliev Avatar answered Nov 12 '22 12:11

Hristo Iliev


You should be overlapping your computations with reading lines from file. One good way to do it would be to use Threading Building Blocks pipeline algorithm. What you do is you specify three (based on what you're showing in the pseudo-code example) filters, two serial one and one parallel. Serial filters are input and output ones. The first one reads data from a file line by line and passes each line to a second filter, which is parallel and runs your computation/processing function in a multi-threaded mode. The last stage/filter is also serial and it does ouput. I'm copy-pasting an example from TBB Tutorial, which seems to be doing exactly what you want to achieve:

// Holds a slice of text.
/** Instances *must* be allocated/freed using methods herein, because the
C++ declaration
represents only the header of a much larger object in memory. */
class TextSlice {
    // Pointer to one past last character in sequence
    char* logical_end;
    // Pointer to one past last available byte in sequence.
    char* physical_end;
public:
    // Allocate a TextSlice object that can hold up to max_size characters.
    static TextSlice* allocate( size_t max_size ) {
        // +1 leaves room for a terminating null character.
        TextSlice* t = (TextSlice*)tbb::tbb_allocator<char>().allocate(sizeof(TextSlice)+max_size+1 );
        t->logical_end = t->begin();
        t->physical_end = t->begin()+max_size;
        return t;
    }
    // Free this TextSlice object
    void free() {
        tbb::tbb_allocator<char>().deallocate((char*)this,
        sizeof(TextSlice)+(physical_end-begin())+1);
    }
    // Pointer to beginning of sequence
    char* begin() {return (char*)(this+1);}
    // Pointer to one past last character in sequence
    char* end() {return logical_end;}
    // Length of sequence
    size_t size() const {return logical_end-(char*)(this+1);}
    // Maximum number of characters that can be appended to sequence
    size_t avail() const {return physical_end-logical_end;}
    // Append sequence [first,last) to this sequence.
    void append( char* first, char* last ) {
        memcpy( logical_end, first, last-first );
        logical_end += last-first;
    }
    // Set end() to given value.
    void set_end( char* p ) {logical_end=p;}
};

And the function to get this to run is:

void RunPipeline( int ntoken, FILE* input_file, FILE* output_file ) {
    tbb::parallel_pipeline(
    ntoken,
    tbb::make_filter<void,TextSlice*>(
    tbb::filter::serial_in_order, MyInputFunc(input_file) )
    &
    tbb::make_filter<TextSlice*,TextSlice*>(
    tbb::filter::parallel, MyTransformFunc() )
    &
    tbb::make_filter<TextSlice*,void>(
    tbb::filter::serial_in_order, MyOutputFunc(output_file) ) );
}
like image 44
Anton Pegushin Avatar answered Nov 12 '22 13:11

Anton Pegushin