Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Hughes' Fibonacci stream

I am trying to understand the "Streams as arrows" section in John Hughes' famous "Generalising Arrows to Monads". To be more precise, I am interested in writing down the Fibonacci stream.

I tweaked Hughes' definition a bit:

data StreamProcessor a b = Get (a -> StreamProcessor a b) | 
                           Put b (StreamProcessor a b) |
                           Halt
put = Put
get = Get

First of all, I treat stream processors as lists which may block (waiting for input). That is:

  • Put :: b -> StreamProcessor a b -> StreamProcessor a b matches (:) :: a -> [a] -> [a];
  • Halt :: StreamProcessor a b matches [] :: [a];
  • Get :: (a -> StreamProcessor a b) -> StreamProcessor a b helps us block the stream and wait for input.

Therefore, if we drop the Get we get a list-like structure. If we also drop Halt we get an infinite-list-like structure.

Here are two ways I would understand "a stream of Fibonaccis":

  • a non-blocked infinite stream (infinite-list-like):

    zipNonBlockedStreamsWith :: (a -> b -> c) 
                                -> StreamProcessor () a 
                                -> StreamProcessor () b
                                -> StreamProcessor () c
    zipNonBlockedStreamsWith f (Put x sp) (Put y sp') 
     = Put (f x y) (zipNonBlockedStreamsWith f sp sp')
    zipNonBlockedStreamsWith f Halt       sp          = Halt  
    zipNonBlockedStreamsWith f sp         Halt        = Halt
    
    fibs :: StreamProcessor () Int
    fibs = 
     put 0 (put 1 $ zipNonBlockedStreamsWith (+) fibs (tailNonBlockedStream fibs))
    
    -- matches a well-known definition of an infinite Fibonacci list.
    fibsList :: [Int]
    fibsList = 0 : 1 : (zipWith (+) fibsList (tail fibsList))
    
    -- with the 'fibsList', we can use folds to do the same thing.
    putStream :: [b] -> StreamProcessor a b -> StreamProcessor a b
    putStream bs sp = foldr Put sp bs
    
    fibs' :: StreamProcessor () Int
    fibs' = putStream fibsList Halt
    
  • A blocked stream waits for n, outputs the nth Fibonacci number and blocks again. Hughes' Arrow interface helps us express it in a quite concise way:

    instance Category StreamProcessor where
      ...
    
    instance Arrow StreamProcessor where
      arr f = Get $ \ a -> Put (f a) (arr f)
      ...
    
    fibsList :: [Int]
    fibsList = 0 : 1 : (zipWith (+) fibsList (tail fibsList))
    
    blockedFibs :: StreamProcessor Int Int
    blockedFibs = arr (fibsList !!)
    

Yet, in the paper I linked John Hughes shows another solution, Arrowing his way through:

instance Category StreamProcessor where
  id = Get (\ x -> Put x id)
  
  Put c bc . ab = Put c (bc . ab)
  Get bbc . Put b ab = (bbc b) . ab
  Get bbc . Get aab = Get $ \ a -> (Get bbc) . (aab a)
  Get bbc . Halt = Halt
  Halt . ab = Halt

bypass :: [b] -> [d] -> StreamProcessor b c -> StreamProcessor (b, d) (c, d)
bypass [] ds (Get f)          = Get $ \ ~(b, d) -> bypass [] (ds ++ [d]) (f b)
bypass (b : bs) [] (Get f)    = bypass bs [] (f b)
bypass [] (d : ds) (Put c sp) = Put (c, d) $ bypass [] ds sp
bypass bs [] (Put c sp) =      
  Get $ \ ~(b, d) -> Put (c, d) (bypass (bs ++ [b]) [] sp)
bypass bs ds Halt             = Halt

instance Arrow StreamProcessor where
  arr f = Get $ \ a -> Put (f a) (arr f)
  first = bypass [] []

liftArr2 :: Arrow k => (a -> b -> c) -> k r a -> k r b -> k r c
liftArr2 f a b = a &&& b >>^ uncurry f

fibsHughes = let 
              fibsHughes' = put 1 (liftArr2 (+) fibsHughes fibsHughes')
             in put 0 fibsHughes'

But it does not work the way I expect. The following function would help us take the values from the stream until it blocks or halts (using Data.List.unfoldr):

popToTheBlockOrHalt :: StreamProcessor a b -> [b]
popToTheBlockOrHalt = let
                         getOutput (Put x sp) = Just (x, sp)
                         getOutput getOrHalt  = Nothing
                      in unfoldr getOutput

So, here is what we get:

GHCi> popToTheBlockOrHalt fibsHughes
[0, 1]
GHCi> :t fibsHughes
fibsHughes :: StreamProcessor a Integer

If we check the patterns, we would see that it blocks (that is we stumble into Get).

I cannot tell whether it should be that way. If it is what we want, why? If not, what is the problem? I checked and rechecked the code I wrote and it pretty much matches the definitions in Hughes' article (well, I had to add id and patterns for Halt - I cannot see how they could have messed the process up).


Edit: As it is said in the comments, in the later edition of the paper bypass was slightly changed (we use that one). It is able to accumulate both withheld bs and ds (that is has two queues), whereas the bypass from the original paper accumulates just ds (that is one queue).


Edit #2: I just wanted to write down a function which would pop the Fibonacci numbers from the fibsHughes:

popToTheHaltThroughImproperBlocks :: StreamProcessor () b -> [b]
popToTheHaltThroughImproperBlocks = let
                                       getOutput (Put x sp) = Just (x, sp)
                                       getOutput (Get c)    = getOutput $ c ()
                                       getOutput Halt       = Nothing
                                    in unfoldr getOutput

And here we go:

GHCi> (take 10 . popToTheHaltThroughImproperBlocks) fibsHughes 
[0,1,1,2,3,5,8,13,21,34]

like image 402
Zhiltsoff Igor Avatar asked Aug 08 '20 21:08

Zhiltsoff Igor


2 Answers

The issue is with the paper. Where exactly the blame lies is largely a matter of subjective interpretation. I think it's an overlooked bug in the paper due to the type StreamProcessor not being as intuitive as it may seem.

To first focus on the concrete example of the fibsHughes stream, it indeed contains Get, but they are constant functions. You must feed some arguments to access the rest of the stream. In a way, the "true" type of fibsHughes is SP () b whereas what you might intuitively want is SP Void b to encode the absence of Get (which doesn't quite work that way, and that's kinda the source of the problem), and "feeding" it input is how you get from one to the other:

type SP = StreamProcessor

feed :: SP () b -> SP Void b
feed p = produceForever () >>> p

produceForever :: b -> SP Void b
produceForever b = Put b (produceForever b)

fibsHughes :: SP Void b
fibsHughes = feed (... {- rest of the definition -})

Now to see how we got into this situation, we have to go back to the definition of first. My opinion is that it is a questionable operation on streams to define in the first place, because it has to introduce Get actions to be able to produce the second component of the pairs as output:

first ::: SP a b -> SP (a, c) (b, c)

The problematic part is the following branch in the definition of bypass, which introduces the Get to then be able to Put:

bypass bs [] (Put c sp) =      
  Get $ \ ~(b, d) -> Put (c, d) (bypass (bs ++ [b]) [] sp)

It is what you need to do if you want to write something of the expected type, but it is arguably not a natural thing to do.

Having defined first is what leads to defining and using the (&&&) operator, which has unintuitive semantics. To see why it's unintuitive, specialize (&&&) with Void as the stream input type:

(&&&) :: SP Void b -> SP Void c -> SP Void (b, c)

Anyone who looks at this would think that, of course, the result must be a producer, which never Gets anything, that would be absurd. Except that (&&&) does the absurd thing; thus specialized to Void, it is morally equivalent to the following (ignoring the existence of undefined which can technically be used to tell them apart in Haskell):

_ &&& _ = Get (absurd :: Void -> SP a c)

There is a more natural definition of (&&&) by recursion on streams which avoids that issue: if the two arguments never do any Get, then the result never does any Get either.

As far as I can tell, this "better" (&&&) cannot be defined using first, (>>>) and arr.

However, it comes at cost: it is not intuitive from the point of view of a graphical interpretation of arrows, because it breaks this equation (which can be drawn graphically as "sliding" f out of &&&):

f &&& g   =   (id &&& g) >>> first f

Whichever definition of (&&&) you choose, it is going to confuse someone.


IMO it comes down to the type StreamProcessor not being able to rule out the use of Get. Even if the input type is Void, a stream can still do a vacuous Get.

A better type of stream processor without such definitional issues is the one from the pipes library (called Proxy). In particular, it differs from SP because it can forbid the use of Get, and that provides a faithful encoding of true "producers" such as the Fibonacci stream.

like image 149
Li-yao Xia Avatar answered Sep 21 '22 01:09

Li-yao Xia


I think one confusion you may have is that, while you can find a correlation between stream processors and lists that may block, they are not exactly the same. Morally, a StreamProcessor a b consumes a stream of a and produces a stream of b. (One odd thing about Hughes' paper is that he doesn't explicitly define what a stream is.) What this boils down to is that your popToTheBlockOrHalt function never actually provides an input stream, but it still expects the given stream processor to produce an output stream.

Another thing to keep in mind is that in Hughes' paper, there is no Halt — stream processors are infinite and work on infinite streams. So, for a so-called "producer" like fibsHughes (that is, a stream processor whose input stream is arbitrary), there really is no "blocking" going on even if there are Gets hidden internally because there is always more input from the input stream — it's infinite!

So, what you need to work with these stream processors is a way to run them, or to turn something of the form StreamProcessor a b into a function from a stream of a to a stream of b. Because your version allows the streams to be finite, it makes sense to use regular old lists as your "stream" type. Thus, you want a function with a type like:

runStreamProcessor :: StreamProcessor a b -> [a] -> [b]

There is a natural definition for this:

runStreamProcessor Halt _ = []
runStreamProcessor (Put x s) xs = x : runStreamProcessor s xs
runStreamProcessor _ [] = []
runStreamProcessor (Get f) (x:xs) = runStreamProcessor (f x) xs

Now, you can consider the type of runStreamProcessor fibsHughes :: [a] -> [Integer] and realize that, naturally, you must supply, e.g., repeat () in order to guarantee an infinite output stream. This works:

> take 10 $ runStreamProcessor fibsHughes (repeat ())
[0,1,1,2,3,5,8,13,21,34]
like image 37
DDub Avatar answered Sep 22 '22 01:09

DDub