(fileNameToCharStream "bigfile" |>> fuse [length; splitBy (fun x -> x = ' ' || x = '\n') removeEmpty |>> length; splitBy (fun x -> x = '\n') keepEmpty |>> length; ]) (*fuse "fuses" the three functions to run concurrently*) |> run 2 (*forces to run in parallel on two threads*) |> (fun [num_chars; num_words; num_lines] -> printfn "%d %d %d" num_chars num_words, num_lines))
I want to make this code work in the following way: split the original stream into two exactly in the middle; then for each half run a separate computation that computes 3 things: the length (i.e. number of chars), the number of words, the number of lines. However, I do not want to have a problem if I erroneously split over a word. This has to be taken care of. The file should be read only once.
How should I program the functions specified and the operator |>>? Is it possible?
It looks like your asking for quite a bit. I'll leave it up to you to figure out the string manipulation, but I'll show you how to define an operator which executes a series of operations in parallel.
Step 1: Write a fuse
function
Your fuse function appears to map a single input using multiple functions, which is easy enough to write as follows:
//val fuse : seq<('a -> 'b)> -> 'a -> 'b list
let fuse functionList input = [ for f in functionList -> f input]
Note that all of your mapping functions need to have the same type.
Step 2: Define operator to execute functions in parallel
The standard parallel map function can be written as follows:
//val pmap : ('a -> 'b) -> seq<'a> -> 'b array
let pmap f l =
seq [for a in l -> async { return f a } ]
|> Async.Parallel
|> Async.RunSynchronously
To my knowledge, Async.Parallel
will execute async operations in parallel, where the number of parallel tasks executing at any given time is equal to the number of cores on a machine (someone can correct me if I'm wrong). So on a dual core machine, we should have at most 2 threads running on my machine when this function is called. This is a good thing, since we don't expect any speedup by running more than one thread per core (in fact the extra context switching might slow things down).
We can define an operator |>>
in terms of pmap
and fuse
:
//val ( |>> ) : seq<'a> -> seq<('a -> 'b)> -> 'b list array
let (|>>) input functionList = pmap (fuse functionList) input
So the |>>
operator takes a bunch of inputs and maps them using lots of different outputs. So far, if we put all this together, we get the following (in fsi):
> let countOccurrences compareChar source =
source |> Seq.sumBy(fun c -> if c = compareChar then 1 else 0)
let length (s : string) = s.Length
let testData = "Juliet is awesome|Someone should give her a medal".Split('|')
let testOutput =
testData
|>> [length; countOccurrences 'J'; countOccurrences 'o'];;
val countOccurrences : 'a -> seq<'a> -> int
val length : string -> int
val testData : string [] =
[|"Juliet is awesome"; "Someone should give her a medal"|]
val testOutput : int list array = [|[17; 1; 1]; [31; 0; 3]|]
testOutput
contains two elements, both of which were computed in parallel.
Step 3: Aggregate elements into a single output
Alright, so now we have partial results represented by each element in our array, and we want to merge our partial results into a single aggregate. I assume each element in the array should be merged the same function, since each element in the input has the same datatype.
Here's a really ugly function I wrote for the job:
> let reduceMany f input =
input
|> Seq.reduce (fun acc x -> [for (a, b) in Seq.zip acc x -> f a b ]);;
val reduceMany : ('a -> 'a -> 'a) -> seq<'a list> -> 'a list
> reduceMany (+) testOutput;;
val it : int list = [48; 1; 4]
reduceMany
takes sequence of n-length sequences, and it returns an n-length array as an output. If you can think of a better way to write this function, be my guest :)
To decode the output above:
Step 4: Put everything together
let pmap f l =
seq [for a in l -> async { return f a } ]
|> Async.Parallel
|> Async.RunSynchronously
let fuse functionList input = [ for f in functionList -> f input]
let (|>>) input functionList = pmap (fuse functionList) input
let reduceMany f input =
input
|> Seq.reduce (fun acc x -> [for (a, b) in Seq.zip acc x -> f a b ])
let countOccurrences compareChar source =
source |> Seq.sumBy(fun c -> if c = compareChar then 1 else 0)
let length (s : string) = s.Length
let testData = "Juliet is awesome|Someone should give her a medal".Split('|')
let testOutput =
testData
|>> [length; countOccurrences 'J'; countOccurrences 'o']
|> reduceMany (+)
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With