pyspark has a api queueStream used to construct dstream from a series of rdd.
queueStream(rdds, oneAtATime=True, default=None)
Create an input stream from an queue of RDDs or list. In each batch, it will process either one or all of the RDDs returned by the queue.
NOTE: changes to the queue after the stream is created will not be recognized.
Parameters:
rdds – Queue of RDDs
oneAtATime – pick one rdd each time or pick all of them once.
default – The default rdd if no more in rdds
Question 1:
In a distributed environment, if I define a queue object q1. And I do the operation like q1.add(RDD). Will the q1 object transferred to all worker nodes? Will there be problem for the operation q1.add(RDD) if this object are copied to other nodes?
Question 2:
After I run dstream = queueStream(q1). If I continue to put RDD in the queue. Will these RDDS be added to the dstream?
I believe that the following note:
changes to the queue after the stream is created will not be recognized.
pretty much answers the question but to understand why this is the case you'll have to at the PySpark code, in particular following line:
queue = self._jvm.PythonDStream.toRDDQueue([r._jrdd for r in rdds])
If this not enough you can take a look at the corresponding Scala code to see that it takes a static list:
def toRDDQueue(rdds: JArrayList[JavaRDD[Array[Byte]]])
and converts it to a queue:
val queue = new java.util.LinkedList[JavaRDD[Array[Byte]]]
rdds.asScala.foreach(queue.add)
So there is simply no way for any changes on the Python side to be reflected in the stream.
Regarding the first question the answer is negative. Queue won't be distributed because RDD are simply not meaningful outside Driver
context.
Note:
To be clear, Scala queueStream
will reflect additions to the queue. There is even an example in the Spark source
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With