Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

RxJava- Turn Observable into Iterator, Stream, or Sequence

I know this breaks a lot of Rx rules, but I really like RxJava-JDBC and so do my teammates. Relational databases are very core to what we do and so is Rx.

However there are some occasions where we do not want to emit as an Observable<ResultSet> but would rather just have a pull-based Java 8 Stream<ResultSet> or Kotlin Sequence<ResultSet>. But we are very accustomed to the RxJava-JDBC library which only returns an Observable<ResultSet>.

Therefore, I am wondering if there is a way I can turn an Observable<ResultSet> into a Sequence<ResultSet> using an extension function, and not do any intermediary collection or toBlocking() calls. Below is all I have so far but my head is spinning now trying to connect push and pull based systems, and I cannot buffer either as the ResultSet is stateful with each onNext() call. Is this an impossible task?

import rx.Observable
import rx.Subscriber
import java.sql.ResultSet

fun Observable<ResultSet>.asSequence() = object: Iterator<ResultSet>, Subscriber<ResultSet>() {

    private var isComplete = false

    override fun onCompleted() {
        isComplete = true
    }

    override fun onError(e: Throwable?) {
        throw UnsupportedOperationException()
    }

    override fun onNext(rs: ResultSet?) {
        throw UnsupportedOperationException()
    }


    override fun hasNext(): Boolean {
        throw UnsupportedOperationException()
    }

    override fun next(): ResultSet {
        throw UnsupportedOperationException()
    }

}.asSequence()
like image 438
tmn Avatar asked Mar 13 '23 05:03

tmn


1 Answers

I'm not sure that's the easiest way to achieve what you want but you can try this code. It converts an Observable to an Iterator by creating a blocking queue and publishing all events from the Observable to this queue. The Iterable pulls events from the queue and blocks if there're none. Then it modify its own state depending on received current event.

class ObservableIterator<T>(
    observable: Observable<T>,
    scheduler: Scheduler
) : Iterator<T>, Closeable {

  private val queue = LinkedBlockingQueue<Notification<T>>()
  private var cached: Notification<T>? = null
  private var completed: Boolean = false

  private val subscription =
      observable
          .materialize()
          .subscribeOn(scheduler)
          .subscribe({ queue.put(it) })

  override fun hasNext(): Boolean {
    cacheNext()
    return !completed
  }

  override fun next(): T {
    cacheNext()
    val notification = cached ?: throw NoSuchElementException()
    check(notification.isOnNext)
    cached = null
    return notification.value
  }

  private fun cacheNext() {
    if (completed) {
      return
    }

    if (cached == null) {
      queue.take().let { notification ->
        if (notification.isOnError) {
          completed = true
          throw RuntimeException(notification.throwable)
        } else if (notification.isOnCompleted) {
          completed = true
        } else {
          cached = notification
        }
      }
    }
  }

  override fun close() {
    subscription.unsubscribe()
    completed = true
    cached = null
  }
}
like image 171
Michael Avatar answered Mar 15 '23 17:03

Michael