Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Grouping event with fs2.Stream

I have event stream as follows:

sealed trait Event

val eventStream: fs2.Stream[IO, Event] = //...

I want to group this events received within a single minute (i.e from 0 sec to 59 sec of every minute). This sounds pretty straightforward with fs2

val groupedEventsStream = eventStream groupAdjacentBy {event => 
    TimeUnit.MILLISECONDS.toMinutes(System.currentTimeMillis())
}

The problem is that the grouping function is not pure. It uses currentTimeMillis. I can workaroud this as follows:

stream.evalMap(t => IO(System.currentTimeMillis(), t))
  .groupAdjacentBy(t => TimeUnit.MILLISECONDS.toMinutes(t._1))

The thing is that adds clumsy boilerplate with tuples I'd like to avoid. Is there any other solutions?

Or maybe using impure function is not that bad for such a case?

like image 475
Some Name Avatar asked Mar 08 '19 21:03

Some Name


People also ask

How do you define a stream in fs2?

FS2 Stream FS2 provides a functional way to define a stream that involves incrementally loading and transforming data. An FS2 stream is defined by the type Stream [+F [_], +O], which defines a stream that requires an environment (also called a context) of type F and outputs a value of O.

What is the purpose of FS2?

FS2 provides a functional way to define a stream that involves incrementally loading and transforming data. An FS2 stream is defined by the type Stream [+F [_], +O], which defines a stream that requires an environment (also called a context) of type F and outputs a value of O.

What is the difference between Akka stream and FS2 stream?

Unlike Akka Streams that always require an actor system and a materializer, an fs2 stream can be created straight away: So an fs2 stream has 2 type parameters. The second one is the one you expect as it represents the type of the elements of the stream – e.g. Int. The first type is a type constructor which corresponds to the effect type.

What is a pull type in fs2?

The PullType As we said more than once, the fs2 defines streams as a pulltype, which means that the stream effectively computes the next stream element just in time. Under the hood, the library implements the Streamtype functions using the Pulltype to honor this behavior.


1 Answers

You could remove some of the boilerplate by using cats.effect.Clock:

def groupedEventsStream[A](stream: fs2.Stream[IO, A])
                          (implicit clock: Clock[IO], eq: Eq[Long]): fs2.Stream[IO, (Long, Chunk[(Long, A)])] =
  stream.evalMap(t => clock.realTime(TimeUnit.MINUTES).map((_, t)))
    .groupAdjacentBy(_._1)
like image 190
kosii Avatar answered Sep 22 '22 00:09

kosii