What can be alternative to dynamically changing running graph ? Here is my situation. I have graph that ingests articles into DB. Articles come from 3 plugins in different format. Thus I have several flows
val converterFlow1: Flow[ImpArticle, Article, NotUsed]
val converterFlow2: Flow[NewsArticle, Article, NotUsed]
val sinkDB: Sink[Article, Future[Done]]
// These are being created every time I poll plugins
val sourceContentProvider : Source[ImpArticle, NotUsed]
val sourceNews : Source[NewsArticle, NotUsed]
val sourceCit : Source[Article, NotUsed]
val merged = Source.combine(
sourceContentProvider.via(converterFlow1),
sourceNews.via(converterFlow2),
sourceCit)(Merge(_))
val res = merged
.buffer(10, OverflowStrategy.backpressure)
.toMat(sinkDB)(Keep.both)
.run()
Problem is that I get data from content provider once per 24 hrs, from news once per 2 hrs and last source may come at any time because it's coming from humans.
I realize that graphs are immutable but how I can periodically attach new instances of Source
to my graph so that I have single point of throttling of the process of ingesting ?
UPDATE: You can say my data is stream of Source
-s, three sources in my case. But I cannot change that because I get instances of Source
from external classes (so called plugins). These plugins work independently from my ingestion class. I can't combine them into one gigantic class to have single Source
.
Okay, in general the correct way would be to join a stream of sources into a single source, i.e. go from Source[Source[T, _], Whatever]
to Source[T, Whatever]
. This can be done with flatMapConcat
or with flatMapMerge
. Therefore, if you can get a Source[Source[Article, NotUsed], NotUsed]
, you can use one of flatMap*
variants and obtain a final Source[Article, NotUsed]
. Do it for each of your sources (no pun intended), and then your original approach should work.
I've implemented code based up on answer given by Vladimir Matveev and want to share it with others since it looks like common use-case to me.
I knew about Source.queue
which Viktor Klang mentioned but I wasn't aware of flatMapConcat
. It's pure awesomeness.
implicit val system = ActorSystem("root")
implicit val executor = system.dispatcher
implicit val materializer = ActorMaterializer()
case class ImpArticle(text: String)
case class NewsArticle(text: String)
case class Article(text: String)
val converterFlow1: Flow[ImpArticle, Article, NotUsed] = Flow[ImpArticle].map(a => Article("a:" + a.text))
val converterFlow2: Flow[NewsArticle, Article, NotUsed] = Flow[NewsArticle].map(a => Article("a:" + a.text))
val sinkDB: Sink[Article, Future[Done]] = Sink.foreach { a =>
Thread.sleep(1000)
println(a)
}
// These are being created every time I poll plugins
val sourceContentProvider: Source[ImpArticle, NotUsed] = Source(List(ImpArticle("cp1"), ImpArticle("cp2")))
val sourceNews: Source[NewsArticle, NotUsed] = Source(List(NewsArticle("news1"), NewsArticle("news2")))
val sourceCit: Source[Article, NotUsed] = Source(List(Article("a1"), Article("a2")))
val (queue, completionFut) = Source
.queue[Source[Article, NotUsed]](10, backpressure)
.flatMapConcat(identity)
.buffer(2, OverflowStrategy.backpressure)
.toMat(sinkDB)(Keep.both)
.run()
queue.offer(sourceContentProvider.via(converterFlow1))
queue.offer(sourceNews.via(converterFlow2))
queue.offer(sourceCit)
queue.complete()
completionFut.onComplete {
case Success(res) =>
println(res)
system.terminate()
case Failure(ex) =>
ex.printStackTrace()
system.terminate()
}
Await.result(system.whenTerminated, Duration.Inf)
I'd still check success of Future
returned by queue.offer
but in my case these calls will be pretty infrequent.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With