Okasaki describes persistent real-time queues which can be realized in Haskell using the type
data Queue a = forall x . Queue
{ front :: [a]
, rear :: [a]
, schedule :: [x]
}
where incremental rotations maintain the invariant
length schedule = length front - length rear
If you're familiar with the queues involved, you can skip this section.
The rotation function looks like
rotate :: [a] -> [a] -> [a] -> [a]
rotate [] (y : _) a = y : a
rotate (x : xs) (y : ys) a =
x : rotate xs ys (y : a)
and it's called by a smart constructor
exec :: [a] -> [a] -> [x] -> Queue a
exec f r (_ : s) = Queue f r s
exec f r [] = Queue f' [] f' where
f' = rotate f r []
after each queue operation. The smart constructor is always called when length s = length f - length r + 1
, ensuring that the pattern match in rotate
will succeed.
I hate partial functions! I'd love to find a way to express the structural invariant in the types. The usual dependent vector seems a likely choice:
data Nat = Z | S Nat
data Vec n a where
Nil :: Vec 'Z a
Cons :: a -> Vec n a -> Vec ('S n) a
and then (perhaps)
data Queue a = forall x rl sl . Queue
{ front :: Vec (sl :+ rl) a
, rear :: Vec rl a
, schedule :: Vec sl x
}
The trouble is that I haven't been able to figure out how to juggle the types. It seems extremely likely that some amount of unsafeCoerce
will be needed to make this efficient. However, I haven't been able to come up with an approach that's even vaguely manageable. Is it possible to do this nicely in Haskell?
persistent queueA feature that you configure to take data that is in an input queue and store it to files on disk. Using a persistent queue can prevent data loss if the forwarder or indexer has too much data to process at one time.
By default, Logstash uses in-memory bounded queues between pipeline stages (inputs → pipeline workers) to buffer events. However, in order to protect against data loss during abnormal termination, Logstash has a persistent queue feature which can be enabled to store the message queue on disk.
The accretion in queue size is simply because the persisting thread is creating objects faster than the polling thread. One way to make a persistent queue in this situation faster is to add more threads to poll the queue. However, when introducing parallelism, atomicity comes into concern.
Logstash has a buffer that will hold events until Elasticsearch is ready again, leaving a single point of failure. If Logstash fails or its buffer overflows, logged events are lost.
Here is what I got:
open import Function
open import Data.Nat.Base
open import Data.Vec
grotate : ∀ {n m} {A : Set}
-> (B : ℕ -> Set)
-> (∀ {n} -> A -> B n -> B (suc n))
-> Vec A n
-> Vec A (suc n + m)
-> B m
-> B (suc n + m)
grotate B cons [] (y ∷ ys) a = cons y a
grotate B cons (x ∷ xs) (y ∷ ys) a = grotate (B ∘ suc) cons xs ys (cons y a)
rotate : ∀ {n m} {A : Set} -> Vec A n -> Vec A (suc n + m) -> Vec A m -> Vec A (suc n + m)
rotate = grotate (Vec _) _∷_
record Queue (A : Set) : Set₁ where
constructor queue
field
{X} : Set
{n m} : ℕ
front : Vec A (n + m)
rear : Vec A m
schedule : Vec X n
open import Relation.Binary.PropositionalEquality
open import Data.Nat.Properties.Simple
exec : ∀ {m n A} -> Vec A (n + m) -> Vec A (suc m) -> Vec A n -> Queue A
exec {m} {suc n} f r (_ ∷ s) = queue (subst (Vec _) (sym (+-suc n m)) f) r s
exec {m} f r [] = queue (with-zero f') [] f' where
with-zero = subst (Vec _ ∘ suc) (sym (+-right-identity m))
without-zero = subst (Vec _ ∘ suc) (+-right-identity m)
f' = without-zero (rotate f (with-zero r) [])
rotate
is defined in terms of grotate
for the same reason reverse
is defined in terms of foldl
(or enumerate
in terms of genumerate
): because Vec A (suc n + m)
is not definitionally Vec A (n + suc m)
, while (B ∘ suc) m
is definitionally B (suc m)
.
exec
has the same implementation as you provided (modulo those subst
s), but I'm not sure about the types: is it OK that r
must be non-empty?
The other answer is super clever (please take a moment to upvote it), but as someone not familiar with Agda, how this would be implemented in Haskell was not obvious to me. Here's a full Haskell version. We'll need a whole slew of extensions, as well as Data.Type.Equality
(since we will need to do some limited amount of type-proofs).
{-# LANGUAGE GADTs, ScopedTypeVariables,RankNTypes,
TypeInType, TypeFamilies, TypeOperators #-}
import Data.Type.Equality
Nat
, Vec
, and Queue
Next, we define the usual type-level natural numbers (this looks like just a regular data
definition, but because we have TypeInType
enabled, it will get automatically promoted when we use it in a type) and a type function (a type family
) for addition. Note that although there are multiple ways of defining +
, our choice here will impact what follows. We'll also define the usual Vec
which is very much like a list except that it encodes its length in the phantom type n
. With that, we can go ahead and define the type of our queue.
data Nat = Z | S Nat
type family n + m where
Z + m = m
S n + m = S (n + m)
data Vec a n where
Nil :: Vec a Z
(:::) :: a -> Vec a n -> Vec a (S n)
data Queue a where
Queue :: { front :: Vec a (n + m)
, rear :: Vec a m
, schedule :: Vec x n } -> Queue a
rotate
Now, things start to get hairier. We want to define a function rotate
that has type rotate :: Vec a n -> Vec a (S n + m) -> Vec a m -> Vec a (S n + m)
, but you quickly run into a variety of proof related problems with just defining this recursively. The solution is instead to define a slightly more general grotate
, which can be defined recursively, and for which rotate
is a special case.
The point of Bump
is to circumvent the fact that there is no such thing as type level composition in Haskell. There is no way of writing things an operator like (∘)
such that (S ∘ S) x
is S (S x)
. The workaround is to continuously wrap/unwrap with Bump
/lower
.
newtype Bump p n = Bump { lower :: p (S n) }
grotate :: forall p n m a.
(forall n. a -> p n -> p (S n)) ->
Vec a n ->
Vec a (S n + m) ->
p m ->
p (S n + m)
grotate cons Nil (y ::: _) zs = cons y zs
grotate cons (x ::: xs) (y ::: ys) zs = lower (grotate consS xs ys (Bump (cons y zs)))
where
consS :: forall n. a -> Bump p n -> Bump p (S n)
consS = \a -> Bump . cons a . lower
rotate :: Vec a n -> Vec a (S n + m) -> Vec a m -> Vec a (S n + m)
rotate = grotate (:::)
We need explicit forall
s here to make it very clear which type variables are getting captured and which aren't, as well as to denote higher-rank types.
SNat
Before we proceed to exec
, we set up some machinery that will allow us to prove some type-level arithmetic claims (which we need to get exec
to typecheck). We start by making an SNat
type (which is a singleton type corresponding to Nat
). SNat
reflects its value in a phantom type variable.
data SNat n where
SZero :: SNat Z
SSucc :: SNat n -> SNat (S n)
We can then make a couple useful functions to do things with SNat
.
sub1 :: SNat (S n) -> SNat n
sub1 (SSucc x) = x
size :: Vec a n -> SNat n
size Nil = SZero
size (_ ::: xs) = SSucc (size xs)
Finally, we are prepared to prove some arithmetic, namely that n + S m ~ S (n + m)
and n + Z ~ n
.
plusSucc :: (SNat n) -> (SNat m) -> (n + S m) :~: S (n + m)
plusSucc SZero _ = Refl
plusSucc (SSucc n) m = gcastWith (plusSucc n m) Refl
plusZero :: SNat n -> (n + Z) :~: n
plusZero SZero = Refl
plusZero (SSucc n) = gcastWith (plusZero n) Refl
exec
Now that we have rotate
, we can define exec
. This definition looks almost identical to the one in the question (with lists), except annotated with gcastWith <some-proof>
.
exec :: Vec a (n + m) -> Vec a (S m) -> Vec a n -> Queue a
exec f r (_ ::: s) = gcastWith (plusSucc (size s) (sub1 (size r))) $ Queue f r s
exec f r Nil = gcastWith (plusZero (sub1 (size r))) $
let f' = rotate f r Nil in (Queue f' Nil f')
It is probably worth noting that we can get some stuff for free by using singletons
. With the right extensions enabled, the following more readable code
import Data.Singletons.TH
singletons [d|
data Nat = Z | S Nat
(+) :: Nat -> Nat -> Nat
Z + n = n
S m + n = S (m + n)
|]
defines, Nat
, the type family :+
(equivalent to my +
), and the singleton type SNat
(with constructors SZ
and SS
equivalent to my SZero
and SSucc
) all in one.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With