I am trying to understand the "Streams as arrows" section in John Hughes' famous "Generalising Arrows to Monads". To be more precise, I am interested in writing down the Fibonacci stream.
I tweaked Hughes' definition a bit:
data StreamProcessor a b = Get (a -> StreamProcessor a b) |
Put b (StreamProcessor a b) |
Halt
put = Put
get = Get
First of all, I treat stream processors as lists which may block (waiting for input). That is:
Put :: b -> StreamProcessor a b -> StreamProcessor a b
matches (:) :: a -> [a] -> [a]
;Halt :: StreamProcessor a b
matches [] :: [a]
;Get :: (a -> StreamProcessor a b) -> StreamProcessor a b
helps us block the stream and wait for input.Therefore, if we drop the Get
we get a list-like structure. If we also drop Halt
we get an infinite-list-like structure.
Here are two ways I would understand "a stream of Fibonaccis":
a non-blocked infinite stream (infinite-list-like):
zipNonBlockedStreamsWith :: (a -> b -> c)
-> StreamProcessor () a
-> StreamProcessor () b
-> StreamProcessor () c
zipNonBlockedStreamsWith f (Put x sp) (Put y sp')
= Put (f x y) (zipNonBlockedStreamsWith f sp sp')
zipNonBlockedStreamsWith f Halt sp = Halt
zipNonBlockedStreamsWith f sp Halt = Halt
fibs :: StreamProcessor () Int
fibs =
put 0 (put 1 $ zipNonBlockedStreamsWith (+) fibs (tailNonBlockedStream fibs))
-- matches a well-known definition of an infinite Fibonacci list.
fibsList :: [Int]
fibsList = 0 : 1 : (zipWith (+) fibsList (tail fibsList))
-- with the 'fibsList', we can use folds to do the same thing.
putStream :: [b] -> StreamProcessor a b -> StreamProcessor a b
putStream bs sp = foldr Put sp bs
fibs' :: StreamProcessor () Int
fibs' = putStream fibsList Halt
A blocked stream waits for n
, outputs the n
th Fibonacci number and blocks again. Hughes' Arrow
interface helps us express it in a quite concise way:
instance Category StreamProcessor where
...
instance Arrow StreamProcessor where
arr f = Get $ \ a -> Put (f a) (arr f)
...
fibsList :: [Int]
fibsList = 0 : 1 : (zipWith (+) fibsList (tail fibsList))
blockedFibs :: StreamProcessor Int Int
blockedFibs = arr (fibsList !!)
Yet, in the paper I linked John Hughes shows another solution, Arrow
ing his way through:
instance Category StreamProcessor where
id = Get (\ x -> Put x id)
Put c bc . ab = Put c (bc . ab)
Get bbc . Put b ab = (bbc b) . ab
Get bbc . Get aab = Get $ \ a -> (Get bbc) . (aab a)
Get bbc . Halt = Halt
Halt . ab = Halt
bypass :: [b] -> [d] -> StreamProcessor b c -> StreamProcessor (b, d) (c, d)
bypass [] ds (Get f) = Get $ \ ~(b, d) -> bypass [] (ds ++ [d]) (f b)
bypass (b : bs) [] (Get f) = bypass bs [] (f b)
bypass [] (d : ds) (Put c sp) = Put (c, d) $ bypass [] ds sp
bypass bs [] (Put c sp) =
Get $ \ ~(b, d) -> Put (c, d) (bypass (bs ++ [b]) [] sp)
bypass bs ds Halt = Halt
instance Arrow StreamProcessor where
arr f = Get $ \ a -> Put (f a) (arr f)
first = bypass [] []
liftArr2 :: Arrow k => (a -> b -> c) -> k r a -> k r b -> k r c
liftArr2 f a b = a &&& b >>^ uncurry f
fibsHughes = let
fibsHughes' = put 1 (liftArr2 (+) fibsHughes fibsHughes')
in put 0 fibsHughes'
But it does not work the way I expect. The following function would help us take the values from the stream until it blocks or halts (using Data.List.unfoldr
):
popToTheBlockOrHalt :: StreamProcessor a b -> [b]
popToTheBlockOrHalt = let
getOutput (Put x sp) = Just (x, sp)
getOutput getOrHalt = Nothing
in unfoldr getOutput
So, here is what we get:
GHCi> popToTheBlockOrHalt fibsHughes
[0, 1]
GHCi> :t fibsHughes
fibsHughes :: StreamProcessor a Integer
If we check the patterns, we would see that it blocks (that is we stumble into Get
).
I cannot tell whether it should be that way. If it is what we want, why? If not, what is the problem? I checked and rechecked the code I wrote and it pretty much matches the definitions in Hughes' article (well, I had to add id
and patterns for Halt
- I cannot see how they could have messed the process up).
Edit: As it is said in the comments, in the later edition of the paper bypass
was slightly changed (we use that one). It is able to accumulate both withheld b
s and d
s (that is has two queues), whereas the bypass
from the original paper accumulates just d
s (that is one queue).
Edit #2: I just wanted to write down a function which would pop the Fibonacci numbers from the fibsHughes
:
popToTheHaltThroughImproperBlocks :: StreamProcessor () b -> [b]
popToTheHaltThroughImproperBlocks = let
getOutput (Put x sp) = Just (x, sp)
getOutput (Get c) = getOutput $ c ()
getOutput Halt = Nothing
in unfoldr getOutput
And here we go:
GHCi> (take 10 . popToTheHaltThroughImproperBlocks) fibsHughes
[0,1,1,2,3,5,8,13,21,34]
The issue is with the paper. Where exactly the blame lies is largely a matter of subjective interpretation. I think it's an overlooked bug in the paper due to the type StreamProcessor
not being as intuitive as it may seem.
To first focus on the concrete example of the fibsHughes
stream, it indeed contains Get
, but they are constant functions. You must feed some arguments to access the rest of the stream. In a way, the "true" type of fibsHughes
is SP () b
whereas what you might intuitively want is SP Void b
to encode the absence of Get
(which doesn't quite work that way, and that's kinda the source of the problem), and "feeding" it input is how you get from one to the other:
type SP = StreamProcessor
feed :: SP () b -> SP Void b
feed p = produceForever () >>> p
produceForever :: b -> SP Void b
produceForever b = Put b (produceForever b)
fibsHughes :: SP Void b
fibsHughes = feed (... {- rest of the definition -})
Now to see how we got into this situation, we have to go back to the definition of first
. My opinion is that it is a questionable operation on streams to define in the first place, because it has to introduce Get
actions to be able to produce the second component of the pairs as output:
first ::: SP a b -> SP (a, c) (b, c)
The problematic part is the following branch in the definition of bypass
, which introduces the Get
to then be able to Put
:
bypass bs [] (Put c sp) =
Get $ \ ~(b, d) -> Put (c, d) (bypass (bs ++ [b]) [] sp)
It is what you need to do if you want to write something of the expected type, but it is arguably not a natural thing to do.
Having defined first
is what leads to defining and using the (&&&)
operator, which has unintuitive semantics. To see why it's unintuitive, specialize (&&&)
with Void
as the stream input type:
(&&&) :: SP Void b -> SP Void c -> SP Void (b, c)
Anyone who looks at this would think that, of course, the result must be a producer, which never Get
s anything, that would be absurd. Except that (&&&)
does the absurd thing; thus specialized to Void
, it is morally equivalent to the following (ignoring the existence of undefined
which can technically be used to tell them apart in Haskell):
_ &&& _ = Get (absurd :: Void -> SP a c)
There is a more natural definition of (&&&)
by recursion on streams which avoids that issue: if the two arguments never do any Get
, then the result never does any Get
either.
As far as I can tell, this "better" (&&&)
cannot be defined using first
, (>>>)
and arr
.
However, it comes at cost: it is not intuitive from the point of view of a graphical interpretation of arrows, because it breaks this equation (which can be drawn graphically as "sliding" f
out of &&&
):
f &&& g = (id &&& g) >>> first f
Whichever definition of (&&&)
you choose, it is going to confuse someone.
IMO it comes down to the type StreamProcessor
not being able to rule out the use of Get
. Even if the input type is Void
, a stream can still do a vacuous Get
.
A better type of stream processor without such definitional issues is the one from the pipes library (called Proxy
). In particular, it differs from SP
because it can forbid the use of Get
, and that provides a faithful encoding of true "producers" such as the Fibonacci stream.
I think one confusion you may have is that, while you can find a correlation between stream processors and lists that may block, they are not exactly the same. Morally, a StreamProcessor a b
consumes a stream of a
and produces a stream of b
. (One odd thing about Hughes' paper is that he doesn't explicitly define what a stream is.) What this boils down to is that your popToTheBlockOrHalt
function never actually provides an input stream, but it still expects the given stream processor to produce an output stream.
Another thing to keep in mind is that in Hughes' paper, there is no Halt
— stream processors are infinite and work on infinite streams. So, for a so-called "producer" like fibsHughes
(that is, a stream processor whose input stream is arbitrary), there really is no "blocking" going on even if there are Get
s hidden internally because there is always more input from the input stream — it's infinite!
So, what you need to work with these stream processors is a way to run them, or to turn something of the form StreamProcessor a b
into a function from a stream of a
to a stream of b
. Because your version allows the streams to be finite, it makes sense to use regular old lists as your "stream" type. Thus, you want a function with a type like:
runStreamProcessor :: StreamProcessor a b -> [a] -> [b]
There is a natural definition for this:
runStreamProcessor Halt _ = []
runStreamProcessor (Put x s) xs = x : runStreamProcessor s xs
runStreamProcessor _ [] = []
runStreamProcessor (Get f) (x:xs) = runStreamProcessor (f x) xs
Now, you can consider the type of runStreamProcessor fibsHughes :: [a] -> [Integer]
and realize that, naturally, you must supply, e.g., repeat ()
in order to guarantee an infinite output stream. This works:
> take 10 $ runStreamProcessor fibsHughes (repeat ())
[0,1,1,2,3,5,8,13,21,34]
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With