I have a stream of sequence of java.io.File. And I use flatMapConcat
for create new Source
of file, something like that:
def func(files: List[File]) =
Source(files)
.map(f => openFile(f))
.flatMapConcat(f => Source.fromPublisher(SomePublisher(f)))
.grouped(10)
.via(SomeFlow)
.runWith(Sink.ignore)
Is there a simple way to close each file after stream ending? SomePublisher()
cannot close it.
So I found one good way of many ways to solve my problem, but if you have another way I would like to see it also.
def someSource(file: File) = {
val f = openFile(file)
Source
.fromPublisher(SomePublisher(f))
.transform(() => new PushStage[?, ?] {
override def onPush(elem: ?, ctx: Context[?]): SyncDirective = ctx.push(elem)
override def postStop(): Unit = {
f.close()
super.postStop()
}
}
}
def func(files: List[File]) =
Source(files)
.flatMapConcat(someSource)
.grouped(10)
.via(SomeFlow)
.runWith(Sink.ignore)
So, if I understand you correctly you do the following: for each file YOU create database object what opens the file. So since you are opening db connections in your code you are responsible for closing it. Since you are working with finite list of files you may store all db connections in a sequence, run your stream and close all connection after streaming ends.
Alternative way is to make your own publisher what will get file name, open db connection, stream from it, close db connection. Second option will allow you to stream from infinite list of files.
If you want code sniplets from me give me your full source for the function and I'll update it.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With