I know this breaks a lot of Rx rules, but I really like RxJava-JDBC and so do my teammates. Relational databases are very core to what we do and so is Rx.
However there are some occasions where we do not want to emit as an Observable<ResultSet>
but would rather just have a pull-based Java 8 Stream<ResultSet>
or Kotlin Sequence<ResultSet>
. But we are very accustomed to the RxJava-JDBC library which only returns an Observable<ResultSet>
.
Therefore, I am wondering if there is a way I can turn an Observable<ResultSet>
into a Sequence<ResultSet>
using an extension function, and not do any intermediary collection or toBlocking()
calls. Below is all I have so far but my head is spinning now trying to connect push and pull based systems, and I cannot buffer either as the ResultSet
is stateful with each onNext()
call. Is this an impossible task?
import rx.Observable
import rx.Subscriber
import java.sql.ResultSet
fun Observable<ResultSet>.asSequence() = object: Iterator<ResultSet>, Subscriber<ResultSet>() {
private var isComplete = false
override fun onCompleted() {
isComplete = true
}
override fun onError(e: Throwable?) {
throw UnsupportedOperationException()
}
override fun onNext(rs: ResultSet?) {
throw UnsupportedOperationException()
}
override fun hasNext(): Boolean {
throw UnsupportedOperationException()
}
override fun next(): ResultSet {
throw UnsupportedOperationException()
}
}.asSequence()
I'm not sure that's the easiest way to achieve what you want but you can try this code. It converts an Observable
to an Iterator
by creating a blocking queue and publishing all events from the Observable
to this queue. The Iterable
pulls events from the queue and blocks if there're none. Then it modify its own state depending on received current event.
class ObservableIterator<T>(
observable: Observable<T>,
scheduler: Scheduler
) : Iterator<T>, Closeable {
private val queue = LinkedBlockingQueue<Notification<T>>()
private var cached: Notification<T>? = null
private var completed: Boolean = false
private val subscription =
observable
.materialize()
.subscribeOn(scheduler)
.subscribe({ queue.put(it) })
override fun hasNext(): Boolean {
cacheNext()
return !completed
}
override fun next(): T {
cacheNext()
val notification = cached ?: throw NoSuchElementException()
check(notification.isOnNext)
cached = null
return notification.value
}
private fun cacheNext() {
if (completed) {
return
}
if (cached == null) {
queue.take().let { notification ->
if (notification.isOnError) {
completed = true
throw RuntimeException(notification.throwable)
} else if (notification.isOnCompleted) {
completed = true
} else {
cached = notification
}
}
}
}
override fun close() {
subscription.unsubscribe()
completed = true
cached = null
}
}
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With