Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Parallel pipelining

 (fileNameToCharStream "bigfile"
 |>> fuse [length;
           splitBy (fun x -> x = ' ' || x = '\n') removeEmpty |>> length;
           splitBy (fun x -> x = '\n') keepEmpty |>> length;
         ])
  (*fuse "fuses" the three functions to run concurrently*)
 |> run 2  (*forces to run in parallel on two threads*)
 |> (fun [num_chars; num_words; num_lines] -> 
       printfn "%d %d %d"
           num_chars num_words, num_lines))

I want to make this code work in the following way: split the original stream into two exactly in the middle; then for each half run a separate computation that computes 3 things: the length (i.e. number of chars), the number of words, the number of lines. However, I do not want to have a problem if I erroneously split over a word. This has to be taken care of. The file should be read only once.

How should I program the functions specified and the operator |>>? Is it possible?

like image 806
Stefan Savev Avatar asked Sep 30 '09 02:09

Stefan Savev


1 Answers

It looks like your asking for quite a bit. I'll leave it up to you to figure out the string manipulation, but I'll show you how to define an operator which executes a series of operations in parallel.

Step 1: Write a fuse function

Your fuse function appears to map a single input using multiple functions, which is easy enough to write as follows:

//val fuse : seq<('a -> 'b)> -> 'a -> 'b list
let fuse functionList input = [ for f in functionList -> f input]

Note that all of your mapping functions need to have the same type.

Step 2: Define operator to execute functions in parallel

The standard parallel map function can be written as follows:

//val pmap : ('a -> 'b) -> seq<'a> -> 'b array
let pmap f l =
    seq [for a in l -> async { return f a } ]
    |> Async.Parallel
    |> Async.RunSynchronously

To my knowledge, Async.Parallel will execute async operations in parallel, where the number of parallel tasks executing at any given time is equal to the number of cores on a machine (someone can correct me if I'm wrong). So on a dual core machine, we should have at most 2 threads running on my machine when this function is called. This is a good thing, since we don't expect any speedup by running more than one thread per core (in fact the extra context switching might slow things down).

We can define an operator |>> in terms of pmap and fuse:

//val ( |>> ) : seq<'a> -> seq<('a -> 'b)> -> 'b list array
let (|>>) input functionList = pmap (fuse functionList) input

So the |>> operator takes a bunch of inputs and maps them using lots of different outputs. So far, if we put all this together, we get the following (in fsi):

> let countOccurrences compareChar source =
    source |> Seq.sumBy(fun c -> if c = compareChar then 1 else 0)

let length (s : string) = s.Length

let testData = "Juliet is awesome|Someone should give her a medal".Split('|')
let testOutput =
    testData
    |>> [length; countOccurrences 'J'; countOccurrences 'o'];;

val countOccurrences : 'a -> seq<'a> -> int
val length : string -> int
val testData : string [] =
  [|"Juliet is awesome"; "Someone should give her a medal"|]
val testOutput : int list array = [|[17; 1; 1]; [31; 0; 3]|]

testOutput contains two elements, both of which were computed in parallel.

Step 3: Aggregate elements into a single output

Alright, so now we have partial results represented by each element in our array, and we want to merge our partial results into a single aggregate. I assume each element in the array should be merged the same function, since each element in the input has the same datatype.

Here's a really ugly function I wrote for the job:

> let reduceMany f input =
    input
    |> Seq.reduce (fun acc x -> [for (a, b) in Seq.zip acc x -> f a b ]);;

val reduceMany : ('a -> 'a -> 'a) -> seq<'a list> -> 'a list

> reduceMany (+) testOutput;;
val it : int list = [48; 1; 4]

reduceMany takes sequence of n-length sequences, and it returns an n-length array as an output. If you can think of a better way to write this function, be my guest :)

To decode the output above:

  • 48 = sum of the lengths of my two input strings. Note the original string was 49 chars, but splitting it on the "|" ate up one char per "|".
  • 1 = sum of all instances of 'J' in my input
  • 4 = sum of all instances of 'O'.

Step 4: Put everything together

let pmap f l =
    seq [for a in l -> async { return f a } ]
    |> Async.Parallel
    |> Async.RunSynchronously

let fuse functionList input = [ for f in functionList -> f input]

let (|>>) input functionList = pmap (fuse functionList) input

let reduceMany f input =
    input
    |> Seq.reduce (fun acc x -> [for (a, b) in Seq.zip acc x -> f a b ])

let countOccurrences compareChar source =
    source |> Seq.sumBy(fun c -> if c = compareChar then 1 else 0)

let length (s : string) = s.Length

let testData = "Juliet is awesome|Someone should give her a medal".Split('|')
let testOutput =
    testData
    |>> [length; countOccurrences 'J'; countOccurrences 'o']
    |> reduceMany (+)
like image 147
Juliet Avatar answered Oct 04 '22 03:10

Juliet