Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Parallel top ten algorithm for distributed data

This is an interview question. Suppose there are a few computers and each computer keeps a very large log file of visited URLs. Find the top ten most visited URLs.

For example: Suppose there are only 3 computers and we need the top two most visited URLs.

Computer A: url1, url2, url1, url3
Computer B: url4, url2, url1, url1
Computer C: url3, url4, url1, url3

url1 appears 5 times in all logs
url2 2
url3 3
url4 2 

So the answer is url1, url3

The log files are too large to fit in RAM and copy them by network. As I understand, it is important also to make the computation parallel and use all given computers.

How would you solve it?

like image 347
Michael Avatar asked Mar 25 '13 11:03

Michael


People also ask

What is parallel and distributed algorithms in data mining?

The enormity and high dimensionality of datasets typically available as input to the problem of association rule discovery, makes it an ideal problem for solving multiple processors in parallel. The primary reasons are the memory and CPU speed limitations faced by single processors.

What is parallel algorithm example?

Examples include many algorithms to solve Rubik's Cubes and find values which result in a given hash. Some problems cannot be split up into parallel portions, as they require the results from a preceding step to effectively carry on with the next step – these are called inherently serial problems.

Which algorithm is a distributed algorithm?

Distributed algorithms are a sub-type of parallel algorithm, typically executed concurrently, with separate parts of the algorithm being run simultaneously on independent processors, and having limited information about what the other parts of the algorithm are doing.

What is the relationship of parallel and distributed computing to algorithm?

Distributed computing is often used in tandem with parallel computing. Parallel computing on a single computer uses multiple processors to process tasks in parallel, whereas distributed parallel computing uses multiple computing devices to process those tasks.


2 Answers

This is a pretty standard problem for which there is a well-known solution. You simply sort the log files on each computer by URL and then merge them through a priority queue of size k (the number of items you want) on the "master" computer. This technique has been around since the 1960s, and is still in use today (although slightly modified) in the form of MapReduce.

On each computer, extract the URL and the count from the log file, and sort by URL. Because the log files are larger than will fit into memory, you need to do an on-disk merge. That entails reading a chunk of the log file, sorting by URL, writing the chunk to disk. Reading the next chunk, sorting, writing to disk, etc. At some point, you have M log file chunks, each sorted. You can then do an M-way merge. But instead of writing items to disk, you present them, in sorted order (sorted by URL, that is), to the "master".

Each machine sorts its own log.

The "master" computer merges the data from the separate computers and does the top K selection. This is actually two problems, but can be combined into one.

The master creates two priority queues: one for the merge, and one for the top K selection. The first is of size N, where N is the number of computers it's merging data from. The second is of size K: the number of items you want to select. I use a min heap for this, as it's easy and reasonably fast.

To set up the merge queue, initialize the queue and get the first item from each of the "worker" computers. In the pseudo-code below, "get lowest item from merge queue" means getting the root item from the merge queue and then getting the next item from whichever working computer presented that item. So if the queue contains [1, 2, 3], and the items came from computers B, C, A (in that order), then taking the lowest item would mean getting the next item from computer B and adding it to the priority queue.

The master then does the following:

working = get lowest item from merge queue
while (items left to merge)
{
    temp = get lowest item from merge queue
    while (temp.url == working.url)
    {
        working.count += temp.count
        temp = get lowest item from merge queue
    }
    // Now have merged counts for one url.
    if (topK.Count < desired_count)
    {
        // topK queue doesn't have enough items yet.
        // so add this one.
        topK.Add(working);
    }
    else if (topK.Peek().count < working.count)
    {
        // the count for this url is larger
        // than the smallest item on the heap
        // replace smallest on the heap with this one
        topK.RemoveRoot()
        topK.Add(working)
    }
    working = temp;
}
// Here you need to check the last item:
if (topK.Peek().count < working.count)
{
    // the count for this url is larger
    // than the smallest item on the heap
    // replace smallest on the heap with this one
    topK.RemoveRoot()
    topK.Add(working)
}

At this point, the topK queue has the K items with the highest counts.

So each computer has to do a merge sort, which is O(n log n), where n is the number of items in that computer's log. The merge on the master is O(n), where n is the sum of all the items from the individual computers. Picking the top k items is O(n log k), where n is the number of unique urls.

The sorts are done in parallel, of course, with each computer preparing its own sorted list. But the "merge" part of the sort is done at the same time the master computer is merging, so there is some coordination, and all machines are involved at that stage.

like image 54
Jim Mischel Avatar answered Oct 17 '22 15:10

Jim Mischel


Given the scale of the log files and the generic nature of the question, this is quite a difficult problem to solve. I do not think that there is one best algorithm for all situations. It depends on the nature of the contents of the log files. For example, take the corner case that all URLs are all unique in all log files. In that case, basically any solution will take a long time to draw that conclusion (if it even gets that far...), and there is not even an answer to your question because there is no top-ten.

I do not have a watertight algorithm that I can present, but I would explore a solution that uses histograms of hash values of the URLs as opposed to the URLs themselves. These histograms can be calculated by means of one-pass file reads, so it can deal with arbitrary size log files. In pseudo-code, I would go for something like this:

  • Use a hash function with a limited target space (say 10,000, note that colliding hash-values are expected) to calculate the hash value of each item in the log file and count how many times each of the has value occurs. Communicate the resulting histogram to a server (although it is probably also possible to avoid a central server at all by multicasting the result to every other node -- but I will stick with the more obvious server-approach here)
  • The server should merge the histograms and communicate the result back. Depending on the distribution of the URLs, there might be a number of clearly visible peaks already, containing the top-visited URLs.
  • Each of the nodes should then focus on the peaks in the histogram. It should go trough its log file again, use an additional hash function (again with a limited target space) to calculate a new hash-histogram for those URLs that have their first hash value in one of the peaks (the number of peaks to focus on would be a parameter to be tuned in the algorithm, depending on the distribution of the URLs), and calculate a second histogram with the new hash values. The result should be communicated to the server.
  • The server should merge the results again and analyse the new histogram versus the original histogram. Depending on clearly visible peaks, it might be able to draw conclusions about the two hash values of the top ten URLs already. Or it might have to instruct the machines to calculate more hash values with the second hash function, and probably after that go through a third pass of hash-calculations with yet another hash function. This has to continue until a conclusion can be drawn from the collective group of histograms what the hash values of the peak URLs are, and then the nodes can identify the different URLs from that.

Note that this mechanism will require tuning and optimization with regard to several aspects of the algorithm and hash-functions. It will also need orchestration by the server as to which calculations should be done at any time. It probably will also need to set some boundaries in order to conclude when no conclusion can be drawn, in other words when the "spectrum" of URL hash values is too flat to make it worth the effort to continue calculations.

This approach should work well if there is a clear distribution in the URLs though. I suspect that, practically speaking, the question only makes sense in that case anyway.

like image 23
Reinier Torenbeek Avatar answered Oct 17 '22 14:10

Reinier Torenbeek