I have a code which calls couchbase for getting some rows as following:
val gotValues: Observable[JsonDocument] = Observable.from(rowKeys).flatMap(id =>
couchbaseBucket.async().get(id))
If I have 1,2,3,4,5,6 as input row-keys and only rows 1,2,3 exist in the DB then the observable would get notified only about 1,2,3.
My requirement however is that I return a map with 1,2,3 true (exist in db) and 4,5,6 with false, (meaning not exist in DB). I managed to do that with scala observable however i'm using an intermediate map data structure to return the total map which contains all ids. Below is a sample code which simulates my problem..
object Main extends App {
import rx.lang.scala.Observable
val idsToFetch = Seq(1,2,3,4,5,6)
println(isInDBOrNot()) // {1=true, 2=true, 3=true, 4=false, 5=false, 6=false}
private def isInDBOrNot(): ConcurrentHashMap[Int, Boolean] = {
val inAndNotInDB = new java.util.concurrent.ConcurrentHashMap[Int, Boolean]
// - How can I avoid the additional data structure?
// - In this case a map, so that the function will return
// a map with all numbers and for each if exist in DB?
// - I mean I want the function to return a map I don't
// want to populate that map inside the observer,
// it's like a mini side effect I would rather simply
// manipulate the stream.
Observable.from(idsToFetch)
.filterNot(x => x == 4 || x == 5 || x == 6) // Simulate fetch from DB, 4,5,6 do not exist in DB, so not returned.
.subscribe(
x => inAndNotInDB.put(x, true),
e => println(e),
() => idsToFetch.filterNot(inAndNotInDB.containsKey)
.foreach(inAndNotInDB.put(_, false)) // mark all non-found as false.
)
inAndNotInDB
}
}
Anyway to do that without the intermediate map (without populating an intermedia data structure, but only by manipulating the stream)? It does not look clean!! . Thanks.
You problem seems to arise from the fact that you use flatMap
so if there is no data in the DB for a given id
and you get an empty Observable
, flatMap
just produces no output for such id
. So it looks like what you need is defaultIfEmpty which is translated to Scala's orElse
. You can use orElse
to return some default value inside flatMap
. So to modify your example:
def fetchFromDb(id: Int): Observable[String] = {
if (id <= 3)
Observable.just(s"Document #$id")
else
Observable.empty
}
def gotValue(idsToFetch: Seq[Int]): Observable[(Int, Boolean)] = {
Observable.from(idsToFetch).flatMap((id: Int) => fetchFromDb(id).map(_ => (id, true)).orElse((id, false)))
}
println(gotValue(Seq(1, 2, 3, 4, 5, 6)).toBlocking.toList)
which prints
List((1,true), (2,true), (3,true), (4,false), (5,false), (6,false))
Or you can use Option
to return Some(JsonDocument)
or None
such as
def gotValueEx(idsToFetch: Seq[Int]): Observable[(Int, Option[String])] = {
Observable.from(idsToFetch).flatMap((id: Int) => fetchFromDb(id).map(doc => (id, Option(doc))).orElse((id, None)))
}
println(gotValueEx(Seq(1, 2, 3, 4, 5, 6)).toBlocking.toList)
which prints
List((1,Some(Document #1)), (2,Some(Document #2)), (3,Some(Document #3)), (4,None), (5,None), (6,None))
One way of doing this is the following:
(1) convert sequence of ids to Observable
and map
it with
id => (id, false)
... so you'll get an observable of type Observable[(Int, Boolean)]
(lets call this new observable first
).
(2) fetch data from database and map
every fetched row to from:
(some_id, true)
... inside Observable[(Int, Boolean)]
(lets call this observable last
)
(3) concat first
and last
.
(4) toMap result of (3). Duplicate elements coming from first
will be dropped in process. (this will be your resultObsrvable
)
(5) (possibly) collect the first and only element of the observable (your map). You might not want to do this at all, but if you do, you should really understand implications of blocking to collect result at this point. In any case, this step really depends on your application specifics (how threading\scheduling\io is organized) but brute-force approach should look something like this (refer to this demo for more specifics):
Await.result(resultObsrvable.toBlocking.toFuture, 2 seconds)
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With