Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How do I dynamically add Source to existing Graph?

What can be alternative to dynamically changing running graph ? Here is my situation. I have graph that ingests articles into DB. Articles come from 3 plugins in different format. Thus I have several flows

val converterFlow1: Flow[ImpArticle, Article, NotUsed]
val converterFlow2: Flow[NewsArticle, Article, NotUsed]
val sinkDB: Sink[Article, Future[Done]]

// These are being created every time I poll plugins    
val sourceContentProvider : Source[ImpArticle, NotUsed]
val sourceNews : Source[NewsArticle, NotUsed]
val sourceCit : Source[Article, NotUsed]

val merged = Source.combine(
    sourceContentProvider.via(converterFlow1),
    sourceNews.via(converterFlow2),
    sourceCit)(Merge(_))

val res = merged
  .buffer(10, OverflowStrategy.backpressure)
  .toMat(sinkDB)(Keep.both)
  .run()

Problem is that I get data from content provider once per 24 hrs, from news once per 2 hrs and last source may come at any time because it's coming from humans.

I realize that graphs are immutable but how I can periodically attach new instances of Source to my graph so that I have single point of throttling of the process of ingesting ?

UPDATE: You can say my data is stream of Source-s, three sources in my case. But I cannot change that because I get instances of Source from external classes (so called plugins). These plugins work independently from my ingestion class. I can't combine them into one gigantic class to have single Source.

like image 899
expert Avatar asked Jun 21 '16 12:06

expert


2 Answers

Okay, in general the correct way would be to join a stream of sources into a single source, i.e. go from Source[Source[T, _], Whatever] to Source[T, Whatever]. This can be done with flatMapConcat or with flatMapMerge. Therefore, if you can get a Source[Source[Article, NotUsed], NotUsed], you can use one of flatMap* variants and obtain a final Source[Article, NotUsed]. Do it for each of your sources (no pun intended), and then your original approach should work.

like image 81
Vladimir Matveev Avatar answered Nov 15 '22 06:11

Vladimir Matveev


I've implemented code based up on answer given by Vladimir Matveev and want to share it with others since it looks like common use-case to me.

I knew about Source.queue which Viktor Klang mentioned but I wasn't aware of flatMapConcat. It's pure awesomeness.

implicit val system = ActorSystem("root")
implicit val executor = system.dispatcher
implicit val materializer = ActorMaterializer()

case class ImpArticle(text: String)
case class NewsArticle(text: String)
case class Article(text: String)

val converterFlow1: Flow[ImpArticle, Article, NotUsed] = Flow[ImpArticle].map(a => Article("a:" + a.text))
val converterFlow2: Flow[NewsArticle, Article, NotUsed] = Flow[NewsArticle].map(a => Article("a:" + a.text))
val sinkDB: Sink[Article, Future[Done]] = Sink.foreach { a =>
  Thread.sleep(1000)
  println(a)
}

// These are being created every time I poll plugins
val sourceContentProvider: Source[ImpArticle, NotUsed] = Source(List(ImpArticle("cp1"), ImpArticle("cp2")))
val sourceNews: Source[NewsArticle, NotUsed] = Source(List(NewsArticle("news1"), NewsArticle("news2")))
val sourceCit: Source[Article, NotUsed] = Source(List(Article("a1"), Article("a2")))

val (queue, completionFut) = Source
  .queue[Source[Article, NotUsed]](10, backpressure)
  .flatMapConcat(identity)
  .buffer(2, OverflowStrategy.backpressure)
  .toMat(sinkDB)(Keep.both)
  .run()

queue.offer(sourceContentProvider.via(converterFlow1))
queue.offer(sourceNews.via(converterFlow2))
queue.offer(sourceCit)
queue.complete()

completionFut.onComplete {
  case Success(res) =>
    println(res)
    system.terminate()
  case Failure(ex) =>
    ex.printStackTrace()
    system.terminate()
}

Await.result(system.whenTerminated, Duration.Inf)

I'd still check success of Future returned by queue.offer but in my case these calls will be pretty infrequent.

like image 31
expert Avatar answered Nov 15 '22 07:11

expert