Assuming I have an RDD containing (Int, Int) tuples. I wish to turn it into a Vector where first Int in tuple is the index and second is the value.
Any Idea how can I do that?
I update my question and add my solution to clarify: My RDD is already reduced by key, and the number of keys is known. I want a vector in order to update a single accumulator instead of multiple accumulators.
There for my final solution was:
reducedStream.foreachRDD(rdd => rdd.collect({case (x: Int,y: Int) => {
val v = Array(0,0,0,0)
v(x) = y
accumulator += new Vector(v)
}}))
Using Vector
from accumulator example in documentation.
reversed is to: Apply zipWithIndex to the RDD, as already suggested. Sort it in the reversed order and zip resulting RDD with index as well. reduceByKey or groupByKey the union of the RDDs from step 1 and 2 , with index as the key.
This should be possible by first indexing the RDD. The transformation zipWithIndex provides a stable indexing, numbering each element in its original order. If you're expecting to use lookup often on the same RDD, I'd recommend to cache the indexKey RDD to improve performance.
There are two ways to create RDDs: parallelizing an existing collection in your driver program, or referencing a dataset in an external storage system, such as a shared filesystem, HDFS, HBase, or any data source offering a Hadoop InputFormat.
Transformations create RDDs from each other, but when we want to work with the actual dataset, at that point action is performed. When the action is triggered after the result, new RDD is not formed like transformation. Thus, Actions are Spark RDD operations that give non-RDD values.
rdd.collectAsMap.foldLeft(Vector[Int]()){case (acc, (k,v)) => acc updated (k, v)}
Turn the RDD into a Map. Then iterate over that, building a Vector as we go.
You could use justt collect(), but if there are many repetitions of the tuples with the same key that might not fit in memory.
One key thing: do you really need Vector? Map could be much more suitable.
If you really need local Vector, you first need to use .collect() and then do local transformations into Vector. Of course you shall have enough memory for this. But here the real problem is where to find Vector which can be built efficiently from pairs of (index, value). As far as I know Spark MLLib has itself class org.apache.spark.mllib.linalg.Vectors
which can create Vector
from array of indices and values and even from tuples. Under the hood it uses breeze.linalg
. So probably it would be best start for you.
If you just need order, you just can use .orderByKey()
as you already have RDD[(K,V)]
. This way you have ordered stream. Which does not strictly follow your intention but maybe it could suit even better. Now you can drop elements with the same key by .reduceByKey()
producing only resulting elements.
Finally if you really need large vector, do .orderByKey
and then you can produce real vector by doing .flatmap()
which maintain counter and drops more than one element for the same index / inserts needed amount of 'default' elements for missing indices.
Hope this is clear enough.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With