I'm just started to learn Haskell and my question may be a trivial one, so please sorry for disturb.
So, imagine I have a real infinite data source (stdin for instance) and I want to process data with use of parallel facilities of Repa or Accelerate. Both of them used arrays (finite data structures). It seems to me that will deal with sequence of arrays in my code, but I really don't know what to do then I need some data from one array and the next one to calculate something.
So, I mean, how can I calculate y = phase(a[i] * conj a[i+1]) for the all input, where a is infinite stream of complex numbers.
Repa and Accelerate, as you've mentioned, are both designed to work on fixed size arrays and matrices. These are batch-oriented algorithms and they are based around many optimizations which are only available when working from fixed bits of memory.
For "infinite" data you need a streaming (or "online") algorithm. The one you've described is streamable as it only requires a finite window. You could translate a streaming algorithm into a large number of small batch steps by computing the phase using Repa for each and every pair of inputs (a[i], a[i+1]) that stream in, but you may lose out on speed due to the large amount of boxing and copying required to move each small chunk to the contiguous memory array.
To compare, you can use something like pipes to generate a fast streaming algorithm.
import Pipes
import Data.Complex
type ComplexPair = (Complex Double, Complex Double)
pairOff :: Monad m => Pipe a (a, a) m r
pairOff = await >>= forever . go where
go x = await >>= \y -> yield (x, y)
compute :: Monad m => Pipe ComplexPair Double m r
compute = do
(ai, ai1) <- await
yield (phase $ ai * conjugate ai1)
run :: Monad m => Pipe (Complex Double) Double m r
run = pairOff >-> compute
Then run can be fed by an infinite streaming input source and dump into whatever the next step of your processing pipeline is. These sources can be Monadic or, if m ~ Identity, pure.
As far as I know, you can't use Repa or Accelerate directly with an infinite stream of data. It's designed to operate on vectors, which is a different kind of data structure. There are ways around this, and you have a few options for how to do it:
Read in a fixed number of values, construct a vector with it, perform the calculations, and push the values downstream again, iteratively processing chunks of data at a time. You would have to figure out what you want the chunk size to be, but it would give you the optimal performance during the calculation. Unfortunately, this means you'll be losing efficiency for all those read-construct steps. In fact, that would be the step I would be most worried about parallelizing, since you could be performing those steps while the previous chunk is being processed.
Forget about using a vector library and turn to the Pipes library. It is designed to process streams of data in constant memory and gets pretty good performance. With the pipes-concurrency package, you can write a stream processor that reads in data and processes it in parallel. You won't have the efficiency of vectors, but it'll be easier to handle your infinite stream of data efficiently while using minimal RAM.
Implement it with lists first, lazily processing each element and using regular Haskell functions to do so. You won't get the benefits of speed or memory usage, but it'll be easy to write. Once you have the concept down, you can decide if you need the efficiency of vectors or streams and then run a benchmarking tool to determine where your bottlenecks are before trying to optimize.
Personally, I would start with 2. or 3., then figure out if my program is slow enough to warrant using vectors. It may be that you could get that extra performance from vectors, but unless you're utilizing a GPU, this isn't likely.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With