Suppose I essentially want Stream.from(0)
as InputDStream
. How would I go about this? The only way I can see is to use StreamingContext#queueStream
, but I'd have to either enqueue elements from another thread or subclass Queue
to create a queue that behaves like an infinite stream, both of which feel like a hack.
What's the correct way to do this?
I don't think that it's available in Spark by default but it's easy to implement it with ReceiverInputDStream.
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.dstream.ReceiverInputDStream
import org.apache.spark.streaming.receiver.Receiver
class InfiniteStreamInputDStream[T](
@transient ssc_ : StreamingContext,
stream: Stream[T],
storageLevel: StorageLevel
) extends ReceiverInputDStream[T](ssc_) {
override def getReceiver(): Receiver[T] = {
new InfiniteStreamReceiver(stream, storageLevel)
}
}
class InfiniteStreamReceiver[T](stream: Stream[T], storageLevel: StorageLevel) extends Receiver[T](storageLevel) {
// Stateful iterator
private val streamIterator = stream.iterator
private class ReadAndStore extends Runnable {
def run(): Unit = {
while (streamIterator.hasNext) {
val next = streamIterator.next()
store(next)
}
}
}
override def onStart(): Unit = {
new Thread(new ReadAndStore).run()
}
override def onStop(): Unit = { }
}
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With