Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to turn a known structured RDD to Vector

Assuming I have an RDD containing (Int, Int) tuples. I wish to turn it into a Vector where first Int in tuple is the index and second is the value.

Any Idea how can I do that?

I update my question and add my solution to clarify: My RDD is already reduced by key, and the number of keys is known. I want a vector in order to update a single accumulator instead of multiple accumulators.

There for my final solution was:

reducedStream.foreachRDD(rdd => rdd.collect({case (x: Int,y: Int) => {
  val v = Array(0,0,0,0)
  v(x) = y
  accumulator += new Vector(v)
}}))

Using Vector from accumulator example in documentation.

like image 832
Noam Shaish Avatar asked Dec 18 '14 21:12

Noam Shaish


People also ask

How do you reverse RDD?

reversed is to: Apply zipWithIndex to the RDD, as already suggested. Sort it in the reversed order and zip resulting RDD with index as well. reduceByKey or groupByKey the union of the RDDs from step 1 and 2 , with index as the key.

Can you index an RDD?

This should be possible by first indexing the RDD. The transformation zipWithIndex provides a stable indexing, numbering each element in its original order. If you're expecting to use lookup often on the same RDD, I'd recommend to cache the indexKey RDD to improve performance.

How many ways can you create RDD in Spark?

There are two ways to create RDDs: parallelizing an existing collection in your driver program, or referencing a dataset in an external storage system, such as a shared filesystem, HDFS, HBase, or any data source offering a Hadoop InputFormat.

What is action and transformation in Spark?

Transformations create RDDs from each other, but when we want to work with the actual dataset, at that point action is performed. When the action is triggered after the result, new RDD is not formed like transformation. Thus, Actions are Spark RDD operations that give non-RDD values.


2 Answers

rdd.collectAsMap.foldLeft(Vector[Int]()){case (acc, (k,v)) => acc updated (k, v)}

Turn the RDD into a Map. Then iterate over that, building a Vector as we go.

You could use justt collect(), but if there are many repetitions of the tuples with the same key that might not fit in memory.

like image 180
The Archetypal Paul Avatar answered Nov 15 '22 04:11

The Archetypal Paul


One key thing: do you really need Vector? Map could be much more suitable.

  • If you really need local Vector, you first need to use .collect() and then do local transformations into Vector. Of course you shall have enough memory for this. But here the real problem is where to find Vector which can be built efficiently from pairs of (index, value). As far as I know Spark MLLib has itself class org.apache.spark.mllib.linalg.Vectors which can create Vector from array of indices and values and even from tuples. Under the hood it uses breeze.linalg. So probably it would be best start for you.

  • If you just need order, you just can use .orderByKey() as you already have RDD[(K,V)]. This way you have ordered stream. Which does not strictly follow your intention but maybe it could suit even better. Now you can drop elements with the same key by .reduceByKey() producing only resulting elements.

  • Finally if you really need large vector, do .orderByKey and then you can produce real vector by doing .flatmap() which maintain counter and drops more than one element for the same index / inserts needed amount of 'default' elements for missing indices.

Hope this is clear enough.

like image 36
Roman Nikitchenko Avatar answered Nov 15 '22 05:11

Roman Nikitchenko