Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How do I use infinite Scala streams as source in Spark Streaming?

Suppose I essentially want Stream.from(0) as InputDStream. How would I go about this? The only way I can see is to use StreamingContext#queueStream, but I'd have to either enqueue elements from another thread or subclass Queue to create a queue that behaves like an infinite stream, both of which feel like a hack.

What's the correct way to do this?

like image 250
BasilTomato Avatar asked Nov 01 '22 12:11

BasilTomato


1 Answers

I don't think that it's available in Spark by default but it's easy to implement it with ReceiverInputDStream.

import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.dstream.ReceiverInputDStream
import org.apache.spark.streaming.receiver.Receiver

class InfiniteStreamInputDStream[T](
       @transient ssc_ : StreamingContext,
       stream: Stream[T],
       storageLevel: StorageLevel
      ) extends ReceiverInputDStream[T](ssc_)  {

  override def getReceiver(): Receiver[T] = {
    new InfiniteStreamReceiver(stream, storageLevel)
  }
}

class InfiniteStreamReceiver[T](stream: Stream[T], storageLevel: StorageLevel) extends Receiver[T](storageLevel) {

  // Stateful iterator
  private val streamIterator = stream.iterator

  private class ReadAndStore extends Runnable {
    def run(): Unit = {
      while (streamIterator.hasNext) {
        val next = streamIterator.next()
        store(next)
      }
    }
  }

  override def onStart(): Unit = {
    new Thread(new ReadAndStore).run()    
  }

  override def onStop(): Unit = { }
}
like image 155
Eugene Zhulenev Avatar answered Nov 15 '22 05:11

Eugene Zhulenev