Are there some code examples of using org.reactivestreams libraries to process large data streams using Java NIO (for high performance)? I'm aiming at distributed processing, so examples using Akka would be best, but I can figure that out.
It still seems to be the case that most (I hope not all) examples of reading files in scala resort to Source
(non-binary) or direct Java NIO (and even things like Files.readAllBytes
!)
Perhaps there is an activator template I've missed? (Akka Streams with Scala! is close addressing everything I need except the binary/NIO side)
Reactive Streams is effectively push-based. It has a single mode of operation, which works like a pull-based system when the subscriber is slower and a push-based system when the subscriber is faster. This is dynamic and changes in near real-time.
Akka is a powerful actor / reactive framework for the JVM. Akka is an extremely high-performance library — you can do Up to 50 million msg/sec on a single machine. Small memory footprint; ~2.5 million actors per GB of the heap. Akka is also resilient by Design and follows the principles of the Reactive Manifesto.
Reactive Streams is a standard for asynchronous stream processing with non-blocking back pressure. This specification is defined in the Reactive Manifesto, and there are various implementations of it, for example, RxJava or Akka-Streams.
Mono is a stream of 0..1 elements: Mono<String> mn = Mono. just("hello"); And as both are the implementations of the Publisher interface in the reactive stream.
Do not use scala.collection.immutable.Stream
to consume files like this, the reason being that it performs memoization - that is, while yes it is lazy it will keep the entire stream buffered (memoized) in memory!
This is definitely not what you want when you think about "stream processing a file". The reason Scala's Stream works like this is because in a functional setting it makes complete sense - you can avoid calculating fibbonachi numbers again and again easily thanks to this for example, for more details see the ScalaDoc.
Akka Streams provides Reactive Streams implementations and provides a FileIO
class that you could use here (it will properly back-pressure and pull the data out of the file only when needed and the rest of the stream is ready to consume it):
import java.io._
import akka.actor.ActorSystem
import akka.stream.scaladsl.{ Sink, Source }
object ExampleApp extends App {
implicit val sys = ActorSystem()
implicit val mat = FlowMaterializer()
FileIO.fromPath(Paths.get("/example/file.txt"))
.map(c ⇒ { print(c); c })
.runWith(Sink.onComplete(_ ⇒ { f.close(); sys.shutdown() } ))
}
Here are more docs about working with IO with Akka Streams Note that this is for the current-as-of writing version of Akka, so the 2.5.x series.
Hope this helps!
We actually use akka streams to process binary files. It was a little tricky to get things going as there wasn't any documentation around this, but this is what we came up with:
val binFile = new File(filePath)
val inputStream = new BufferedInputStream(new FileInputStream(binFile))
val binStream = Stream.continually(inputStream.read).takeWhile(-1 != _).map(_.toByte)
val binSource = Source(binStream)
Once you have binSource
, which is an akka Source[Byte]
you can go ahead and start applying whatever stream transformations (map
, flatMap
, transform
, etc...) you want to it. This functionality leverages the Source
companion object's apply
that takes an Iterable
, passing in a scala Stream
that should read in the data lazily and make it available to your transforms.
EDIT
As Konrad pointed out in the comments section, a Stream can be an issue with large files due to the fact that it performs memoization of the elements it encounters as it's lazily building out the stream. This can lead to out of memory situations if you are not careful. However, if you look at the docs for Stream there is a tip for avoiding memoization building up in memory:
One must be cautious of memoization; you can very quickly eat up large amounts of memory if you're not careful. The reason for this is that the memoization of the Stream creates a structure much like scala.collection.immutable.List. So long as something is holding on to the head, the head holds on to the tail, and so it continues recursively. If, on the other hand, there is nothing holding on to the head (e.g. we used def to define the Stream) then once it is no longer being used directly, it disappears.
So taking that into account, you could modify my original example as follows:
val binFile = new File(filePath)
val inputStream = new BufferedInputStream(new FileInputStream(binFile))
val binSource = Source(() => binStream(inputStream).iterator)
def binStream(in:BufferedInputStream) = Stream.continually(in.read).takeWhile(-1 != _).map(_.toByte)
So the idea here is to build the Stream
via a def
and not assign to a val
and then immediately get the iterator
from it and use that to initialize the Akka Source
. Setting things up this way should avoid the issues with momoization. I ran the old code against a big file and was able to produce an OutOfMemory
situation by doing a foreach
on the Source
. When I switched it over to the new code I was able to avoid this issue.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With