Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Preload data from a file using a separate thread

I have a small application that process a large quantity of (relatively small) files. It runs sequentially: it loads data from a file, perform operations on it, and move to the next file. I noticed that during run time, the CPU usage is not 100%, and I guess this is due to the time taken by the I/O operations on the hard drive.

So the idea would be to load the next data in memory in parallel with the processing of the current data, using a separate thread (the data in question would simply be a sequence of int, stored in a vector). This seems a very common problem, but I have a hard time finding a simple, plain C++ example to do that! And now C++0x is on its way, a simple demo code using the new thread facility, with no external library, would be very nice.

Also, although I know this depends on a lot of things, is it possible to have an educated guess on the benefits (or setbacks) of such an approach, in respect to the size of the data file to load for example? I guess that with large files, the disk I/O operations are very seldom anyway, since the data is already buffered (with fstream(?))

Olivier

like image 465
OlivierB Avatar asked Aug 20 '11 12:08

OlivierB


2 Answers

A toy program on how to use some C++0x threading and synchronization facilities. No idea on what the performance of this (I recommend Matt's answer), my focus is on clarity and correctness for the sake of making an example.

The files are read separately, as you requested. They're not converted to a sequence of int however, as I feel this is more related to processing rather than strict I/O. So the files are dumped into a plain std::string.

#include <fstream>
#include <sstream>
#include <string>
#include <vector>
#include <deque>
#include <future>
#include <mutex>
#include <condition_variable>

int
main()
{
    // this is shared
    std::mutex mutex;
    std::condition_variable condition;
    bool more_to_process = true;
    std::deque<std::string> to_process;

    /* Reading the files is done asynchronously */
    std::vector<std::string> filenames = /* initialize */
    auto process = std::async(std::launch::async, [&](std::vector<std::string> filenames)
    {
        typedef std::lock_guard<std::mutex> lock_type;
        for(auto&& filename: filenames) {
            std::ifstream file(filename);
            if(file) {
                std::ostringstream stream;
                stream << file.rdbuf();
                if(stream) {
                    lock_type lock(mutex);
                    to_process.push_back(stream.str());
                    condition.notify_one();
                }
            }
        }
        lock_type lock(mutex);
        more_to_process = false;
        condition.notify_one();
    }, std::move(filenames));

    /* processing is synchronous */
    for(;;) {
        std::string file;
        {
            std::unique_lock<std::mutex> lock(mutex);
            condition.wait(lock, [&]
            { return !more_to_process || !to_process.empty(); });

            if(!more_to_process && to_process.empty())
                break;
            else if(to_process.empty())
                continue;

            file = std::move(to_process.front());
            to_process.pop_front();
        }

        // use file here
    }

    process.get();
}

Some notes:

  • the mutex, condition variable, stop flag and std::string container are all logically related. You may as well replace them with a thread-safe container/channel
  • I use std::async instead of std::thread because it has better exception-safety characteristics
  • there is no error handling to speak of; if a file can't be read for some reason, it is silently skipped. You have several options: signal that there is no more to process and throw to handle as soon as possible; or use a boost::variant<std::string, std::exception_ptr> to pass the error on to the processing side of things (here the error is passed as an exception but you can use an error_code or anything you fancy). Not an exhaustive list by any means.
like image 165
Luc Danton Avatar answered Oct 18 '22 02:10

Luc Danton


Use of threading for an IO bound problem like this will give you negligible performance gains. You may fill some "gaps" in your desire to saturate the available IO resources by opening several files ahead in advance, and by overlapping system calls, via threads as you've indicated.

I would recommend you instead look at giving the kernel hints about how you intend to do IO, which will improve read ahead, and improving the physical read bandwidth, such as by verifying that the file-system, kernel, and hard drive (or whatever your storage source is) is as fast as possible.

  • posix_fadvise()
  • posix_madvise()
  • readahead()
like image 2
Matt Joiner Avatar answered Oct 18 '22 01:10

Matt Joiner