Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

scala observable unify observable with a sequence without intermediate datastructure update

I have a code which calls couchbase for getting some rows as following:

val gotValues: Observable[JsonDocument] = Observable.from(rowKeys).flatMap(id =>
      couchbaseBucket.async().get(id))

If I have 1,2,3,4,5,6 as input row-keys and only rows 1,2,3 exist in the DB then the observable would get notified only about 1,2,3.

My requirement however is that I return a map with 1,2,3 true (exist in db) and 4,5,6 with false, (meaning not exist in DB). I managed to do that with scala observable however i'm using an intermediate map data structure to return the total map which contains all ids. Below is a sample code which simulates my problem..

object Main extends App {
  import rx.lang.scala.Observable

  val idsToFetch = Seq(1,2,3,4,5,6)

  println(isInDBOrNot()) // {1=true, 2=true, 3=true, 4=false, 5=false, 6=false}

  private def isInDBOrNot(): ConcurrentHashMap[Int, Boolean] = {
    val inAndNotInDB = new java.util.concurrent.ConcurrentHashMap[Int, Boolean]
    // - How can I avoid the additional data structure?
    // - In this case a map, so that the function will return
    //   a map with all numbers and for each if exist in DB?
    // - I mean I want the function to return a map I don't 
    //   want to populate that map inside the observer,
    //   it's like a mini side effect I would rather simply 
    //   manipulate the stream.

    Observable.from(idsToFetch)
      .filterNot(x => x == 4 || x == 5 || x == 6) // Simulate fetch from DB, 4,5,6 do not exist in DB, so not returned.
      .subscribe(
      x => inAndNotInDB.put(x, true),
      e => println(e),
      () => idsToFetch.filterNot(inAndNotInDB.containsKey)
        .foreach(inAndNotInDB.put(_, false)) // mark all non-found as false.
    )

    inAndNotInDB
  }

}

Anyway to do that without the intermediate map (without populating an intermedia data structure, but only by manipulating the stream)? It does not look clean!! . Thanks.

like image 927
Jas Avatar asked Nov 14 '17 12:11

Jas


2 Answers

You problem seems to arise from the fact that you use flatMap so if there is no data in the DB for a given id and you get an empty Observable, flatMap just produces no output for such id. So it looks like what you need is defaultIfEmpty which is translated to Scala's orElse. You can use orElse to return some default value inside flatMap. So to modify your example:

def fetchFromDb(id: Int): Observable[String] = {
  if (id <= 3)
    Observable.just(s"Document #$id")
  else
    Observable.empty
}

def gotValue(idsToFetch: Seq[Int]): Observable[(Int, Boolean)] = {
  Observable.from(idsToFetch).flatMap((id: Int) => fetchFromDb(id).map(_ => (id, true)).orElse((id, false)))
}

println(gotValue(Seq(1, 2, 3, 4, 5, 6)).toBlocking.toList)

which prints

List((1,true), (2,true), (3,true), (4,false), (5,false), (6,false))

Or you can use Option to return Some(JsonDocument) or None such as

def gotValueEx(idsToFetch: Seq[Int]): Observable[(Int, Option[String])] = {
  Observable.from(idsToFetch).flatMap((id: Int) => fetchFromDb(id).map(doc => (id, Option(doc))).orElse((id, None)))
}

println(gotValueEx(Seq(1, 2, 3, 4, 5, 6)).toBlocking.toList)

which prints

List((1,Some(Document #1)), (2,Some(Document #2)), (3,Some(Document #3)), (4,None), (5,None), (6,None))

like image 171
SergGr Avatar answered Nov 11 '22 09:11

SergGr


One way of doing this is the following:

(1) convert sequence of ids to Observable and map it with

id => (id, false)

... so you'll get an observable of type Observable[(Int, Boolean)] (lets call this new observable first).

(2) fetch data from database and map every fetched row to from:

(some_id, true)

... inside Observable[(Int, Boolean)] (lets call this observable last)

(3) concat first and last.

(4) toMap result of (3). Duplicate elements coming from first will be dropped in process. (this will be your resultObsrvable)

(5) (possibly) collect the first and only element of the observable (your map). You might not want to do this at all, but if you do, you should really understand implications of blocking to collect result at this point. In any case, this step really depends on your application specifics (how threading\scheduling\io is organized) but brute-force approach should look something like this (refer to this demo for more specifics):

Await.result(resultObsrvable.toBlocking.toFuture, 2 seconds)
like image 1
Eugene Loy Avatar answered Nov 11 '22 09:11

Eugene Loy