Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How can I close file after stream ending?

I have a stream of sequence of java.io.File. And I use flatMapConcat for create new Source of file, something like that:

def func(files: List[File]) =
  Source(files)
    .map(f => openFile(f))
    .flatMapConcat(f => Source.fromPublisher(SomePublisher(f)))
    .grouped(10)
    .via(SomeFlow)
    .runWith(Sink.ignore)

Is there a simple way to close each file after stream ending? SomePublisher() cannot close it.

like image 364
Leonard Avatar asked Oct 19 '22 17:10

Leonard


2 Answers

So I found one good way of many ways to solve my problem, but if you have another way I would like to see it also.

def someSource(file: File) = {
  val f = openFile(file)

  Source
    .fromPublisher(SomePublisher(f))
    .transform(() => new PushStage[?, ?] {
      override def onPush(elem: ?, ctx: Context[?]): SyncDirective = ctx.push(elem)

      override def postStop(): Unit = {
        f.close()
        super.postStop()
      }
    }
}

def func(files: List[File]) =
  Source(files)
    .flatMapConcat(someSource)
    .grouped(10)
    .via(SomeFlow)
    .runWith(Sink.ignore)
like image 148
Leonard Avatar answered Nov 15 '22 07:11

Leonard


So, if I understand you correctly you do the following: for each file YOU create database object what opens the file. So since you are opening db connections in your code you are responsible for closing it. Since you are working with finite list of files you may store all db connections in a sequence, run your stream and close all connection after streaming ends.

Alternative way is to make your own publisher what will get file name, open db connection, stream from it, close db connection. Second option will allow you to stream from infinite list of files.

If you want code sniplets from me give me your full source for the function and I'll update it.

like image 25
Yury Sukhoverkhov Avatar answered Nov 15 '22 07:11

Yury Sukhoverkhov