Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

too many map keys causing out of memory exception in spark

I have an RDD 'inRDD' of the form RDD[(Vector[(Int, Byte)], Vector[(Int, Byte)])] which is a PairRDD(key,value) where key is Vector[(Int, Byte)] and value is Vector[(Int, Byte)].

For each element (Int, Byte) in the vector of key field, and each element (Int, Byte) in the vector of value field I would like to get a new (key,value) pair in the output RDD as (Int, Int), (Byte, Byte).

That should give me an RDD of the form RDD[((Int, Int), (Byte, Byte))].

For example, inRDD contents could be like,

(Vector((3,2)),Vector((4,2))), (Vector((2,3), (3,3)),Vector((3,1))), (Vector((1,3)),Vector((2,1))), (Vector((1,2)),Vector((2,2), (1,2)))

which would become

((3,4),(2,2)), ((2,3),(3,1)), ((3,3),(3,1)), ((1,2),(3,1)), ((1,2),(2,2)), ((1,1),(2,2))

I have the following code for that.

val outRDD = inRDD.flatMap {                                        
    case (left, right) =>
    for ((ll, li) <- left; (rl, ri) <- right) yield {
        (ll,rl) -> (li,ri)
    }
}

It works when the vectors are small in size in the inRDD. But when there are lot elements in the vectors, I get out of memory exception. Increasing the available memory to spark could only solve for smaller inputs and the error appears again for even larger inputs. Looks like I am trying to assemble a huge structure in memory. I am unable to rewrite this code in any other ways.

I have implemented a similar logic with java in hadoop as follows.

for (String fromValue : fromAssetVals) {
    fromEntity = fromValue.split(":")[0];
    fromAttr = fromValue.split(":")[1];
    for (String toValue : toAssetVals) {
        toEntity = toValue.split(":")[0];
        toAttr = toValue.split(":")[1];
        oKey = new Text(fromEntity.trim() + ":" + toEntity.trim());
        oValue = new Text(fromAttr + ":" + toAttr);
        outputCollector.collect(oKey, oValue);
    }
}

But when I try something similar in spark, I get nested rdd exceptions.

How do I do this efficiently with spark using scala?

like image 226
CRM Avatar asked Feb 24 '16 11:02

CRM


1 Answers

Well, if Cartesian product is the only option you can at least make it a little bit more lazy:

inRDD.flatMap { case (xs, ys) =>
  xs.toIterator.flatMap(x => ys.toIterator.map(y => (x, y)))
}

You can also handle this at the Spark level

import org.apache.spark.RangePartitioner

val indexed = inRDD.zipWithUniqueId.map(_.swap)
val partitioner = new RangePartitioner(indexed.partitions.size, indexed)
val partitioned = indexed.partitionBy(partitioner)

val lefts = partitioned.flatMapValues(_._1)
val rights = partitioned.flatMapValues(_._2)

lefts.join(rights).values
like image 74
zero323 Avatar answered Sep 17 '22 05:09

zero323