I have event stream as follows:
sealed trait Event
val eventStream: fs2.Stream[IO, Event] = //...
I want to group this events received within a single minute (i.e from 0 sec to 59 sec of every minute). This sounds pretty straightforward with fs2
val groupedEventsStream = eventStream groupAdjacentBy {event =>
TimeUnit.MILLISECONDS.toMinutes(System.currentTimeMillis())
}
The problem is that the grouping function is not pure. It uses currentTimeMillis
. I can workaroud this as follows:
stream.evalMap(t => IO(System.currentTimeMillis(), t))
.groupAdjacentBy(t => TimeUnit.MILLISECONDS.toMinutes(t._1))
The thing is that adds clumsy boilerplate with tuples I'd like to avoid. Is there any other solutions?
Or maybe using impure function is not that bad for such a case?
FS2 Stream FS2 provides a functional way to define a stream that involves incrementally loading and transforming data. An FS2 stream is defined by the type Stream [+F [_], +O], which defines a stream that requires an environment (also called a context) of type F and outputs a value of O.
FS2 provides a functional way to define a stream that involves incrementally loading and transforming data. An FS2 stream is defined by the type Stream [+F [_], +O], which defines a stream that requires an environment (also called a context) of type F and outputs a value of O.
Unlike Akka Streams that always require an actor system and a materializer, an fs2 stream can be created straight away: So an fs2 stream has 2 type parameters. The second one is the one you expect as it represents the type of the elements of the stream – e.g. Int. The first type is a type constructor which corresponds to the effect type.
The PullType As we said more than once, the fs2 defines streams as a pulltype, which means that the stream effectively computes the next stream element just in time. Under the hood, the library implements the Streamtype functions using the Pulltype to honor this behavior.
You could remove some of the boilerplate by using cats.effect.Clock:
def groupedEventsStream[A](stream: fs2.Stream[IO, A])
(implicit clock: Clock[IO], eq: Eq[Long]): fs2.Stream[IO, (Long, Chunk[(Long, A)])] =
stream.evalMap(t => clock.realTime(TimeUnit.MINUTES).map((_, t)))
.groupAdjacentBy(_._1)
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With